spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Teoh <chris.t...@gmail.com>
Subject Re: Map side join without broadcast
Date Mon, 01 Jul 2019 21:12:28 GMT
Hey there,

I think it's overcomplicating the partitioning by explicitly specifying the
partitioning when using the hash is the default behaviour of the
partitioner in Spark. You could simply do a partitionBy and it would
implement the hash partitioner by default.

Let me know if I've misinterpreted the code. I think also using map after
partitioning will also cause Spark to lose the partitioner.

On Sun, 30 Jun 2019 at 20:56, jelmer <jkuperus@gmail.com> wrote:

> Does something like the code below make any sense or would there be a more
> efficient way to do it ?
>
>     val wordsOnOnePartition = input
>>       .map { word => Math.abs(word.id.hashCode) % numPartitions -> word }
>>       .partitionBy(new PartitionIdPassthrough(numPartitions))
>>     val indices = wordsOnOnePartition
>>         .mapPartitions(it => new IndexIterator(it, m))
>>         .cache()
>>     val wordsOnEachPartition = input
>>       .flatMap(word => 0 until numPartitions map { partition => partition
>> -> word } )
>>       .partitionBy(new PartitionIdPassthrough(numPartitions))
>>     val nearest = indices.join(wordsOnEachPartition)
>>       .flatMap { case (_, (index, Word(word, vector))) =>
>>         index.findNearest(vector, k + 1).collect {
>>           case SearchResult(Word(relatedWord, _), score) if relatedWord
>> != word =>
>>             RelatedItem(word, relatedWord, score)
>>         }
>>         .take(k)
>>       }
>>     val result = nearest.groupBy(_.word).map { case (word, relatedItems)
>> =>
>>         word +: relatedItems.toSeq
>>             .sortBy(_.similarity)(Ordering[Double].reverse)
>>             .map(_.relatedWord)
>>             .take(k)
>>             .mkString("\t")
>>     }
>>
>
> I manually assign a partition to each word of a list of words, and
> repartition the rdd by this partition key
>
> There i use mapPartitions to construct a partial index so i end up with
> one index in each partition.
>
> Then i read the words again but this time assign every partition to each
> word and join it on the indices rdd by partition key. So effectively every
> index will be queries
>
> Finally i merge the results from each index into a single  list keeping
> only the most relevant items by doing a groupBy
>
>
>
> On Sun, 30 Jun 2019 at 01:45, Chris Teoh <chris.teoh@gmail.com> wrote:
>
>> The closest thing I can think of here is if you have both dataframes
>> written out using buckets. Hive uses this technique for join optimisation
>> such that both datasets of the same bucket are read by the same mapper to
>> achieve map side joins.
>>
>> On Sat., 29 Jun. 2019, 9:10 pm jelmer, <jkuperus@gmail.com> wrote:
>>
>>> I have 2 dataframes,
>>>
>>> Dataframe A which contains 1 element per partition that is gigabytes big
>>> (an index)
>>>
>>> Dataframe B which is made up out of millions of small rows.
>>>
>>> I want to join B on A but i want all the work to be done on the
>>> executors holding the partitions of dataframe A
>>>
>>> Is there a way to accomplish this without putting dataframe B in a
>>> broadcast variable or doing a broadcast join ?
>>>
>>>

-- 
Chris

Mime
View raw message