spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: flatMapGroupsWithState not timing out (spark 2.2.1)
Date Fri, 12 Jan 2018 23:39:27 GMT
Hello Dan,

>From your code, it seems like you are setting the timeout timestamp based
on the current processing-time / wall-clock-time, while the watermark is
being calculated on the event-time ("when" column). The semantics of the
EventTimeTimeout is that when the last set timeout timestamp of a group
becomes older than the watermark (that is calculated across all groups)
because that group did not get any new data for a while, then there is a
timeout and the function is called with hasTimedOut to true. However, in
this case, the timeout timestamp is being from a different source of time
(using the wall clock time) than the watermark (using event-time), so they
may not correlate correctly. For example, if the event-time in the test
data is such that it is always one hour behind the wall clock time, the
watermark will be atleast 1 hour older than the set timeout timestamp, and
the group would have to not received data for more than an hour before it
times out.

So I would verify what is the gap between the event-time in data, and the
wall-clock time that is being used to set to understand what is going on.
Or even better, just use the event-time in the data to calculate the
timeout timestamp and not use processing time timeout anywhere.

Let me know how it goes.

TD



On Fri, Jan 12, 2018 at 2:36 PM, daniel williams <daniel.williams@gmail.com>
wrote:

> Hi,
>
> I’m attempting to leverage flatMapGroupsWithState to handle some
> arbitrary aggregations and am noticing a couple of things:
>
>    - *ProcessingTimeTimeout* + *setTimeoutDuration* timeout not being
>    honored
>    - *EventTimeTimeout* + watermark value not being honored.
>    - *EventTimeTimeout* + *setTimeoutTimestamp* not being honored
>
> I’ve come to this conclusion due to never hitting a conditional check
> (with log output) for the *hasTimedOut* property. Each of these scenarios
> was tested in isolation from each other and all three exhibited the same
> behavior — failure to reach a timeout event, and Spark induced huge
> duration between batches.
>
> The test was 2000 messages read from a Kafka topic with two distinct
> groups (1000 messages / group).
>
> To give an idea of what I’m attempting to do: aggregate all events into a
> single bucket given some timeout expiry.
>
> Also, it should be noted, in this example I’m attempting to get the
> *final* value of the GroupState object as its timedout. This is why I
> attempt to do a second pass on the timeout — but that doesn’t really matter
> as I’m not even getting the timeout event.
>
> My code is here:
>
>     val stream = reader
>       .load()
>       .selectExpr(
>         "CAST(key AS STRING)",
>         "topic",
>         "CAST(value AS BINARY)",
>         "timestamp"
>       )
>       .as[KafkaLoadType].map(el => getJacksonReader(classOf[Data]).readValue[Data](new
String(el._3)))
>       .withWatermark("when", "10 seconds")
>       .groupByKey(f => (f.name, f.when))
>       .flatMapGroupsWithState[SessionInfo, Result](OutputMode.Append, GroupStateTimeout.EventTimeTimeout())
{
>       case ((name, when),
>       events: Iterator[Data], state: GroupState[SessionInfo]) => {
>
>         state.setTimeoutTimestamp(DateTime.now.plusMinutes(1).getMillis)
>
>         info("Starting flatMapGroupsWithState func")
>
>         val asList = events.toList
>
>         info(s"${name} iterator size: ${asList.size}")
>
>         if (state.exists) {
>           info(s"State exists: ${state.get}")
>         }
>
>         var session = state.getOption.getOrElse(SessionInfo.zero(when, name))
>
>         asList.foreach(e => {
>           session = session.add(e.value)
>         })
>
>         info(s"Updating value to ${session}")
>
>         state.update(session)
>
>         val result = if (state.hasTimedOut && !state.get.finalized) {
>           info("State has timedout ... finalizing")
>
>           state.update(state.get.copy(finalized = true))
>
>           Iterator(Option(state.get).map(r => Result(r.when, r.name, r.value)).get)
>         } else if (state.hasTimedOut && state.get.finalized) {
>           info("State has timedout AND is finalized")
>
>           val r = state.get
>
>           state.remove()
>
>           Iterator(Option(r).map(r => Result(r.when, r.name, r.value)).get)
>         } else {
>           val result = state.get
>
>           info(s"Returning ${result}")
>
>           //          state.remove()
>
>           Iterator(Option(result).map(r => Result(r.when, r.name, r.value)).get)
>         }
>
>         info("Exiting flatMapGroupsWithState func")
>
>         result
>       }
>     }.writeStream.trigger(Trigger.ProcessingTime(500))
>       .format("console").option("truncate", false)
>       .outputMode(OutputMode.Append)
>       .start()
>
> ​
>
>
>
> Thanks for any help.
>
> dan
>

Mime
View raw message