spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: Stateful Structured Spark Streaming: Timeout is not getting triggered
Date Thu, 05 Mar 2020 02:30:48 GMT
Make sure that you are continuously feeding data into the query to trigger
the batches. only then timeouts are processed.
See the timeout behavior details here -
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.GroupState

On Wed, Mar 4, 2020 at 2:51 PM Something Something <mailinglists19@gmail.com>
wrote:

> I've set the timeout duration to "2 minutes" as follows:
>
> def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[R00tJsonObject],
>                           oldState: GroupState[MyState]): OutputRow = {
>
>     println("$$$$ Inside updateAcrossEvents with : " + tuple3._1 + ", " + tuple3._2 +
", " + tuple3._3)
>     var state: MyState = if (oldState.exists) oldState.get else MyState(tuple3._1, tuple3._2,
tuple3._3)
>
>     if (oldState.hasTimedOut) {
>       println("@@@@@ oldState has timed out @@@@")
>       // Logic to Write OutputRow
>       OutputRow("some values here...")
>     } else {
>       for (input <- inputs) {
>         state = updateWithEvent(state, input)
>         oldState.update(state)
>         *oldState.setTimeoutDuration("2 minutes")*
>       }
>       OutputRow(null, null, null)
>     }
>
>   }
>
> I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as follows...
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)
>
> But 'hasTimedOut' is never true so I don't get any output! What am I doing wrong?
>
>
>
>

Mime
View raw message