spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Reynold Xin <r...@apache.org>
Subject Re: examples of map-side join of two hadoop sequence files
Date Mon, 21 Oct 2013 04:54:05 GMT
How about the following:

val smallFile = sc.sequenceFile(....).collect()
val largeFile = sc.sequenceFile(...)

val small = sc.broadcast(smallFile)
largeFile.mapPartitions { iter =>
  // build up a hash table for small. called it smallTable
  iter.filter(row => smallTable.contains(row.joinKey)).map { row =>
    join smallTable.get(row.joinKey) with row itself
  }
}




On Fri, Oct 18, 2013 at 2:22 PM, Ameet Kini <ameetkini@gmail.com> wrote:

> Forgot to add an important point. My sequence files are sorted (they're
> actually Hadoop map files). Since they're sorted, it makes sense to do a
> fetch at the partition-level of the inner sequence file.
>
> Thanks,
> Ameet
>
>
> On Fri, Oct 18, 2013 at 5:20 PM, Ameet Kini <ameetkini@gmail.com> wrote:
>
>>
>> I've seen discussions where the suggestion is to do a map-side join, but
>> haven't seen an example yet, and can certainly use one. I have two sequence
>> files where the key is unique within each file, so the join is a one-to-one
>> join, and can hence benefit from a map-side join. However both sequence
>> files can be large, so reading one of them completely in the driver and
>> broadcasting it out would be expensive.
>>
>> I don't think there is a map-side join implementation in Spark but
>> earlier suggestions have been to write one using mapPartitions on one of
>> the operands as the outer loop. If that is the case, how would I fetch the
>> split corresponding to the keys in the outer's partition. I'd prefer to do
>> a fetch-per-partition rather than a fetch-per-tuple.
>>
>> In any case, some feedback, and preferably, an example of a map-side join
>> without broadcasting would help.
>>
>> Thanks,
>> Ameet
>>
>
>

Mime
View raw message