Yiannis, 

sorry for late response, 
It is indeed not possible to create new RDD inside of foreachPartitions, so you have to write data manually. I haven’t tried that and haven’t got such an exception, but I’d assume you might try to write locally and them upload it into HDFS. FileSystem has a specific method for that “copyFromLocalFile”.

Another approach would be to try to split RDD into multiple RDDs by key. You can get distinct keys, collect them on driver and have a loop over they keys and filter out new RDD out of the original one by that key.

for( key : keys ) {
    RDD.filter( key ).saveAsTextfile()
}

It might help to cache original rdd.

On 16 Jul 2015, at 12:21, Yiannis Gkoufas <johngouf85@gmail.com> wrote:

Hi Eugene,

thanks for your response!
Your recommendation makes sense, that's what I more or less tried.
The problem that I am facing is that inside foreachPartition() I cannot create a new rdd and use saveAsTextFile.
It would probably make sense to write directly to HDFS using the Java API.
When I tried that I was getting errors similar to this:

Failed on local exception: java.io.InterruptedIOException: Interruped while waiting for IO on channel java.nio.channels.SocketChannel

Probably it's hitting a race condition.

Has anyone else faced this situation? Any suggestions?

Thanks a lot!

On 15 July 2015 at 14:04, Eugene Morozov <fathersson@list.ru> wrote:
Yiannis ,

It looks like you might explore other approach.

sc.textFile("input/path")
.map() // your own implementation
.partitionBy(new HashPartitioner(num))
.groupBy() //your own implementation, as a result - PairRDD of key vs Iterable of values
.foreachPartition()

On the last step you could sort all values for the key and store them into separate file even into the same directory of all other files for other keys. 
HashParititoner must guarantee that all values for specific key will reside in just one partition, but it might happen that one partition might contain more, than one key (with values). This I’m not sure, but that shouldn’t be a big deal as you would iterate over tuple<key, Iterable<value>> and store one key to a specific file.

On 15 Jul 2015, at 03:23, Yiannis Gkoufas <johngouf85@gmail.com> wrote:

Hi there,

I have been using the approach described here:


In addition to that, I was wondering if there is a way to set the customize the order of those values contained in each file.

Thanks a lot!

Eugene Morozov






Eugene Morozov
fathersson@list.ru