spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Koert Kuipers <ko...@tresata.com>
Subject Re: Join highly skewed datasets
Date Mon, 29 Jun 2015 02:24:13 GMT
other people might disagree, but i have had better luck with a model that
looks more like traditional map-red if you use spark for disk-to-disk
computations: more cores per executor (and so less RAM per core/task). so i
would suggest trying --executor-cores 4 and adjust numPartitions
accordingly.

On Sun, Jun 28, 2015 at 6:45 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com> wrote:

> Regarding # of executors.
>
> I get 342 executors in parallel each time and i set executor-cores to 1.
> Hence i need to set 342 * 2 * x (x = 1,2,3, ..) as number of partitions
> while running blockJoin. Is this correct.
>
> And is my assumptions on replication levels correct.
>
> Did you get a chance to look at my processing.
>
>
>
> On Sun, Jun 28, 2015 at 3:31 PM, Koert Kuipers <koert@tresata.com> wrote:
>
>> regarding your calculation of executors... RAM in executor is not really
>> comparable to size on disk.
>>
>> if you read from from file and write to file you do not have to set
>> storage level.
>>
>> in the join or blockJoin specify number of partitions  as a multiple (say
>> 2 times) of number of cores available to you across all executors (so not
>> just number of executors, on yarn you can have many cores per executor).
>>
>> On Sun, Jun 28, 2015 at 6:04 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>> wrote:
>>
>>> Could you please suggest and help me understand further.
>>>
>>> This is the actual sizes
>>>
>>> -sh-4.1$ hadoop fs -count dw_lstg_item
>>>            1          764      2041084436189
>>> /sys/edw/dw_lstg_item/snapshot/2015/06/25/00
>>> *This is not skewed there is exactly one etntry for each item but its
>>> 2TB*
>>> So should its replication be set to 1 ?
>>>
>>> The below two datasets (RDD) are unioned and their total size is 150G.
>>> These can be skewed and
>>> hence we use block join with Scoobi + MR.
>>> *So should its replication be set to 3 ?*
>>> -sh-4.1$ hadoop fs -count
>>> /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
>>>            1          101        73796345977
>>> /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
>>> -sh-4.1$ hadoop fs -count
>>> /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21
>>>            1          101        85559964549
>>> /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21
>>>
>>> Also can you suggest the number of executors to be used in this case ,
>>> each executor is started with max 14G of memory?
>>>
>>> Is it equal to 2TB + 150G (Total data) = 20150 GB/14GB = 1500 executors
>>> ? Is this calculation correct ?
>>>
>>> And also please suggest on the
>>> "(should be memory-and-disk or disk-only), number of partitions (should
>>> be large, multiple of num executors),"
>>>
>>>
>>> https://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose
>>>
>>> When do i choose this setting ?  (Attached is my code for reference)
>>>
>>>
>>>
>>> On Sun, Jun 28, 2015 at 2:57 PM, Koert Kuipers <koert@tresata.com>
>>> wrote:
>>>
>>>> a blockJoin spreads out one side while replicating the other. i would
>>>> suggest replicating the smaller side. so if lstgItem is smaller try
>>>> 3,1 or else 1,3. this should spread the "fat" keys out over multiple (3)
>>>> executors...
>>>>
>>>>
>>>> On Sun, Jun 28, 2015 at 5:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>>>> wrote:
>>>>
>>>>> I am able to use blockjoin API and it does not throw compilation error
>>>>>
>>>>> val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
>>>>> Long))] = lstgItem.blockJoin(viEvents,1,1).map {
>>>>>
>>>>> }
>>>>>
>>>>> Here viEvents is highly skewed and both are on HDFS.
>>>>>
>>>>> What should be the optimal values of replication, i gave 1,1
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I incremented the version of spark from 1.4.0 to 1.4.0.1 and ran
>>>>>>
>>>>>>  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
>>>>>> -Phive-thriftserver
>>>>>>
>>>>>> Build was successful but the script faild. Is there a way to pass
the
>>>>>> incremented version ?
>>>>>>
>>>>>>
>>>>>> [INFO] BUILD SUCCESS
>>>>>>
>>>>>> [INFO]
>>>>>> ------------------------------------------------------------------------
>>>>>>
>>>>>> [INFO] Total time: 09:56 min
>>>>>>
>>>>>> [INFO] Finished at: 2015-06-28T13:45:29-07:00
>>>>>>
>>>>>> [INFO] Final Memory: 84M/902M
>>>>>>
>>>>>> [INFO]
>>>>>> ------------------------------------------------------------------------
>>>>>>
>>>>>> + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist
>>>>>>
>>>>>> + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib
>>>>>>
>>>>>> + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'
>>>>>>
>>>>>> + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver
>>>>>>
>>>>>> + cp
>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/
>>>>>>
>>>>>> + cp
>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/
>>>>>>
>>>>>> + cp
>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/
>>>>>>
>>>>>> + mkdir -p
>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main
>>>>>>
>>>>>> + cp -r
>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/
>>>>>>
>>>>>> + '[' 1 == 1 ']'
>>>>>>
>>>>>> + cp
>>>>>> '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/
>>>>>>
>>>>>> cp:
>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
>>>>>> No such file or directory
>>>>>>
>>>>>> LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh  --tgz
>>>>>> -Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers <koert@tresata.com>
>>>>>> wrote:
>>>>>>
>>>>>>> you need 1) to publish to inhouse maven, so your application
can
>>>>>>> depend on your version, and 2) use the spark distribution you
compiled to
>>>>>>> launch your job (assuming you run with yarn so you can launch
multiple
>>>>>>> versions of spark on same cluster)
>>>>>>>
>>>>>>> On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> How can i import this pre-built spark into my application
via maven
>>>>>>>> as i want to use the block join API.
>>>>>>>>
>>>>>>>> On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)
<deepujain@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> I ran this w/o maven options
>>>>>>>>>
>>>>>>>>> ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
>>>>>>>>> -Phive-thriftserver
>>>>>>>>>
>>>>>>>>> I got this spark-1.4.0-bin-2.4.0.tgz in the same working
directory.
>>>>>>>>>
>>>>>>>>> I hope this is built with 2.4.x hadoop as i did specify
-P
>>>>>>>>>
>>>>>>>>> On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)
<
>>>>>>>>> deepujain@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>>  ./make-distribution.sh  --tgz --*mvn* "-Phadoop-2.4
-Pyarn
>>>>>>>>>> -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver
-DskipTests clean package"
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> or
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4
-Pyarn
>>>>>>>>>> -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver
-DskipTests clean package"
>>>>>>>>>> ​Both fail with
>>>>>>>>>>
>>>>>>>>>> + echo -e 'Specify the Maven command with the --mvn
flag'
>>>>>>>>>>
>>>>>>>>>> Specify the Maven command with the --mvn flag
>>>>>>>>>>
>>>>>>>>>> + exit -1
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Deepak
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Deepak
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Deepak
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Deepak
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>
>
> --
> Deepak
>
>

Mime
View raw message