spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nicholas Chammas (JIRA)" <>
Subject [jira] [Commented] (SPARK-3105) Calling cache() after RDDs are pipelined has no effect in PySpark
Date Thu, 02 Oct 2014 18:26:33 GMT


Nicholas Chammas commented on SPARK-3105:

I think it's definitely important for the larger project that the 3 APIs (Scala, Java, and
Python) have semantics that are as consistent as possible. And for what it's worth, the Scala/Java
semantics in this case seem nicer. 

People learn about the DAG as a distinguishing feature of Spark, so it might seem strange
in PySpark that caching an RDD earlier in a lineage confers no benefit on descendent RDDs.
Whether the descendent RDDs were defined before or after the caching seems like something
people shouldn't have to think about.

It sounds like this is a non-trivial change to make, and I don't appreciate the other implications
it might have, but it seems like a good thing to me.

> Calling cache() after RDDs are pipelined has no effect in PySpark
> -----------------------------------------------------------------
>                 Key: SPARK-3105
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.0.0, 1.1.0
>            Reporter: Josh Rosen
>            Assignee: Josh Rosen
> PySpark's PipelinedRDD decides whether to pipeline transformations by checking whether
those transformations are pipelinable _at the time that the PipelinedRDD objects are created_
rather than at the time that we invoke actions.  This might lead to problems if we call {{cache()}}
on an RDD after it's already been used in a pipeline:
> {code}
> rdd1 = sc.parallelize(range(100)).map(lambda x: x)
> rdd2 = x: 2 * x)
> rdd1.cache()
> rdd2.collect()
> {code}
> When I run this code, I'd expect {cache()}} to break the pipeline and cache intermediate
results, but instead the two transformations are pipelined together in Python, effectively
ignoring the {{cache()}}.
> Note that {{cache()}} works properly if we call it before performing any other transformations
on the RDD:
> {code}
> rdd1 = sc.parallelize(range(100)).map(lambda x: x).cache()
> rdd2 = x: 2 * x)
> rdd2.collect()
> {code}
> This works as expected and caches {{rdd1}}.
> To fix this, I think we dynamically decide whether to pipeline when we actually perform
actions, rather than statically deciding when we create the RDDs.
> We should also add tests for this.
> (Thanks to [~tdas] for pointing out this issue.)

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message