kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Buffer sizing for consumers
Date Thu, 15 Aug 2013 04:27:02 GMT
You need to set fetch.size in consumer config to be at least 210MB plus
message overhead (about 10 bytes) in Kafka. What data are you sending?
210MB for a single message is bit unusual for Kafka.

Thanks,

Jun


On Wed, Aug 14, 2013 at 9:11 PM, Eric Sites <Eric.Sites@threattrack.com>wrote:

> Everyone,
>
> I need a little help figuring out how buffers are allocated in Kafka
> consumers ver 0.8.
>
> What are the proper settings for a consumer that needs to receive a single
> message that is 210 MB in size.
>
> The consumer listens to multiple topics all with a single partition. One
> of the topics is where the 210 MB message will come from
> And the other topics will be very small messages.
>
>     consumer =
> Consumer.createJavaConsumerConnector(createConsumerConfig());
>             Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>             topicCountMap.put(paradiso_scan_job_topic, 1);
>             topicCountMap.put(paradiso_scan_cancel_topic, 1);
>             topicCountMap.put(paradiso_add_worker_name_topic, 1);
>             topicCountMap.put(paradiso_file_delete_topic, 1);
>
>             Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap
=
> consumer.createMessageStreams(topicCountMap);
>
>             // Start worker status threads to watch other topics
>             ParadisoWorkerCancelConsumer cancel_consumer = new
> ParadisoWorkerCancelConsumer(consumerMap.get(paradiso_scan_cancel_topic).get(0));
>             cancel_consumer.start();
>             ParadisoWorkerFileAdd file_add = new
> ParadisoWorkerFileAdd(consumerMap.get(paradiso_add_worker_name_topic).get(0));
>             file_add.start();
>             ParadisoWorkerFileDelete file_delete = new
> ParadisoWorkerFileDelete(consumerMap.get(paradiso_file_delete_topic).get(0));
>             file_delete.start();
>
>     KafkaStream<byte[], byte[]> stream =
>  consumerMap.get(paradiso_scan_job_topic).get(0);
>             ConsumerIterator<byte[], byte[]> it = stream.iterator();
>
>             while (it.hasNext() && !time_to_shutdown) {
>
> Thanks,
> Eric Sites
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message