spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: Spark streaming from Kafka best fit
Date Tue, 01 Mar 2016 18:04:10 GMT
You don't need an equal number of executor cores to partitions.  An
executor can and will work on multiple partitions within a batch, one after
the other.  The real issue is whether you are able to keep your processing
time under your batch time, so that delay doesn't increase.

On Tue, Mar 1, 2016 at 11:59 AM, Jatin Kumar <jkumar@rocketfuelinc.com>
wrote:

> Thanks Cody!
>
> I understand what you said and if I am correct it will be using 224
> executor cores just for fetching + stage-1 processing of 224 partitions. I
> will obviously need more cores for processing further stages and fetching
> next batch.
>
> I will start with higher number of executor cores and see how it goes.
>
> --
> Thanks
> Jatin Kumar | Rocket Scientist
> +91-7696741743 m
>
> On Tue, Mar 1, 2016 at 9:07 PM, Cody Koeninger <cody@koeninger.org> wrote:
>
>> > "How do I keep a balance of executors which receive data from Kafka
>> and which process data"
>>
>> I think you're misunderstanding how the direct stream works.  The
>> executor which receives data is also the executor which processes data,
>> there aren't separate receivers.  If it's a single stage worth of work
>> (e.g. straight map / filter), the processing of a given partition is going
>> to be done by the executor that read it from kafka.  If you do something
>> involving a shuffle (e.g. reduceByKey), other executors will do additional
>> processing.  The question of which executor works on which tasks is up to
>> the scheduler (and getPreferredLocations, which only matters if you're
>> running spark on the same nodes as kafka)
>>
>> On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
>> jkumar@rocketfuelinc.com.invalid> wrote:
>>
>>> Hello all,
>>>
>>> I see that there are as of today 3 ways one can read from Kafka in spark
>>> streaming:
>>> 1. KafkaUtils.createStream() (here
>>> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>)
>>> 2. KafkaUtils.createDirectStream() (here
>>> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>)
>>> 3. Kafka-spark-consumer (here
>>> <https://github.com/dibbhatt/kafka-spark-consumer>)
>>>
>>> My spark streaming application has to read from 1 kafka topic with
>>> around 224 partitions, consuming data at around 150MB/s (~90,000
>>> messages/sec) which reduces to around 3MB/s (~1400 messages/sec) after
>>> filtering. After filtering I need to maintain top 10000 URL counts. I don't
>>> really care about exactly once semantics as I am interested in rough
>>> estimate.
>>>
>>> Code:
>>>
>>> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
>>> sparkConf.setAppName("KafkaReader")
>>> val ssc = StreamingContext.getOrCreate(kCheckPointDir, createStreamingContext)
>>>
>>> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>>> val kafkaParams = Map[String, String](
>>>   "metadata.broker.list" -> "kafka.server.ip:9092",
>>>   "group.id" -> consumer_group
>>> )
>>>
>>> val lineStreams = (1 to N).map{ _ =>
>>>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
>>>     ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
>>> }
>>>
>>> ssc.union(
>>>   lineStreams.map(stream => {
>>>   stream.map(ParseStringToLogRecord)
>>>     .filter(record => isGoodRecord(record))
>>>     .map(record => record.url)
>>>   })
>>> ).window(Seconds(120), Seconds(120))  // 2 Minute window
>>>   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute moving
window, 28 will probably help in parallelism
>>>   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
>>>   .mapPartitions(iter => {
>>>     iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0,
1000).iterator
>>>   }, true)
>>>   .foreachRDD((latestRDD, rddTime) => {
>>>       printTopFromRDD(rddTime, latestRDD.map(record => (record._2, record._1)).sortByKey(false).take(1000))
>>>   })
>>>
>>> ssc.start()
>>> ssc.awaitTermination()
>>>
>>> Questions:
>>>
>>> a) I used #2 but I found that I couldn't control how many executors will
>>> be actually fetching from Kafka. How do I keep a balance of executors which
>>> receive data from Kafka and which process data? Do they keep changing for
>>> every batch?
>>>
>>> b) Now I am trying to use #1 creating multiple DStreams, filtering them
>>> and then doing a union. I don't understand why would the number of events
>>> processed per 120 seconds batch will change drastically. PFA the events/sec
>>> graph while running with 1 receiver. How to debug this?
>>>
>>> c) What will be the most suitable method to integrate with Kafka from
>>> above 3? Any recommendations for getting maximum performance, running the
>>> streaming application reliably in production environment?
>>>
>>> --
>>> Thanks
>>> Jatin Kumar
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>
>>
>

Mime
View raw message