spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "" <>
Subject RE: About StorageLevel
Date Fri, 27 Jun 2014 03:14:55 GMT
Thanks Raymond!
I was just reading the source code of ShuffledRDD, and found the the ShuffleFetcher, which
wraps BlockManager, does the magic. 
The shuffled partitions will be stored in disk(?) just as what cacheManager does in a persist
Is that to say, whenever there is a shuffle stage, it behaves like there is a "persist(StorageLevel.DISk_ONLY)"
called implicitly?

Kang Liu
From: Liu, Raymond
Date: 2014-06-27 11:02
Subject: RE: About StorageLevel
I think there is a shuffle stage involved. And the future count job will depends on the first
job’s shuffle stages’s output data directly as long as it is still available. Thus it
will be much faster.
Best Regards,
Raymond Liu
From: [] 
Sent: Friday, June 27, 2014 10:08 AM
To: user
Subject: Re: About StorageLevel
Thank u Andrew, that's very helpful.
I still have some doubts on a simple trial: I opened a spark shell in local mode,
and typed in
val r=sc.parallelize(0 to 500000)
val r2=r.keyBy(x=>x).groupByKey(10)
and then I invoked the count action several times on it,
(multiple times)
The first job obviously takes more time than the latter ones. Is there some magic underneath?
Kang Liu
From: Andrew Or
Date: 2014-06-27 02:25
To: user
Subject: Re: About StorageLevel
Hi Kang,
You raise a good point. Spark does not automatically cache all your RDDs. Why? Simply because
the application may create many RDDs, and not all of them are to be reused. After all, there
is only so much memory available to each executor, and caching an RDD adds some overhead especially
if we have to kick out old blocks with LRU. As an example, say you run the following chain:
You might be interested in reusing only the final result, but each step of the chain actually
creates an RDD. If we automatically cache all RDDs, then we'll end up doing extra work for
the RDDs we don't care about. The effect can be much worse if our RDDs are big and there are
many of them, in which case there may be a lot of churn in the cache as we constantly evict
RDDs we reuse. After all, the users know best what RDDs they are most interested in, so it
makes sense to give them control over caching behavior.
2014-06-26 5:36 GMT-07:00 <>:
Hi all,
I have a newbie question about StorageLevel of spark. I came up with these sentences in spark

If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that
way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast
as possible.


Spark automatically monitors cache usage on each node and drops out old data partitions in
a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of
waiting for it to fall out of the cache, use the RDD.unpersist() method. 
But I found the default storageLevel is NONE in source code, and if I never call 'persist(somelevel)',
that value will always be NONE. The 'iterator' method goes to
final def iterator(split: Partition, context: TaskContext): Iterator[T] = { 
    if (storageLevel != StorageLevel.NONE) { 
        SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) 
    } else { 
        computeOrReadCheckpoint(split, context) 
Is that to say, the rdds are cached in memory (or somewhere else) if and only if the 'persist'
or 'cache' method is called explicitly,
otherwise they will be re-computed every time even in an iterative situation?
It made me confused becase I had a first impression that spark is super-fast because it prefers
to store intermediate results in memory automatically.

Forgive me if I asked a stupid question.

Kang Liu
View raw message