kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gerard Maas <gerard.m...@gmail.com>
Subject Re: KafkaUtils not consuming all the data from all partitions
Date Wed, 07 Jan 2015 14:51:13 GMT
Hi,

Could you add the code where you create the Kafka consumer?

-kr, Gerard.

On Wed, Jan 7, 2015 at 3:43 PM, <francois.garillot@typesafe.com> wrote:

> Hi Mukesh,
>
> If my understanding is correct, each Stream only has a single Receiver.
> So, if you have each receiver consuming 9 partitions, you need 10 input
> DStreams to create 10 concurrent receivers:
>
>
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
>
> Would you mind sharing a bit more on how you achieve this ?
>
> —
> FG
>
>
> On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha <me.mukesh.jha@gmail.com>
> wrote:
>
>> Hi Guys,
>>
>> I have a kafka topic having 90 partitions and I running
>> SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
>> kafka-receivers.
>>
>> My streaming is running fine and there is no delay in processing, just
>> that some partitions data is never getting picked up. From the kafka
>> console I can see that each receiver is consuming data from 9 partitions
>> but the lag for some offsets keeps on increasing.
>>
>> Below is my kafka-consumers parameters.
>>
>> Any of you have face this kind of issue, if so then do you have any
>> pointers to fix it?
>>
>>  Map<String, String> kafkaConf = new HashMap<String, String>();
>>  kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
>>  kafkaConf.put("group.id", kafkaConsumerGroup);
>>  kafkaConf.put("consumer.timeout.ms", "30000");
>>  kafkaConf.put("auto.offset.reset", "largest");
>>  kafkaConf.put("fetch.message.max.bytes", "20000000");
>>  kafkaConf.put("zookeeper.session.timeout.ms", "6000");
>>  kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
>>  kafkaConf.put("zookeeper.sync.time.ms", "2000");
>>  kafkaConf.put("rebalance.backoff.ms", "10000");
>>  kafkaConf.put("rebalance.max.retries", "20");
>>
>> --
>> Thanks & Regards,
>>
>> Mukesh Jha <me.mukesh.jha@gmail.com>
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message