spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ethan Jewett <esjew...@gmail.com>
Subject Re: performance improvement on second operation...without caching?
Date Mon, 05 May 2014 15:07:01 GMT
Thanks Patrick and Matei for the clarification. I actually have to update
some code now, as I was apparently relying on the fact that the output
files are being re-used. Explains some edge-case behavior that I've seen.

For me, at least, I read the guide, did some tests on fairly extensive RDD
dependency graphs, saw that tasks earlier in the dependency graphs were not
being regenerated and assumed (very much incorrectly I just found out!)
that it was because the RDDs themselves were being cached. I wonder if
there is a way to explain this distinction concisely in the programming
guide. Or maybe I'm the only one that went down this incorrect learning
path :-)

Ethan


On Sun, May 4, 2014 at 12:05 AM, Matei Zaharia <matei.zaharia@gmail.com>wrote:

> Yes, this happens as long as you use the same RDD. For example say you do
> the following:
>
> data1 = sc.textFile(…).map(…).reduceByKey(…)
> data1.count()
> data1.filter(…).count()
>
> The first count() causes outputs of the map/reduce pair in there to be
> written out to shuffle files. Next time you do a count, on either this RDD
> or a child (e.g. after the filter), we notice that output files were
> already generated for this shuffle so we don’t rerun the map stage. Note
> that the output does get read again over the network, which is kind of
> wasteful (if you really wanted to reuse this as quickly as possible you’d
> use cache()).
>
> Matei
>
> On May 3, 2014, at 8:44 PM, Koert Kuipers <koert@tresata.com> wrote:
>
> Hey Matei,
> Not sure i understand that. These are 2 separate jobs. So the second job
> takes advantage of the fact that there is map output left somewhere on disk
> from the first job, and re-uses that?
>
>
> On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia <matei.zaharia@gmail.com>wrote:
>
>> Hi Diana,
>>
>> Apart from these reasons, in a multi-stage job, Spark saves the map
>> output files from map stages to the filesystem, so it only needs to rerun
>> the last reduce stage. This is why you only saw one stage executing. These
>> files are saved for fault recovery but they speed up subsequent runs.
>>
>> Matei
>>
>> On May 3, 2014, at 5:21 PM, Patrick Wendell <pwendell@gmail.com> wrote:
>>
>> Ethan,
>>
>> What you said is actually not true, Spark won't cache RDD's unless you
>> ask it to.
>>
>> The observation here - that running the same job can speed up
>> substantially even without caching - is common. This is because other
>> components in the stack are performing caching and optimizations. Two that
>> can make a huge difference are:
>>
>> 1. The OS buffer cache. Which will keep recently read disk blocks in
>> memory.
>> 2. The Java just-in-time compiler (JIT) which will use runtime profiling
>> to significantly speed up execution speed.
>>
>> These can make a huge difference if you are running the same job
>> over-and-over. And there are other things like the OS network stack
>> increasing TCP windows and so fourth. These will all improve response time
>> as a spark program executes.
>>
>>
>> On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett <esjewett@gmail.com> wrote:
>>
>>> I believe Spark caches RDDs it has memory for regardless of whether you
>>> actually call the 'cache' method on the RDD. The 'cache' method just tips
>>> off Spark that the RDD should have higher priority. At least, that is my
>>> experience and it seems to correspond with your experience and with my
>>> recollection of other discussions on this topic on the list. However, going
>>> back and looking at the programming guide, this is not the way the
>>> cache/persist behavior is described. Does the guide need to be updated?
>>>
>>>
>>> On Fri, May 2, 2014 at 9:04 AM, Diana Carroll <dcarroll@cloudera.com>wrote:
>>>
>>>> I'm just Posty McPostalot this week, sorry folks! :-)
>>>>
>>>> Anyway, another question today:
>>>> I have a bit of code that is pretty time consuming (pasted at the end
>>>> of the message):
>>>> It reads in a bunch of XML files, parses them, extracts some data in a
>>>> map, counts (using reduce), and then sorts.   All stages are executed when
>>>> I do a final operation (take).  The first stage is the most expensive: on
>>>> first run it takes 30s to a minute.
>>>>
>>>> I'm not caching anything.
>>>>
>>>> When I re-execute that take at the end, I expected it to re-execute all
>>>> the same stages, and take approximately the same amount of time, but it
>>>> didn't.  The second "take" executes only a single stage which collectively
>>>> run very fast: the whole operation takes less than 1 second (down from 5
>>>> minutes!)
>>>>
>>>> While this is awesome (!) I don't understand it.  If I'm not caching
>>>> data, why would I see such a marked performance improvement on subsequent
>>>> execution?
>>>>
>>>> (or is this related to the known .9.1 bug about sortByKey executing an
>>>> action when it shouldn't?)
>>>>
>>>> Thanks,
>>>> Diana
>>>> <sparkdev_04-23_KEEP_FOR_BUILDS.png>
>>>>
>>>> # load XML files containing device activation records.
>>>> # Find the most common device models activated
>>>> import xml.etree.ElementTree as ElementTree
>>>>
>>>> # Given a partition containing multi-line XML, parse the contents.
>>>> # Return an iterator of activation Elements contained in the partition
>>>> def getactivations(fileiterator):
>>>>     s = ''
>>>>     for i in fileiterator: s = s + str(i)
>>>>     filetree = ElementTree.fromstring(s)
>>>>     return filetree.getiterator('activation')
>>>>
>>>> # Get the model name from a device activation record
>>>> def getmodel(activation):
>>>>     return activation.find('model').text
>>>>
>>>> filename="hdfs://localhost/user/training/activations/*.xml"
>>>>
>>>> # parse each partition as a file into an activation XML record
>>>> activations = sc.textFile(filename)
>>>> activationTrees = activations.mapPartitions(lambda xml:
>>>> getactivations(xml))
>>>> models = activationTrees.map(lambda activation: getmodel(activation))
>>>>
>>>> # count and sort activations by model
>>>> topmodels = models.map(lambda model: (model,1))\
>>>>     .reduceByKey(lambda v1,v2: v1+v2)\
>>>>     .map(lambda (model,count): (count,model))\
>>>>     .sortByKey(ascending=False)
>>>>
>>>> # display the top 10 models
>>>> for (count,model) in topmodels.take(10):
>>>>     print "Model %s (%s)" % (model,count)
>>>>
>>>>  # repeat!
>>>> for (count,model) in topmodels.take(10):
>>>>     print "Model %s (%s)" % (model,count)
>>>>
>>>>
>>>
>>
>>
>
>

Mime
View raw message