spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Diana Carroll <dcarr...@cloudera.com>
Subject performance improvement on second operation...without caching?
Date Fri, 02 May 2014 14:04:30 GMT
I'm just Posty McPostalot this week, sorry folks! :-)

Anyway, another question today:
I have a bit of code that is pretty time consuming (pasted at the end of
the message):
It reads in a bunch of XML files, parses them, extracts some data in a map,
counts (using reduce), and then sorts.   All stages are executed when I do
a final operation (take).  The first stage is the most expensive: on first
run it takes 30s to a minute.

I'm not caching anything.

When I re-execute that take at the end, I expected it to re-execute all the
same stages, and take approximately the same amount of time, but it didn't.
 The second "take" executes only a single stage which collectively run very
fast: the whole operation takes less than 1 second (down from 5 minutes!)

While this is awesome (!) I don't understand it.  If I'm not caching data,
why would I see such a marked performance improvement on subsequent
execution?

(or is this related to the known .9.1 bug about sortByKey executing an
action when it shouldn't?)

Thanks,
Diana
[image: Inline image 1]

# load XML files containing device activation records.
# Find the most common device models activated
import xml.etree.ElementTree as ElementTree

# Given a partition containing multi-line XML, parse the contents.
# Return an iterator of activation Elements contained in the partition
def getactivations(fileiterator):
    s = ''
    for i in fileiterator: s = s + str(i)
    filetree = ElementTree.fromstring(s)
    return filetree.getiterator('activation')

# Get the model name from a device activation record
def getmodel(activation):
    return activation.find('model').text

filename="hdfs://localhost/user/training/activations/*.xml"

# parse each partition as a file into an activation XML record
activations = sc.textFile(filename)
activationTrees = activations.mapPartitions(lambda xml: getactivations(xml))
models = activationTrees.map(lambda activation: getmodel(activation))

# count and sort activations by model
topmodels = models.map(lambda model: (model,1))\
    .reduceByKey(lambda v1,v2: v1+v2)\
    .map(lambda (model,count): (count,model))\
    .sortByKey(ascending=False)

# display the top 10 models
for (count,model) in topmodels.take(10):
    print "Model %s (%s)" % (model,count)

# repeat!
for (count,model) in topmodels.take(10):
    print "Model %s (%s)" % (model,count)

Mime
View raw message