spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <>
Subject Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?
Date Wed, 30 Aug 2017 23:09:24 GMT
The per-key state S is kept in the memory. It has to be of a type that can
be encoded by Datasets. All you have to do is update S every time the
function is called, and the engine takes care of serializing/checkpointing
the state value, and retrieving the correct version of the value when
restarting from failures. So you explicitly don't have to "store" the state
anywhere, the engine takes care of it under the hood. Internally, there is
an interface called StateStore
which defines a component who is actually responsible for checkpointing the
values, etc. And there is a single implementation
of the store that keeps the values in a hashmap and writes all changes to
the values to a HDFS-API-compatible fault-tolerant filesystem for
checkpointing. With this, by default, you really don't have to worry about
externalizing it and you don't have overload any thing in GroupState. You
just use it as the example shows.

It's important to note that all the state of all the keys is distributed
over the executors. So each executor will have in its memory, a fraction of
the all the train state. Depending on the number of trains, and the amount
of data in the state, you will have to size the cluster and the workers
accordingly. If you keep a lot of state for each train, then your overall
memory requirements are going to increase. So you have to be judicious
about how much data to keep as state data for each key.

Regarding aggregation vs mapGroupsWithState, it's a trade-off between
efficiency and flexibility. With aggregation, you can do sliding window of
"24 hours" sliding every "1 hour", which will give max in "last 24 hours"
updated every "1 hour". If you are okay with this approximation, then this
is easiest to implement (don't forget setting watermarks) and most
efficient. If you really want something more precise than that, then
mapGroupsWithState is the ultimate flexible tool. However, you have to do
bookkeeping of "last 24 hours" and calculate the max yourself. :)

Hope this helps.

On Wed, Aug 30, 2017 at 10:58 AM, kant kodali <> wrote:

> I think I understand *groupByKey/**mapGroupsWithState *and I am still
> trying to wrap my head around *GroupState<S>*. so, I believe I have a
> naive questions to ask on *GroupState<S>*.
> If I were to represent a state that has history of events (say 24 hours)
> and say the number of events can be big for a given 24 hour period. where
> do I store the state S? An external store like Kafka or a Database or a
> Distributed File system ? I wonder if I can represent the state S using a
> DataSet that represents the history of events? GroupState also has
> .exists() and  .get() and if I am not wrong I should override these methods
> right so comparisons and retrieval from external store can work?
> Thanks!
> On Wed, Aug 30, 2017 at 1:39 AM, kant kodali <> wrote:
>> Hi TD,
>> Thanks for the explanation and for the clear pseudo code and an example!
>> mapGroupsWithState is cool and looks very flexible however I have few
>> concerns and questions. For example
>> Say I store TrainHistory as max heap from the Java Collections library
>> and I keep adding to to this heap for 24 hours and at some point I will run
>> out of Java heap space right? Do I need to store TrainHistory as a
>> DataSet or DataFrame instead of in memory max heap object from Java
>> Collections library?
>> I wonder between *Nested query* vs  *groupByKey/**mapGroupsWithState*
>> which approach is more efficient to solve this particular problem ?
>> Thanks!
>> On Tue, Aug 29, 2017 at 9:50 PM, Tathagata Das <
>>> wrote:
>>> Aah, I might have misinterpreted. The groupBy + window solution would
>>> give the max time for each train over 24 hours (non-overlapping window) of
>>> event data (timestamped by activity_timestamp). So the output would be
>>> like.
>>> Train     Dest   Window(activity_timestamp)    max(Time)
>>> 1         HK     Aug28-00:00 to Aug29-00:00    10:00    <- updating
>>> currently through aug29
>>> 1         HK    Aug27-00:00 to Aug28-00:00     09:00    <- not updating
>>> as no new updates coming in with activity_timestamp in this range.
>>> The drawback of this approach is that as soon as Aug28 starts, you have
>>> wait for new event about a train to get a new max(time). You may rather
>>> want a rolling 24 hour period, that is, the max time known over events in
>>> the last 24 hours.
>>> Then maintaining our own custom state using mapGroupsWithState/
>>> flatMapGroupsWithState() is the best and most flexible option.
>>> It is available in Spark 2.2 in Scala, Java.
>>> Here is an example that tracks sessions based on events.
>>> Scala -
>>> n/scala/org/apache/spark/examples/sql/streaming/StructuredSe
>>> ssionization.scala
>>> You will have to create a custom per-train state which keeps track of
>>> last 24 hours of trains history, and use that state to calculate the max
>>> time for each train.
>>> def updateHistoryAndGetMax(train: String, events: Iterator[TrainEvents],
>>> state: GroupState[TrainHistory]): Long = {
>>>     // for every event, update history (i.e. last 24 hours of events)
>>> and return the max time from the history
>>> }
>>> trainTimesDataset     // Dataset[TrainEvents]
>>>   .groupByKey(_.train)
>>>   .mapGroupsWithState(updateHistoryAndGetMax)
>>> Hope this helps.
>>> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz <> wrote:
>>>> Hey TD,
>>>> If I understood the question correctly, your solution wouldn't return
>>>> the exact solution, since it also groups by on destination. I would say the
>>>> easiest solution would be to use flatMapGroupsWithState, where you:
>>>> .groupByKey(_.train)
>>>> and keep in state the row with the maximum time.
>>>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
>>>>> wrote:
>>>>> Yes. And in that case, if you just care about only the last few days
>>>>> of max, then you should set watermark on the timestamp column.
>>>>>  *trainTimesDataset*
>>>>> *  .withWatermark("**activity_timestamp", "5 days")*
>>>>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
>>>>> "train", "dest")*
>>>>> *  .max("time")*
>>>>> Any counts which are more than 5 days old will be dropped from the
>>>>> streaming state.
>>>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <>
>>>>> wrote:
>>>>>> Hi,
>>>>>> Thanks for the response. Since this is a streaming based query and
>>>>>> my case I need to hold state for 24 hours which I forgot to mention
in my
>>>>>> previous email. can I do ?
>>>>>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours",
>>>>>> "24 hours"), "train", "dest").max("time")*
>>>>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>>>>>>> wrote:
>>>>>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>>>>>>> Int, dest: String, time: Timestamp] *
>>>>>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>>>>>> *SQL*: *"select train, dest, max(time) from trainTimesView group
>>>>>>> train, dest"*    // after calling
>>>>>>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>>>>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <>
>>>>>>> wrote:
>>>>>>>> Hi All,
>>>>>>>> I am wondering what is the easiest and concise way to express
>>>>>>>> computation below in Spark Structured streaming given that
it supports both
>>>>>>>> imperative and declarative styles?
>>>>>>>> I am just trying to select rows that has max timestamp for
>>>>>>>> train? Instead of doing some sort of nested queries like
we normally do in
>>>>>>>> any relational database I am trying to see if I can leverage
>>>>>>>> imperative and declarative at the same time. If nested queries
or join are
>>>>>>>> not required then I would like to see how this can be possible?
I am using
>>>>>>>> spark 2.1.1.
>>>>>>>> Dataset
>>>>>>>> Train    Dest      Time1        HK        10:001        SH
       12:001        SZ        14:002        HK        13:002        SH        09:002    
   SZ        07:00
>>>>>>>> The desired result should be:
>>>>>>>> Train    Dest      Time1        SZ        14:002        HK

View raw message