spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kapil Malik <kma...@adobe.com>
Subject RE: does calling cache()/persist() on a RDD trigger its immediate evaluation?
Date Sun, 04 Jan 2015 14:19:55 GMT
Hi Pengcheng YIN,
RDD cache / persist calls do not trigger evaluation.
Unpersist call is blocking (it does have an async flavor but am not sure what are the SLAs
on behavior).

val rdd = sc.textFile().map()
rdd.persist() // This does not trigger actual storage
while(true){
    val count = rdd.filter().count // this will trigger storage of RDD, so far so good
    if(count == 0)
        break
    
    newRdd = /* some codes that use `rdd` several times, and produce an new RDD */
    rdd.unpersist() // This is immediate !!, if newRDD has not been evaluated + stored yet,
it's not good
    rdd = newRdd.persist() // this will do nothing till next iteration of loop (at count).
}

IMHO, last 3 lines can be replaced with -

    newRdd = /* some codes that use `rdd` several times, and produce an new RDD */
ADDED -->   newRdd.persist( ) // mark for storage
ADDED  -->   newRdd.filter( ... ).count // trigger storage
    rdd.unpersist()
    rdd = newRdd

Although, others can correct me if I am mistaken. You can also verify this with small dataset.

Thanks,

Kapil 
-----Original Message-----
From: Pengcheng YIN [mailto:pcyin1992@gmail.com] 
Sent: 04 January 2015 12:53
To: user@spark.apache.org
Subject: does calling cache()/persist() on a RDD trigger its immediate evaluation?

Hi Pro,

I have a question regarding calling cache()/persist() on an RDD. All RDDs in Spark are lazily
evaluated, but does calling cache()/persist() on a RDD trigger its immediate evaluation?

My spark app is something like this:

val rdd = sc.textFile().map()
rdd.persist()
while(true){
    val count = rdd.filter().count
    if(count == 0)
        break
    
    newRdd = /* some codes that use `rdd` several times, and produce an new RDD */
    rdd.unpersist()
    rdd = newRdd.persist()
}

In each iteration, I persist `rdd`, and unpersist it at the end of the iteration, replace
`rdd` with persisted `newRdd`. My concern is that, if RDD is not evaluated and persisted when
persist() is called, I need to change the position of persist()/unpersist() called to make
it more efficient.

Thanks,
Pengcheng




---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional commands, e-mail:
user-help@spark.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message