spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <t...@databricks.com>
Subject Re: Breaking lineage and reducing stages in Spark Streaming
Date Thu, 09 Jul 2015 20:46:33 GMT
Summarizing the main problems discussed by Dean

1. If you have an infinitely growing lineage, bad things will eventually
happen. You HAVE TO periodically (say every 10th batch), checkpoint the
information.

2. Unpersist the previous `current` RDD ONLY AFTER running an action on the
`newCurrent`. Otherwise you are throwing current out of the cache before
newCurrent has been computed. Modifying Dean's example.

val newCurrent = rdd.union(current).reduceByKey(_+_)
...
// join with newCurrent
// collect or count or any action that uses newCurrent
//

// Now you can unpersist because the newCurrent has been persisted and wont
require falling back to this cached current RDD.
current.unpersist()


On Thu, Jul 9, 2015 at 6:36 AM, Dean Wampler <deanwampler@gmail.com> wrote:

> I think you're complicating the cache behavior by aggressively re-using
> vars when temporary vals would be more straightforward. For example,
> newBase = newBase.unpersist()... effectively means that newBase's data is
> not actually cached when the subsequent .union(...) is performed, so it
> probably goes back to the lineage... Same with the current.unpersist logic
> before it.
>
> Names are cheap, so just use local vals:
>
> val newCurrent = rdd.union(current).reduceByKey(_+_)
> current.unpersist()
>
> Also, what happens if you omit the "2" argument for the number of
> partitions in reduceByKey?
>
> Other minor points:
>
> I would change the joined, toUpdate, toNotUpdate logic to this:
>
> val joined = current.leftOuterJoin(newBase).map(mymap).cache()
>
> val toUpdate = joined.filter(myfilter).cache()
> val toNotUpdate = joined.filter(mynotfilter).cache()
>
>
> Maybe it's just for this email example, but you don't need to call collect
> on toUpdate before using foreach(println). If the RDD is huge, you
> definitely don't want to do that.
>
> Hope this helps.
>
> dean
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
> Typesafe <http://typesafe.com>
> @deanwampler <http://twitter.com/deanwampler>
> http://polyglotprogramming.com
>
> On Thu, Jul 9, 2015 at 8:06 AM, Anand Nalya <anand.nalya@gmail.com> wrote:
>
>> Yes, myRDD is outside of DStream. Following is the actual code where newBase
>> and current are the rdds being updated with each batch:
>>
>>   val base = sc.textFile...
>>   var newBase = base.cache()
>>
>>   val dstream: DStream[String] = ssc.textFileStream...
>>   var current: RDD[(String, Long)] = sc.emptyRDD.cache()
>>
>>   dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd => {
>>
>>     current = rdd.union(current.unpersist()).reduceByKey(_+_, 2)
>>
>>     val joined = current.leftOuterJoin(newBase).cache()
>>     val toUpdate = joined.filter(myfilter).map(mymap).cache()
>>     val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache()
>>
>>     toUpdate.collect().foreach(println) // this goes to some store
>>
>>     newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_,
>> 2).cache()
>>
>>     current = toNotUpdate.cache()
>>
>>     toUpdate.unpersist()
>>     joined.unpersist()
>>     rdd.unpersist()
>>   })
>>
>>
>> Regards,
>>
>> Anand
>>
>>
>> On 9 July 2015 at 18:16, Dean Wampler <deanwampler@gmail.com> wrote:
>>
>>> Is myRDD outside a DStream? If so are you persisting on each batch
>>> iteration? It should be checkpointed frequently too.
>>>
>>> Dean Wampler, Ph.D.
>>> Author: Programming Scala, 2nd Edition
>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>> Typesafe <http://typesafe.com>
>>> @deanwampler <http://twitter.com/deanwampler>
>>> http://polyglotprogramming.com
>>>
>>> On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya <anand.nalya@gmail.com>
>>> wrote:
>>>
>>>> The data coming from dstream have the same keys that are in myRDD, so
>>>> the reduceByKey after union keeps the overall tuple count in myRDD
>>>> fixed. Or even with fixed tuple count, it will keep consuming more
>>>> resources?
>>>>
>>>> On 9 July 2015 at 16:19, Tathagata Das <tdas@databricks.com> wrote:
>>>>
>>>>> If you are continuously unioning RDDs, then you are accumulating ever
>>>>> increasing data, and you are processing ever increasing amount of data
in
>>>>> every batch. Obviously this is going to not last for very long. You
>>>>> fundamentally cannot keep processing ever increasing amount of data with
>>>>> finite resources, isnt it?
>>>>>
>>>>> On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya <anand.nalya@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thats from the Streaming tab for Spark 1.4 WebUI.
>>>>>>
>>>>>> On 9 July 2015 at 15:35, Michel Hubert <michelh@vsnsystemen.nl>
>>>>>> wrote:
>>>>>>
>>>>>>>  Hi,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I was just wondering how you generated to second image with the
>>>>>>> charts.
>>>>>>>
>>>>>>> What product?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *From:* Anand Nalya [mailto:anand.nalya@gmail.com]
>>>>>>> *Sent:* donderdag 9 juli 2015 11:48
>>>>>>> *To:* spark users
>>>>>>> *Subject:* Breaking lineage and reducing stages in Spark Streaming
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I've an application in which an rdd is being updated with tuples
>>>>>>> coming from RDDs in a DStream with following pattern.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> dstream.foreachRDD(rdd => {
>>>>>>>
>>>>>>>   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
>>>>>>>
>>>>>>> })
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I'm using cache() and checkpointin to cache results. Over the
time,
>>>>>>> the lineage of myRDD keeps increasing and stages in each batch
of dstream
>>>>>>> keeps increasing, even though all the earlier stages are skipped.
When the
>>>>>>> number of stages grow big enough, the overall delay due to scheduling
delay
>>>>>>> starts increasing. The processing time for each batch is still
fixed.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Following figures illustrate the problem:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Job execution: https://i.imgur.com/GVHeXH3.png?1
>>>>>>>
>>>>>>> [image: Image removed by sender.]
>>>>>>>
>>>>>>> Delays: https://i.imgur.com/1DZHydw.png?1
>>>>>>>
>>>>>>> [image: Image removed by sender.]
>>>>>>>
>>>>>>> Is there some pattern that I can use to avoid this?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Anand
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message