spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Josh Rosen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-3105) Calling cache() after RDDs are pipelined has no effect in PySpark
Date Thu, 02 Oct 2014 05:12:33 GMT

    [ https://issues.apache.org/jira/browse/SPARK-3105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14156088#comment-14156088
] 

Josh Rosen commented on SPARK-3105:
-----------------------------------

I discussed this with [~tdas] and [~davies] today.

As it stands now, there's a small discrepancy between the Scala and Python semantics for cache()
and persist().  In Scala, calling cache() or persist() both returns an RDD _and_ changes the
persistence of the RDD instance it was called on, so running

{code}
val a = sc.parallelize(...).map()
val b = a.map(...)
a.count()
b.count()

a.cache()
a.count()
b.count()
{code}

will result in {{b.count()}} using a cached copy of {{a}}.

In Python, as described above, changing the persistence level of an RDD will not automatically
cause that persistence change to be reflected in existing RDDs that were generated by transforming
the original non-persisted RDD.

In all languages, calling cache() or persist() and performing subsequent transformations on
the RDDs returned from cache() / persist() will work as expected.  

One scenario where the Python semantics might be annoying is in an IPython notebook.  Say
that a user defines some base RDDs in cell #1, some transformed RDDs in cell #2, then performs
actions in cell #3, calls cache() in cell #4, then goes back and re-runs cell #3.  In this
case, the cache() won't have any effect.

On the one hand, I suppose we could argue that it's best to keep the semantics as close as
possible across all languages.

Unfortunately, this would require us to make some large internal changes to PySpark in order
to implement the Scala/Java semantics.  If we decide that this is worth pursuing, I can post
a more detailed proposal outlining the necessary changes (the "tl;dr" of my proposed approach
is "establish a one-to-one mapping between JavaRDDs and PySpark RDDs and defer the pipelining
of Python functions to execution time").

In my interpretation, the Spark Programming Guide [seems to suggest|https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence]
that persist() and cache() should modify the instance that they're called on rather than returning
a new, different RDD which happens to be persisted:

{quote}
You can mark an RDD to be persisted using the persist() or cache() methods on it. 
{quote}

"Marking" an RDD sounds like it modifies the metadata of that RDD rather than returning a
new one (maybe I'm reading too much into this, though).

Does anyone have strong opinions on whether we should change the PySpark semantics to match
Scala's, or examples of real use-cases where PySpark's current cache() semantics are confusing?

> Calling cache() after RDDs are pipelined has no effect in PySpark
> -----------------------------------------------------------------
>
>                 Key: SPARK-3105
>                 URL: https://issues.apache.org/jira/browse/SPARK-3105
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.0.0, 1.1.0
>            Reporter: Josh Rosen
>            Assignee: Josh Rosen
>
> PySpark's PipelinedRDD decides whether to pipeline transformations by checking whether
those transformations are pipelinable _at the time that the PipelinedRDD objects are created_
rather than at the time that we invoke actions.  This might lead to problems if we call {{cache()}}
on an RDD after it's already been used in a pipeline:
> {code}
> rdd1 = sc.parallelize(range(100)).map(lambda x: x)
> rdd2 = rdd1.map(lambda x: 2 * x)
> rdd1.cache()
> rdd2.collect()
> {code}
> When I run this code, I'd expect {cache()}} to break the pipeline and cache intermediate
results, but instead the two transformations are pipelined together in Python, effectively
ignoring the {{cache()}}.
> Note that {{cache()}} works properly if we call it before performing any other transformations
on the RDD:
> {code}
> rdd1 = sc.parallelize(range(100)).map(lambda x: x).cache()
> rdd2 = rdd1.map(lambda x: 2 * x)
> rdd2.collect()
> {code}
> This works as expected and caches {{rdd1}}.
> To fix this, I think we dynamically decide whether to pipeline when we actually perform
actions, rather than statically deciding when we create the RDDs.
> We should also add tests for this.
> (Thanks to [~tdas] for pointing out this issue.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message