kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Liel Shraga (lshraga)" <lshr...@cisco.com>
Subject RE: Round Robin for several consumers in KAFKA
Date Wed, 13 Sep 2017 08:55:42 GMT
Hi,

I didn’t define the partition size. How can I do it with kafka-clients API?

Thanks,


 


Liel Shraga
ENGINEER.SOFTWARE ENGINEERING
lshraga@cisco.com
Tel: +972 2 588 6394
Cisco Systems, Inc.
32 HaMelacha St., (HaSharon Bldg) P.O.Box 8735, I.Z.Sapir
SOUTH NETANYA
42504
Israel
cisco.com


Think before you print.
This email may contain confidential and privileged material for the sole use of the intended
recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If
you are not the intended recipient (or authorized to receive for the recipient), please contact
the sender by reply email and delete all copies of this message.
Please click here for Company Registration Information.



-----Original Message-----
From: Manikumar [mailto:manikumar.reddy@gmail.com] 
Sent: Wednesday, September 13, 2017 11:47 AM
To: users@kafka.apache.org
Subject: Re: Round Robin for several consumers in KAFKA

what is the partition size? you need at least 2 partitions to distribute across two consumers


On Wed, Sep 13, 2017 at 1:24 PM, Liel Shraga (lshraga) <lshraga@cisco.com>
wrote:

> Hi,
>
>
>
> I have 5 separate docker images : 1 for kafka broker, 1 zookeeper , 1 
> producer and 2 consumers.
>
> I publish messages to the topic via the producer.
>
> Basically, I would likw that the messages will be published in a round 
> robin algorithm,
>
> so for that purpose I defined the consumers with the same group.id and 
> added config of partition.assignment.strategy to be 
> org.apache.kafka.clients.consumer.RoundRobinAssignor,
>
> but actually only 1 consumer receive all the messages.
>
> *My Producer Code: *
>
> *public* *class* DiscoveryKafkaProducer{
>
>    Producer<String, String> producer;
>
>
>
>    *public* DiscoveryKafkaProducer(Properties configs) {
>
>        producer = *new* KafkaProducer<String, String>(configs);
>
>    }
>
>    *public* *void* send(String topic, List<String> records) {
>
>          *for*(String record: records){
>
>                producer.send(*new* ProducerRecord<String, 
> String>(topic, record));
>
>          }
>
>        producer.flush();
>
>    }
>
>
>
>    *public* *void* close() *throws* Exception {
>
>        producer.close();
>
>    }
>
> }
>
>
>
> *My Consumer Code:*
>
> *public* *static* *void* main(String[] args) {
>
>             String server = "lshraga-ubuntu-sp-nac:9092";
>
>             Properties consumerConfigs = *new* Properties();
>
>             consumerConfigs.put("bootstrap.servers", server);
>
>             consumerConfigs.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
>             consumerConfigs.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
>             consumerConfigs.put("group.id", "discovery");
>
>             consumerConfigs.put("client.id", "discovery");
>
>             consumerConfigs.put("partition.assignment.strategy",
> "org.apache.kafka.clients.consumer.RoundRobinAssignor");
>
>             List<String> *list* = *new* ArrayList<String>();
>
>             DiscoveryKafkaConsumer consumer1 = *new* 
> DiscoveryKafkaConsumer(Collections.*singletonList*(topicName),
> consumerConfigs);
>
>
>
>             *try* {
>
>                   *while* (*true*) {
>
>                         System.*out*.println("Start to consume");
>
>                       consumer1.poll(1000L);
>
>                   }
>
>             } *catch* (InterruptedException e) {
>
>                   // *TODO* Auto-generated catch block
>
>                   e.printStackTrace();
>
>             }
>
>
>
> *public* *class* DiscoveryKafkaConsumer {
>
>    Consumer<String, String> consumer;
>
>    Integer id;
>
>    *public* DiscoveryKafkaConsumer(List<String> topics, Properties 
> configs) {
>
>        consumer = *new* KafkaConsumer<String, String>(configs);
>
>        consumer.subscribe(topics);
>
>    }
>
>
>
>    *public* DiscoveryKafkaConsumer(*int* i, List<String> topics, 
> Properties configs) {
>
>          consumer = *new* KafkaConsumer<String, String>(configs);
>
>        consumer.subscribe(topics);
>
>        *this*.id = i;
>
> }
>
>
>
>    *public* *void* poll(*long* timeout) *throws* InterruptedException 
> {
>
>        ConsumerRecords<String,String> records = 
> consumer.poll(timeout);
>
>        System.*out*.println("Hey!Consumer #" + id + "got records:" + 
> records);
>
>        Map<String, List<String>> results = *new* HashMap<String, 
> List<String>>();
>
>        records.forEach((cr) -> {
>
>          System.*out*.println("cr.topic()=" + cr.topic());
>
>            List<String> list = results.get(cr.topic());
>
>            *if*(list == *null*) {
>
>                list = *new* ArrayList<>();
>
>                results.put(cr.topic(), list);
>
>                }
>
>                   list.add(cr.value());
>
>                   System.*out*.println("list=" + list);
>
>            });
>
>    }
>
>    *public* *void* close() *throws* Exception {
>
>        consumer.close();
>
>    }
>
> *What I need to add/condig in order to consume the messages in a Round 
> Robin ?*
>
>
>
>
>
> Thanks,
>
>
>
>
>
> [image: banner14]
>
>
>
> *Liel Shraga*
>
> ENGINEER.SOFTWARE ENGINEERING
>
> lshraga@cisco.com
>
> Tel: *+972 2 588 6394*
>
> *Cisco Systems, Inc.*
>
> 32 HaMelacha St., (HaSharon
> <https://maps.google.com/?q=32+HaMelacha+St.,+(HaSharon&entry=gmail&so
> urce=g>
> Bldg) P.O.Box 8735, I.Z.Sapir
> SOUTH NETANYA
> 42504
> Israel
> cisco.com
>
>
>
> [image: 
> http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif]Think
> before you print.
>
> This email may contain confidential and privileged material for the 
> sole use of the intended recipient. Any review, use, distribution or 
> disclosure by others is strictly prohibited. If you are not the 
> intended recipient (or authorized to receive for the recipient), 
> please contact the sender by reply email and delete all copies of this message.
>
> Please click here
> <http://www.cisco.com/web/about/doing_business/legal/cri/index.html> 
> for Company Registration Information.
>
>
>
Mime
View raw message