kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Amir Shahinpour <a...@holisticlabs.net>
Subject Re: Round Robin for several consumers in KAFKA
Date Tue, 19 Sep 2017 06:06:46 GMT
Can you provide your docker file, or compose file?

On Wed, Sep 13, 2017 at 1:55 AM, Liel Shraga (lshraga) <lshraga@cisco.com>
wrote:

> Hi,
>
> I didn’t define the partition size. How can I do it with kafka-clients API?
>
> Thanks,
>
>
>
>
>
> Liel Shraga
> ENGINEER.SOFTWARE ENGINEERING
> lshraga@cisco.com
> Tel: +972 2 588 6394
> Cisco Systems, Inc.
> 32 HaMelacha St., (HaSharon Bldg) P.O.Box 8735, I.Z.Sapir
> SOUTH NETANYA
> 42504
> Israel
> cisco.com
>
>
> Think before you print.
> This email may contain confidential and privileged material for the sole
> use of the intended recipient. Any review, use, distribution or disclosure
> by others is strictly prohibited. If you are not the intended recipient (or
> authorized to receive for the recipient), please contact the sender by
> reply email and delete all copies of this message.
> Please click here for Company Registration Information.
>
>
>
> -----Original Message-----
> From: Manikumar [mailto:manikumar.reddy@gmail.com]
> Sent: Wednesday, September 13, 2017 11:47 AM
> To: users@kafka.apache.org
> Subject: Re: Round Robin for several consumers in KAFKA
>
> what is the partition size? you need at least 2 partitions to distribute
> across two consumers
>
>
> On Wed, Sep 13, 2017 at 1:24 PM, Liel Shraga (lshraga) <lshraga@cisco.com>
> wrote:
>
> > Hi,
> >
> >
> >
> > I have 5 separate docker images : 1 for kafka broker, 1 zookeeper , 1
> > producer and 2 consumers.
> >
> > I publish messages to the topic via the producer.
> >
> > Basically, I would likw that the messages will be published in a round
> > robin algorithm,
> >
> > so for that purpose I defined the consumers with the same group.id and
> > added config of partition.assignment.strategy to be
> > org.apache.kafka.clients.consumer.RoundRobinAssignor,
> >
> > but actually only 1 consumer receive all the messages.
> >
> > *My Producer Code: *
> >
> > *public* *class* DiscoveryKafkaProducer{
> >
> >    Producer<String, String> producer;
> >
> >
> >
> >    *public* DiscoveryKafkaProducer(Properties configs) {
> >
> >        producer = *new* KafkaProducer<String, String>(configs);
> >
> >    }
> >
> >    *public* *void* send(String topic, List<String> records) {
> >
> >          *for*(String record: records){
> >
> >                producer.send(*new* ProducerRecord<String,
> > String>(topic, record));
> >
> >          }
> >
> >        producer.flush();
> >
> >    }
> >
> >
> >
> >    *public* *void* close() *throws* Exception {
> >
> >        producer.close();
> >
> >    }
> >
> > }
> >
> >
> >
> > *My Consumer Code:*
> >
> > *public* *static* *void* main(String[] args) {
> >
> >             String server = "lshraga-ubuntu-sp-nac:9092";
> >
> >             Properties consumerConfigs = *new* Properties();
> >
> >             consumerConfigs.put("bootstrap.servers", server);
> >
> >             consumerConfigs.put("key.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> >
> >             consumerConfigs.put("value.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> >
> >             consumerConfigs.put("group.id", "discovery");
> >
> >             consumerConfigs.put("client.id", "discovery");
> >
> >             consumerConfigs.put("partition.assignment.strategy",
> > "org.apache.kafka.clients.consumer.RoundRobinAssignor");
> >
> >             List<String> *list* = *new* ArrayList<String>();
> >
> >             DiscoveryKafkaConsumer consumer1 = *new*
> > DiscoveryKafkaConsumer(Collections.*singletonList*(topicName),
> > consumerConfigs);
> >
> >
> >
> >             *try* {
> >
> >                   *while* (*true*) {
> >
> >                         System.*out*.println("Start to consume");
> >
> >                       consumer1.poll(1000L);
> >
> >                   }
> >
> >             } *catch* (InterruptedException e) {
> >
> >                   // *TODO* Auto-generated catch block
> >
> >                   e.printStackTrace();
> >
> >             }
> >
> >
> >
> > *public* *class* DiscoveryKafkaConsumer {
> >
> >    Consumer<String, String> consumer;
> >
> >    Integer id;
> >
> >    *public* DiscoveryKafkaConsumer(List<String> topics, Properties
> > configs) {
> >
> >        consumer = *new* KafkaConsumer<String, String>(configs);
> >
> >        consumer.subscribe(topics);
> >
> >    }
> >
> >
> >
> >    *public* DiscoveryKafkaConsumer(*int* i, List<String> topics,
> > Properties configs) {
> >
> >          consumer = *new* KafkaConsumer<String, String>(configs);
> >
> >        consumer.subscribe(topics);
> >
> >        *this*.id = i;
> >
> > }
> >
> >
> >
> >    *public* *void* poll(*long* timeout) *throws* InterruptedException
> > {
> >
> >        ConsumerRecords<String,String> records =
> > consumer.poll(timeout);
> >
> >        System.*out*.println("Hey!Consumer #" + id + "got records:" +
> > records);
> >
> >        Map<String, List<String>> results = *new* HashMap<String,
> > List<String>>();
> >
> >        records.forEach((cr) -> {
> >
> >          System.*out*.println("cr.topic()=" + cr.topic());
> >
> >            List<String> list = results.get(cr.topic());
> >
> >            *if*(list == *null*) {
> >
> >                list = *new* ArrayList<>();
> >
> >                results.put(cr.topic(), list);
> >
> >                }
> >
> >                   list.add(cr.value());
> >
> >                   System.*out*.println("list=" + list);
> >
> >            });
> >
> >    }
> >
> >    *public* *void* close() *throws* Exception {
> >
> >        consumer.close();
> >
> >    }
> >
> > *What I need to add/condig in order to consume the messages in a Round
> > Robin ?*
> >
> >
> >
> >
> >
> > Thanks,
> >
> >
> >
> >
> >
> > [image: banner14]
> >
> >
> >
> > *Liel Shraga*
> >
> > ENGINEER.SOFTWARE ENGINEERING
> >
> > lshraga@cisco.com
> >
> > Tel: *+972 2 588 6394*
> >
> > *Cisco Systems, Inc.*
> >
> > 32 HaMelacha St., (HaSharon
> > <https://maps.google.com/?q=32+HaMelacha+St.,+(HaSharon&entry=gmail&so
> > urce=g>
> > Bldg) P.O.Box 8735, I.Z.Sapir
> > SOUTH NETANYA
> > 42504
> > Israel
> > cisco.com
> >
> >
> >
> > [image:
> > http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif]Think
> > before you print.
> >
> > This email may contain confidential and privileged material for the
> > sole use of the intended recipient. Any review, use, distribution or
> > disclosure by others is strictly prohibited. If you are not the
> > intended recipient (or authorized to receive for the recipient),
> > please contact the sender by reply email and delete all copies of this
> message.
> >
> > Please click here
> > <http://www.cisco.com/web/about/doing_business/legal/cri/index.html>
> > for Company Registration Information.
> >
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message