spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Julien CHAMP <jch...@tellmeplus.com>
Subject Re: Spark | Window Function |
Date Wed, 19 Jul 2017 10:56:30 GMT
Hi and thanks a lot for your example !

Ok i've found my problem..
There was too much data ( 1000 ids / 1000 timestamps ) for my test, and it
does not seems to work in such cases :/
This does not seems to scale linearly with the number of id. With a small
example, with 1000 timestamps per id :
- and 50   ids : around 1min to compute
- and 100 ids : around 4min to compute

I've tried to repartition my data without any success :(
As my use cases can grow to a really large number of id / timestamps this
is problematic for me.

Regards
Julien

Le mer. 19 juil. 2017 à 00:22, Radhwane Chebaane <r.chebaane@mindlytix.com>
a écrit :

> Hi Julien,
>
> Could you give more details about the problems you faced?
> Here is a working example with Spark dataframe and Spark SQL:
> https://gist.github.com/radcheb/d16042d8bb3815d3dd42030ecedc43cf
>
>
> Cheers,
> Radhwane Chebaane
>
>
> 2017-07-18 18:21 GMT+02:00 Julien CHAMP <jchamp@tellmeplus.com>:
>
>> Hi Radhwane !
>>
>> I've tested both your solutions using dataframe or spark sql... and in
>> both cases spark is stucked :/
>> Did you test the code that you gave me ? I don't know if I've done
>> something wrong...
>>
>> Regards,
>> Julien
>>
>> Le lun. 10 juil. 2017 à 10:53, Radhwane Chebaane <
>> r.chebaane@mindlytix.com> a écrit :
>>
>>> Hi Julien,
>>>
>>>
>>> - Usually, windows functions require less shuffle than cross join so
>>> it's a little faster depending on use case. For large windows, cross join
>>> and window functions performances are close.
>>> - Use can use UDFs and UDAFs as in any Spark SQL request (Geometric
>>> Mean tested successfully).
>>>
>>> Regards,
>>> Radhwane
>>>
>>> 2017-07-06 16:22 GMT+02:00 Julien CHAMP <jchamp@tellmeplus.com>:
>>>
>>>> Thx a lot for your answer Radhwane :)
>>>>
>>>>
>>>> I have some (many) use case with such needs of Long in window function.
>>>> As said in the bug report, I can store events in ms in a dataframe, and
>>>> want to count the number of events in past 10 years ( requiring a Long
>>>> value )
>>>>
>>>> -> *Let's imagine that this window is used on timestamp values in ms :
>>>> I can ask for a window with a range between [-2160000000L, 0] and only have
>>>> a few values inside, not necessarily 2160000000L. I can understand the
>>>> limitaion for the rowBetween() method but the rangeBetween() method is nice
>>>> for this kind of usage.*
>>>>
>>>>
>>>> The solution with self join seems nice, but 2 questions :
>>>>
>>>> - regarding performances, will it be as fast as window function ?
>>>>
>>>> - can I use my own aggregate function ( for example a Geometric Mean )
>>>> with your solution ? ( using this :
>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html ?
>>>>
>>>>
>>>>
>>>> Thanks again,
>>>>
>>>> Regards,
>>>>
>>>>
>>>> Julien
>>>>
>>>>
>>>>
>>>> Le mer. 5 juil. 2017 à 19:18, Radhwane Chebaane <
>>>> r.chebaane@mindlytix.com> a écrit :
>>>>
>>>>> Hi Julien,
>>>>>
>>>>>
>>>>> Although this is a strange bug in Spark, it's rare to need more than
>>>>> Integer max value size for a window.
>>>>>
>>>>> Nevertheless, most of the window functions can be expressed with
>>>>> self-joins. Hence, your problem may be solved with this example:
>>>>>
>>>>> If input data as follow:
>>>>>
>>>>> +---+-------------+-----+
>>>>> | id|    timestamp|value|
>>>>> +---+-------------+-----+
>>>>> |  B|1000000000000|  100|
>>>>> |  B|1001000000000|   50|
>>>>> |  B|1002000000000|  200|
>>>>> |  B|2500000000000|  500|
>>>>> +---+-------------+-----+
>>>>>
>>>>> And the window is  (-2000000000L, 0)
>>>>>
>>>>> Then this code will give the wanted result:
>>>>>
>>>>> df.as("df1").join(df.as("df2"),
>>>>>   $"df2.timestamp" between($"df1.timestamp" - 2000000000L, $"df1.timestamp"))
>>>>>   .groupBy($"df1.id", $"df1.timestamp", $"df1.value")
>>>>>   .agg( functions.min($"df2.value").as("min___value"))
>>>>>   .orderBy($"df1.timestamp")
>>>>>   .show()
>>>>>
>>>>> +---+-------------+-----+-----------+
>>>>> | id|    timestamp|value|min___value|
>>>>> +---+-------------+-----+-----------+
>>>>> |  B|1000000000000|  100|        100|
>>>>> |  B|1001000000000|   50|         50|
>>>>> |  B|1002000000000|  200|         50|
>>>>> |  B|2500000000000|  500|        500|
>>>>> +---+-------------+-----+-----------+
>>>>>
>>>>> Or by SparkSQL:
>>>>>
>>>>> SELECT c.id as id, c.timestamp as timestamp, c.value, min(c._value) as
min___value FROM
>>>>> (
>>>>>   SELECT a.id as id, a.timestamp as timestamp, a.value as value, b.timestamp
as _timestamp, b.value as _value
>>>>>   FROM df a CROSS JOIN df b
>>>>>   ON b.timestamp >= a.timestamp - 2000000000L and b.timestamp <=
a.timestamp
>>>>> ) c
>>>>> GROUP BY c.id, c.timestamp, c.value ORDER BY c.timestamp
>>>>>
>>>>>
>>>>> This must be also possible also on Spark Streaming however don't expect
high performance.
>>>>>
>>>>>
>>>>> Cheers,
>>>>> Radhwane
>>>>>
>>>>>
>>>>>
>>>>> 2017-07-05 10:41 GMT+02:00 Julien CHAMP <jchamp@tellmeplus.com>:
>>>>>
>>>>>> Hi there !
>>>>>>
>>>>>> Let me explain my problem to see if you have a good solution to help
>>>>>> me :)
>>>>>>
>>>>>> Let's imagine that I have all my data in a DB or a file, that I load
>>>>>> in a dataframe DF with the following columns :
>>>>>> *id | timestamp(ms) | value*
>>>>>> A | 1000000 |  100
>>>>>> A | 1000010 |  50
>>>>>> B | 1000000 |  100
>>>>>> B | 1000010 |  50
>>>>>> B | 1000020 |  200
>>>>>> B | 2500000 |  500
>>>>>> C | 1000000 |  200
>>>>>> C | 1000010 |  500
>>>>>>
>>>>>> The timestamp is a *long value*, so as to be able to express date
in
>>>>>> ms from 0000-01-01 to today !
>>>>>>
>>>>>> I want to compute operations such as min, max, average on the *value
>>>>>> column*, for a given window function, and grouped by id ( Bonus :
>>>>>>  if possible for only some timestamps... )
>>>>>>
>>>>>> For example if I have 3 tuples :
>>>>>>
>>>>>> id | timestamp(ms) | value
>>>>>> B | 1000000 |  100
>>>>>> B | 1000010 |  50
>>>>>> B | 1000020 |  200
>>>>>> B | 2500000 |  500
>>>>>>
>>>>>> I would like to be able to compute the min value for windows of time
>>>>>> = 20. This would result in such a DF :
>>>>>>
>>>>>> id | timestamp(ms) | value | min___value
>>>>>> B | 1000000 |  100 | 100
>>>>>> B | 1000010 |  50  | 50
>>>>>> B | 1000020 |  200 | 50
>>>>>> B | 2500000 |  500 | 500
>>>>>>
>>>>>> This seems the perfect use case for window function in spark  ( cf
:
>>>>>> https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
>>>>>>  )
>>>>>> I can use :
>>>>>>
>>>>>> Window.orderBy("timestamp").partitionBy("id").rangeBetween(-20,0)
>>>>>> df.withColumn("min___value", min(df.col("value")).over(tw))
>>>>>>
>>>>>> This leads to the perfect answer !
>>>>>>
>>>>>> However, there is a big bug with window functions as reported here
(
>>>>>> https://issues.apache.org/jira/browse/SPARK-19451 ) when working
>>>>>> with Long values !!! So I can't use this....
>>>>>>
>>>>>> So my question is ( of course ) how can I resolve my problem ?
>>>>>> If I use spark streaming I will face the same issue ?
>>>>>>
>>>>>> I'll be glad to discuss this problem with you, feel free to answer
:)
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Julien
>>>>>> --
>>>>>>
>>>>>>
>>>>>> Julien CHAMP — Data Scientist
>>>>>>
>>>>>>
>>>>>> *Web : **www.tellmeplus.com* <http://tellmeplus.com/> — *Email
: **jchamp@tellmeplus.com
>>>>>> <jchamp@tellmeplus.com>*
>>>>>>
>>>>>> *Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn*
:  *here*
>>>>>> <https://www.linkedin.com/in/julienchamp>
>>>>>>
>>>>>> TellMePlus S.A — Predictive Objects
>>>>>>
>>>>>> *Paris* : 7 rue des Pommerots, 78400 Chatou
>>>>>> *Montpellier* : 51 impasse des églantiers, 34980 St Clément de
>>>>>> Rivière
>>>>>>
>>>>>>
>>>>>> Ce message peut contenir des informations confidentielles ou
>>>>>> couvertes par le secret professionnel, à l’intention de son destinataire.
>>>>>> Si vous n’en êtes pas le destinataire, merci de contacter l’expéditeur
et
>>>>>> d’en supprimer toute copie.
>>>>>> This email may contain confidential and/or privileged information
for
>>>>>> the intended recipient. If you are not the intended recipient, please
>>>>>> contact the sender and delete all copies.
>>>>>>
>>>>>>
>>>>>> <http://www.tellmeplus.com/assets/emailing/banner.html>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> [image: photo] Radhwane Chebaane
>>>>> Distributed systems engineer, Mindlytix
>>>>>
>>>>> Mail: radhwane@mindlytix.com  <radhwane@mindlytix.com>
>>>>> Mobile: +33 695 588 906 <+33+695+588+906>
>>>>> <https://mail.google.com/mail/u/0/#>
>>>>> Skype: rad.cheb  <https://mail.google.com/mail/u/0/#>
>>>>> LinkedIn <https://fr.linkedin.com/in/radhwane-chebaane-483b3a7b>
>>>>> <https://mail.google.com/mail/u/0/#>
>>>>>
>>>> --
>>>>
>>>>
>>>> Julien CHAMP — Data Scientist
>>>>
>>>>
>>>> *Web : **www.tellmeplus.com* <http://tellmeplus.com/> — *Email :
**jchamp@tellmeplus.com
>>>> <jchamp@tellmeplus.com>*
>>>>
>>>> *Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* :  *here*
>>>> <https://www.linkedin.com/in/julienchamp>
>>>>
>>>> TellMePlus S.A — Predictive Objects
>>>>
>>>> *Paris* : 7 rue des Pommerots, 78400 Chatou
>>>> *Montpellier* : 51 impasse des églantiers, 34980 St Clément de Rivière
>>>>
>>>>
>>>> Ce message peut contenir des informations confidentielles ou couvertes
>>>> par le secret professionnel, à l’intention de son destinataire. Si vous
>>>> n’en êtes pas le destinataire, merci de contacter l’expéditeur et d’en
>>>> supprimer toute copie.
>>>> This email may contain confidential and/or privileged information for
>>>> the intended recipient. If you are not the intended recipient, please
>>>> contact the sender and delete all copies.
>>>>
>>>>
>>>> <http://www.tellmeplus.com/assets/emailing/banner.html>
>>>>
>>>
>>>
>>>
>>> --
>>>
>>> [image: photo] Radhwane Chebaane
>>> Distributed systems engineer, Mindlytix
>>>
>>> Mail: radhwane@mindlytix.com  <radhwane@mindlytix.com>
>>> Mobile: +33 695 588 906 <+33+695+588+906>
>>> <https://mail.google.com/mail/u/0/#>
>>> Skype: rad.cheb  <https://mail.google.com/mail/u/0/#>
>>> LinkedIn <https://fr.linkedin.com/in/radhwane-chebaane-483b3a7b>
>>> <https://mail.google.com/mail/u/0/#>
>>>
>> --
>>
>>
>> Julien CHAMP — Data Scientist
>>
>>
>> *Web : **www.tellmeplus.com* <http://tellmeplus.com/> — *Email : **jchamp@tellmeplus.com
>> <jchamp@tellmeplus.com>*
>>
>> *Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* :  *here*
>> <https://www.linkedin.com/in/julienchamp>
>>
>> TellMePlus S.A — Predictive Objects
>>
>> *Paris* : 7 rue des Pommerots, 78400 Chatou
>> *Montpellier* : 51 impasse des églantiers, 34980 St Clément de Rivière
>>
>>
>> Ce message peut contenir des informations confidentielles ou couvertes
>> par le secret professionnel, à l’intention de son destinataire. Si vous
>> n’en êtes pas le destinataire, merci de contacter l’expéditeur et d’en
>> supprimer toute copie.
>> This email may contain confidential and/or privileged information for the
>> intended recipient. If you are not the intended recipient, please contact
>> the sender and delete all copies.
>>
>>
>> <http://www.tellmeplus.com/assets/emailing/banner.html>
>>
>
>
>
> --
>
> [image: photo] Radhwane Chebaane
> Distributed systems engineer, Mindlytix
>
> Mail: radhwane@mindlytix.com  <radhwane@mindlytix.com>
> Mobile: +33 695 588 906 <+33+695+588+906>
> <https://mail.google.com/mail/u/0/#>
> Skype: rad.cheb  <https://mail.google.com/mail/u/0/#>
> LinkedIn <https://fr.linkedin.com/in/radhwane-chebaane-483b3a7b>
> <https://mail.google.com/mail/u/0/#>
>
-- 


Julien CHAMP — Data Scientist


*Web : **www.tellmeplus.com* <http://tellmeplus.com/> — *Email :
**jchamp@tellmeplus.com
<jchamp@tellmeplus.com>*

*Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* :  *here*
<https://www.linkedin.com/in/julienchamp>

TellMePlus S.A — Predictive Objects

*Paris* : 7 rue des Pommerots, 78400 Chatou
*Montpellier* : 51 impasse des églantiers, 34980 St Clément de Rivière

-- 

Ce message peut contenir des informations confidentielles ou couvertes par 
le secret professionnel, à l’intention de son destinataire. Si vous n’en 
êtes pas le destinataire, merci de contacter l’expéditeur et d’en supprimer 
toute copie.
This email may contain confidential and/or privileged information for the 
intended recipient. If you are not the intended recipient, please contact 
the sender and delete all copies.


-- 
 <http://www.tellmeplus.com/assets/emailing/banner.html>

Mime
View raw message