kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Liel Shraga (lshraga)" <lshr...@cisco.com>
Subject Round Robin for several consumers in KAFKA
Date Wed, 13 Sep 2017 07:54:14 GMT
Hi,

I have 5 separate docker images : 1 for kafka broker, 1 zookeeper , 1 producer and 2 consumers.
I publish messages to the topic via the producer.
Basically, I would likw that the messages will be published in a round robin algorithm,
so for that purpose I defined the consumers with the same group.id and added config of partition.assignment.strategy
to be org.apache.kafka.clients.consumer.RoundRobinAssignor,
but actually only 1 consumer receive all the messages.
My Producer Code:
public class DiscoveryKafkaProducer{
   Producer<String, String> producer;

   public DiscoveryKafkaProducer(Properties configs) {
       producer = new KafkaProducer<String, String>(configs);
   }
   public void send(String topic, List<String> records) {
         for(String record: records){
               producer.send(new ProducerRecord<String, String>(topic, record));
         }
       producer.flush();
   }

   public void close() throws Exception {
       producer.close();
   }
}

My Consumer Code:
public static void main(String[] args) {
            String server = "lshraga-ubuntu-sp-nac:9092";
            Properties consumerConfigs = new Properties();
            consumerConfigs.put("bootstrap.servers", server);
            consumerConfigs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumerConfigs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumerConfigs.put("group.id", "discovery");
            consumerConfigs.put("client.id", "discovery");
            consumerConfigs.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
            List<String> list = new ArrayList<String>();
            DiscoveryKafkaConsumer consumer1 = new DiscoveryKafkaConsumer(Collections.singletonList(topicName),
consumerConfigs);

            try {
                  while (true) {
                        System.out.println("Start to consume");
                      consumer1.poll(1000L);
                  }
            } catch (InterruptedException e) {
                  // TODO Auto-generated catch block
                  e.printStackTrace();
            }

public class DiscoveryKafkaConsumer {
   Consumer<String, String> consumer;
   Integer id;
   public DiscoveryKafkaConsumer(List<String> topics, Properties configs) {
       consumer = new KafkaConsumer<String, String>(configs);
       consumer.subscribe(topics);
   }

   public DiscoveryKafkaConsumer(int i, List<String> topics, Properties configs) {
         consumer = new KafkaConsumer<String, String>(configs);
       consumer.subscribe(topics);
       this.id = i;
}

   public void poll(long timeout) throws InterruptedException {
       ConsumerRecords<String,String> records = consumer.poll(timeout);
       System.out.println("Hey!Consumer #" + id + "got records:" + records);
       Map<String, List<String>> results = new HashMap<String, List<String>>();
       records.forEach((cr) -> {
         System.out.println("cr.topic()=" + cr.topic());
           List<String> list = results.get(cr.topic());
           if(list == null) {
               list = new ArrayList<>();
               results.put(cr.topic(), list);
               }
                  list.add(cr.value());
                  System.out.println("list=" + list);
           });
   }
   public void close() throws Exception {
       consumer.close();
   }
What I need to add/condig in order to consume the messages in a Round Robin ?


Thanks,


[banner14]



Liel Shraga
ENGINEER.SOFTWARE ENGINEERING
lshraga@cisco.com<mailto:lshraga@cisco.com>
Tel: +972 2 588 6394

Cisco Systems, Inc.
32 HaMelacha St., (HaSharon Bldg) P.O.Box 8735, I.Z.Sapir
SOUTH NETANYA
42504
Israel
cisco.com


[http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif]Think before you print.

This email may contain confidential and privileged material for the sole use of the intended
recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If
you are not the intended recipient (or authorized to receive for the recipient), please contact
the sender by reply email and delete all copies of this message.
Please click here<http://www.cisco.com/web/about/doing_business/legal/cri/index.html>
for Company Registration Information.



Mime
  • Unnamed multipart/related (inline, None, 0 bytes)
View raw message