kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Neha Narkhede <neha.narkh...@gmail.com>
Subject Re: Can the sequence be preserved?
Date Mon, 27 Feb 2012 18:01:25 GMT
>> Further, we looked into the quick start guide of Kafka to produce data to
the same partition but the constructor mentioned in the point-6 that takes
an extra parameter for the partition key is not available in kafka-0.7. Am
I missing something here?

Instead of that, the ProducerData object takes in a key, for a set of
messages. It uses the "partitioner.class" to pick a partition id. The
default partitioner picks a partiiton id at random. You can also plug
in your own custom partitioner if you want a different partitioning
scheme. Messages are delivered in order per partition.

Thanks,
Neha

On Mon, Feb 27, 2012 at 9:43 AM, Jun Rao <junrao@gmail.com> wrote:
> Anuj,
>
> Sequencing should be guaranteed within a partition. The problem you saw
> could be related to reconnect.interval, which controls how frequently the
> producer gets a new socket. Everytime a producer gets a new socket, there
> is a small window that events sent over the old socket and over the new one
> may be out of order. You can set reconnect.interval to maxInt to avoid new
> sockets being created.
>
> Thanks,
>
> Jun
>
> On Sun, Feb 26, 2012 at 8:06 PM, Anuj Kumar <anujsays@gmail.com> wrote:
>
>> Hello Everyone,
>>
>> We are using Kafka-0.7.0 to push a sequence of numbers and we want to
>> preserve the exact sequence in which they are produced by the producer. We
>> wrote a sample producer that simply produces a million points in a
>> sequence. The code snippet is-
>>
>> int t = 0;
>> while( t<=1000000)
>> {
>>    producer.send( new ProducerData<String,String>( topic,new
>> Integer(t).toString()) );
>>     ++t;
>> }
>>
>> where, the producer is initialized as-
>>
>> Properties props = new Properties();
>> props.put("serializer.class", "kafka.serializer.StringEncoder");
>> props.put("zk.connect", "localhost:2181");
>> producer = new Producer<String, String>(new ProducerConfig(props));
>>
>> and topic is taken as an input to the constructor of the Producer class.
>>
>> On the consumer side, our code looks like-
>>
>> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>> topicCountMap.put(topic, new Integer(1));
>> Map<String, List<KafkaMessageStream<Message>>> consumerMap =
>> consumer.createMessageStreams(topicCountMap);
>> KafkaMessageStream<Message> stream =  consumerMap.get(topic).get(0);
>> ConsumerIterator<Message> it = stream.iterator();
>> Integer prev = null;
>> while(it.hasNext())
>> {
>>     String msg = getMessage(it.next());
>>     int current = Integer.parseInt(msg);
>>     ...
>> }
>>
>> The consumer is initialized as-
>>
>> public SimpleKafkaQueueConsumer(String _topic)
>> {
>>    topic = _topic;
>>    consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
>> }
>>
>> private static ConsumerConfig createConsumerConfig()
>> {
>>    Properties props = new Properties();
>>    props.put("zk.connect", "localhost:2181");
>>    props.put("groupid", "simpleconsumer");
>>    return new ConsumerConfig(props);
>> }
>>
>> We started both the producer and consumer with the same topic but we
>> noticed that on the consumer side the sequence of the number changes (only
>> for some of the numbers). We would like the sequence to be exactly the
>> same.
>>
>> We initially thought that it might be an issue with the data being stored
>> in different partitions. Is that the case?
>>
>> Further, we looked into the quick start guide of Kafka to produce data to
>> the same partition but the constructor mentioned in the point-6 that takes
>> an extra parameter for the partition key is not available in kafka-0.7. Am
>> I missing something here?
>>
>> Is there a way to preserve the sequence?
>>
>> Regards,
>> Anuj
>>

Mime
View raw message