spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eugene Morozov <fathers...@list.ru>
Subject Re: Sorted Multiple Outputs
Date Thu, 13 Aug 2015 00:06:17 GMT
Yiannis, 

sorry for late response, 
It is indeed not possible to create new RDD inside of foreachPartitions, so you have to write
data manually. I haven’t tried that and haven’t got such an exception, but I’d assume
you might try to write locally and them upload it into HDFS. FileSystem has a specific method
for that “copyFromLocalFile”.

Another approach would be to try to split RDD into multiple RDDs by key. You can get distinct
keys, collect them on driver and have a loop over they keys and filter out new RDD out of
the original one by that key.

for( key : keys ) {
    RDD.filter( key ).saveAsTextfile()
}

It might help to cache original rdd.

On 16 Jul 2015, at 12:21, Yiannis Gkoufas <johngouf85@gmail.com> wrote:

> Hi Eugene,
> 
> thanks for your response!
> Your recommendation makes sense, that's what I more or less tried.
> The problem that I am facing is that inside foreachPartition() I cannot create a new
rdd and use saveAsTextFile.
> It would probably make sense to write directly to HDFS using the Java API.
> When I tried that I was getting errors similar to this:
> 
> Failed on local exception: java.io.InterruptedIOException: Interruped while waiting for
IO on channel java.nio.channels.SocketChannel
> 
> Probably it's hitting a race condition.
> 
> Has anyone else faced this situation? Any suggestions?
> 
> Thanks a lot! 
> 
> On 15 July 2015 at 14:04, Eugene Morozov <fathersson@list.ru> wrote:
> Yiannis ,
> 
> It looks like you might explore other approach.
> 
> sc.textFile("input/path")
> .map() // your own implementation
> .partitionBy(new HashPartitioner(num))
> .groupBy() //your own implementation, as a result - PairRDD of key vs Iterable of values
> .foreachPartition()
> 
> On the last step you could sort all values for the key and store them into separate file
even into the same directory of all other files for other keys. 
> HashParititoner must guarantee that all values for specific key will reside in just one
partition, but it might happen that one partition might contain more, than one key (with values).
This I’m not sure, but that shouldn’t be a big deal as you would iterate over tuple<key,
Iterable<value>> and store one key to a specific file.
> 
> On 15 Jul 2015, at 03:23, Yiannis Gkoufas <johngouf85@gmail.com> wrote:
> 
>> Hi there,
>> 
>> I have been using the approach described here:
>> 
>> http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job
>> 
>> In addition to that, I was wondering if there is a way to set the customize the order
of those values contained in each file.
>> 
>> Thanks a lot!
> 
> Eugene Morozov
> fathersson@list.ru
> 
> 
> 
> 
> 

Eugene Morozov
fathersson@list.ru





Mime
View raw message