kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eric Sites <Eric.Si...@threattrack.com>
Subject Re: Buffer sizing for consumers
Date Thu, 15 Aug 2013 04:41:12 GMT
Lots of malware. 210 MB is our upper limit that I want to support in the
system we are building.


Most of the messages are metrics, command messages, meta data about the
malware, etc...

But so far Kafka 0.8 has worked out really well for moving the malware
files, whitelist files, and the unknown stuff around to place them in
storage, and run them through our meta data collection workers. 100s of
servers.

I am just trying to track down some memory issues on one class of workers
that store the whitelist files for testing our av engine definitions
before release.

Cheers,
Eric Sites

On 8/15/13 12:27 AM, "Jun Rao" <junrao@gmail.com> wrote:

>You need to set fetch.size in consumer config to be at least 210MB plus
>message overhead (about 10 bytes) in Kafka. What data are you sending?
>210MB for a single message is bit unusual for Kafka.
>
>Thanks,
>
>Jun
>
>
>On Wed, Aug 14, 2013 at 9:11 PM, Eric Sites
><Eric.Sites@threattrack.com>wrote:
>
>> Everyone,
>>
>> I need a little help figuring out how buffers are allocated in Kafka
>> consumers ver 0.8.
>>
>> What are the proper settings for a consumer that needs to receive a
>>single
>> message that is 210 MB in size.
>>
>> The consumer listens to multiple topics all with a single partition. One
>> of the topics is where the 210 MB message will come from
>> And the other topics will be very small messages.
>>
>>     consumer =
>> Consumer.createJavaConsumerConnector(createConsumerConfig());
>>             Map<String, Integer> topicCountMap = new HashMap<String,
>> Integer>();
>>             topicCountMap.put(paradiso_scan_job_topic, 1);
>>             topicCountMap.put(paradiso_scan_cancel_topic, 1);
>>             topicCountMap.put(paradiso_add_worker_name_topic, 1);
>>             topicCountMap.put(paradiso_file_delete_topic, 1);
>>
>>             Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap
=
>> consumer.createMessageStreams(topicCountMap);
>>
>>             // Start worker status threads to watch other topics
>>             ParadisoWorkerCancelConsumer cancel_consumer = new
>> 
>>ParadisoWorkerCancelConsumer(consumerMap.get(paradiso_scan_cancel_topic).
>>get(0));
>>             cancel_consumer.start();
>>             ParadisoWorkerFileAdd file_add = new
>> 
>>ParadisoWorkerFileAdd(consumerMap.get(paradiso_add_worker_name_topic).get
>>(0));
>>             file_add.start();
>>             ParadisoWorkerFileDelete file_delete = new
>> 
>>ParadisoWorkerFileDelete(consumerMap.get(paradiso_file_delete_topic).get(
>>0));
>>             file_delete.start();
>>
>>     KafkaStream<byte[], byte[]> stream =
>>  consumerMap.get(paradiso_scan_job_topic).get(0);
>>             ConsumerIterator<byte[], byte[]> it = stream.iterator();
>>
>>             while (it.hasNext() && !time_to_shutdown) {
>>
>> Thanks,
>> Eric Sites
>>
>>


Mime
View raw message