kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Praveen Ramachandra <pravee...@gmail.com>
Subject ery slow producer and consumer throughput: Using Kafka 0.7.0
Date Fri, 10 Feb 2012 09:57:10 GMT
Hi All,


I am getting ridiculously low producer and consumer throughput.

I am using default config values for producer, consumer and broker
which are very good starting points, as they should yield sufficient
throughput.

Only config that I changed on the server is "num-partitions". Changed
it to 20 (instead of 1). With this change the throughput increased to
2k messages per second (size 1k), but still it is far lower than what
I would have expected.

Appreciate if you can point to settings/changes-in-code needs to be done
to get higher throughput.



====Consumer Code=====
        long startTime = System.currentTimeMillis();
        long endTime = startTime + runDuration*1000l;

        Properties props = new Properties();
        props.put("zk.connect", "localhost:2181");
        props.put("groupid", subscriptionName); // to support multiple
subscribers
        props.put("zk.sessiontimeout.ms", "400");
        props.put("zk.synctime.ms", "200");
        props.put("autocommit.interval.ms", "1000");

        consConfig =  new ConsumerConfig(props);
        consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(consConfig);

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topicName, new Integer(1)); // has the topic
to which to subscribe to
        Map<String, List<KafkaMessageStream<Message>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
        KafkaMessageStream<Message> stream =  consumerMap.get(topicName).get(0);
        ConsumerIterator<Message> it = stream.iterator();

        while(System.currentTimeMillis() <= endTime )
        {
            it.next(); // discard data
            consumeMsgCount.incrementAndGet();
        }

====End consumer CODE============================


=====Producer CODE========================
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("zk.connect", "localhost:2181");
            // Use random partitioner. Don't need the key type. Just
set it to Integer.
            // The message is of type String.
            producer = new kafka.javaapi.producer.Producer<Integer,
String>(new ProducerConfig(props));


        long endTime = startTime + runDuration*1000l; // run duration
is in seconds
        while(System.currentTimeMillis() <= endTime )
        {
            String msg =
org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0));
            producer.send(new ProducerData<Integer, String>(topicName, msg));
            pc.incrementAndGet();

        }
        java.util.Date date = new java.util.Date(System.currentTimeMillis());
        System.out.println(date+" :: stopped producer for topic"+topicName);

=====END Producer CODE========================


--
Regards,
Praveen Ramachandra



-- 
--
Regards,
Praveen Ramachandra

Mime
View raw message