spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
Subject Re: Join highly skewed datasets
Date Mon, 29 Jun 2015 13:56:22 GMT
It failed like always and with similar exceptions on UI.

Attached is UI screen state right now, its been running for 9 hours now.
(on hadoop it finishes in 2 hours, so there is a 10x loss in performance
and with failures). I have had this error all the time and with regular
join () API as well.

Attached is my code.

*Someone please help me debug this, i am not able to get it running for 3
months now*

*Commnad*




*export SPARK_HOME=/home/dvasthimal/spark1.4/spark-1.4.0-bin-2.4.0export
SPARK_JAR=/home/dvasthimal/spark1.4/spark-1.4.0-bin-2.4.0/lib/spark-assembly-1.4.0-hadoop2.4.0.jarexport
HADOOP_CONF_DIR=/apache/hadoop/confcd $SPARK_HOMEhadoop fs -rmr -skipTrash
 /user/dvasthimal/*cp ~/spark_reporting-1.0-SNAPSHOT.jar
/home/dvasthimal/spark1.4/lib/export
SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar./bin/spark-submit
-v --master yarn-cluster --driver-class-path
/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
--jars
/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar
 --num-executors 9973 --driver-memory 14g --driver-java-options
"-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps" --executor-memory 14g --executor-cores 1 --queue
hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp
/home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar
startDate=2015-06-20 endDate=2015-06-21
input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=viewItem
output=/user/dvasthimal/epdatasets/viewItem buffersize=128
maxbuffersize=1068 maxResultSize=200G *

Trace:
org.apache.spark.shuffle.FetchFailedException: Failed to connect to
executorHost/executorIP:42048+details

org.apache.spark.shuffle.FetchFailedException: Failed to connect to
executorHost/executorIP:42048
	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)


On Sun, Jun 28, 2015 at 9:36 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com> wrote:

> Attached image shows the current run with blockJoin.
>
> On Sun, Jun 28, 2015 at 7:24 PM, Koert Kuipers <koert@tresata.com> wrote:
>
>> other people might disagree, but i have had better luck with a model that
>> looks more like traditional map-red if you use spark for disk-to-disk
>> computations: more cores per executor (and so less RAM per core/task). so i
>> would suggest trying --executor-cores 4 and adjust numPartitions
>> accordingly.
>>
>> On Sun, Jun 28, 2015 at 6:45 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>> wrote:
>>
>>> Regarding # of executors.
>>>
>>> I get 342 executors in parallel each time and i set executor-cores to 1.
>>> Hence i need to set 342 * 2 * x (x = 1,2,3, ..) as number of partitions
>>> while running blockJoin. Is this correct.
>>>
>>> And is my assumptions on replication levels correct.
>>>
>>> Did you get a chance to look at my processing.
>>>
>>>
>>>
>>> On Sun, Jun 28, 2015 at 3:31 PM, Koert Kuipers <koert@tresata.com>
>>> wrote:
>>>
>>>> regarding your calculation of executors... RAM in executor is not
>>>> really comparable to size on disk.
>>>>
>>>> if you read from from file and write to file you do not have to set
>>>> storage level.
>>>>
>>>> in the join or blockJoin specify number of partitions  as a multiple
>>>> (say 2 times) of number of cores available to you across all executors (so
>>>> not just number of executors, on yarn you can have many cores per executor).
>>>>
>>>> On Sun, Jun 28, 2015 at 6:04 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>>>> wrote:
>>>>
>>>>> Could you please suggest and help me understand further.
>>>>>
>>>>> This is the actual sizes
>>>>>
>>>>> -sh-4.1$ hadoop fs -count dw_lstg_item
>>>>>            1          764      2041084436189
>>>>> /sys/edw/dw_lstg_item/snapshot/2015/06/25/00
>>>>> *This is not skewed there is exactly one etntry for each item but its
>>>>> 2TB*
>>>>> So should its replication be set to 1 ?
>>>>>
>>>>> The below two datasets (RDD) are unioned and their total size is 150G.
>>>>> These can be skewed and
>>>>> hence we use block join with Scoobi + MR.
>>>>> *So should its replication be set to 3 ?*
>>>>> -sh-4.1$ hadoop fs -count
>>>>> /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
>>>>>            1          101        73796345977
>>>>> /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/20
>>>>> -sh-4.1$ hadoop fs -count
>>>>> /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21
>>>>>            1          101        85559964549
>>>>> /apps/hdmi-prod/b_um/epdatasets/exptsession/2015/06/21
>>>>>
>>>>> Also can you suggest the number of executors to be used in this case
,
>>>>> each executor is started with max 14G of memory?
>>>>>
>>>>> Is it equal to 2TB + 150G (Total data) = 20150 GB/14GB = 1500
>>>>> executors ? Is this calculation correct ?
>>>>>
>>>>> And also please suggest on the
>>>>> "(should be memory-and-disk or disk-only), number of partitions
>>>>> (should be large, multiple of num executors),"
>>>>>
>>>>>
>>>>> https://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose
>>>>>
>>>>> When do i choose this setting ?  (Attached is my code for reference)
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Jun 28, 2015 at 2:57 PM, Koert Kuipers <koert@tresata.com>
>>>>> wrote:
>>>>>
>>>>>> a blockJoin spreads out one side while replicating the other. i would
>>>>>> suggest replicating the smaller side. so if lstgItem is smaller try
>>>>>> 3,1 or else 1,3. this should spread the "fat" keys out over multiple
(3)
>>>>>> executors...
>>>>>>
>>>>>>
>>>>>> On Sun, Jun 28, 2015 at 5:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I am able to use blockjoin API and it does not throw compilation
>>>>>>> error
>>>>>>>
>>>>>>> val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
>>>>>>> Long))] = lstgItem.blockJoin(viEvents,1,1).map {
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> Here viEvents is highly skewed and both are on HDFS.
>>>>>>>
>>>>>>> What should be the optimal values of replication, i gave 1,1
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Jun 28, 2015 at 1:47 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I incremented the version of spark from 1.4.0 to 1.4.0.1
and ran
>>>>>>>>
>>>>>>>>  ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn  -Phive
>>>>>>>> -Phive-thriftserver
>>>>>>>>
>>>>>>>> Build was successful but the script faild. Is there a way
to pass
>>>>>>>> the incremented version ?
>>>>>>>>
>>>>>>>>
>>>>>>>> [INFO] BUILD SUCCESS
>>>>>>>>
>>>>>>>> [INFO]
>>>>>>>> ------------------------------------------------------------------------
>>>>>>>>
>>>>>>>> [INFO] Total time: 09:56 min
>>>>>>>>
>>>>>>>> [INFO] Finished at: 2015-06-28T13:45:29-07:00
>>>>>>>>
>>>>>>>> [INFO] Final Memory: 84M/902M
>>>>>>>>
>>>>>>>> [INFO]
>>>>>>>> ------------------------------------------------------------------------
>>>>>>>>
>>>>>>>> + rm -rf /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist
>>>>>>>>
>>>>>>>> + mkdir -p /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib
>>>>>>>>
>>>>>>>> + echo 'Spark 1.4.0.1 built for Hadoop 2.4.0'
>>>>>>>>
>>>>>>>> + echo 'Build flags: -Phadoop-2.4' -Pyarn -Phive -Phive-thriftserver
>>>>>>>>
>>>>>>>> + cp
>>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/assembly/target/scala-2.10/spark-assembly-1.4.0.1-hadoop2.4.0.jar
>>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/
>>>>>>>>
>>>>>>>> + cp
>>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/target/scala-2.10/spark-examples-1.4.0.1-hadoop2.4.0.jar
>>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/
>>>>>>>>
>>>>>>>> + cp
>>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/network/yarn/target/scala-2.10/spark-1.4.0.1-yarn-shuffle.jar
>>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/
>>>>>>>>
>>>>>>>> + mkdir -p
>>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/main
>>>>>>>>
>>>>>>>> + cp -r
>>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/examples/src/main
>>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/examples/src/
>>>>>>>>
>>>>>>>> + '[' 1 == 1 ']'
>>>>>>>>
>>>>>>>> + cp
>>>>>>>> '/Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar'
>>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/dist/lib/
>>>>>>>>
>>>>>>>> cp:
>>>>>>>> /Users/dvasthimal/ebay/projects/ep/spark-1.4.0/lib_managed/jars/datanucleus*.jar:
>>>>>>>> No such file or directory
>>>>>>>>
>>>>>>>> LM-SJL-00877532:spark-1.4.0 dvasthimal$ ./make-distribution.sh
>>>>>>>> --tgz -Phadoop-2.4 -Pyarn  -Phive -Phive-thriftserver
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Jun 28, 2015 at 1:41 PM, Koert Kuipers <koert@tresata.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> you need 1) to publish to inhouse maven, so your application
can
>>>>>>>>> depend on your version, and 2) use the spark distribution
you compiled to
>>>>>>>>> launch your job (assuming you run with yarn so you can
launch multiple
>>>>>>>>> versions of spark on same cluster)
>>>>>>>>>
>>>>>>>>> On Sun, Jun 28, 2015 at 4:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)
<
>>>>>>>>> deepujain@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> How can i import this pre-built spark into my application
via
>>>>>>>>>> maven as i want to use the block join API.
>>>>>>>>>>
>>>>>>>>>> On Sun, Jun 28, 2015 at 1:31 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)
<
>>>>>>>>>> deepujain@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I ran this w/o maven options
>>>>>>>>>>>
>>>>>>>>>>> ./make-distribution.sh  --tgz -Phadoop-2.4 -Pyarn
 -Phive
>>>>>>>>>>> -Phive-thriftserver
>>>>>>>>>>>
>>>>>>>>>>> I got this spark-1.4.0-bin-2.4.0.tgz in the same
working
>>>>>>>>>>> directory.
>>>>>>>>>>>
>>>>>>>>>>> I hope this is built with 2.4.x hadoop as i did
specify -P
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Jun 28, 2015 at 1:10 PM, ÐΞ€ρ@Ҝ
(๏̯͡๏) <
>>>>>>>>>>> deepujain@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>  ./make-distribution.sh  --tgz --*mvn* "-Phadoop-2.4
-Pyarn
>>>>>>>>>>>> -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver
-DskipTests clean package"
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> or
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>  ./make-distribution.sh  --tgz --*mvn* -Phadoop-2.4
-Pyarn
>>>>>>>>>>>> -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver
-DskipTests clean package"
>>>>>>>>>>>> ​Both fail with
>>>>>>>>>>>>
>>>>>>>>>>>> + echo -e 'Specify the Maven command with
the --mvn flag'
>>>>>>>>>>>>
>>>>>>>>>>>> Specify the Maven command with the --mvn
flag
>>>>>>>>>>>>
>>>>>>>>>>>> + exit -1
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Deepak
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Deepak
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Deepak
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Deepak
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Deepak
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>
>
> --
> Deepak
>
>


-- 
Deepak

Mime
View raw message