spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Diana Carroll <dcarr...@cloudera.com>
Subject Re: performance improvement on second operation...without caching?
Date Mon, 05 May 2014 19:47:44 GMT
Ethan, you're not the only one, which is why I was asking about this! :-)

Matei, thanks for your response. your answer explains the performance jump
in my code, but shows I've missed something key in my understanding of
Spark!

I was not aware until just now that map output was saved to disk (other
than if explicitly told to do use using persist.)  It raises almost as many
questions as it answers.

Where are the shuffle files saved?  Locally on the mapper nodes?  Is it the
same location that disk-spilled cache is saved to?  Doesn't the necessity
of saving to disk result in increased i/o that would slow the job down?  I
thought part of the goal of Spark was to do everything in memory unless the
user specifically chose to persist...thereby making a choice to incur
time/disk space expense up front in return for fast failure recovery?

Not that I'm complaining, mind you, but I do think people should be made
clear....this not only affects performance, but also, for instance, whether
the data is fresh/out of date.  I had assumed if I did not set caching,
that each time I performed an operation on an RDD, it would re-compute
based on lineage, including re-reading the files...so I didn't have to
worry about the possibility of my file content changing.  But if it's
auto-caching shuffle files, my base files won't get re-read even if the
content has changed. (Or does it check timestamps?)

Thanks,
Diana








On Mon, May 5, 2014 at 11:07 AM, Ethan Jewett <esjewett@gmail.com> wrote:

