spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
Subject Re: Join highly skewed datasets
Date Sun, 28 Jun 2015 22:09:50 GMT
My code:

    val viEvents = details.filter(_.get(14).asInstanceOf[Long] !=
NULL_VALUE).map
{ vi => (vi.get(14).asInstanceOf[Long], vi) } //AVRO (150G)

    val lstgItem = DataUtil.getDwLstgItem(sc,
DateUtil.addDaysToDate(startDate, -89)).filter(_.getItemId().toLong !=
NULL_VALUE).map { lstg => (lstg.getItemId().toLong, lstg) } // SEQUENCE
(2TB)


    val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
Long))] = viEvents.blockJoin(lstgItem, 3, 1, new HashPartitioner(2141)).map
{

}



On Sun, Jun 28, 2015 at 3:03 PM, Koert Kuipers <koert@tresata.com> wrote:

> specify numPartitions or partitioner for operations that shuffle.
>
> so use:
> def join[W](other: RDD[(K, W)], numPartitions: Int)
>
> or
> def blockJoin[W](
>   other: JavaPairRDD[K, W],
>   leftReplication: Int,
>   rightReplication: Int,
>   partitioner: Partitioner)
>
> for example:
> left.blockJoin(right, 3, 1, new HashPartitioner(numPartitions))
>
>
>
> On Sun, Jun 28, 2015 at 5:57 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
> wrote:
>
>> You mentioned storage levels must be
>> (should be memory-and-disk or disk-only), number of partitions (should be
>> large, multiple of num executors),
>>
>> how do i specify that ?
>>
>> On Sun, Jun 28, 2015 at 2:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>> wrote:
>>
>>> I am able to use blockjoin API and it does not throw compilation error
>>>
>>> val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
>>> Long))] = lstgItem.blockJoin(viEvents,1,1).map {
>>>
>>> }
>>>
>>> Here viEvents is highly skewed and both are on HDFS.
>>>
>>> What should be the optimal values of replication, i gave 1,1
>>>
>>>
>>>
>>> On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>>> wrote:
>>>
>>>> I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran
>>>>
>>>>  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
>>>> -Phive-thriftserver
>>>>
>>>> Build was successful but the script faild. Is there a way to pass the
>>>> incremented version ?
>>>>
>>>>
>>>> [INFO] BUILD SUCCESS
>>>>
>>>> [INFO]
>>>> ------------------------------------------------------------------------
>>>>
>>>> [INFO] Total time: 09:56 min
>>>>
>>>> [INFO] Finished at: 2015-06-28T13:45:29-07:00
>>>>
>>>> [INFO] Final Memory: 84M/902M
>>>>
>>>> [INFO]
>>>> ------------------------------------------------------------------------
>>>>
>>>> + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist
>>>>
>>>> + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib
>>>>
>>>> + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'
>>>>
>>>> + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver
>>>>
>>>> + cp
>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/
>>>>
>>>> + cp
>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/
>>>>
>>>> + cp
>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/
>>>>
>>>> + mkdir -p
>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main
>>>>
>>>> + cp -r
>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/
>>>>
>>>> + '[' 1 == 1 ']'
>>>>
>>>> + cp
>>>> '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/
>>>>
>>>> cp:
>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
>>>> No such file or directory
>>>>
>>>> LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
>>>> -Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver
>>>>
>>>>
>>>>
>>>> On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers <koert@tresata.com>
>>>> wrote:
>>>>
>>>>> you need 1) to publish to inhouse maven, so your application can
>>>>> depend on your version, and 2) use the spark distribution you compiled
to
>>>>> launch your job (assuming you run with yarn so you can launch multiple
>>>>> versions of spark on same cluster)
>>>>>
>>>>> On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> How can i import this pre-built spark into my application via maven
>>>>>> as i want to use the block join API.
>>>>>>
>>>>>> On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I ran this w/o maven options
>>>>>>>
>>>>>>> ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
>>>>>>> -Phive-thriftserver
>>>>>>>
>>>>>>> I got this spark-1.4.0-bin-2.4.0.tgz in the same working directory.
>>>>>>>
>>>>>>> I hope this is built with 2.4.x hadoop as i did specify -P
>>>>>>>
>>>>>>> On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>  ./make-distribution.sh  --tgz --*mvn* "-Phadoop-2.4 -Pyarn
>>>>>>>> -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests
clean package"
>>>>>>>>
>>>>>>>>
>>>>>>>> or
>>>>>>>>
>>>>>>>>
>>>>>>>>  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4 -Pyarn
>>>>>>>> -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests
clean package"
>>>>>>>> ​Both fail with
>>>>>>>>
>>>>>>>> + echo -e 'Specify the Maven command with the --mvn flag'
>>>>>>>>
>>>>>>>> Specify the Maven command with the --mvn flag
>>>>>>>>
>>>>>>>> + exit -1
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Deepak
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Deepak
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>


-- 
Deepak

Mime
View raw message