I’m attempting to leverage flatMapGroupsWithState to handle some arbitrary aggregations and am noticing a couple of things:

I’ve come to this conclusion due to never hitting a conditional check (with log output) for the hasTimedOut property. Each of these scenarios was tested in isolation from each other and all three exhibited the same behavior — failure to reach a timeout event, and Spark induced huge duration between batches.

The test was 2000 messages read from a Kafka topic with two distinct groups (1000 messages / group).

To give an idea of what I’m attempting to do: aggregate all events into a single bucket given some timeout expiry.

Also, it should be noted, in this example I’m attempting to get the final value of the GroupState object as its timedout. This is why I attempt to do a second pass on the timeout — but that doesn’t really matter as I’m not even getting the timeout event.

My code is here:

    val stream = reader
        "CAST(key AS STRING)",
        "CAST(value AS BINARY)",
      .as[KafkaLoadType].map(el => getJacksonReader(classOf[Data]).readValue[Data](new String(el._3)))
      .withWatermark("when", "10 seconds")
      .groupByKey(f => (f.name, f.when))
      .flatMapGroupsWithState[SessionInfo, Result](OutputMode.Append, GroupStateTimeout.EventTimeTimeout()) {
      case ((name, when),
      events: Iterator[Data], state: GroupState[SessionInfo]) => {


        info("Starting flatMapGroupsWithState func")

        val asList = events.toList

        info(s"${name} iterator size: ${asList.size}")

        if (state.exists) {
          info(s"State exists: ${state.get}")

        var session = state.getOption.getOrElse(SessionInfo.zero(when, name))

        asList.foreach(e => {
          session = session.add(e.value)

        info(s"Updating value to ${session}")


        val result = if (state.hasTimedOut && !state.get.finalized) {
          info("State has timedout ... finalizing")

          state.update(state.get.copy(finalized = true))

          Iterator(Option(state.get).map(r => Result(r.when, r.name, r.value)).get)
        } else if (state.hasTimedOut && state.get.finalized) {
          info("State has timedout AND is finalized")

          val r = state.get


          Iterator(Option(r).map(r => Result(r.when, r.name, r.value)).get)
        } else {
          val result = state.get

          info(s"Returning ${result}")

          //          state.remove()

          Iterator(Option(result).map(r => Result(r.when, r.name, r.value)).get)

        info("Exiting flatMapGroupsWithState func")

      .format("console").option("truncate", false)

Thanks for any help.