> Thanks Patrick and Matei for the clarification. I actually have to update
> some code now, as I was apparently relying on the fact that the output
> files are being re-used. Explains some edge-case behavior that I've seen.
>
> For me, at least, I read the guide, did some tests on fairly extensive RDD
> dependency graphs, saw that tasks earlier in the dependency graphs were not
> being regenerated and assumed (very much incorrectly I just found out!)
> that it was because the RDDs themselves were being cached. I wonder if
> there is a way to explain this distinction concisely in the programming
> guide. Or maybe I'm the only one that went down this incorrect learning
> path :-)
>
> Ethan
>
>
> On Sun, May 4, 2014 at 12:05 AM, Matei Zaharia <matei.zaharia@gmail.com>wrote:
>
>> Yes, this happens as long as you use the same RDD. For example say you do
>> the following:
>>
>> data1 = sc.textFile(…).map(…).reduceByKey(…)
>> data1.count()
>> data1.filter(…).count()
>>
>> The first count() causes outputs of the map/reduce pair in there to be
>> written out to shuffle files. Next time you do a count, on either this RDD
>> or a child (e.g. after the filter), we notice that output files were
>> already generated for this shuffle so we don’t rerun the map stage. Note
>> that the output does get read again over the network, which is kind of
>> wasteful (if you really wanted to reuse this as quickly as possible you’d
>> use cache()).
>>
>> Matei
>>
>> On May 3, 2014, at 8:44 PM, Koert Kuipers <koert@tresata.com> wrote:
>>
>> Hey Matei,
>> Not sure i understand that. These are 2 separate jobs. So the second job
>> takes advantage of the fact that there is map output left somewhere on disk
>> from the first job, and re-uses that?
>>
>>
>> On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia <matei.zaharia@gmail.com>wrote:
>>
>>> Hi Diana,
>>>
>>> Apart from these reasons, in a multi-stage job, Spark saves the map
>>> output files from map stages to the filesystem, so it only needs to rerun
>>> the last reduce stage. This is why you only saw one stage executing. These
>>> files are saved for fault recovery but they speed up subsequent runs.
>>>
>>> Matei
>>>
>>> On May 3, 2014, at 5:21 PM, Patrick Wendell <pwendell@gmail.com> wrote:
>>>
>>> Ethan,
>>>
>>> What you said is actually not true, Spark won't cache RDD's unless you
>>> ask it to.
>>>
>>> The observation here - that running the same job can speed up
>>> substantially even without caching - is common. This is because other
>>> components in the stack are performing caching and optimizations. Two that
>>> can make a huge difference are:
>>>
>>> 1. The OS buffer cache. Which will keep recently read disk blocks in
>>> memory.
>>> 2. The Java just-in-time compiler (JIT) which will use runtime profiling
>>> to significantly speed up execution speed.
>>>
>>> These can make a huge difference if you are running the same job
>>> over-and-over. And there are other things like the OS network stack
>>> increasing TCP windows and so fourth. These will all improve response time
>>> as a spark program executes.
>>>
>>>
>>> On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett <esjewett@gmail.com> wrote:
>>>
>>>> I believe Spark caches RDDs it has memory for regardless of whether you
>>>> actually call the 'cache' method on the RDD. The 'cache' method just tips
>>>> off Spark that the RDD should have higher priority. At least, that is my
>>>> experience and it seems to correspond with your experience and with my
>>>> recollection of other discussions on this topic on the list. However, going
>>>> back and looking at the programming guide, this is not the way the
>>>> cache/persist behavior is described. Does the guide need to be updated?
>>>>
>>>>
>>>> On Fri, May 2, 2014 at 9:04 AM, Diana Carroll <dcarroll@cloudera.com>wrote:
>>>>
>>>>> I'm just Posty McPostalot this week, sorry folks! :-)
>>>>>
>>>>> Anyway, another question today:
>>>>> I have a bit of code that is pretty time consuming (pasted at the end
>>>>> of the message):
>>>>> It reads in a bunch of XML files, parses them, extracts some data in
a
>>>>> map, counts (using reduce), and then sorts.   All stages are executed
when
>>>>> I do a final operation (take).  The first stage is the most expensive:
on
>>>>> first run it takes 30s to a minute.
>>>>>
>>>>> I'm not caching anything.
>>>>>
>>>>> When I re-execute that take at the end, I expected it to re-execute
>>>>> all the same stages, and take approximately the same amount of time,
but it
>>>>> didn't.  The second "take" executes only a single stage which collectively
>>>>> run very fast: the whole operation takes less than 1 second (down from
5
>>>>> minutes!)
>>>>>
>>>>> While this is awesome (!) I don't understand it.  If I'm not caching
>>>>> data, why would I see such a marked performance improvement on subsequent
>>>>> execution?
>>>>>
>>>>> (or is this related to the known .9.1 bug about sortByKey executing an
>>>>> action when it shouldn't?)
>>>>>
>>>>> Thanks,
>>>>> Diana
>>>>> <sparkdev_04-23_KEEP_FOR_BUILDS.png>
>>>>>
>>>>> # load XML files containing device activation records.
>>>>> # Find the most common device models activated
>>>>> import xml.etree.ElementTree as ElementTree
>>>>>
>>>>> # Given a partition containing multi-line XML, parse the contents.
>>>>> # Return an iterator of activation Elements contained in the partition
>>>>> def getactivations(fileiterator):
>>>>>     s = ''
>>>>>     for i in fileiterator: s = s + str(i)
>>>>>     filetree = ElementTree.fromstring(s)
>>>>>     return filetree.getiterator('activation')
>>>>>
>>>>> # Get the model name from a device activation record
>>>>> def getmodel(activation):
>>>>>     return activation.find('model').text
>>>>>
>>>>> filename="hdfs://localhost/user/training/activations/*.xml"
>>>>>
>>>>> # parse each partition as a file into an activation XML record
>>>>> activations = sc.textFile(filename)
>>>>> activationTrees = activations.mapPartitions(lambda xml:
>>>>> getactivations(xml))
>>>>> models = activationTrees.map(lambda activation: getmodel(activation))
>>>>>
>>>>> # count and sort activations by model
>>>>> topmodels = models.map(lambda model: (model,1))\
>>>>>     .reduceByKey(lambda v1,v2: v1+v2)\
>>>>>     .map(lambda (model,count): (count,model))\
>>>>>     .sortByKey(ascending=False)
>>>>>
>>>>> # display the top 10 models
>>>>> for (count,model) in topmodels.take(10):
>>>>>     print "Model %s (%s)" % (model,count)
>>>>>
>>>>>  # repeat!
>>>>> for (count,model) in topmodels.take(10):
>>>>>     print "Model %s (%s)" % (model,count)
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>>
>

Mime
View raw message