spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Colin Kincaid Williams <disc...@uw.edu>
Subject Re: Improving performance of a kafka spark streaming app
Date Sat, 18 Jun 2016 19:59:46 GMT
There are 25 nodes in the spark cluster.

On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
<mich.talebzadeh@gmail.com> wrote:
> how many nodes are in your cluster?
>
> --num-executors 6 \
>  --driver-memory 4G \
>  --executor-memory 2G \
>  --total-executor-cores 12 \
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
> On 18 June 2016 at 20:40, Colin Kincaid Williams <discord@uw.edu> wrote:
>>
>> I updated my app to Spark 1.5.2 streaming so that it consumes from
>> Kafka using the direct api and inserts content into an hbase cluster,
>> as described in this thread. I was away from this project for awhile
>> due to events in my family.
>>
>> Currently my scheduling delay is high, but the processing time is
>> stable around a second. I changed my setup to use 6 kafka partitions
>> on a set of smaller kafka brokers, with fewer disks. I've included
>> some details below, including the script I use to launch the
>> application. I'm using a Spark on Hbase library, whose version is
>> relevant to my Hbase cluster. Is it apparent there is something wrong
>> with my launch method that could be causing the delay, related to the
>> included jars?
>>
>> Or is there something wrong with the very simple approach I'm taking
>> for the application?
>>
>> Any advice is appriciated.
>>
>>
>> The application:
>>
>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>
>>
>> From the streaming UI I get something like:
>>
>> table Completed Batches (last 1000 out of 27136)
>>
>>
>> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
>> Delay (?) Output Ops: Succeeded/Total
>>
>> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
>>
>> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
>>
>> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
>>
>>
>> Here's how I'm launching the spark application.
>>
>>
>> #!/usr/bin/env bash
>>
>> export SPARK_CONF_DIR=/home/colin.williams/spark
>>
>> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>
>> export
>> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>>
>>
>> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
>>
>> --class com.example.KafkaToHbase \
>>
>> --master spark://spark_master:7077 \
>>
>> --deploy-mode client \
>>
>> --num-executors 6 \
>>
>> --driver-memory 4G \
>>
>> --executor-memory 2G \
>>
>> --total-executor-cores 12 \
>>
>> --jars
>> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
>> \
>>
>> --conf spark.app.name="Kafka To Hbase" \
>>
>> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
>>
>> --conf spark.eventLog.enabled=false \
>>
>> --conf spark.eventLog.overwrite=true \
>>
>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>>
>> --conf spark.streaming.backpressure.enabled=false \
>>
>> --conf spark.streaming.kafka.maxRatePerPartition=500 \
>>
>> --driver-class-path /home/colin.williams/kafka-hbase.jar \
>>
>> --driver-java-options
>>
>> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
>> \
>>
>> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
>> "broker1:9092,broker2:9092"
>>
>> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams <discord@uw.edu>
>> wrote:
>> > Thanks Cody, I can see that the partitions are well distributed...
>> > Then I'm in the process of using the direct api.
>> >
>> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <cody@koeninger.org>
>> > wrote:
>> >> 60 partitions in and of itself shouldn't be a big performance issue
>> >> (as long as producers are distributing across partitions evenly).
>> >>
>> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams <discord@uw.edu>
>> >> wrote:
>> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
>> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>> >>> issue with the receiver was the large number of partitions. I had
>> >>> miscounted the disks and so 11*3*2 is how I decided to partition my
>> >>> topic on insertion, ( by my own, unjustified reasoning, on a first
>> >>> attempt ) . This worked well enough for me, I put 1.7 billion entries
>> >>> into Kafka on a map reduce job in 5 and a half hours.
>> >>>
>> >>> I was concerned using spark 1.5.2 because I'm currently putting my
>> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library jars
>> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>> >>> yesterday, I tried building against 1.5.2. So far it's running without
>> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>> >>> improvement using the same code, but I'll see how the direct api
>> >>> handles it. In the end I can reduce the number of partitions in Kafka
>> >>> if it causes big performance issues.
>> >>>
>> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger <cody@koeninger.org>
>> >>> wrote:
>> >>>> print() isn't really the best way to benchmark things, since it
calls
>> >>>> take(10) under the covers, but 380 records / second for a single
>> >>>> receiver doesn't sound right in any case.
>> >>>>
>> >>>> Am I understanding correctly that you're trying to process a large
>> >>>> number of already-existing kafka messages, not keep up with an
>> >>>> incoming stream?  Can you give any details (e.g. hardware, number
of
>> >>>> topicpartitions, etc)?
>> >>>>
>> >>>> Really though, I'd try to start with spark 1.6 and direct streams,
or
>> >>>> even just kafkacat, as a baseline.
>> >>>>
>> >>>>
>> >>>>
>> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
>> >>>> <discord@uw.edu> wrote:
>> >>>>> Hello again. I searched for "backport kafka" in the list archives
>> >>>>> but
>> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going
to
>> >>>>> use accumulators to make a counter, but then saw on the Streaming
>> >>>>> tab
>> >>>>> the Receiver Statistics. Then I removed all other "functionality"
>> >>>>> except:
>> >>>>>
>> >>>>>
>> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream
=
>> >>>>> KafkaUtils
>> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
>> >>>>> kafkaParams,
>> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>> >>>>>       .createStream(jssc, byte[].class, byte[].class,
>> >>>>> kafka.serializer.DefaultDecoder.class,
>> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
>> >>>>>
>> >>>>>        dstream.print();
>> >>>>>
>> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing
>> >>>>> around
>> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned
>> >>>>> above, I'd need to run around 21 receivers, assuming 380 records
/
>> >>>>> second, just using the print output. This seems awfully high
to me,
>> >>>>> considering that I wrote 80000+ records a second to Kafka from
a
>> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again
using
>> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar
>> >>>>> amount of reads.
>> >>>>>
>> >>>>> Even given the issues with the 1.2 receivers, is this the expected
>> >>>>> way
>> >>>>> to use the Kafka streaming API, or am I doing something terribly
>> >>>>> wrong?
>> >>>>>
>> >>>>> My application looks like
>> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>> >>>>>
>> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger <cody@koeninger.org>
>> >>>>> wrote:
>> >>>>>> Have you tested for read throughput (without writing to
hbase, just
>> >>>>>> deserialize)?
>> >>>>>>
>> >>>>>> Are you limited to using spark 1.2, or is upgrading possible?
 The
>> >>>>>> kafka direct stream is available starting with 1.3.  If
you're
>> >>>>>> stuck
>> >>>>>> on 1.2, I believe there have been some attempts to backport
it,
>> >>>>>> search
>> >>>>>> the mailing list archives.
>> >>>>>>
>> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams
>> >>>>>> <discord@uw.edu> wrote:
>> >>>>>>> I've written an application to get content from a kafka
topic with
>> >>>>>>> 1.7
>> >>>>>>> billion entries,  get the protobuf serialized entries,
and insert
>> >>>>>>> into
>> >>>>>>> hbase. Currently the environment that I'm running in
is Spark 1.2.
>> >>>>>>>
>> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting
between
>> >>>>>>> 0-2500 writes / second. This will take much too long
to consume
>> >>>>>>> the
>> >>>>>>> entries.
>> >>>>>>>
>> >>>>>>> I currently believe that the spark kafka receiver is
the
>> >>>>>>> bottleneck.
>> >>>>>>> I've tried both 1.2 receivers, with the WAL and without,
and
>> >>>>>>> didn't
>> >>>>>>> notice any large performance difference. I've tried
many different
>> >>>>>>> spark configuration options, but can't seem to get better
>> >>>>>>> performance.
>> >>>>>>>
>> >>>>>>> I saw 80000 requests / second inserting these records
into kafka
>> >>>>>>> using
>> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>> >>>>>>>
>> >>>>>>> While hbase inserts might not deliver the same throughput,
I'd
>> >>>>>>> like to
>> >>>>>>> at least get 10%.
>> >>>>>>>
>> >>>>>>> My application looks like
>> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>> >>>>>>>
>> >>>>>>> This is my first spark application. I'd appreciate any
assistance.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> ---------------------------------------------------------------------
>> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>> >>>>>>>
>> >>>>
>> >>>> ---------------------------------------------------------------------
>> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >>>> For additional commands, e-mail: user-help@spark.apache.org
>> >>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message