kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From francois.garil...@typesafe.com
Subject Re: KafkaUtils not consuming all the data from all partitions
Date Wed, 07 Jan 2015 15:43:15 GMT
- You are launching up to 10 threads/topic per Receiver. Are you sure your receivers can support
10 threads each ? (i.e. in the default configuration, do they have 10 cores). If they have
2 cores, that would explain why this works with 20 partitions or less.




- If you have 90 partitions, why start 10 Streams, each consuming 10 partitions, and then
removing the stream at index 0 ? Why not simply start 10 streams with 9 partitions ? Or, more
simply,




val kafkaStreams = (1 to numPartitions).map { _ =>
 KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic -> 1),
 StorageLevel.MEMORY_ONLY_SER)




- You’re consuming up to 10 local threads *per topic*, on each of your 10 receivers. That’s
a lot of threads (10* size of kafkaTopicsList) co-located on a single machine. You mentioned
having a single Kafka topic with 90 partitions. Why not have a single-element topicMap ?


—
FG

On Wed, Jan 7, 2015 at 4:05 PM, Mukesh Jha <me.mukesh.jha@gmail.com>
wrote:

> I understand that I've to create 10 parallel streams. My code is running
> fine when the no of partitions is ~20, but when I increase the no of
> partitions I keep getting in this issue.
> Below is my code to create kafka streams, along with the configs used.
>     Map<String, String> kafkaConf = new HashMap<String, String>();
>     kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
>     kafkaConf.put("group.id", kafkaConsumerGroup);
>     kafkaConf.put("consumer.timeout.ms", "30000");
>     kafkaConf.put("auto.offset.reset", "largest");
>     kafkaConf.put("fetch.message.max.bytes", "20000000");
>     kafkaConf.put("zookeeper.session.timeout.ms", "6000");
>     kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
>     kafkaConf.put("zookeeper.sync.time.ms", "2000");
>     kafkaConf.put("rebalance.backoff.ms", "10000");
>     kafkaConf.put("rebalance.max.retries", "20");
>     String[] topics = kafkaTopicsList;
>     int numStreams = numKafkaThreads; // this is *10*
>     Map<String, Integer> topicMap = new HashMap<>();
>     for (String topic: topics) {
>       topicMap.put(topic, numStreams);
>     }
>     List<JavaPairDStream<byte[], byte[]>> kafkaStreams = new
> ArrayList<>(numStreams);
>     for (int i = 0; i < numStreams; i++) {
>       kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class,
> byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf,
> topicMap, StorageLevel.MEMORY_ONLY_SER()));
>     }
>     JavaPairDStream<byte[], byte[]> ks = sc.union(kafkaStreams.remove(0),
> kafkaStreams);
> On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas <gerard.maas@gmail.com> wrote:
>> Hi,
>>
>> Could you add the code where you create the Kafka consumer?
>>
>> -kr, Gerard.
>>
>> On Wed, Jan 7, 2015 at 3:43 PM, <francois.garillot@typesafe.com> wrote:
>>
>>> Hi Mukesh,
>>>
>>> If my understanding is correct, each Stream only has a single Receiver.
>>> So, if you have each receiver consuming 9 partitions, you need 10 input
>>> DStreams to create 10 concurrent receivers:
>>>
>>>
>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
>>>
>>> Would you mind sharing a bit more on how you achieve this ?
>>>
>>> --
>>> FG
>>>
>>>
>>> On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha <me.mukesh.jha@gmail.com>
>>> wrote:
>>>
>>>> Hi Guys,
>>>>
>>>> I have a kafka topic having 90 partitions and I running
>>>> SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
>>>> kafka-receivers.
>>>>
>>>> My streaming is running fine and there is no delay in processing, just
>>>> that some partitions data is never getting picked up. From the kafka
>>>> console I can see that each receiver is consuming data from 9 partitions
>>>> but the lag for some offsets keeps on increasing.
>>>>
>>>> Below is my kafka-consumers parameters.
>>>>
>>>> Any of you have face this kind of issue, if so then do you have any
>>>> pointers to fix it?
>>>>
>>>>  Map<String, String> kafkaConf = new HashMap<String, String>();
>>>>  kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
>>>>  kafkaConf.put("group.id", kafkaConsumerGroup);
>>>>  kafkaConf.put("consumer.timeout.ms", "30000");
>>>>  kafkaConf.put("auto.offset.reset", "largest");
>>>>  kafkaConf.put("fetch.message.max.bytes", "20000000");
>>>>  kafkaConf.put("zookeeper.session.timeout.ms", "6000");
>>>>  kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
>>>>  kafkaConf.put("zookeeper.sync.time.ms", "2000");
>>>>  kafkaConf.put("rebalance.backoff.ms", "10000");
>>>>  kafkaConf.put("rebalance.max.retries", "20");
>>>>
>>>> --
>>>> Thanks & Regards,
>>>>
>>>> Mukesh Jha <me.mukesh.jha@gmail.com>
>>>>
>>>
>>>
>>
> -- 
> Thanks & Regards,
> *Mukesh Jha <me.mukesh.jha@gmail.com>*
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message