spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Colin Kincaid Williams <disc...@uw.edu>
Subject Re: Improving performance of a kafka spark streaming app
Date Sat, 18 Jun 2016 20:00:59 GMT
I'm attaching a picture from the streaming UI.

On Sat, Jun 18, 2016 at 7:59 PM, Colin Kincaid Williams <discord@uw.edu> wrote:
> There are 25 nodes in the spark cluster.
>
> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
> <mich.talebzadeh@gmail.com> wrote:
>> how many nodes are in your cluster?
>>
>> --num-executors 6 \
>>  --driver-memory 4G \
>>  --executor-memory 2G \
>>  --total-executor-cores 12 \
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>>
>> On 18 June 2016 at 20:40, Colin Kincaid Williams <discord@uw.edu> wrote:
>>>
>>> I updated my app to Spark 1.5.2 streaming so that it consumes from
>>> Kafka using the direct api and inserts content into an hbase cluster,
>>> as described in this thread. I was away from this project for awhile
>>> due to events in my family.
>>>
>>> Currently my scheduling delay is high, but the processing time is
>>> stable around a second. I changed my setup to use 6 kafka partitions
>>> on a set of smaller kafka brokers, with fewer disks. I've included
>>> some details below, including the script I use to launch the
>>> application. I'm using a Spark on Hbase library, whose version is
>>> relevant to my Hbase cluster. Is it apparent there is something wrong
>>> with my launch method that could be causing the delay, related to the
>>> included jars?
>>>
>>> Or is there something wrong with the very simple approach I'm taking
>>> for the application?
>>>
>>> Any advice is appriciated.
>>>
>>>
>>> The application:
>>>
>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>
>>>
>>> From the streaming UI I get something like:
>>>
>>> table Completed Batches (last 1000 out of 27136)
>>>
>>>
>>> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
>>> Delay (?) Output Ops: Succeeded/Total
>>>
>>> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
>>>
>>> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
>>>
>>> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
>>>
>>>
>>> Here's how I'm launching the spark application.
>>>
>>>
>>> #!/usr/bin/env bash
>>>
>>> export SPARK_CONF_DIR=/home/colin.williams/spark
>>>
>>> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>>
>>> export
>>> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>>>
>>>
>>> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
>>>
>>> --class com.example.KafkaToHbase \
>>>
>>> --master spark://spark_master:7077 \
>>>
>>> --deploy-mode client \
>>>
>>> --num-executors 6 \
>>>
>>> --driver-memory 4G \
>>>
>>> --executor-memory 2G \
>>>
>>> --total-executor-cores 12 \
>>>
>>> --jars
>>> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
>>> \
>>>
>>> --conf spark.app.name="Kafka To Hbase" \
>>>
>>> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
>>>
>>> --conf spark.eventLog.enabled=false \
>>>
>>> --conf spark.eventLog.overwrite=true \
>>>
>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>>>
>>> --conf spark.streaming.backpressure.enabled=false \
>>>
>>> --conf spark.streaming.kafka.maxRatePerPartition=500 \
>>>
>>> --driver-class-path /home/colin.williams/kafka-hbase.jar \
>>>
>>> --driver-java-options
>>>
>>> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
>>> \
>>>
>>> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
>>> "broker1:9092,broker2:9092"
>>>
>>> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams <discord@uw.edu>
>>> wrote:
>>> > Thanks Cody, I can see that the partitions are well distributed...
>>> > Then I'm in the process of using the direct api.
>>> >
>>> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <cody@koeninger.org>
>>> > wrote:
>>> >> 60 partitions in and of itself shouldn't be a big performance issue
>>> >> (as long as producers are distributing across partitions evenly).
>>> >>
>>> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams <discord@uw.edu>
>>> >> wrote:
>>> >>> Thanks again Cody. Regarding the details 66 kafka partitions on
3
>>> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>>> >>> issue with the receiver was the large number of partitions. I had
>>> >>> miscounted the disks and so 11*3*2 is how I decided to partition
my
>>> >>> topic on insertion, ( by my own, unjustified reasoning, on a first
>>> >>> attempt ) . This worked well enough for me, I put 1.7 billion entries
>>> >>> into Kafka on a map reduce job in 5 and a half hours.
>>> >>>
>>> >>> I was concerned using spark 1.5.2 because I'm currently putting
my
>>> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library
jars
>>> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>>> >>> yesterday, I tried building against 1.5.2. So far it's running without
>>> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>>> >>> improvement using the same code, but I'll see how the direct api
>>> >>> handles it. In the end I can reduce the number of partitions in
Kafka
>>> >>> if it causes big performance issues.
>>> >>>
>>> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger <cody@koeninger.org>
>>> >>> wrote:
>>> >>>> print() isn't really the best way to benchmark things, since
it calls
>>> >>>> take(10) under the covers, but 380 records / second for a single
>>> >>>> receiver doesn't sound right in any case.
>>> >>>>
>>> >>>> Am I understanding correctly that you're trying to process a
large
>>> >>>> number of already-existing kafka messages, not keep up with
an
>>> >>>> incoming stream?  Can you give any details (e.g. hardware, number
of
>>> >>>> topicpartitions, etc)?
>>> >>>>
>>> >>>> Really though, I'd try to start with spark 1.6 and direct streams,
or
>>> >>>> even just kafkacat, as a baseline.
>>> >>>>
>>> >>>>
>>> >>>>
>>> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
>>> >>>> <discord@uw.edu> wrote:
>>> >>>>> Hello again. I searched for "backport kafka" in the list
archives
>>> >>>>> but
>>> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was
going to
>>> >>>>> use accumulators to make a counter, but then saw on the
Streaming
>>> >>>>> tab
>>> >>>>> the Receiver Statistics. Then I removed all other "functionality"
>>> >>>>> except:
>>> >>>>>
>>> >>>>>
>>> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream
=
>>> >>>>> KafkaUtils
>>> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>>> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U>
keyDecoderClass,
>>> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
>>> >>>>> kafkaParams,
>>> >>>>> java.util.Map<String,Integer> topics, StorageLevel
storageLevel)
>>> >>>>>       .createStream(jssc, byte[].class, byte[].class,
>>> >>>>> kafka.serializer.DefaultDecoder.class,
>>> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>>> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
>>> >>>>>
>>> >>>>>        dstream.print();
>>> >>>>>
>>> >>>>> Then in the Recieiver Stats for the single receiver, I'm
seeing
>>> >>>>> around
>>> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned
>>> >>>>> above, I'd need to run around 21 receivers, assuming 380
records /
>>> >>>>> second, just using the print output. This seems awfully
high to me,
>>> >>>>> considering that I wrote 80000+ records a second to Kafka
from a
>>> >>>>> mapreduce job, and that my bottleneck was likely Hbase.
Again using
>>> >>>>> the 380 estimate, I would need 200+ receivers to reach a
similar
>>> >>>>> amount of reads.
>>> >>>>>
>>> >>>>> Even given the issues with the 1.2 receivers, is this the
expected
>>> >>>>> way
>>> >>>>> to use the Kafka streaming API, or am I doing something
terribly
>>> >>>>> wrong?
>>> >>>>>
>>> >>>>> My application looks like
>>> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>> >>>>>
>>> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger <cody@koeninger.org>
>>> >>>>> wrote:
>>> >>>>>> Have you tested for read throughput (without writing
to hbase, just
>>> >>>>>> deserialize)?
>>> >>>>>>
>>> >>>>>> Are you limited to using spark 1.2, or is upgrading
possible?  The
>>> >>>>>> kafka direct stream is available starting with 1.3.
 If you're
>>> >>>>>> stuck
>>> >>>>>> on 1.2, I believe there have been some attempts to backport
it,
>>> >>>>>> search
>>> >>>>>> the mailing list archives.
>>> >>>>>>
>>> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams
>>> >>>>>> <discord@uw.edu> wrote:
>>> >>>>>>> I've written an application to get content from
a kafka topic with
>>> >>>>>>> 1.7
>>> >>>>>>> billion entries,  get the protobuf serialized entries,
and insert
>>> >>>>>>> into
>>> >>>>>>> hbase. Currently the environment that I'm running
in is Spark 1.2.
>>> >>>>>>>
>>> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only
getting between
>>> >>>>>>> 0-2500 writes / second. This will take much too
long to consume
>>> >>>>>>> the
>>> >>>>>>> entries.
>>> >>>>>>>
>>> >>>>>>> I currently believe that the spark kafka receiver
is the
>>> >>>>>>> bottleneck.
>>> >>>>>>> I've tried both 1.2 receivers, with the WAL and
without, and
>>> >>>>>>> didn't
>>> >>>>>>> notice any large performance difference. I've tried
many different
>>> >>>>>>> spark configuration options, but can't seem to get
better
>>> >>>>>>> performance.
>>> >>>>>>>
>>> >>>>>>> I saw 80000 requests / second inserting these records
into kafka
>>> >>>>>>> using
>>> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>> >>>>>>>
>>> >>>>>>> While hbase inserts might not deliver the same throughput,
I'd
>>> >>>>>>> like to
>>> >>>>>>> at least get 10%.
>>> >>>>>>>
>>> >>>>>>> My application looks like
>>> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>> >>>>>>>
>>> >>>>>>> This is my first spark application. I'd appreciate
any assistance.
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>> ---------------------------------------------------------------------
>>> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>> >>>>>>>
>>> >>>>
>>> >>>> ---------------------------------------------------------------------
>>> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >>>> For additional commands, e-mail: user-help@spark.apache.org
>>> >>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>

Mime
View raw message