spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Reynold Xin <r...@apache.org>
Subject Re: examples of map-side join of two hadoop sequence files
Date Mon, 21 Oct 2013 18:12:31 GMT
Maybe you can override HadoopRDD's compute method to do that?


On Mon, Oct 21, 2013 at 8:16 AM, Ameet Kini <ameetkini@gmail.com> wrote:

> Right, except both my sequence files are large and so doing a "collect()"
> and then broadcasting one of them would be costly. Since I have two large
> sorted sequence files with a one-to-one relationship among the keys, I need
> to perform the "merge" portion of a good old "sort-merge" join. And it is
> actually a very simple merge, since each key is unique within the file.
>
> I was looking at the mapPartitions API:
> def  mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning:
> Boolean)(implicit arg0: ClassManifest[U]): RDD<http://spark.incubator.apache.org/docs/latest/api/core/org/apache/spark/rdd/RDD.html>
> [U]
>
> If somehow the function f has access to the underlying partition
> information (e.g., HadoopPartition.inputSplit), then it could open a reader
> on the actual hdfs file corresponding to that inputSplit, and manually do
> the join. But looks like HadoopPartition is declared private. Is there some
> other way to figure out which underlying HDFS file corresponds to the
> partition being iterated upon in mapPartitions?
>
> Ameet
>
>
>
>
> On Mon, Oct 21, 2013 at 12:54 AM, Reynold Xin <rxin@apache.org> wrote:
>
>> How about the following:
>>
>> val smallFile = sc.sequenceFile(....).collect()
>> val largeFile = sc.sequenceFile(...)
>>
>> val small = sc.broadcast(smallFile)
>> largeFile.mapPartitions { iter =>
>>   // build up a hash table for small. called it smallTable
>>   iter.filter(row => smallTable.contains(row.joinKey)).map { row =>
>>     join smallTable.get(row.joinKey) with row itself
>>   }
>> }
>>
>>
>>
>>
>> On Fri, Oct 18, 2013 at 2:22 PM, Ameet Kini <ameetkini@gmail.com> wrote:
>>
>>> Forgot to add an important point. My sequence files are sorted (they're
>>> actually Hadoop map files). Since they're sorted, it makes sense to do a
>>> fetch at the partition-level of the inner sequence file.
>>>
>>> Thanks,
>>> Ameet
>>>
>>>
>>> On Fri, Oct 18, 2013 at 5:20 PM, Ameet Kini <ameetkini@gmail.com> wrote:
>>>
>>>>
>>>> I've seen discussions where the suggestion is to do a map-side join,
>>>> but haven't seen an example yet, and can certainly use one. I have two
>>>> sequence files where the key is unique within each file, so the join is a
>>>> one-to-one join, and can hence benefit from a map-side join. However both
>>>> sequence files can be large, so reading one of them completely in the
>>>> driver and broadcasting it out would be expensive.
>>>>
>>>> I don't think there is a map-side join implementation in Spark but
>>>> earlier suggestions have been to write one using mapPartitions on one of
>>>> the operands as the outer loop. If that is the case, how would I fetch the
>>>> split corresponding to the keys in the outer's partition. I'd prefer to do
>>>> a fetch-per-partition rather than a fetch-per-tuple.
>>>>
>>>> In any case, some feedback, and preferably, an example of a map-side
>>>> join without broadcasting would help.
>>>>
>>>> Thanks,
>>>> Ameet
>>>>
>>>
>>>
>>
>

Mime
View raw message