spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
Subject Re: Join highly skewed datasets
Date Mon, 29 Jun 2015 04:36:41 GMT
Attached image shows the current run with blockJoin.

On Sun, Jun 28, 2015 at 7:24 PM, Koert Kuipers <koert@tresata.com> wrote:

> other people might disagree, but i have had better luck with a model that
> looks more like traditional map-red if you use spark for disk-to-disk
> computations: more cores per executor (and so less RAM per core/task). so i
> would suggest trying --executor-cores 4 and adjust numPartitions
> accordingly.
>
> On Sun, Jun 28, 2015 at 6:45 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
> wrote:
>
>> Regarding # of executors.
>>
>> I get 342 executors in parallel each time and i set executor-cores to 1.
>> Hence i need to set 342 * 2 * x (x = 1,2,3, ..) as number of partitions
>> while running blockJoin. Is this correct.
>>
>> And is my assumptions on replication levels correct.
>>
>> Did you get a chance to look at my processing.
>>
>>
>>
>> On Sun, Jun 28, 2015 at 3:31 PM, Koert Kuipers <koert@tresata.com> wrote:
>>
>>> regarding your calculation of executors... RAM in executor is not really
>>> comparable to size on disk.
>>>
>>> if you read from from file and write to file you do not have to set
>>> storage level.
>>>
>>> in the join or blockJoin specify number of partitions  as a multiple
>>> (say 2 times) of number of cores available to you across all executors (so
>>> not just number of executors, on yarn you can have many cores per executor).
>>>
>>> On Sun, Jun 28, 2015 at 6:04 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>>> wrote:
>>>
>>>> Could you please suggest and help me understand further.
>>>>
>>>> This is the actual sizes
>>>>
>>>> -sh-4.1$ hadoop fs -count dw_lstg_item
>>>>            1          764      2041084436189
>>>> /sys/edw/dw_lstg_item/snapshot/2015/06/25/00
>>>> *This is not skewed there is exactly one etntry for each item but its
>>>> 2TB*
>>>> So should its replication be set to 1 ?
>>>>
>>>> The below two datasets (RDD) are unioned and their total size is 150G.
>>>> These can be skewed and
>>>> hence we use block join with Scoobi + MR.
>>>> *So should its replication be set to 3 ?*
>>>> -sh-4.1$ hadoop fs -count
>>>> /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
>>>>            1          101        73796345977
>>>> /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
>>>> -sh-4.1$ hadoop fs -count
>>>> /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21
>>>>            1          101        85559964549
>>>> /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21
>>>>
>>>> Also can you suggest the number of executors to be used in this case ,
>>>> each executor is started with max 14G of memory?
>>>>
>>>> Is it equal to 2TB + 150G (Total data) = 20150 GB/14GB = 1500
>>>> executors ? Is this calculation correct ?
>>>>
>>>> And also please suggest on the
>>>> "(should be memory-and-disk or disk-only), number of partitions
>>>> (should be large, multiple of num executors),"
>>>>
>>>>
>>>> https://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose
>>>>
>>>> When do i choose this setting ?  (Attached is my code for reference)
>>>>
>>>>
>>>>
>>>> On Sun, Jun 28, 2015 at 2:57 PM, Koert Kuipers <koert@tresata.com>
>>>> wrote:
>>>>
>>>>> a blockJoin spreads out one side while replicating the other. i would
>>>>> suggest replicating the smaller side. so if lstgItem is smaller try
>>>>> 3,1 or else 1,3. this should spread the "fat" keys out over multiple
(3)
>>>>> executors...
>>>>>
>>>>>
>>>>> On Sun, Jun 28, 2015 at 5:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I am able to use blockjoin API and it does not throw compilation
error
>>>>>>
>>>>>> val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
>>>>>> Long))] = lstgItem.blockJoin(viEvents,1,1).map {
>>>>>>
>>>>>> }
>>>>>>
>>>>>> Here viEvents is highly skewed and both are on HDFS.
>>>>>>
>>>>>> What should be the optimal values of replication, i gave 1,1
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I incremented the version of spark from 1.4.0 to 1.4.0.1 and
ran
>>>>>>>
>>>>>>>  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
>>>>>>> -Phive-thriftserver
>>>>>>>
>>>>>>> Build was successful but the script faild. Is there a way to
pass
>>>>>>> the incremented version ?
>>>>>>>
>>>>>>>
>>>>>>> [INFO] BUILD SUCCESS
>>>>>>>
>>>>>>> [INFO]
>>>>>>> ------------------------------------------------------------------------
>>>>>>>
>>>>>>> [INFO] Total time: 09:56 min
>>>>>>>
>>>>>>> [INFO] Finished at: 2015-06-28T13:45:29-07:00
>>>>>>>
>>>>>>> [INFO] Final Memory: 84M/902M
>>>>>>>
>>>>>>> [INFO]
>>>>>>> ------------------------------------------------------------------------
>>>>>>>
>>>>>>> + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist
>>>>>>>
>>>>>>> + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib
>>>>>>>
>>>>>>> + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'
>>>>>>>
>>>>>>> + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver
>>>>>>>
>>>>>>> + cp
>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/
>>>>>>>
>>>>>>> + cp
>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/
>>>>>>>
>>>>>>> + cp
>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/
>>>>>>>
>>>>>>> + mkdir -p
>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main
>>>>>>>
>>>>>>> + cp -r
>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/
>>>>>>>
>>>>>>> + '[' 1 == 1 ']'
>>>>>>>
>>>>>>> + cp
>>>>>>> '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/
>>>>>>>
>>>>>>> cp:
>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
>>>>>>> No such file or directory
>>>>>>>
>>>>>>> LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh
>>>>>>> --tgz -Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers <koert@tresata.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> you need 1) to publish to inhouse maven, so your application
can
>>>>>>>> depend on your version, and 2) use the spark distribution
you compiled to
>>>>>>>> launch your job (assuming you run with yarn so you can launch
multiple
>>>>>>>> versions of spark on same cluster)
>>>>>>>>
>>>>>>>> On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)
<deepujain@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> How can i import this pre-built spark into my application
via
>>>>>>>>> maven as i want to use the block join API.
>>>>>>>>>
>>>>>>>>> On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)
<
>>>>>>>>> deepujain@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I ran this w/o maven options
>>>>>>>>>>
>>>>>>>>>> ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn
 -Phive
>>>>>>>>>> -Phive-thriftserver
>>>>>>>>>>
>>>>>>>>>> I got this spark-1.4.0-bin-2.4.0.tgz in the same
working
>>>>>>>>>> directory.
>>>>>>>>>>
>>>>>>>>>> I hope this is built with 2.4.x hadoop as i did specify
-P
>>>>>>>>>>
>>>>>>>>>> On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)
<
>>>>>>>>>> deepujain@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>>  ./make-distribution.sh  --tgz --*mvn* "-Phadoop-2.4
-Pyarn
>>>>>>>>>>> -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver
-DskipTests clean package"
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> or
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4
-Pyarn
>>>>>>>>>>> -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver
-DskipTests clean package"
>>>>>>>>>>> ​Both fail with
>>>>>>>>>>>
>>>>>>>>>>> + echo -e 'Specify the Maven command with the
--mvn flag'
>>>>>>>>>>>
>>>>>>>>>>> Specify the Maven command with the --mvn flag
>>>>>>>>>>>
>>>>>>>>>>> + exit -1
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Deepak
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Deepak
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Deepak
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Deepak
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>


-- 
Deepak

Mime
View raw message