kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Amir Shahinpour <a...@holisticlabs.net>
Subject Re: Round Robin for several consumers in KAFKA
Date Tue, 19 Sep 2017 06:28:15 GMT
Does it mean that the number of brokers changes from time to time?

On Mon, Sep 18, 2017 at 11:10 PM, Liel Shraga (lshraga) <lshraga@cisco.com>
wrote:

> Hi,
>
> My docker compose file is :
>
> version: '2'
> services:
>   zookeeper:
>     image: wurstmeister/zookeeper
>     ports:
>       - "2181:2181"
>   kafka:
>     image: wurstmeister/kafka
>     ports:
>       - "9092:9092"
>     environment:
>       KAFKA_ADVERTISED_HOST_NAME: lshraga-ubuntu-sp-nac
>       KAFKA_ADVERTISED_PORT: 9092
>       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
>       KAFKA_NUM_PARTITIONS: 10
>     volumes:
>       - /var/run/docker.sock:/var/run/docker.sock
>
> Basically, I increased the number of KAFKA_NUM_PARTITIONS to be 10, but I
> need to do it dynamically, since I have several micro services which are
> consumers and they grow and harvest in runtime dynanically.
> Is there a way to change the number of pratiotns dynamically via java code?
>
> Thanks,
>
>
>
>
> Liel Shraga
> ENGINEER.SOFTWARE ENGINEERING
> lshraga@cisco.com
> Tel: +972 2 588 6394
> Cisco Systems, Inc.
> 32 HaMelacha St., (HaSharon Bldg) P.O.Box 8735, I.Z.Sapir
> SOUTH NETANYA
> 42504
> Israel
> cisco.com
>
>
> Think before you print.
> This email may contain confidential and privileged material for the sole
> use of the intended recipient. Any review, use, distribution or disclosure
> by others is strictly prohibited. If you are not the intended recipient (or
> authorized to receive for the recipient), please contact the sender by
> reply email and delete all copies of this message.
> Please click here for Company Registration Information.
>
>
>
> -----Original Message-----
> From: Amir Shahinpour [mailto:amir@holisticlabs.net]
> Sent: Tuesday, September 19, 2017 9:07 AM
> To: users@kafka.apache.org
> Subject: Re: Round Robin for several consumers in KAFKA
>
> Can you provide your docker file, or compose file?
>
> On Wed, Sep 13, 2017 at 1:55 AM, Liel Shraga (lshraga) <lshraga@cisco.com>
> wrote:
>
> > Hi,
> >
> > I didn’t define the partition size. How can I do it with kafka-clients
> API?
> >
> > Thanks,
> >
> >
> >
> >
> >
> > Liel Shraga
> > ENGINEER.SOFTWARE ENGINEERING
> > lshraga@cisco.com
> > Tel: +972 2 588 6394
> > Cisco Systems, Inc.
> > 32 HaMelacha St., (HaSharon Bldg) P.O.Box 8735, I.Z.Sapir SOUTH
> > NETANYA
> > 42504
> > Israel
> > cisco.com
> >
> >
> > Think before you print.
> > This email may contain confidential and privileged material for the
> > sole use of the intended recipient. Any review, use, distribution or
> > disclosure by others is strictly prohibited. If you are not the
> > intended recipient (or authorized to receive for the recipient),
> > please contact the sender by reply email and delete all copies of this
> message.
> > Please click here for Company Registration Information.
> >
> >
> >
> > -----Original Message-----
> > From: Manikumar [mailto:manikumar.reddy@gmail.com]
> > Sent: Wednesday, September 13, 2017 11:47 AM
> > To: users@kafka.apache.org
> > Subject: Re: Round Robin for several consumers in KAFKA
> >
> > what is the partition size? you need at least 2 partitions to
> > distribute across two consumers
> >
> >
> > On Wed, Sep 13, 2017 at 1:24 PM, Liel Shraga (lshraga)
> > <lshraga@cisco.com>
> > wrote:
> >
> > > Hi,
> > >
> > >
> > >
> > > I have 5 separate docker images : 1 for kafka broker, 1 zookeeper ,
> > > 1 producer and 2 consumers.
> > >
> > > I publish messages to the topic via the producer.
> > >
> > > Basically, I would likw that the messages will be published in a
> > > round robin algorithm,
> > >
> > > so for that purpose I defined the consumers with the same group.id
> > > and added config of partition.assignment.strategy to be
> > > org.apache.kafka.clients.consumer.RoundRobinAssignor,
> > >
> > > but actually only 1 consumer receive all the messages.
> > >
> > > *My Producer Code: *
> > >
> > > *public* *class* DiscoveryKafkaProducer{
> > >
> > >    Producer<String, String> producer;
> > >
> > >
> > >
> > >    *public* DiscoveryKafkaProducer(Properties configs) {
> > >
> > >        producer = *new* KafkaProducer<String, String>(configs);
> > >
> > >    }
> > >
> > >    *public* *void* send(String topic, List<String> records) {
> > >
> > >          *for*(String record: records){
> > >
> > >                producer.send(*new* ProducerRecord<String,
> > > String>(topic, record));
> > >
> > >          }
> > >
> > >        producer.flush();
> > >
> > >    }
> > >
> > >
> > >
> > >    *public* *void* close() *throws* Exception {
> > >
> > >        producer.close();
> > >
> > >    }
> > >
> > > }
> > >
> > >
> > >
> > > *My Consumer Code:*
> > >
> > > *public* *static* *void* main(String[] args) {
> > >
> > >             String server = "lshraga-ubuntu-sp-nac:9092";
> > >
> > >             Properties consumerConfigs = *new* Properties();
> > >
> > >             consumerConfigs.put("bootstrap.servers", server);
> > >
> > >             consumerConfigs.put("key.deserializer",
> > > "org.apache.kafka.common.serialization.StringDeserializer");
> > >
> > >             consumerConfigs.put("value.deserializer",
> > > "org.apache.kafka.common.serialization.StringDeserializer");
> > >
> > >             consumerConfigs.put("group.id", "discovery");
> > >
> > >             consumerConfigs.put("client.id", "discovery");
> > >
> > >             consumerConfigs.put("partition.assignment.strategy",
> > > "org.apache.kafka.clients.consumer.RoundRobinAssignor");
> > >
> > >             List<String> *list* = *new* ArrayList<String>();
> > >
> > >             DiscoveryKafkaConsumer consumer1 = *new*
> > > DiscoveryKafkaConsumer(Collections.*singletonList*(topicName),
> > > consumerConfigs);
> > >
> > >
> > >
> > >             *try* {
> > >
> > >                   *while* (*true*) {
> > >
> > >                         System.*out*.println("Start to consume");
> > >
> > >                       consumer1.poll(1000L);
> > >
> > >                   }
> > >
> > >             } *catch* (InterruptedException e) {
> > >
> > >                   // *TODO* Auto-generated catch block
> > >
> > >                   e.printStackTrace();
> > >
> > >             }
> > >
> > >
> > >
> > > *public* *class* DiscoveryKafkaConsumer {
> > >
> > >    Consumer<String, String> consumer;
> > >
> > >    Integer id;
> > >
> > >    *public* DiscoveryKafkaConsumer(List<String> topics, Properties
> > > configs) {
> > >
> > >        consumer = *new* KafkaConsumer<String, String>(configs);
> > >
> > >        consumer.subscribe(topics);
> > >
> > >    }
> > >
> > >
> > >
> > >    *public* DiscoveryKafkaConsumer(*int* i, List<String> topics,
> > > Properties configs) {
> > >
> > >          consumer = *new* KafkaConsumer<String, String>(configs);
> > >
> > >        consumer.subscribe(topics);
> > >
> > >        *this*.id = i;
> > >
> > > }
> > >
> > >
> > >
> > >    *public* *void* poll(*long* timeout) *throws*
> > > InterruptedException {
> > >
> > >        ConsumerRecords<String,String> records =
> > > consumer.poll(timeout);
> > >
> > >        System.*out*.println("Hey!Consumer #" + id + "got records:" +
> > > records);
> > >
> > >        Map<String, List<String>> results = *new* HashMap<String,
> > > List<String>>();
> > >
> > >        records.forEach((cr) -> {
> > >
> > >          System.*out*.println("cr.topic()=" + cr.topic());
> > >
> > >            List<String> list = results.get(cr.topic());
> > >
> > >            *if*(list == *null*) {
> > >
> > >                list = *new* ArrayList<>();
> > >
> > >                results.put(cr.topic(), list);
> > >
> > >                }
> > >
> > >                   list.add(cr.value());
> > >
> > >                   System.*out*.println("list=" + list);
> > >
> > >            });
> > >
> > >    }
> > >
> > >    *public* *void* close() *throws* Exception {
> > >
> > >        consumer.close();
> > >
> > >    }
> > >
> > > *What I need to add/condig in order to consume the messages in a
> > > Round Robin ?*
> > >
> > >
> > >
> > >
> > >
> > > Thanks,
> > >
> > >
> > >
> > >
> > >
> > > [image: banner14]
> > >
> > >
> > >
> > > *Liel Shraga*
> > >
> > > ENGINEER.SOFTWARE ENGINEERING
> > >
> > > lshraga@cisco.com
> > >
> > > Tel: *+972 2 588 6394*
> > >
> > > *Cisco Systems, Inc.*
> > >
> > > 32 HaMelacha St., (HaSharon
> > > <https://maps.google.com/?q=32+HaMelacha+St.,+(HaSharon&entry=gmail&
> > > so
> > > urce=g>
> > > Bldg) P.O.Box 8735, I.Z.Sapir
> > > SOUTH NETANYA
> > > 42504
> > > Israel
> > > cisco.com
> > >
> > >
> > >
> > > [image:
> > > http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif]Think
> > > before you print.
> > >
> > > This email may contain confidential and privileged material for the
> > > sole use of the intended recipient. Any review, use, distribution or
> > > disclosure by others is strictly prohibited. If you are not the
> > > intended recipient (or authorized to receive for the recipient),
> > > please contact the sender by reply email and delete all copies of
> > > this
> > message.
> > >
> > > Please click here
> > > <http://www.cisco.com/web/about/doing_business/legal/cri/index.html>
> > > for Company Registration Information.
> > >
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message