kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: ery slow producer and consumer throughput: Using Kafka 0.7.0
Date Fri, 10 Feb 2012 15:48:21 GMT
It could be that flush.interval in the broker is 1. Try increasing that to
a larger number like 1000.

Thanks,

Jun

On Fri, Feb 10, 2012 at 1:57 AM, Praveen Ramachandra <praveen27@gmail.com>wrote:

> Hi All,
>
>
> I am getting ridiculously low producer and consumer throughput.
>
> I am using default config values for producer, consumer and broker
> which are very good starting points, as they should yield sufficient
> throughput.
>
> Only config that I changed on the server is "num-partitions". Changed
> it to 20 (instead of 1). With this change the throughput increased to
> 2k messages per second (size 1k), but still it is far lower than what
> I would have expected.
>
> Appreciate if you can point to settings/changes-in-code needs to be done
> to get higher throughput.
>
>
>
> ====Consumer Code=====
>        long startTime = System.currentTimeMillis();
>        long endTime = startTime + runDuration*1000l;
>
>        Properties props = new Properties();
>        props.put("zk.connect", "localhost:2181");
>        props.put("groupid", subscriptionName); // to support multiple
> subscribers
>        props.put("zk.sessiontimeout.ms", "400");
>        props.put("zk.synctime.ms", "200");
>        props.put("autocommit.interval.ms", "1000");
>
>        consConfig =  new ConsumerConfig(props);
>        consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(consConfig);
>
>        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>        topicCountMap.put(topicName, new Integer(1)); // has the topic
> to which to subscribe to
>        Map<String, List<KafkaMessageStream<Message>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>        KafkaMessageStream<Message> stream =
>  consumerMap.get(topicName).get(0);
>        ConsumerIterator<Message> it = stream.iterator();
>
>        while(System.currentTimeMillis() <= endTime )
>        {
>            it.next(); // discard data
>            consumeMsgCount.incrementAndGet();
>        }
>
> ====End consumer CODE============================
>
>
> =====Producer CODE========================
>        props.put("serializer.class", "kafka.serializer.StringEncoder");
>        props.put("zk.connect", "localhost:2181");
>            // Use random partitioner. Don't need the key type. Just
> set it to Integer.
>            // The message is of type String.
>            producer = new kafka.javaapi.producer.Producer<Integer,
> String>(new ProducerConfig(props));
>
>
>        long endTime = startTime + runDuration*1000l; // run duration
> is in seconds
>        while(System.currentTimeMillis() <= endTime )
>        {
>            String msg =
> org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0));
>            producer.send(new ProducerData<Integer, String>(topicName,
> msg));
>            pc.incrementAndGet();
>
>        }
>        java.util.Date date = new
> java.util.Date(System.currentTimeMillis());
>        System.out.println(date+" :: stopped producer for topic"+topicName);
>
> =====END Producer CODE========================
>
>
> --
> Regards,
> Praveen Ramachandra
>
>
>
> --
> --
> Regards,
> Praveen Ramachandra
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message