spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From daniel williams <>
Subject flatMapGroupsWithState not timing out (spark 2.2.1)
Date Fri, 12 Jan 2018 22:36:27 GMT

I’m attempting to leverage flatMapGroupsWithState to handle some arbitrary
aggregations and am noticing a couple of things:

   - *ProcessingTimeTimeout* + *setTimeoutDuration* timeout not being
   - *EventTimeTimeout* + watermark value not being honored.
   - *EventTimeTimeout* + *setTimeoutTimestamp* not being honored

I’ve come to this conclusion due to never hitting a conditional check (with
log output) for the *hasTimedOut* property. Each of these scenarios was
tested in isolation from each other and all three exhibited the same
behavior — failure to reach a timeout event, and Spark induced huge
duration between batches.

The test was 2000 messages read from a Kafka topic with two distinct groups
(1000 messages / group).

To give an idea of what I’m attempting to do: aggregate all events into a
single bucket given some timeout expiry.

Also, it should be noted, in this example I’m attempting to get the *final*
value of the GroupState object as its timedout. This is why I attempt to do
a second pass on the timeout — but that doesn’t really matter as I’m not
even getting the timeout event.

My code is here:

    val stream = reader
        "CAST(key AS STRING)",
        "CAST(value AS BINARY)",
      .as[KafkaLoadType].map(el =>
getJacksonReader(classOf[Data]).readValue[Data](new String(el._3)))
      .withWatermark("when", "10 seconds")
      .groupByKey(f => (, f.when))
      .flatMapGroupsWithState[SessionInfo, Result](OutputMode.Append,
GroupStateTimeout.EventTimeTimeout()) {
      case ((name, when),
      events: Iterator[Data], state: GroupState[SessionInfo]) => {


        info("Starting flatMapGroupsWithState func")

        val asList = events.toList

        info(s"${name} iterator size: ${asList.size}")

        if (state.exists) {
          info(s"State exists: ${state.get}")

        var session = state.getOption.getOrElse(, name))

        asList.foreach(e => {
          session = session.add(e.value)

        info(s"Updating value to ${session}")


        val result = if (state.hasTimedOut && !state.get.finalized) {
          info("State has timedout ... finalizing")

          state.update(state.get.copy(finalized = true))

          Iterator(Option(state.get).map(r => Result(r.when,,
        } else if (state.hasTimedOut && state.get.finalized) {
          info("State has timedout AND is finalized")

          val r = state.get


          Iterator(Option(r).map(r => Result(r.when,, r.value)).get)
        } else {
          val result = state.get

          info(s"Returning ${result}")

          //          state.remove()

          Iterator(Option(result).map(r => Result(r.when,, r.value)).get)

        info("Exiting flatMapGroupsWithState func")

      .format("console").option("truncate", false)


Thanks for any help.


View raw message