spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Colin Kincaid Williams <disc...@uw.edu>
Subject Re: Improving performance of a kafka spark streaming app
Date Sat, 18 Jun 2016 21:40:56 GMT
Hi Mich again,

Regarding batch window, etc. I have provided the sources, but I'm not
currently calling the window function. Did you see the program source?
It's only 100 lines.

https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877

Then I would expect I'm using defaults, other than what has been shown
in the configuration.

For example:

In the launcher configuration I set --conf
spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there
are 500 messages for the duration set in the application:
JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
Duration(1000));


Then with the --num-executors 6 \ submit flag, and the
spark.streaming.kafka.maxRatePerPartition=500 I think that's how we
arrive at the 3000 events per batch in the UI, pasted above.

Feel free to correct me if I'm wrong.

Then are you suggesting that I set the window?

Maybe following this as reference:

https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html

On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh
<mich.talebzadeh@gmail.com> wrote:
> Ok
>
> What is the set up for these please?
>
> batch window
> window length
> sliding interval
>
> And also in each batch window how much data do you get in (no of messages in
> the topic whatever)?
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
> On 18 June 2016 at 21:01, Mich Talebzadeh <mich.talebzadeh@gmail.com> wrote:
>>
>> I believe you have an issue with performance?
>>
>> have you checked spark GUI (default 4040) for details including shuffles
>> etc?
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>>
>> On 18 June 2016 at 20:59, Colin Kincaid Williams <discord@uw.edu> wrote:
>>>
>>> There are 25 nodes in the spark cluster.
>>>
>>> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
>>> <mich.talebzadeh@gmail.com> wrote:
>>> > how many nodes are in your cluster?
>>> >
>>> > --num-executors 6 \
>>> >  --driver-memory 4G \
>>> >  --executor-memory 2G \
>>> >  --total-executor-cores 12 \
>>> >
>>> >
>>> > Dr Mich Talebzadeh
>>> >
>>> >
>>> >
>>> > LinkedIn
>>> >
>>> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> >
>>> >
>>> >
>>> > http://talebzadehmich.wordpress.com
>>> >
>>> >
>>> >
>>> >
>>> > On 18 June 2016 at 20:40, Colin Kincaid Williams <discord@uw.edu>
>>> > wrote:
>>> >>
>>> >> I updated my app to Spark 1.5.2 streaming so that it consumes from
>>> >> Kafka using the direct api and inserts content into an hbase cluster,
>>> >> as described in this thread. I was away from this project for awhile
>>> >> due to events in my family.
>>> >>
>>> >> Currently my scheduling delay is high, but the processing time is
>>> >> stable around a second. I changed my setup to use 6 kafka partitions
>>> >> on a set of smaller kafka brokers, with fewer disks. I've included
>>> >> some details below, including the script I use to launch the
>>> >> application. I'm using a Spark on Hbase library, whose version is
>>> >> relevant to my Hbase cluster. Is it apparent there is something wrong
>>> >> with my launch method that could be causing the delay, related to the
>>> >> included jars?
>>> >>
>>> >> Or is there something wrong with the very simple approach I'm taking
>>> >> for the application?
>>> >>
>>> >> Any advice is appriciated.
>>> >>
>>> >>
>>> >> The application:
>>> >>
>>> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>> >>
>>> >>
>>> >> From the streaming UI I get something like:
>>> >>
>>> >> table Completed Batches (last 1000 out of 27136)
>>> >>
>>> >>
>>> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
>>> >> Delay (?) Output Ops: Succeeded/Total
>>> >>
>>> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
>>> >>
>>> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
>>> >>
>>> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
>>> >>
>>> >>
>>> >> Here's how I'm launching the spark application.
>>> >>
>>> >>
>>> >> #!/usr/bin/env bash
>>> >>
>>> >> export SPARK_CONF_DIR=/home/colin.williams/spark
>>> >>
>>> >> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>> >>
>>> >> export
>>> >>
>>> >> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>>> >>
>>> >>
>>> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
>>> >>
>>> >> --class com.example.KafkaToHbase \
>>> >>
>>> >> --master spark://spark_master:7077 \
>>> >>
>>> >> --deploy-mode client \
>>> >>
>>> >> --num-executors 6 \
>>> >>
>>> >> --driver-memory 4G \
>>> >>
>>> >> --executor-memory 2G \
>>> >>
>>> >> --total-executor-cores 12 \
>>> >>
>>> >> --jars
>>> >>
>>> >> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
>>> >> \
>>> >>
>>> >> --conf spark.app.name="Kafka To Hbase" \
>>> >>
>>> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
>>> >>
>>> >> --conf spark.eventLog.enabled=false \
>>> >>
>>> >> --conf spark.eventLog.overwrite=true \
>>> >>
>>> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>>> >>
>>> >> --conf spark.streaming.backpressure.enabled=false \
>>> >>
>>> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \
>>> >>
>>> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \
>>> >>
>>> >> --driver-java-options
>>> >>
>>> >>
>>> >> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
>>> >> \
>>> >>
>>> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
>>> >> "broker1:9092,broker2:9092"
>>> >>
>>> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams
>>> >> <discord@uw.edu>
>>> >> wrote:
>>> >> > Thanks Cody, I can see that the partitions are well distributed...
>>> >> > Then I'm in the process of using the direct api.
>>> >> >
>>> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <cody@koeninger.org>
>>> >> > wrote:
>>> >> >> 60 partitions in and of itself shouldn't be a big performance
issue
>>> >> >> (as long as producers are distributing across partitions evenly).
>>> >> >>
>>> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams
>>> >> >> <discord@uw.edu>
>>> >> >> wrote:
>>> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions
on 3
>>> >> >>> kafka servers, likely 8 core systems with 10 disks each.
Maybe the
>>> >> >>> issue with the receiver was the large number of partitions.
I had
>>> >> >>> miscounted the disks and so 11*3*2 is how I decided to
partition
>>> >> >>> my
>>> >> >>> topic on insertion, ( by my own, unjustified reasoning,
on a first
>>> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion
>>> >> >>> entries
>>> >> >>> into Kafka on a map reduce job in 5 and a half hours.
>>> >> >>>
>>> >> >>> I was concerned using spark 1.5.2 because I'm currently
putting my
>>> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98
library
>>> >> >>> jars
>>> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite
a bit
>>> >> >>> yesterday, I tried building against 1.5.2. So far it's
running
>>> >> >>> without
>>> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was
too much
>>> >> >>> improvement using the same code, but I'll see how the direct
api
>>> >> >>> handles it. In the end I can reduce the number of partitions
in
>>> >> >>> Kafka
>>> >> >>> if it causes big performance issues.
>>> >> >>>
>>> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger
>>> >> >>> <cody@koeninger.org>
>>> >> >>> wrote:
>>> >> >>>> print() isn't really the best way to benchmark things,
since it
>>> >> >>>> calls
>>> >> >>>> take(10) under the covers, but 380 records / second
for a single
>>> >> >>>> receiver doesn't sound right in any case.
>>> >> >>>>
>>> >> >>>> Am I understanding correctly that you're trying to
process a
>>> >> >>>> large
>>> >> >>>> number of already-existing kafka messages, not keep
up with an
>>> >> >>>> incoming stream?  Can you give any details (e.g. hardware,
number
>>> >> >>>> of
>>> >> >>>> topicpartitions, etc)?
>>> >> >>>>
>>> >> >>>> Really though, I'd try to start with spark 1.6 and
direct
>>> >> >>>> streams, or
>>> >> >>>> even just kafkacat, as a baseline.
>>> >> >>>>
>>> >> >>>>
>>> >> >>>>
>>> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
>>> >> >>>> <discord@uw.edu> wrote:
>>> >> >>>>> Hello again. I searched for "backport kafka" in
the list
>>> >> >>>>> archives
>>> >> >>>>> but
>>> >> >>>>> couldn't find anything but a post from Spark 0.7.2
. I was going
>>> >> >>>>> to
>>> >> >>>>> use accumulators to make a counter, but then saw
on the
>>> >> >>>>> Streaming
>>> >> >>>>> tab
>>> >> >>>>> the Receiver Statistics. Then I removed all other
>>> >> >>>>> "functionality"
>>> >> >>>>> except:
>>> >> >>>>>
>>> >> >>>>>
>>> >> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]>
dstream =
>>> >> >>>>> KafkaUtils
>>> >> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>>> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U>
keyDecoderClass,
>>> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
>>> >> >>>>> kafkaParams,
>>> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel
storageLevel)
>>> >> >>>>>       .createStream(jssc, byte[].class, byte[].class,
>>> >> >>>>> kafka.serializer.DefaultDecoder.class,
>>> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap,
topicMap,
>>> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
>>> >> >>>>>
>>> >> >>>>>        dstream.print();
>>> >> >>>>>
>>> >> >>>>> Then in the Recieiver Stats for the single receiver,
I'm seeing
>>> >> >>>>> around
>>> >> >>>>> 380 records / second. Then to get anywhere near
my 10% mentioned
>>> >> >>>>> above, I'd need to run around 21 receivers, assuming
380 records
>>> >> >>>>> /
>>> >> >>>>> second, just using the print output. This seems
awfully high to
>>> >> >>>>> me,
>>> >> >>>>> considering that I wrote 80000+ records a second
to Kafka from a
>>> >> >>>>> mapreduce job, and that my bottleneck was likely
Hbase. Again
>>> >> >>>>> using
>>> >> >>>>> the 380 estimate, I would need 200+ receivers to
reach a similar
>>> >> >>>>> amount of reads.
>>> >> >>>>>
>>> >> >>>>> Even given the issues with the 1.2 receivers, is
this the
>>> >> >>>>> expected
>>> >> >>>>> way
>>> >> >>>>> to use the Kafka streaming API, or am I doing something
terribly
>>> >> >>>>> wrong?
>>> >> >>>>>
>>> >> >>>>> My application looks like
>>> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>> >> >>>>>
>>> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger
>>> >> >>>>> <cody@koeninger.org>
>>> >> >>>>> wrote:
>>> >> >>>>>> Have you tested for read throughput (without
writing to hbase,
>>> >> >>>>>> just
>>> >> >>>>>> deserialize)?
>>> >> >>>>>>
>>> >> >>>>>> Are you limited to using spark 1.2, or is upgrading
possible?
>>> >> >>>>>> The
>>> >> >>>>>> kafka direct stream is available starting with
1.3.  If you're
>>> >> >>>>>> stuck
>>> >> >>>>>> on 1.2, I believe there have been some attempts
to backport it,
>>> >> >>>>>> search
>>> >> >>>>>> the mailing list archives.
>>> >> >>>>>>
>>> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid
Williams
>>> >> >>>>>> <discord@uw.edu> wrote:
>>> >> >>>>>>> I've written an application to get content
from a kafka topic
>>> >> >>>>>>> with
>>> >> >>>>>>> 1.7
>>> >> >>>>>>> billion entries,  get the protobuf serialized
entries, and
>>> >> >>>>>>> insert
>>> >> >>>>>>> into
>>> >> >>>>>>> hbase. Currently the environment that I'm
running in is Spark
>>> >> >>>>>>> 1.2.
>>> >> >>>>>>>
>>> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs,
I'm only getting
>>> >> >>>>>>> between
>>> >> >>>>>>> 0-2500 writes / second. This will take
much too long to
>>> >> >>>>>>> consume
>>> >> >>>>>>> the
>>> >> >>>>>>> entries.
>>> >> >>>>>>>
>>> >> >>>>>>> I currently believe that the spark kafka
receiver is the
>>> >> >>>>>>> bottleneck.
>>> >> >>>>>>> I've tried both 1.2 receivers, with the
WAL and without, and
>>> >> >>>>>>> didn't
>>> >> >>>>>>> notice any large performance difference.
I've tried many
>>> >> >>>>>>> different
>>> >> >>>>>>> spark configuration options, but can't
seem to get better
>>> >> >>>>>>> performance.
>>> >> >>>>>>>
>>> >> >>>>>>> I saw 80000 requests / second inserting
these records into
>>> >> >>>>>>> kafka
>>> >> >>>>>>> using
>>> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk
fashion.
>>> >> >>>>>>>
>>> >> >>>>>>> While hbase inserts might not deliver the
same throughput, I'd
>>> >> >>>>>>> like to
>>> >> >>>>>>> at least get 10%.
>>> >> >>>>>>>
>>> >> >>>>>>> My application looks like
>>> >> >>>>>>>
>>> >> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>> >> >>>>>>>
>>> >> >>>>>>> This is my first spark application. I'd
appreciate any
>>> >> >>>>>>> assistance.
>>> >> >>>>>>>
>>> >> >>>>>>>
>>> >> >>>>>>>
>>> >> >>>>>>> ---------------------------------------------------------------------
>>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>> >> >>>>>>>
>>> >> >>>>
>>> >> >>>>
>>> >> >>>> ---------------------------------------------------------------------
>>> >> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >> >>>> For additional commands, e-mail: user-help@spark.apache.org
>>> >> >>>>
>>> >>
>>> >> ---------------------------------------------------------------------
>>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >> For additional commands, e-mail: user-help@spark.apache.org
>>> >>
>>> >
>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message