spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Koert Kuipers <ko...@tresata.com>
Subject Re: performance improvement on second operation...without caching?
Date Sun, 04 May 2014 03:44:16 GMT
Hey Matei,
Not sure i understand that. These are 2 separate jobs. So the second job
takes advantage of the fact that there is map output left somewhere on disk
from the first job, and re-uses that?


On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia <matei.zaharia@gmail.com>wrote:

> Hi Diana,
>
> Apart from these reasons, in a multi-stage job, Spark saves the map output
> files from map stages to the filesystem, so it only needs to rerun the last
> reduce stage. This is why you only saw one stage executing. These files are
> saved for fault recovery but they speed up subsequent runs.
>
> Matei
>
> On May 3, 2014, at 5:21 PM, Patrick Wendell <pwendell@gmail.com> wrote:
>
> Ethan,
>
> What you said is actually not true, Spark won't cache RDD's unless you ask
> it to.
>
> The observation here - that running the same job can speed up
> substantially even without caching - is common. This is because other
> components in the stack are performing caching and optimizations. Two that
> can make a huge difference are:
>
> 1. The OS buffer cache. Which will keep recently read disk blocks in
> memory.
> 2. The Java just-in-time compiler (JIT) which will use runtime profiling
> to significantly speed up execution speed.
>
> These can make a huge difference if you are running the same job
> over-and-over. And there are other things like the OS network stack
> increasing TCP windows and so fourth. These will all improve response time
> as a spark program executes.
>
>
> On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett <esjewett@gmail.com> wrote:
>
>> I believe Spark caches RDDs it has memory for regardless of whether you
>> actually call the 'cache' method on the RDD. The 'cache' method just tips
>> off Spark that the RDD should have higher priority. At least, that is my
>> experience and it seems to correspond with your experience and with my
>> recollection of other discussions on this topic on the list. However, going
>> back and looking at the programming guide, this is not the way the
>> cache/persist behavior is described. Does the guide need to be updated?
>>
>>
>> On Fri, May 2, 2014 at 9:04 AM, Diana Carroll <dcarroll@cloudera.com>wrote:
>>
>>> I'm just Posty McPostalot this week, sorry folks! :-)
>>>
>>> Anyway, another question today:
>>> I have a bit of code that is pretty time consuming (pasted at the end of
>>> the message):
>>> It reads in a bunch of XML files, parses them, extracts some data in a
>>> map, counts (using reduce), and then sorts.   All stages are executed when
>>> I do a final operation (take).  The first stage is the most expensive: on
>>> first run it takes 30s to a minute.
>>>
>>> I'm not caching anything.
>>>
>>> When I re-execute that take at the end, I expected it to re-execute all
>>> the same stages, and take approximately the same amount of time, but it
>>> didn't.  The second "take" executes only a single stage which collectively
>>> run very fast: the whole operation takes less than 1 second (down from 5
>>> minutes!)
>>>
>>> While this is awesome (!) I don't understand it.  If I'm not caching
>>> data, why would I see such a marked performance improvement on subsequent
>>> execution?
>>>
>>> (or is this related to the known .9.1 bug about sortByKey executing an
>>> action when it shouldn't?)
>>>
>>> Thanks,
>>> Diana
>>> <sparkdev_04-23_KEEP_FOR_BUILDS.png>
>>>
>>> # load XML files containing device activation records.
>>> # Find the most common device models activated
>>> import xml.etree.ElementTree as ElementTree
>>>
>>> # Given a partition containing multi-line XML, parse the contents.
>>> # Return an iterator of activation Elements contained in the partition
>>> def getactivations(fileiterator):
>>>     s = ''
>>>     for i in fileiterator: s = s + str(i)
>>>     filetree = ElementTree.fromstring(s)
>>>     return filetree.getiterator('activation')
>>>
>>> # Get the model name from a device activation record
>>> def getmodel(activation):
>>>     return activation.find('model').text
>>>
>>> filename="hdfs://localhost/user/training/activations/*.xml"
>>>
>>> # parse each partition as a file into an activation XML record
>>> activations = sc.textFile(filename)
>>> activationTrees = activations.mapPartitions(lambda xml:
>>> getactivations(xml))
>>> models = activationTrees.map(lambda activation: getmodel(activation))
>>>
>>> # count and sort activations by model
>>> topmodels = models.map(lambda model: (model,1))\
>>>     .reduceByKey(lambda v1,v2: v1+v2)\
>>>     .map(lambda (model,count): (count,model))\
>>>     .sortByKey(ascending=False)
>>>
>>> # display the top 10 models
>>> for (count,model) in topmodels.take(10):
>>>     print "Model %s (%s)" % (model,count)
>>>
>>>  # repeat!
>>> for (count,model) in topmodels.take(10):
>>>     print "Model %s (%s)" % (model,count)
>>>
>>>
>>
>
>

Mime
View raw message