spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Does Rollups work with spark structured streaming with state.
Date Thu, 17 Jun 2021 07:37:33 GMT
OK let us start with the basic cube

create a DF first

scala> val df = Seq(
     |   ("bar", 2L),
     |   ("bar", 2L),
     |   ("foo", 1L),
     |   ("foo", 2L)
     | ).toDF("word", "num")
df: org.apache.spark.sql.DataFrame = [word: string, num: bigint]


Now try cube on it


scala> df.cube($"word", $"num").count.sort(asc("word"), asc("num")).show

+----+----+-----+
|word| num|count|
+----+----+-----+
|null|null|    4| Total rows in df
|null|   1|    1| Count where num equals 1
|null|   2|    3| Count where num equals 2
| bar|null|    2| Where word equals bar
| bar|   2|    2| Where word equals bar and num equals 2
| foo|null|    2| Where word equals foo
| foo|   1|    1| Where word equals foo and num equals 1
| foo|   2|    1| Where word equals foo and num equals 2
+----+----+-----+


and rollup


scala> df.rollup($"word",$"num").count.sort(asc("word"), asc("num")).show


+----+----+-----+
|word| num|count|
+----+----+-----+
|null|null|    4| Count of all rows
| bar|null|    2| Count when word is bar
| bar|   2|    2| Count when num is 2
| foo|null|    2| Count when word is foo
| foo|   1|    1| When word is foo and num is 1
| foo|   2|    1| When word is foo and num is 2
+----+----+-----+


So rollup() returns a subset of the rows returned by cube(). From the
above, rollup returns 6 rows whereas cube returns 8 rows. Here are the
missing rows.

+----+----+-----+
|word| num|count|
+----+----+-----+
|null|   1|    1| Word is null and num is 1
|null|   2|    3| Word is null and num is 2
+----+----+-----+

Now back to Spark Structured Streaming (SSS), we have basic aggregations


            """
            We work out the window and the AVG(temperature) in the window's
timeframe below
            This should return back the following Dataframe as struct

             root
             |-- window: struct (nullable = false)
             |    |-- start: timestamp (nullable = true)
             |    |-- end: timestamp (nullable = true)
             |-- avg(temperature): double (nullable = true)

            """
            resultM = resultC. \
                     withWatermark("timestamp", "5 minutes"). \
                     groupBy(window(resultC.timestamp, "5 minutes", "5
minutes")). \
                     avg('temperature')

            # We take the above Dataframe and flatten it to get the columns
aliased as "startOfWindowFrame", "endOfWindowFrame" and "AVGTemperature"
            resultMF = resultM. \
                       select( \

F.col("window.start").alias("startOfWindowFrame") \
                          , F.col("window.end").alias("endOfWindowFrame") \
                          ,
F.col("avg(temperature)").alias("AVGTemperature"))

Now basic aggregation on singular columns can be done like
avg('temperature'),max(),stddev() etc


For cube() and rollup() I will require additional columns like location etc
in my kafka topic. Personally I have not tried it but it will be
interesting to see if it works.


Have you tried cube() first?


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 17 Jun 2021 at 07:44, Amit Joshi <mailtojoshiamit@gmail.com> wrote:

> Hi Mich,
>
> Yes, you may think of cube rollups.
> Let me try to give an example:
> If we have a stream of data like (country,area,count, time), we would be
> able to get the updated count with different combinations of keys.
>
>> As example -
>>  (country - count)
>>  (country , area - count)
>
>
> We may need to store the state to update the count. So spark structured
> streaming states will come into picture.
>
> As now with batch programming, we can do it with
>
>>     df.rollup(col1,col2).count
>
>
> But if I try to use it with spark structured streaming state, will it
> store the state of all the groups as well?
> I hope I was able to make my point clear.
>
> Regards
> Amit Joshi
>
> On Wed, Jun 16, 2021 at 11:36 PM Mich Talebzadeh <
> mich.talebzadeh@gmail.com> wrote:
>
>>
>>
>> Hi,
>>
>> Just to clarify
>>
>> Are we talking about* rollup* as a subset of a cube that computes
>> hierarchical subtotals from left to right?
>>
>>
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 16 Jun 2021 at 16:37, Amit Joshi <mailtojoshiamit@gmail.com>
>> wrote:
>>
>>> Appreciate if someone could give some pointers in the question below.
>>>
>>> ---------- Forwarded message ---------
>>> From: Amit Joshi <mailtojoshiamit@gmail.com>
>>> Date: Tue, Jun 15, 2021 at 12:19 PM
>>> Subject: [Spark]Does Rollups work with spark structured streaming with
>>> state.
>>> To: spark-user <user@spark.apache.org>
>>>
>>>
>>> Hi Spark-Users,
>>>
>>> Hope you are all doing well.
>>> Recently I was looking into rollup operations in spark.
>>>
>>> As we know state based aggregation is supported in spark structured
>>> streaming.
>>> I was wondering if rollup operations are also supported?
>>> Like the state of previous aggregation on the rollups are saved.
>>>
>>> If rollups are not supported, then what is the standard way to handle
>>> this?
>>>
>>>
>>> Regards
>>> Amit Joshi
>>>
>>

Mime
View raw message