spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Colin Kincaid Williams <disc...@uw.edu>
Subject Re: Improving performance of a kafka spark streaming app
Date Thu, 23 Jun 2016 05:09:14 GMT
Streaming UI tab showing empty events and very different metrics than on 1.5.2

On Thu, Jun 23, 2016 at 5:06 AM, Colin Kincaid Williams <discord@uw.edu> wrote:
> After a bit of effort I moved from a Spark cluster running 1.5.2, to a
> Yarn cluster running 1.6.1 jars. I'm still setting the maxRPP. The
> completed batches are no longer showing the number of events processed
> in the Streaming UI tab . I'm getting around 4k inserts per second in
> hbase, but I haven't yet tried to remove or reset the mRPP.  I will
> attach a screenshot of the UI tab. It shows significantly lower
> figures for processing and delay times, than the previous posted shot.
> It also shows the batches as empty, however I see the requests hitting
> hbase.
>
> Then it's possible my issues were related to running on the Spark
> 1.5.2 cluster. Also is the missing event count in the completed
> batches a bug? Should I file an issue?
>
> On Tue, Jun 21, 2016 at 9:04 PM, Colin Kincaid Williams <discord@uw.edu> wrote:
>> Thanks @Cody, I will try that out. In the interm, I tried to validate
>> my Hbase cluster by running a random write test and see 30-40K writes
>> per second. This suggests there is noticeable room for improvement.
>>
>> On Tue, Jun 21, 2016 at 8:32 PM, Cody Koeninger <cody@koeninger.org> wrote:
>>> Take HBase out of the equation and just measure what your read
>>> performance is by doing something like
>>>
>>> createDirectStream(...).foreach(_.println)
>>>
>>> not take() or print()
>>>
>>> On Tue, Jun 21, 2016 at 3:19 PM, Colin Kincaid Williams <discord@uw.edu>
wrote:
>>>> @Cody I was able to bring my processing time down to a second by
>>>> setting maxRatePerPartition as discussed. My bad that I didn't
>>>> recognize it as the cause of my scheduling delay.
>>>>
>>>> Since then I've tried experimenting with a larger Spark Context
>>>> duration. I've been trying to get some noticeable improvement
>>>> inserting messages from Kafka -> Hbase using the above application.
>>>> I'm currently getting around 3500 inserts / second on a 9 node hbase
>>>> cluster. So far, I haven't been able to get much more throughput. Then
>>>> I'm looking for advice here how I should tune Kafka and Spark for this
>>>> job.
>>>>
>>>> I can create a kafka topic with as many partitions that I want. I can
>>>> set the Duration and maxRatePerPartition. I have 1.7 billion messages
>>>> that I can insert rather quickly into the Kafka queue, and I'd like to
>>>> get them into Hbase as quickly as possible.
>>>>
>>>> I'm looking for advice regarding # Kafka Topic Partitions / Streaming
>>>> Duration / maxRatePerPartition / any other spark settings or code
>>>> changes that I should make to try to get a better consumption rate.
>>>>
>>>> Thanks for all the help so far, this is the first Spark application I
>>>> have written.
>>>>
>>>> On Mon, Jun 20, 2016 at 12:32 PM, Colin Kincaid Williams <discord@uw.edu>
wrote:
>>>>> I'll try dropping the maxRatePerPartition=400, or maybe even lower.
>>>>> However even at application starts up I have this large scheduling
>>>>> delay. I will report my progress later on.
>>>>>
>>>>> On Mon, Jun 20, 2016 at 2:12 PM, Cody Koeninger <cody@koeninger.org>
wrote:
>>>>>> If your batch time is 1 second and your average processing time is
>>>>>> 1.16 seconds, you're always going to be falling behind.  That would
>>>>>> explain why you've built up an hour of scheduling delay after eight
>>>>>> hours of running.
>>>>>>
>>>>>> On Sat, Jun 18, 2016 at 4:40 PM, Colin Kincaid Williams <discord@uw.edu>
wrote:
>>>>>>> Hi Mich again,
>>>>>>>
>>>>>>> Regarding batch window, etc. I have provided the sources, but
I'm not
>>>>>>> currently calling the window function. Did you see the program
source?
>>>>>>> It's only 100 lines.
>>>>>>>
>>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>
>>>>>>> Then I would expect I'm using defaults, other than what has been
shown
>>>>>>> in the configuration.
>>>>>>>
>>>>>>> For example:
>>>>>>>
>>>>>>> In the launcher configuration I set --conf
>>>>>>> spark.streaming.kafka.maxRatePerPartition=500 \ and I believe
there
>>>>>>> are 500 messages for the duration set in the application:
>>>>>>> JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
>>>>>>> Duration(1000));
>>>>>>>
>>>>>>>
>>>>>>> Then with the --num-executors 6 \ submit flag, and the
>>>>>>> spark.streaming.kafka.maxRatePerPartition=500 I think that's
how we
>>>>>>> arrive at the 3000 events per batch in the UI, pasted above.
>>>>>>>
>>>>>>> Feel free to correct me if I'm wrong.
>>>>>>>
>>>>>>> Then are you suggesting that I set the window?
>>>>>>>
>>>>>>> Maybe following this as reference:
>>>>>>>
>>>>>>> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html
>>>>>>>
>>>>>>> On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh
>>>>>>> <mich.talebzadeh@gmail.com> wrote:
>>>>>>>> Ok
>>>>>>>>
>>>>>>>> What is the set up for these please?
>>>>>>>>
>>>>>>>> batch window
>>>>>>>> window length
>>>>>>>> sliding interval
>>>>>>>>
>>>>>>>> And also in each batch window how much data do you get in
(no of messages in
>>>>>>>> the topic whatever)?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> LinkedIn
>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 18 June 2016 at 21:01, Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:
>>>>>>>>>
>>>>>>>>> I believe you have an issue with performance?
>>>>>>>>>
>>>>>>>>> have you checked spark GUI (default 4040) for details
including shuffles
>>>>>>>>> etc?
>>>>>>>>>
>>>>>>>>> HTH
>>>>>>>>>
>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> LinkedIn
>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 18 June 2016 at 20:59, Colin Kincaid Williams <discord@uw.edu>
wrote:
>>>>>>>>>>
>>>>>>>>>> There are 25 nodes in the spark cluster.
>>>>>>>>>>
>>>>>>>>>> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
>>>>>>>>>> <mich.talebzadeh@gmail.com> wrote:
>>>>>>>>>> > how many nodes are in your cluster?
>>>>>>>>>> >
>>>>>>>>>> > --num-executors 6 \
>>>>>>>>>> >  --driver-memory 4G \
>>>>>>>>>> >  --executor-memory 2G \
>>>>>>>>>> >  --total-executor-cores 12 \
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > Dr Mich Talebzadeh
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > LinkedIn
>>>>>>>>>> >
>>>>>>>>>> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > http://talebzadehmich.wordpress.com
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > On 18 June 2016 at 20:40, Colin Kincaid Williams
<discord@uw.edu>
>>>>>>>>>> > wrote:
>>>>>>>>>> >>
>>>>>>>>>> >> I updated my app to Spark 1.5.2 streaming
so that it consumes from
>>>>>>>>>> >> Kafka using the direct api and inserts content
into an hbase cluster,
>>>>>>>>>> >> as described in this thread. I was away
from this project for awhile
>>>>>>>>>> >> due to events in my family.
>>>>>>>>>> >>
>>>>>>>>>> >> Currently my scheduling delay is high, but
the processing time is
>>>>>>>>>> >> stable around a second. I changed my setup
to use 6 kafka partitions
>>>>>>>>>> >> on a set of smaller kafka brokers, with
fewer disks. I've included
>>>>>>>>>> >> some details below, including the script
I use to launch the
>>>>>>>>>> >> application. I'm using a Spark on Hbase
library, whose version is
>>>>>>>>>> >> relevant to my Hbase cluster. Is it apparent
there is something wrong
>>>>>>>>>> >> with my launch method that could be causing
the delay, related to the
>>>>>>>>>> >> included jars?
>>>>>>>>>> >>
>>>>>>>>>> >> Or is there something wrong with the very
simple approach I'm taking
>>>>>>>>>> >> for the application?
>>>>>>>>>> >>
>>>>>>>>>> >> Any advice is appriciated.
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >> The application:
>>>>>>>>>> >>
>>>>>>>>>> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >> From the streaming UI I get something like:
>>>>>>>>>> >>
>>>>>>>>>> >> table Completed Batches (last 1000 out of
27136)
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >> Batch Time Input Size Scheduling Delay (?)
Processing Time (?) Total
>>>>>>>>>> >> Delay (?) Output Ops: Succeeded/Total
>>>>>>>>>> >>
>>>>>>>>>> >> 2016/06/18 11:21:32 3000 events 1.2 h 1
s 1.2 h 1/1
>>>>>>>>>> >>
>>>>>>>>>> >> 2016/06/18 11:21:31 3000 events 1.2 h 1
s 1.2 h 1/1
>>>>>>>>>> >>
>>>>>>>>>> >> 2016/06/18 11:21:30 3000 events 1.2 h 1
s 1.2 h 1/1
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >> Here's how I'm launching the spark application.
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >> #!/usr/bin/env bash
>>>>>>>>>> >>
>>>>>>>>>> >> export SPARK_CONF_DIR=/home/colin.williams/spark
>>>>>>>>>> >>
>>>>>>>>>> >> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>>>>>>>>> >>
>>>>>>>>>> >> export
>>>>>>>>>> >>
>>>>>>>>>> >> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit
\
>>>>>>>>>> >>
>>>>>>>>>> >> --class com.example.KafkaToHbase \
>>>>>>>>>> >>
>>>>>>>>>> >> --master spark://spark_master:7077 \
>>>>>>>>>> >>
>>>>>>>>>> >> --deploy-mode client \
>>>>>>>>>> >>
>>>>>>>>>> >> --num-executors 6 \
>>>>>>>>>> >>
>>>>>>>>>> >> --driver-memory 4G \
>>>>>>>>>> >>
>>>>>>>>>> >> --executor-memory 2G \
>>>>>>>>>> >>
>>>>>>>>>> >> --total-executor-cores 12 \
>>>>>>>>>> >>
>>>>>>>>>> >> --jars
>>>>>>>>>> >>
>>>>>>>>>> >> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
>>>>>>>>>> >> \
>>>>>>>>>> >>
>>>>>>>>>> >> --conf spark.app.name="Kafka To Hbase" \
>>>>>>>>>> >>
>>>>>>>>>> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory"
\
>>>>>>>>>> >>
>>>>>>>>>> >> --conf spark.eventLog.enabled=false \
>>>>>>>>>> >>
>>>>>>>>>> >> --conf spark.eventLog.overwrite=true \
>>>>>>>>>> >>
>>>>>>>>>> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
\
>>>>>>>>>> >>
>>>>>>>>>> >> --conf spark.streaming.backpressure.enabled=false
\
>>>>>>>>>> >>
>>>>>>>>>> >> --conf spark.streaming.kafka.maxRatePerPartition=500
\
>>>>>>>>>> >>
>>>>>>>>>> >> --driver-class-path /home/colin.williams/kafka-hbase.jar
\
>>>>>>>>>> >>
>>>>>>>>>> >> --driver-java-options
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
>>>>>>>>>> >> \
>>>>>>>>>> >>
>>>>>>>>>> >> /home/colin.williams/kafka-hbase.jar "FromTable"
"ToTable"
>>>>>>>>>> >> "broker1:9092,broker2:9092"
>>>>>>>>>> >>
>>>>>>>>>> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid
Williams
>>>>>>>>>> >> <discord@uw.edu>
>>>>>>>>>> >> wrote:
>>>>>>>>>> >> > Thanks Cody, I can see that the partitions
are well distributed...
>>>>>>>>>> >> > Then I'm in the process of using the
direct api.
>>>>>>>>>> >> >
>>>>>>>>>> >> > On Tue, May 3, 2016 at 6:51 PM, Cody
Koeninger <cody@koeninger.org>
>>>>>>>>>> >> > wrote:
>>>>>>>>>> >> >> 60 partitions in and of itself
shouldn't be a big performance issue
>>>>>>>>>> >> >> (as long as producers are distributing
across partitions evenly).
>>>>>>>>>> >> >>
>>>>>>>>>> >> >> On Tue, May 3, 2016 at 1:44 PM,
Colin Kincaid Williams
>>>>>>>>>> >> >> <discord@uw.edu>
>>>>>>>>>> >> >> wrote:
>>>>>>>>>> >> >>> Thanks again Cody. Regarding
the details 66 kafka partitions on 3
>>>>>>>>>> >> >>> kafka servers, likely 8 core
systems with 10 disks each. Maybe the
>>>>>>>>>> >> >>> issue with the receiver was
the large number of partitions. I had
>>>>>>>>>> >> >>> miscounted the disks and so
11*3*2 is how I decided to partition
>>>>>>>>>> >> >>> my
>>>>>>>>>> >> >>> topic on insertion, ( by my
own, unjustified reasoning, on a first
>>>>>>>>>> >> >>> attempt ) . This worked well
enough for me, I put 1.7 billion
>>>>>>>>>> >> >>> entries
>>>>>>>>>> >> >>> into Kafka on a map reduce
job in 5 and a half hours.
>>>>>>>>>> >> >>>
>>>>>>>>>> >> >>> I was concerned using spark
1.5.2 because I'm currently putting my
>>>>>>>>>> >> >>> data into a CDH 5.3 HDFS cluster,
using hbase-spark .98 library
>>>>>>>>>> >> >>> jars
>>>>>>>>>> >> >>> built for spark 1.2 on CDH
5.3. But after debugging quite a bit
>>>>>>>>>> >> >>> yesterday, I tried building
against 1.5.2. So far it's running
>>>>>>>>>> >> >>> without
>>>>>>>>>> >> >>> issue on a Spark 1.5.2 cluster.
I'm not sure there was too much
>>>>>>>>>> >> >>> improvement using the same
code, but I'll see how the direct api
>>>>>>>>>> >> >>> handles it. In the end I can
reduce the number of partitions in
>>>>>>>>>> >> >>> Kafka
>>>>>>>>>> >> >>> if it causes big performance
issues.
>>>>>>>>>> >> >>>
>>>>>>>>>> >> >>> On Tue, May 3, 2016 at 4:08
AM, Cody Koeninger
>>>>>>>>>> >> >>> <cody@koeninger.org>
>>>>>>>>>> >> >>> wrote:
>>>>>>>>>> >> >>>> print() isn't really the
best way to benchmark things, since it
>>>>>>>>>> >> >>>> calls
>>>>>>>>>> >> >>>> take(10) under the covers,
but 380 records / second for a single
>>>>>>>>>> >> >>>> receiver doesn't sound
right in any case.
>>>>>>>>>> >> >>>>
>>>>>>>>>> >> >>>> Am I understanding correctly
that you're trying to process a
>>>>>>>>>> >> >>>> large
>>>>>>>>>> >> >>>> number of already-existing
kafka messages, not keep up with an
>>>>>>>>>> >> >>>> incoming stream?  Can you
give any details (e.g. hardware, number
>>>>>>>>>> >> >>>> of
>>>>>>>>>> >> >>>> topicpartitions, etc)?
>>>>>>>>>> >> >>>>
>>>>>>>>>> >> >>>> Really though, I'd try
to start with spark 1.6 and direct
>>>>>>>>>> >> >>>> streams, or
>>>>>>>>>> >> >>>> even just kafkacat, as
a baseline.
>>>>>>>>>> >> >>>>
>>>>>>>>>> >> >>>>
>>>>>>>>>> >> >>>>
>>>>>>>>>> >> >>>> On Mon, May 2, 2016 at
7:01 PM, Colin Kincaid Williams
>>>>>>>>>> >> >>>> <discord@uw.edu>
wrote:
>>>>>>>>>> >> >>>>> Hello again. I searched
for "backport kafka" in the list
>>>>>>>>>> >> >>>>> archives
>>>>>>>>>> >> >>>>> but
>>>>>>>>>> >> >>>>> couldn't find anything
but a post from Spark 0.7.2 . I was going
>>>>>>>>>> >> >>>>> to
>>>>>>>>>> >> >>>>> use accumulators to
make a counter, but then saw on the
>>>>>>>>>> >> >>>>> Streaming
>>>>>>>>>> >> >>>>> tab
>>>>>>>>>> >> >>>>> the Receiver Statistics.
Then I removed all other
>>>>>>>>>> >> >>>>> "functionality"
>>>>>>>>>> >> >>>>> except:
>>>>>>>>>> >> >>>>>
>>>>>>>>>> >> >>>>>
>>>>>>>>>> >> >>>>>     JavaPairReceiverInputDStream<byte[],
byte[]> dstream =
>>>>>>>>>> >> >>>>> KafkaUtils
>>>>>>>>>> >> >>>>>       //createStream(JavaStreamingContext
jssc,Class<K>
>>>>>>>>>> >> >>>>> keyTypeClass,Class<V>
valueTypeClass, Class<U> keyDecoderClass,
>>>>>>>>>> >> >>>>> Class<T> valueDecoderClass,
java.util.Map<String,String>
>>>>>>>>>> >> >>>>> kafkaParams,
>>>>>>>>>> >> >>>>> java.util.Map<String,Integer>
topics, StorageLevel storageLevel)
>>>>>>>>>> >> >>>>>       .createStream(jssc,
byte[].class, byte[].class,
>>>>>>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class,
>>>>>>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class,
kafkaParamsMap, topicMap,
>>>>>>>>>> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
>>>>>>>>>> >> >>>>>
>>>>>>>>>> >> >>>>>        dstream.print();
>>>>>>>>>> >> >>>>>
>>>>>>>>>> >> >>>>> Then in the Recieiver
Stats for the single receiver, I'm seeing
>>>>>>>>>> >> >>>>> around
>>>>>>>>>> >> >>>>> 380 records / second.
Then to get anywhere near my 10% mentioned
>>>>>>>>>> >> >>>>> above, I'd need to
run around 21 receivers, assuming 380 records
>>>>>>>>>> >> >>>>> /
>>>>>>>>>> >> >>>>> second, just using
the print output. This seems awfully high to
>>>>>>>>>> >> >>>>> me,
>>>>>>>>>> >> >>>>> considering that I
wrote 80000+ records a second to Kafka from a
>>>>>>>>>> >> >>>>> mapreduce job, and
that my bottleneck was likely Hbase. Again
>>>>>>>>>> >> >>>>> using
>>>>>>>>>> >> >>>>> the 380 estimate, I
would need 200+ receivers to reach a similar
>>>>>>>>>> >> >>>>> amount of reads.
>>>>>>>>>> >> >>>>>
>>>>>>>>>> >> >>>>> Even given the issues
with the 1.2 receivers, is this the
>>>>>>>>>> >> >>>>> expected
>>>>>>>>>> >> >>>>> way
>>>>>>>>>> >> >>>>> to use the Kafka streaming
API, or am I doing something terribly
>>>>>>>>>> >> >>>>> wrong?
>>>>>>>>>> >> >>>>>
>>>>>>>>>> >> >>>>> My application looks
like
>>>>>>>>>> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>>>> >> >>>>>
>>>>>>>>>> >> >>>>> On Mon, May 2, 2016
at 6:09 PM, Cody Koeninger
>>>>>>>>>> >> >>>>> <cody@koeninger.org>
>>>>>>>>>> >> >>>>> wrote:
>>>>>>>>>> >> >>>>>> Have you tested
for read throughput (without writing to hbase,
>>>>>>>>>> >> >>>>>> just
>>>>>>>>>> >> >>>>>> deserialize)?
>>>>>>>>>> >> >>>>>>
>>>>>>>>>> >> >>>>>> Are you limited
to using spark 1.2, or is upgrading possible?
>>>>>>>>>> >> >>>>>> The
>>>>>>>>>> >> >>>>>> kafka direct stream
is available starting with 1.3.  If you're
>>>>>>>>>> >> >>>>>> stuck
>>>>>>>>>> >> >>>>>> on 1.2, I believe
there have been some attempts to backport it,
>>>>>>>>>> >> >>>>>> search
>>>>>>>>>> >> >>>>>> the mailing list
archives.
>>>>>>>>>> >> >>>>>>
>>>>>>>>>> >> >>>>>> On Mon, May 2,
2016 at 12:54 PM, Colin Kincaid Williams
>>>>>>>>>> >> >>>>>> <discord@uw.edu>
wrote:
>>>>>>>>>> >> >>>>>>> I've written
an application to get content from a kafka topic
>>>>>>>>>> >> >>>>>>> with
>>>>>>>>>> >> >>>>>>> 1.7
>>>>>>>>>> >> >>>>>>> billion entries,
 get the protobuf serialized entries, and
>>>>>>>>>> >> >>>>>>> insert
>>>>>>>>>> >> >>>>>>> into
>>>>>>>>>> >> >>>>>>> hbase. Currently
the environment that I'm running in is Spark
>>>>>>>>>> >> >>>>>>> 1.2.
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>> With 8 executors
and 2 cores, and 2 jobs, I'm only getting
>>>>>>>>>> >> >>>>>>> between
>>>>>>>>>> >> >>>>>>> 0-2500 writes
/ second. This will take much too long to
>>>>>>>>>> >> >>>>>>> consume
>>>>>>>>>> >> >>>>>>> the
>>>>>>>>>> >> >>>>>>> entries.
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>> I currently
believe that the spark kafka receiver is the
>>>>>>>>>> >> >>>>>>> bottleneck.
>>>>>>>>>> >> >>>>>>> I've tried
both 1.2 receivers, with the WAL and without, and
>>>>>>>>>> >> >>>>>>> didn't
>>>>>>>>>> >> >>>>>>> notice any
large performance difference. I've tried many
>>>>>>>>>> >> >>>>>>> different
>>>>>>>>>> >> >>>>>>> spark configuration
options, but can't seem to get better
>>>>>>>>>> >> >>>>>>> performance.
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>> I saw 80000
requests / second inserting these records into
>>>>>>>>>> >> >>>>>>> kafka
>>>>>>>>>> >> >>>>>>> using
>>>>>>>>>> >> >>>>>>> yarn / hbase
/ protobuf / kafka in a bulk fashion.
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>> While hbase
inserts might not deliver the same throughput, I'd
>>>>>>>>>> >> >>>>>>> like to
>>>>>>>>>> >> >>>>>>> at least get
10%.
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>> My application
looks like
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>> This is my
first spark application. I'd appreciate any
>>>>>>>>>> >> >>>>>>> assistance.
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>> ---------------------------------------------------------------------
>>>>>>>>>> >> >>>>>>> To unsubscribe,
e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>> >> >>>>>>> For additional
commands, e-mail: user-help@spark.apache.org
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>
>>>>>>>>>> >> >>>>
>>>>>>>>>> >> >>>> ---------------------------------------------------------------------
>>>>>>>>>> >> >>>> To unsubscribe, e-mail:
user-unsubscribe@spark.apache.org
>>>>>>>>>> >> >>>> For additional commands,
e-mail: user-help@spark.apache.org
>>>>>>>>>> >> >>>>
>>>>>>>>>> >>
>>>>>>>>>> >> ---------------------------------------------------------------------
>>>>>>>>>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>> >> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>> >>
>>>>>>>>>> >
>>>>>>>>>
>>>>>>>>>
>>>>>>>>

Mime
View raw message