kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Anurup Raveendran <anurup.raveend...@fluturasolutions.com>
Subject Re: kafka multi threaded consumer with very low throughput
Date Mon, 08 Jul 2013 15:16:13 GMT
here is the source code of the program

package kafka.examples;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.consumer.ConsumerIterator;

import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.io.*;

import java.util.Date;
import java.text.DateFormat;
import java.text.SimpleDateFormat;

public class KafkaConsumerMultithread
{
    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;
 public static DateFormat dateFormat = new
SimpleDateFormat("yyyyMMddhhmmss");
        //get current date time with Date()
        public static Date date = new Date();
public static String filename =  dateFormat.format(date)+"-topicname.csv";
 public static long starttime = 0;
public static long endtime = 0;
public List<ConsumerPrintMessage> cpm_list;
      public static class ConsumerPrintMessage implements Runnable
{
    ConsumerIterator<Message> it;
 int threadNumber;
public long messageCount=0;
ConsumerPrintMessage(KafkaStream<Message> stream, int threadNumber)
 {
it = stream.iterator();
this.threadNumber = threadNumber;
 }
  public void run()
{

try
 {


while(it.hasNext())
 {

System.out.println("\nThread Consuming Data : "+threadNumber+"\n");
 String msg = "";
msg = ExampleUtils.getMessage(it.next().message()) ;
 this.messageCount++;
 if(System.currentTimeMillis() >= starttime+36000000)
 {
System.out.println("\n\n Code to Send data to hdfs is commented - uncomment
\n\n");
 System.out.println("\n\nNext Hour Data \n\n");
date = new Date();
 filename =  dateFormat.format(date)+"-topicname.csv";
starttime = System.currentTimeMillis();
 }



}

  }
    catch(Exception ex)
    {
            ex.printStackTrace();
 System.out.println("\n\t\t\tError : IO Exception : Unable to write data to
temp.\n");
    }
  }
  }


    public KafkaConsumerMultithread(String a_zookeeper, String a_groupId,
String a_topic)
 {
        consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,
a_groupId));
        this.topic = a_topic;
    }

/* To shutdown consumer and producer */
    public void shutdown()
{
        int count = 0;
         // get the total message count
         for(int i = 0; i<cpm_list.size();i++){
               ConsumerPrintMessage cpm = cpm_list.get(i);
               count+=cpm.messageCount;
         }
         System.out.println("MessageCount : "+count);

 //System.out.println("consumer : "+consumer);
//System.out.println("executer : "+executor);
 System.out.println("thread job");
System.out.println("end time :"+ ( System.currentTimeMillis() - starttime )
);
        if (consumer != null)
{
consumer.shutdown();
 }
        if (executor != null)
{
executor.shutdown();
 }
    }

    public void run(int a_numThreads)
 {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<Message>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
         List<KafkaStream<Message>> streams = consumerMap.get(topic);

        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);

        // now create an object to consume the messages
        //
        int threadNumber = 0;
//Future< ? > fut_1 = null;
 starttime = System.currentTimeMillis();
 cpm_list = new ArrayList<ConsumerPrintMessage>();
        for (final KafkaStream<Message> stream : streams)
{
             //fut_1 =
             ConsumerPrintMessage cx = new ConsumerPrintMessage(stream,
threadNumber);
             cpm_list.add(cx);
 executor.submit(cx);
            threadNumber++;
        }
    }

    private static ConsumerConfig createConsumerConfig(String a_zookeeper,
String a_groupId)
 {
        Properties props = new Properties();
//props.put("broker.list", "1:192.168.0.46:9092,2:192.168.0.47:9092");
        props.put("zk.connect", a_zookeeper);
        props.put("groupid", a_groupId);
        props.put("zk.sessiontimeout.ms", "400");
        props.put("zk.synctime.ms", "200");
        props.put("autocommit.interval.ms", "1000");

        return new ConsumerConfig(props);
    }

    public static void main(String[] args)
{

/*
Note : This part specifies your consumer properties - dynamic
Uncomment following lines for specifying values dynamically

execution : KafkaConsumerMultithread <zk: arg[0]> <gid: arg[1]> <topic:
arg[2]> <num_threads: arg[3]>
*/
 /*
String zooKeeper = args[0];     // First argument is your zookeer
connection address
        String groupId = args[1];       // Second argument will be your
groupid
        String topic = args[2];         // Third argument topic to consume
        int threads = Integer.parseInt(args[3]); // Fourth argument Number
of threads to run
*/

/*
Note : This part specifies your consumer properties - hard coded
*/
        String zooKeeper = "zoo1:2181,zoo2:2181,zoo3:2181";
        String groupId = "group1";
        String topic = "topic1";
        int threads = Integer.parseInt(args[4]);
//Process mktemp = Runtime.getRuntime().exec("mkdir temp");
        KafkaConsumerMultithread example = new
KafkaConsumerMultithread(zooKeeper, groupId, topic);
        example.run(threads);
         try
{
System.out.println("main thread going to sleep for 10 seconds");
            Thread.sleep(900);
        }
 catch (InterruptedException ie)
{
System.out.println("\n\nNote : Execution Interrupted..!!\n\n");
        }
System.out.println("main thread woke up");
        example.shutdown();
    }
}


On Mon, Jul 8, 2013 at 8:43 PM, Anurup Raveendran <
anurup.raveendran@fluturasolutions.com> wrote:

> 305 bytes
>
>
> On Mon, Jul 8, 2013 at 8:41 PM, Tom Brown <tombrown52@gmail.com> wrote:
>
>> What is the size of each message?
>>
>> --Tom
>>
>>
>> On Mon, Jul 8, 2013 at 9:04 AM, Anurup Raveendran <
>> anurup.raveendran@fluturasolutions.com> wrote:
>>
>> > I have 2 kafka brokers running on two systems with the same
>> configuration
>> >
>> > CPU - Dual Core
>> > RAM - 4 GB
>> >
>> > I'm trying to benchmark my kafka setup
>> > Number of messages - 10000
>> >
>> > for 2 partitions & 2 threads configuration - consumer consumes in 1.175
>> > seconds
>> >
>> > for 1 partition & 1 thread configuration - consumer consumes in 1.253
>> > seconds
>> >
>> > Is there some specific configuration which would enable better
>> throughput?
>> >
>> >
>> >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message