spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Thomas <>
Subject Re: Are all transformations lazy?
Date Wed, 12 Mar 2014 05:49:11 GMT
Perfect! That answers my question. I was under the impression that map and
reduceByKey were Scala collection functions, but they weren't. Now it makes

On Tue, Mar 11, 2014 at 10:38 PM, Ewen Cheslack-Postava <>wrote:

> Ah, I see. You need to follow those other calls through to their
> implementations to see what ultimately happens. For example, the map()
> calls are to, not one of Scala's built-in map methods for
> collections. The implementation looks like this:
> /**
>  * Return a new RDD by applying a function to all elements of this RDD.
>  */
> def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
> So once you get to one of the most primitive operations, like, map(),
> you'll see the function actually generates a specific type of RDD
> representing the transformation. MappedRDD just stores a reference to the
> previous RDD, the function it needs to apply -- it doesn't actually contain
> any data. Of course the idea is that it *looks* like the normal map(),
> filter(), etc. in Scala, but it doesn't work the same way.
> By calling a bunch of these functions, you end up generating a graph,
> specifically a DAG, of RDDs. This graph describes all the steps needed to
> perform the operation, but no data. The final action, e.g. count() or
> collect(), that triggers computation is called on one of these RDDs. To get
> the value out, the Spark runtime/scheduler traverses the DAG starting from
> that RDD and triggers evaluation of anything parent RDDs it needs that
> aren't computed and cached yet.
> Any future operations build on the same DAG as long as you use the same
> RDD objects and, if you used cache() or persist(), can reuse the same data
> after it has been computed the first time.
> -Ewen
>   David Thomas <>
>  March 11, 2014 at 10:15 PM
> I think you misunderstood my question - I should have stated it better.
> I'm not saying it should be applied immediately, but I'm trying to
> understand how Spark achieves this lazy computation transformations. May be
> this is due to my ignorance of how Scala works, but when I see the code, I
> see that the function is applied to the elements of RDD when I call
> distinct - or is it not applied immediately? How does the returned RDD
> 'keep track of the operation'?
>   Ewen Cheslack-Postava <>
>  March 11, 2014 at 10:06 PM
>  You should probably be asking the opposite question: why do you think it
> *should* be applied immediately? Since the driver program hasn't requested
> any data back (distinct generates a new RDD, it doesn't return any data),
> there's no need to actually compute anything yet.
> As the documentation describes, if the call returns an RDD, it's
> transforming the data and will just keep track of the operation it
> eventually needs to perform. Only methods that return data back to the
> driver should trigger any computation.
> (The one known exception is sortByKey, which really should be lazy, but
> apparently uses an RDD.count call in its implementation:
>   David Thomas <>
>  March 11, 2014 at 9:49 PM
> For example, is distinct() transformation lazy?
> when I see the Spark source code, distinct applies a map-> reduceByKey ->
> map function to the RDD elements. Why is this lazy? Won't the function be
> applied immediately to the elements of RDD when I call someRDD.distinct?
>   /**
>    * Return a new RDD containing the distinct elements in this RDD.
>    */
>   def distinct(numPartitions: Int): RDD[T] =
>     map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
>   /**
>    * Return a new RDD containing the distinct elements in this RDD.
>    */
>   def distinct(): RDD[T] = distinct(partitions.size)

View raw message