spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ameet Kini <>
Subject Re: examples of map-side join of two hadoop sequence files
Date Wed, 23 Oct 2013 13:52:18 GMT
So for now, I solved this problem of doing a map-side join by partitioning
my sequence files (actually, map files) such that they are range
partitioned on the key. For every new Spark partition (as returned by
mapPartition), I look at the very first key and based on that key, I open a
MapFile.Reader on the right map file. The successive keys are guaranteed to
be in that same map file, and hence can be served by the already opened

While this works, I would like to explore overriding HadoopRDD as per
Reynold's suggestion, as it would make for a cleaner implementation.

Thanks for your suggestions.


On Mon, Oct 21, 2013 at 2:12 PM, Reynold Xin <> wrote:

> Maybe you can override HadoopRDD's compute method to do that?
> On Mon, Oct 21, 2013 at 8:16 AM, Ameet Kini <> wrote:
>> Right, except both my sequence files are large and so doing a "collect()"
>> and then broadcasting one of them would be costly. Since I have two large
>> sorted sequence files with a one-to-one relationship among the keys, I need
>> to perform the "merge" portion of a good old "sort-merge" join. And it is
>> actually a very simple merge, since each key is unique within the file.
>> I was looking at the mapPartitions API:
>> def  mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning:
>> Boolean)(implicit arg0: ClassManifest[U]): RDD<>
>> [U]
>> If somehow the function f has access to the underlying partition
>> information (e.g., HadoopPartition.inputSplit), then it could open a reader
>> on the actual hdfs file corresponding to that inputSplit, and manually do
>> the join. But looks like HadoopPartition is declared private. Is there some
>> other way to figure out which underlying HDFS file corresponds to the
>> partition being iterated upon in mapPartitions?
>> Ameet
>> On Mon, Oct 21, 2013 at 12:54 AM, Reynold Xin <> wrote:
>>> How about the following:
>>> val smallFile = sc.sequenceFile(....).collect()
>>> val largeFile = sc.sequenceFile(...)
>>> val small = sc.broadcast(smallFile)
>>> largeFile.mapPartitions { iter =>
>>>   // build up a hash table for small. called it smallTable
>>>   iter.filter(row => smallTable.contains(row.joinKey)).map { row =>
>>>     join smallTable.get(row.joinKey) with row itself
>>>   }
>>> }
>>> On Fri, Oct 18, 2013 at 2:22 PM, Ameet Kini <> wrote:
>>>> Forgot to add an important point. My sequence files are sorted (they're
>>>> actually Hadoop map files). Since they're sorted, it makes sense to do a
>>>> fetch at the partition-level of the inner sequence file.
>>>> Thanks,
>>>> Ameet
>>>> On Fri, Oct 18, 2013 at 5:20 PM, Ameet Kini <>wrote:
>>>>> I've seen discussions where the suggestion is to do a map-side join,
>>>>> but haven't seen an example yet, and can certainly use one. I have two
>>>>> sequence files where the key is unique within each file, so the join
is a
>>>>> one-to-one join, and can hence benefit from a map-side join. However
>>>>> sequence files can be large, so reading one of them completely in the
>>>>> driver and broadcasting it out would be expensive.
>>>>> I don't think there is a map-side join implementation in Spark but
>>>>> earlier suggestions have been to write one using mapPartitions on one
>>>>> the operands as the outer loop. If that is the case, how would I fetch
>>>>> split corresponding to the keys in the outer's partition. I'd prefer
to do
>>>>> a fetch-per-partition rather than a fetch-per-tuple.
>>>>> In any case, some feedback, and preferably, an example of a map-side
>>>>> join without broadcasting would help.
>>>>> Thanks,
>>>>> Ameet

View raw message