spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kant kodali <kanth...@gmail.com>
Subject Re: Spark 2.2 structured streaming with mapGroupsWithState + window functions
Date Wed, 06 Sep 2017 01:34:45 GMT
Hi Daniel,

I am thinking you could use groupByKey & mapGroupsWithState to send
whatever updates ("updated state") you want and then use .groupBy(window).
will that work as expected?

Thanks,
Kant


On Mon, Aug 28, 2017 at 7:06 AM, daniel williams <daniel.williams@gmail.com>
wrote:

> Hi all,
>
> I've been looking heavily into Spark 2.2 to solve a problem I have by
> specifically using mapGroupsWithState.  What I've discovered is that a
> *groupBy(window(..))* does not work when being used with a subsequent
> *mapGroupsWithState* and produces an AnalysisException of :
>
> *"mapGroupsWithState is not supported with aggregation on a streaming
> DataFrame/Dataset;;"*
>
> I have http logs that have been rolled up via a previous jobs window
> function in the form of:
>
> {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"},
> "account": "A", "verb": "GET","statusCode": 500, "eventCount": 10}
> {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"},
> "account": "A", "verb": "GET","statusCode": 200, "eventCount": 89}
>
> In this data the *when* sub-object is of one minute blocks.  I'd lock to
> use a *window* function to aggregate that to 10 minute windows and sum
> the eventCount by grouping on account, verb, and statusCode.  From there
> I'd like to *mapGroupsWithState* for each *account* and *verb* to produce
> buckets for some configurable window, say 10 minutes for example's sake, of
> the form:
>
> {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:20"},
> "account": "A", "verb": "GET", "totalRequests": 999, "totalErrors": 198}
>
> *mapGroupsWithState* is perfect for this but, as stated, I've not found a
> way to apply a window function *and* use the mapsGroupsWithState.
>
> Example:
>
> ds.withColumn("bucket", $"when.from")
>
> .withWatermark("bucket", "1 minutes")
>
> .groupBy(window($"bucket", "10 minutes"), -- buckets and sums smaller
> windowed events into a rolled up larger window event with summed eventCount
>
>   $"account",
>
>   $"verb",
>
>   $"statusCode")
>
> .agg(
>
>   sum($"eventCount")
>
> )
>
> .map(r => Log(....))
>
> .groupByKey(l => (l.when, l.account, l.verb)) -- maps
>
> .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout -- will
> calculate totalErrors / totalRequests per bucket
>
>        .EventTimeTimeout()) {
>
>    case ((when: Window, account: String, verb: String),
>
>              events: Iterator[Log],
>
>              state: GroupState[SessionInfo]) => {
>
>         ..........
>
>   }
> }
>
>
> Any suggestions would be greatly appreciated.
>
> I've also noticed that *groupByKey().reduceGroups()* does not work with *mapGroupsWithState
> *which is another strategy that I've tried.
>
> Thanks.
>
> dan
>

Mime
View raw message