Ah I see. In that case, the groupByKey function does guarantee every key is on exactly one partition matched with the aggregated data. This can be improved depending on what you want to do after. Group by key only aggregates the data after shipping it across the cluster. Meanwhile, using reduceByKey will do aggregation on each node first, then ship those results to the final node and partition to finalize the aggregation there. If that makes sense.

So say Node 1 has pairs: (a, 1), (b, 2), (b, 6)
Node 2 has pairs: (a, 2), (a,3), (b, 4)

group by would say send both a pair and b pairs across the network. If you did reduce with the aggregate of sum then you'd expect it to ship (b, 8) from Node 1 or (a, 5) from Node 2 since it did the local aggregation first.

You are correct that doing something with expensive side-effects like writing to a database (connections and network + I/O) is best done with the mapPartitions or foreachPartition type of functions on RDD so you can share a database connection and also potentially do things like batch statements.

On Tue, Sep 8, 2015 at 7:37 PM, Mike Wright <mwright@snl.com> wrote:
Thanks for the response!

Well, in retrospect each partition doesn't need to be restricted to a single key. But, I cannot have values associated with a key span partitions since they all need to be processed together for a key to facilitate cumulative calcs. So provided an individual key has all its values in a single partition, I'm OK.

Additionally, the values will be written to the database, and from what I have read doing this at the partition level is the best compromise between 1) Writing the calculated values for each key (lots of connect/disconnects) and collecting them all at the end and writing them all at once.

I am using a groupBy against the filtered RDD the get the grouping I want, but apparently this may not be the most efficient way, and it seems that everything is always in a single partition under this scenario. 


Mike Wright
Principal Architect, Software Engineering

SNL Financial LC
434-951-7816 p
434-244-4466 f

On Tue, Sep 8, 2015 at 5:38 PM, Richard Marscher <rmarscher@localytics.com> wrote:
That seems like it could work, although I don't think `partitionByKey` is a thing, at least for RDD. You might be able to merge step #2 and step #3 into one step by using the `reduceByKey` function signature that takes in a Partitioner implementation.

def reduceByKey(partitioner: Partitionerfunc: (VV) ⇒ V)RDD[(KV)]

Merge the values for each key using an associative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.

The tricky part might be getting the partitioner to know about the number of partitions, which I think it needs to know upfront in `abstract def numPartitionsInt`. The `HashPartitioner` for example takes in the number as a constructor argument, maybe you could use that with an upper bound size if you don't mind empty partitions. Otherwise you might have to mess around to extract the exact number of keys if it's not readily available.

Aside: what is the requirement to have each partition only contain the data related to one key?

On Fri, Sep 4, 2015 at 11:06 AM, mmike87 <mwright@snl.com> wrote:
Hello, I am new to Apache Spark and this is my company's first Spark project.
Essentially, we are calculating models dealing with Mining data using Spark.

I am holding all the source data in a persisted RDD that we will refresh
periodically. When a "scenario" is passed to the Spark job (we're using Job
Server) the persisted RDD is filtered to the relevant mines. For example, we
may want all mines in Chile and the 1990-2015 data for each.

Many of the calculations are cumulative, that is when we apply user-input
"adjustment factors" to a value, we also need the "flexed" value we
calculated for that mine previously.

To ensure that this works, the idea if to:

1) Filter the superset to relevant mines (done)
2) Group the subset by the unique identifier for the mine. So, a group may
be all the rows for mine "A" for 1990-2015
3) I then want to ensure that the RDD is partitioned by the Mine Identifier
(and Integer).

It's step 3 that is confusing me. I suspect it's very easy ... do I simply
use PartitionByKey?

We're using Java if that makes any difference.


View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/New-to-Spark-Paritioning-Question-tp24580.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org

Richard Marscher
Software Engineer
Localytics.com | Our Blog | Twitter | Facebook | LinkedIn

Richard Marscher
Software Engineer
Localytics.com | Our Blog | Twitter | Facebook | LinkedIn