spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
Subject Re: How to speed up Spark process
Date Tue, 14 Jul 2015 05:37:28 GMT
I stopped at 35 repartitions as it takes around 12-14 minutes. I cached a
RDD as it was used in the next two tasks.  However it slowed down the
process.

Code:

    val genericRecordsAndKeys = inputRecords.map {

      record =>

        val rec = new MasterPrimeRecord(detail, record)

        var keyToOutput = new StringBuilder("");

        dimensions.foreach {

          dim =>

            keyToOutput = keyToOutput.append("_" + rec.get(dim).toString)

        }

        (keyToOutput.toString, rec)

    }

    genericRecordsAndKeys.cache


    val quantiles = genericRecordsAndKeys

      .map {

        case (keyToOutput, rec) =>

          var digest: TDigest = TDigest.createAvlTreeDigest(10)

          val fpPaidGMB = rec.get("fpPaidGMB").asInstanceOf[Double]

          digest.add(fpPaidGMB)

          var bbuf: ByteBuffer = ByteBuffer.allocate(digest.byteSize());

          digest.asBytes(bbuf);

          (keyToOutput.toString, bbuf.array())

      }.reduceByKey {

      case (v1, v2) =>

        var tree1 = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v1
.asInstanceOf[scala.Array[Byte]]))

        var tree2 = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v2
.asInstanceOf[scala.Array[Byte]]))

        tree1.add(tree2)

        tree1.compress()

        var bbuf: ByteBuffer = ByteBuffer.allocate(tree1.byteSize())

        tree1.asBytes(bbuf)

        bbuf.array

    }


    val outputRecords: RDD[(AvroKey[MasterPrimeRecord], NullWritable)] =
genericRecordsAndKeys.join(quantiles).map {

      case (k, v) =>

        val masterPrimeRec = v._1

        val mergedTree = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v._2))

        val capVal = mergedTree.quantile(0.999)

        if (masterPrimeRec.get("fpPaidGMB").asInstanceOf[Double] > capVal) {

          masterPrimeRec.put("fpPaidGMB", capVal)

        }

        val wrap = new AvroKey[MasterPrimeRecord](masterPrimeRec)

        (wrap, NullWritable.get)

    }

On Mon, Jul 13, 2015 at 9:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com> wrote:

> My guess worked fine now. The repartion took aproximately 1/4 the time as
> i reduce the number of paritions.
> And the rest of the process took 1/4 extra time but that is ok.
>
> On Mon, Jul 13, 2015 at 9:46 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
> wrote:
>
>> I reduced the number of partitions to 1/4 to   76  in order to reduce the
>> time to 1/4 (from 33 to 8) But the re-parition is still running beyond 15
>> mins.
>>
>> @Nirmal
>> click on details, shows the code lines and does not show why it is slow.
>> I know that repartition is slow and want to speed it up
>>
>> @Sharma
>> I have seen increasing the cores speeds up reparition, but it does slow
>> down the rest of the stages in the job plan.
>>
>>
>> I need some logical explanation and math to know before hand , otherwise
>> with Spark am always firing in dark. Spark has been a depressingly
>> lackluster so far (Join use case and now a simple outlier detection using
>> TDigest)
>>
>> On Mon, Jul 13, 2015 at 9:37 PM, Aniruddh Sharma <asharma.gd@gmail.com>
>> wrote:
>>
>>> Hi Deepak
>>>
>>> Not 100% sure , but please try increasing (--executor-cores ) to twice
>>> the number of your physical cores on your machine.
>>>
>>> Thanks and Regards
>>> Aniruddh
>>>
>>> On Tue, Jul 14, 2015 at 9:49 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>>> wrote:
>>>
>>>> Its been 30 minutes and still the partitioner has not completed yet,
>>>> its ever.
>>>>
>>>> Without repartition, i see this error
>>>> https://issues.apache.org/jira/browse/SPARK-5928
>>>>
>>>>
>>>>  FetchFailed(BlockManagerId(1, imran-2.ent.cloudera.com, 55028), shuffleId=1,
mapId=0, reduceId=0, message=
>>>> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds
2147483647: 3021252889 - discarded
>>>> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>>>> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Jul 13, 2015 at 8:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com>
>>>> wrote:
>>>>
>>>>> I have 100 MB of Avro data. and i do repartition(307) is taking
>>>>> forever.
>>>>>
>>>>> 2. val x = input.repartition(7907).map( {k1,k2,k3,k4}, {inputRecord}
)
>>>>> 3. val quantiles = x.map( {k1,k2,k3,k4},  TDigest(inputRecord).asBytes
>>>>> ).reduceByKey() [ This was groupBy earlier ]
>>>>> 4. x.join(quantiles).coalesce(100).writeInAvro
>>>>>
>>>>>
>>>>> Attached is full Scala code.
>>>>>
>>>>> I have 340 Yarn node cluster with 14G Ram on each node and have input
>>>>> data of just just 100 MB.  (Hadoop takes 2.5 hours on 1 TB dataset)
>>>>>
>>>>>
>>>>> ./bin/spark-submit -v --master yarn-cluster  --jars
>>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>>>>>  --num-executors 330 --driver-memory 14g --driver-java-options
>>>>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails
>>>>> -XX:+PrintGCTimeStamps" --executor-memory 14g --executor-cores 1 --queue
>>>>> hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp
>>>>> /home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar
>>>>> startDate=2015-06-20 endDate=2015-06-21
>>>>> input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=ppwmasterprime
>>>>> output=/user/dvasthimal/epdatasets/ppwmasterprime buffersize=128
>>>>> maxbuffersize=1068 maxResultSize=200G
>>>>>
>>>>>
>>>>> I see this in stdout of the task on that executor
>>>>>
>>>>> 15/07/13 19:58:48 WARN hdfs.BlockReaderLocal: The short-circuit local
reads feature cannot be used because libhadoop cannot be loaded.
>>>>> 15/07/13 20:00:08 INFO collection.ExternalSorter: Thread 47 spilling
in-memory map of 2.2 GB to disk (1 time so far)
>>>>> 15/07/13 20:01:31 INFO collection.ExternalSorter: Thread 47 spilling
in-memory map of 2.2 GB to disk (2 times so far)
>>>>> 15/07/13 20:03:07 INFO collection.ExternalSorter: Thread 47 spilling
in-memory map of 2.2 GB to disk (3 times so far)
>>>>> 15/07/13 20:04:32 INFO collection.ExternalSorter: Thread 47 spilling
in-memory map of 2.2 GB to disk (4 times so far)
>>>>> 15/07/13 20:06:21 INFO collection.ExternalSorter: Thread 47 spilling
in-memory map of 2.2 GB to disk (5 times so far)
>>>>> 15/07/13 20:08:09 INFO collection.ExternalSorter: Thread 47 spilling
in-memory map of 2.2 GB to disk (6 times so far)
>>>>> 15/07/13 20:09:51 INFO collection.ExternalSorter: Thread 47 spilling
in-memory map of 2.2 GB to disk (7 times so far)
>>>>>
>>>>>
>>>>>
>>>>> Also attached is the thread dump
>>>>>
>>>>>
>>>>> --
>>>>> Deepak
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>


-- 
Deepak

Mime
View raw message