spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Reynold Xin <>
Subject Re: examples of map-side join of two hadoop sequence files
Date Mon, 21 Oct 2013 04:54:05 GMT
How about the following:

val smallFile = sc.sequenceFile(....).collect()
val largeFile = sc.sequenceFile(...)

val small = sc.broadcast(smallFile)
largeFile.mapPartitions { iter =>
  // build up a hash table for small. called it smallTable
  iter.filter(row => smallTable.contains(row.joinKey)).map { row =>
    join smallTable.get(row.joinKey) with row itself

On Fri, Oct 18, 2013 at 2:22 PM, Ameet Kini <> wrote:

> Forgot to add an important point. My sequence files are sorted (they're
> actually Hadoop map files). Since they're sorted, it makes sense to do a
> fetch at the partition-level of the inner sequence file.
> Thanks,
> Ameet
> On Fri, Oct 18, 2013 at 5:20 PM, Ameet Kini <> wrote:
>> I've seen discussions where the suggestion is to do a map-side join, but
>> haven't seen an example yet, and can certainly use one. I have two sequence
>> files where the key is unique within each file, so the join is a one-to-one
>> join, and can hence benefit from a map-side join. However both sequence
>> files can be large, so reading one of them completely in the driver and
>> broadcasting it out would be expensive.
>> I don't think there is a map-side join implementation in Spark but
>> earlier suggestions have been to write one using mapPartitions on one of
>> the operands as the outer loop. If that is the case, how would I fetch the
>> split corresponding to the keys in the outer's partition. I'd prefer to do
>> a fetch-per-partition rather than a fetch-per-tuple.
>> In any case, some feedback, and preferably, an example of a map-side join
>> without broadcasting would help.
>> Thanks,
>> Ameet

View raw message