kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From <Viswanath_Ponn...@Dell.com>
Subject Kafka Message Distribution/Load Balancing
Date Fri, 14 Sep 2012 09:57:18 GMT
Hello,

I am working on experimental project on message distribution and load balancing across cluster
using Apache Kafka and Zookeeper.  The goal of the project is to equally distribute the messages
to the cluster for concurrent processing.

For example; the server cluster contains 3 servers namely kafkaserver1, kafkaserver2, kafkaserver3.
 When the producer sends the 300 number of messages to particular topic (demo), I expect each
servers should get 100 messages each.

The project setup

-          Started Kafka and Zookeeper process on 3 servers

-          Started 3 Consumer client connections and listening for messages, ex: client 1
connects to Kafkaserver1, client2 connects to Kafkaserver2, client 3 connected to Kafkaserver3.

-          Started Producer which will push messages to Zookeeper cluster.

The code as follows:

Sample Producer.java

       System.out.println("ProdMain - starting..");
       Properties props = new Properties();
        String broker = "kafkaserver1:2181,kafkaserver2:2181,kafkaserver3:2181";

        props.put("zk.connect", broker);
        props.put("zk.connectiontimeout.ms", "1000000");
        props.put("zk.sessiontimeout.ms", "1000000");
        props.put("partitioner.class", "com.esg.ganges.kafka.MemberIdPartitioner");
        props.put("serializer.class", StringEncoder.class.getName());

        System.out.println("ProdMain - Initializing..");
        ProducerConfig config = new ProducerConfig(props);
        System.out.println("ProdMain - con time: " + config.getZkConnectionTimeoutMs());

        System.out.println("ProdMain - Producer:start");
        Producer<String, String> producer = new Producer<String, String>(config);

        System.out.println("ProdMain - Creating the data");
        StringProducerData prodData = new StringProducerData("demo");
        System.out.println("ProdMain - Start sending messages...");
        try {
            long start = System.currentTimeMillis();
            prodData.add("Hello world");
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                producer.send(prodData);
            }
            long cost = System.currentTimeMillis() - start;
            System.out.println("send message cost: "+cost+" ms");
        } finally {
            producer.close();
        }

        System.out.println("ProdMain - End");


Consumer.java
        Properties props = new Properties();
        String broker = args[0]+":2181";
        System.out.println("Connecting to the Server:" + broker);

        props.put("zk.connect", broker);
        props.put("groupid", "test_group");
        //
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
        ConsumerConnector connector = Consumer.create(consumerConfig);
        //
        Map<String, List<MessageStream<String>>> topicMessageStreams = connector.createMessageStreams(ImmutableMap.of("demo",
2), new StringDecoder());
        List<MessageStream<String>> streams = topicMessageStreams.get("demo");
        //
        ExecutorService executor = Executors.newFixedThreadPool(2);
        final AtomicInteger count = new AtomicInteger();
        for (final MessageStream<String> stream : streams) {
            executor.submit(new Runnable() {

                public void run() {
                    for (String message : stream) {
                        System.out.println(count.incrementAndGet() + " => " + message);
                    }
                }
            });
        }
        //
        System.out.println("Connected to the broker:" + broker + ", waiting for messages..");
        executor.awaitTermination(1, TimeUnit.HOURS);
    }

public class MemberIdPartitioner implements Partitioner{
      public MemberIdPartitioner() {
            System.out.println("Initialized: MemberIdPartitioner..");
      }

      @Override
      public int partition(Object arg0, int numberOfPartitions) {
            System.out.println("Type: " + arg0.getClass().getName());
            // TODO Auto-generated method stub
            return 3;
      }

}


The behavior I see in this implementation is all the messages been consumed by only single
consumer. When one of the consumer is shutdown; the next consumer gets activated and see messages
consumed. My experiment is to distribute messages equally across all the three servers. Please
do let me know if I am doing something wrong.

Thank you,
Vish

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message