+1 

Is this ticket related https://issues.apache.org/jira/browse/SPARK-21641 ?

On Mon, Aug 28, 2017 at 7:06 AM, daniel williams <daniel.williams@gmail.com> wrote:
Hi all,

I've been looking heavily into Spark 2.2 to solve a problem I have by specifically using mapGroupsWithState.  What I've discovered is that a groupBy(window(..)) does not work when being used with a subsequent mapGroupsWithState and produces an AnalysisException of :

"mapGroupsWithState is not supported with aggregation on a streaming DataFrame/Dataset;;"

I have http logs that have been rolled up via a previous jobs window function in the form of:

{"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"}, "account": "A", "verb": "GET","statusCode": 500, "eventCount": 10}
{"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"}, "account": "A", "verb": "GET","statusCode": 200, "eventCount": 89}

In this data the when sub-object is of one minute blocks.  I'd lock to use a window function to aggregate that to 10 minute windows and sum the eventCount by grouping on account, verb, and statusCode.  From there I'd like to mapGroupsWithState for each account and verb to produce buckets for some configurable window, say 10 minutes for example's sake, of the form:

{"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:20"}, "account": "A", "verb": "GET", "totalRequests": 999, "totalErrors": 198}

mapGroupsWithState is perfect for this but, as stated, I've not found a way to apply a window function and use the mapsGroupsWithState.

Example:

ds.withColumn("bucket", $"when.from")

.withWatermark("bucket", "1 minutes")

.groupBy(window($"bucket", "10 minutes"), -- buckets and sums smaller windowed events into a rolled up larger window event with summed eventCount

  $"account",

  $"verb",

  $"statusCode")

.agg(

  sum($"eventCount")

)

.map(r => Log(....))

.groupByKey(l => (l.when, l.account, l.verb)) -- maps

.mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout -- will calculate totalErrors / totalRequests per bucket

       .EventTimeTimeout()) {

   case ((when: Window, account: String, verb: String),

             events: Iterator[Log],

             state: GroupState[SessionInfo]) => {

        ..........

  }

}    



Any suggestions would be greatly appreciated.

I've also noticed that groupByKey().reduceGroups() does not work with mapGroupsWithState which is another strategy that I've tried.  

Thanks.

dan