spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Improving performance of a kafka spark streaming app
Date Sat, 18 Jun 2016 20:08:20 GMT
Ok

What is the set up for these please?

batch window
window length
sliding interval

And also in each batch window how much data do you get in (no of messages
in the topic whatever)?




Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 18 June 2016 at 21:01, Mich Talebzadeh <mich.talebzadeh@gmail.com> wrote:

> I believe you have an issue with performance?
>
> have you checked spark GUI (default 4040) for details including shuffles
> etc?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 18 June 2016 at 20:59, Colin Kincaid Williams <discord@uw.edu> wrote:
>
>> There are 25 nodes in the spark cluster.
>>
>> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
>> <mich.talebzadeh@gmail.com> wrote:
>> > how many nodes are in your cluster?
>> >
>> > --num-executors 6 \
>> >  --driver-memory 4G \
>> >  --executor-memory 2G \
>> >  --total-executor-cores 12 \
>> >
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > LinkedIn
>> >
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >
>> >
>> >
>> > http://talebzadehmich.wordpress.com
>> >
>> >
>> >
>> >
>> > On 18 June 2016 at 20:40, Colin Kincaid Williams <discord@uw.edu>
>> wrote:
>> >>
>> >> I updated my app to Spark 1.5.2 streaming so that it consumes from
>> >> Kafka using the direct api and inserts content into an hbase cluster,
>> >> as described in this thread. I was away from this project for awhile
>> >> due to events in my family.
>> >>
>> >> Currently my scheduling delay is high, but the processing time is
>> >> stable around a second. I changed my setup to use 6 kafka partitions
>> >> on a set of smaller kafka brokers, with fewer disks. I've included
>> >> some details below, including the script I use to launch the
>> >> application. I'm using a Spark on Hbase library, whose version is
>> >> relevant to my Hbase cluster. Is it apparent there is something wrong
>> >> with my launch method that could be causing the delay, related to the
>> >> included jars?
>> >>
>> >> Or is there something wrong with the very simple approach I'm taking
>> >> for the application?
>> >>
>> >> Any advice is appriciated.
>> >>
>> >>
>> >> The application:
>> >>
>> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>> >>
>> >>
>> >> From the streaming UI I get something like:
>> >>
>> >> table Completed Batches (last 1000 out of 27136)
>> >>
>> >>
>> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
>> >> Delay (?) Output Ops: Succeeded/Total
>> >>
>> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
>> >>
>> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
>> >>
>> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
>> >>
>> >>
>> >> Here's how I'm launching the spark application.
>> >>
>> >>
>> >> #!/usr/bin/env bash
>> >>
>> >> export SPARK_CONF_DIR=/home/colin.williams/spark
>> >>
>> >> export HADOOP_CONF_DIR=/etc/hadoop/conf
>> >>
>> >> export
>> >>
>> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>> >>
>> >>
>> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
>> >>
>> >> --class com.example.KafkaToHbase \
>> >>
>> >> --master spark://spark_master:7077 \
>> >>
>> >> --deploy-mode client \
>> >>
>> >> --num-executors 6 \
>> >>
>> >> --driver-memory 4G \
>> >>
>> >> --executor-memory 2G \
>> >>
>> >> --total-executor-cores 12 \
>> >>
>> >> --jars
>> >>
>> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
>> >> \
>> >>
>> >> --conf spark.app.name="Kafka To Hbase" \
>> >>
>> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
>> >>
>> >> --conf spark.eventLog.enabled=false \
>> >>
>> >> --conf spark.eventLog.overwrite=true \
>> >>
>> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>> >>
>> >> --conf spark.streaming.backpressure.enabled=false \
>> >>
>> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \
>> >>
>> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \
>> >>
>> >> --driver-java-options
>> >>
>> >>
>> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
>> >> \
>> >>
>> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
>> >> "broker1:9092,broker2:9092"
>> >>
>> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams <discord@uw.edu
>> >
>> >> wrote:
>> >> > Thanks Cody, I can see that the partitions are well distributed...
>> >> > Then I'm in the process of using the direct api.
>> >> >
>> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <cody@koeninger.org>
>> >> > wrote:
>> >> >> 60 partitions in and of itself shouldn't be a big performance issue
>> >> >> (as long as producers are distributing across partitions evenly).
>> >> >>
>> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams <
>> discord@uw.edu>
>> >> >> wrote:
>> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions
on 3
>> >> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe
the
>> >> >>> issue with the receiver was the large number of partitions.
I had
>> >> >>> miscounted the disks and so 11*3*2 is how I decided to partition
my
>> >> >>> topic on insertion, ( by my own, unjustified reasoning, on
a first
>> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion
>> entries
>> >> >>> into Kafka on a map reduce job in 5 and a half hours.
>> >> >>>
>> >> >>> I was concerned using spark 1.5.2 because I'm currently putting
my
>> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library
>> jars
>> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a
bit
>> >> >>> yesterday, I tried building against 1.5.2. So far it's running
>> without
>> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too
much
>> >> >>> improvement using the same code, but I'll see how the direct
api
>> >> >>> handles it. In the end I can reduce the number of partitions
in
>> Kafka
>> >> >>> if it causes big performance issues.
>> >> >>>
>> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger <cody@koeninger.org
>> >
>> >> >>> wrote:
>> >> >>>> print() isn't really the best way to benchmark things,
since it
>> calls
>> >> >>>> take(10) under the covers, but 380 records / second for
a single
>> >> >>>> receiver doesn't sound right in any case.
>> >> >>>>
>> >> >>>> Am I understanding correctly that you're trying to process
a large
>> >> >>>> number of already-existing kafka messages, not keep up
with an
>> >> >>>> incoming stream?  Can you give any details (e.g. hardware,
number
>> of
>> >> >>>> topicpartitions, etc)?
>> >> >>>>
>> >> >>>> Really though, I'd try to start with spark 1.6 and direct
>> streams, or
>> >> >>>> even just kafkacat, as a baseline.
>> >> >>>>
>> >> >>>>
>> >> >>>>
>> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
>> >> >>>> <discord@uw.edu> wrote:
>> >> >>>>> Hello again. I searched for "backport kafka" in the
list archives
>> >> >>>>> but
>> >> >>>>> couldn't find anything but a post from Spark 0.7.2
. I was going
>> to
>> >> >>>>> use accumulators to make a counter, but then saw on
the Streaming
>> >> >>>>> tab
>> >> >>>>> the Receiver Statistics. Then I removed all other "functionality"
>> >> >>>>> except:
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]>
dstream =
>> >> >>>>> KafkaUtils
>> >> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U>
keyDecoderClass,
>> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
>> >> >>>>> kafkaParams,
>> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel
storageLevel)
>> >> >>>>>       .createStream(jssc, byte[].class, byte[].class,
>> >> >>>>> kafka.serializer.DefaultDecoder.class,
>> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap,
topicMap,
>> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
>> >> >>>>>
>> >> >>>>>        dstream.print();
>> >> >>>>>
>> >> >>>>> Then in the Recieiver Stats for the single receiver,
I'm seeing
>> >> >>>>> around
>> >> >>>>> 380 records / second. Then to get anywhere near my
10% mentioned
>> >> >>>>> above, I'd need to run around 21 receivers, assuming
380 records
>> /
>> >> >>>>> second, just using the print output. This seems awfully
high to
>> me,
>> >> >>>>> considering that I wrote 80000+ records a second to
Kafka from a
>> >> >>>>> mapreduce job, and that my bottleneck was likely Hbase.
Again
>> using
>> >> >>>>> the 380 estimate, I would need 200+ receivers to reach
a similar
>> >> >>>>> amount of reads.
>> >> >>>>>
>> >> >>>>> Even given the issues with the 1.2 receivers, is this
the
>> expected
>> >> >>>>> way
>> >> >>>>> to use the Kafka streaming API, or am I doing something
terribly
>> >> >>>>> wrong?
>> >> >>>>>
>> >> >>>>> My application looks like
>> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>> >> >>>>>
>> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger <
>> cody@koeninger.org>
>> >> >>>>> wrote:
>> >> >>>>>> Have you tested for read throughput (without writing
to hbase,
>> just
>> >> >>>>>> deserialize)?
>> >> >>>>>>
>> >> >>>>>> Are you limited to using spark 1.2, or is upgrading
possible?
>> The
>> >> >>>>>> kafka direct stream is available starting with
1.3.  If you're
>> >> >>>>>> stuck
>> >> >>>>>> on 1.2, I believe there have been some attempts
to backport it,
>> >> >>>>>> search
>> >> >>>>>> the mailing list archives.
>> >> >>>>>>
>> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid
Williams
>> >> >>>>>> <discord@uw.edu> wrote:
>> >> >>>>>>> I've written an application to get content
from a kafka topic
>> with
>> >> >>>>>>> 1.7
>> >> >>>>>>> billion entries,  get the protobuf serialized
entries, and
>> insert
>> >> >>>>>>> into
>> >> >>>>>>> hbase. Currently the environment that I'm running
in is Spark
>> 1.2.
>> >> >>>>>>>
>> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm
only getting
>> between
>> >> >>>>>>> 0-2500 writes / second. This will take much
too long to consume
>> >> >>>>>>> the
>> >> >>>>>>> entries.
>> >> >>>>>>>
>> >> >>>>>>> I currently believe that the spark kafka receiver
is the
>> >> >>>>>>> bottleneck.
>> >> >>>>>>> I've tried both 1.2 receivers, with the WAL
and without, and
>> >> >>>>>>> didn't
>> >> >>>>>>> notice any large performance difference. I've
tried many
>> different
>> >> >>>>>>> spark configuration options, but can't seem
to get better
>> >> >>>>>>> performance.
>> >> >>>>>>>
>> >> >>>>>>> I saw 80000 requests / second inserting these
records into
>> kafka
>> >> >>>>>>> using
>> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>> >> >>>>>>>
>> >> >>>>>>> While hbase inserts might not deliver the same
throughput, I'd
>> >> >>>>>>> like to
>> >> >>>>>>> at least get 10%.
>> >> >>>>>>>
>> >> >>>>>>> My application looks like
>> >> >>>>>>>
>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>> >> >>>>>>>
>> >> >>>>>>> This is my first spark application. I'd appreciate
any
>> assistance.
>> >> >>>>>>>
>> >> >>>>>>>
>> >> >>>>>>>
>> ---------------------------------------------------------------------
>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>> >> >>>>>>>
>> >> >>>>
>> >> >>>>
>> ---------------------------------------------------------------------
>> >> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >> >>>> For additional commands, e-mail: user-help@spark.apache.org
>> >> >>>>
>> >>
>> >> ---------------------------------------------------------------------
>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >> For additional commands, e-mail: user-help@spark.apache.org
>> >>
>> >
>>
>
>

Mime
View raw message