How about the following:

val smallFile = sc.sequenceFile(....).collect()
val largeFile = sc.sequenceFile(...)

val small = sc.broadcast(smallFile)
largeFile.mapPartitions { iter =>
  // build up a hash table for small. called it smallTable
  iter.filter(row => smallTable.contains(row.joinKey)).map { row =>
    join smallTable.get(row.joinKey) with row itself
  }
}




On Fri, Oct 18, 2013 at 2:22 PM, Ameet Kini <ameetkini@gmail.com> wrote:
Forgot to add an important point. My sequence files are sorted (they're actually Hadoop map files). Since they're sorted, it makes sense to do a fetch at the partition-level of the inner sequence file. 

Thanks,
Ameet


On Fri, Oct 18, 2013 at 5:20 PM, Ameet Kini <ameetkini@gmail.com> wrote:

I've seen discussions where the suggestion is to do a map-side join, but haven't seen an example yet, and can certainly use one. I have two sequence files where the key is unique within each file, so the join is a one-to-one join, and can hence benefit from a map-side join. However both sequence files can be large, so reading one of them completely in the driver and broadcasting it out would be expensive.

I don't think there is a map-side join implementation in Spark but earlier suggestions have been to write one using mapPartitions on one of the operands as the outer loop. If that is the case, how would I fetch the split corresponding to the keys in the outer's partition. I'd prefer to do a fetch-per-partition rather than a fetch-per-tuple. 

In any case, some feedback, and preferably, an example of a map-side join without broadcasting would help. 

Thanks,
Ameet