kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Can the sequence be preserved?
Date Mon, 27 Feb 2012 17:43:36 GMT
Anuj,

Sequencing should be guaranteed within a partition. The problem you saw
could be related to reconnect.interval, which controls how frequently the
producer gets a new socket. Everytime a producer gets a new socket, there
is a small window that events sent over the old socket and over the new one
may be out of order. You can set reconnect.interval to maxInt to avoid new
sockets being created.

Thanks,

Jun

On Sun, Feb 26, 2012 at 8:06 PM, Anuj Kumar <anujsays@gmail.com> wrote:

> Hello Everyone,
>
> We are using Kafka-0.7.0 to push a sequence of numbers and we want to
> preserve the exact sequence in which they are produced by the producer. We
> wrote a sample producer that simply produces a million points in a
> sequence. The code snippet is-
>
> int t = 0;
> while( t<=1000000)
> {
>    producer.send( new ProducerData<String,String>( topic,new
> Integer(t).toString()) );
>     ++t;
> }
>
> where, the producer is initialized as-
>
> Properties props = new Properties();
> props.put("serializer.class", "kafka.serializer.StringEncoder");
> props.put("zk.connect", "localhost:2181");
> producer = new Producer<String, String>(new ProducerConfig(props));
>
> and topic is taken as an input to the constructor of the Producer class.
>
> On the consumer side, our code looks like-
>
> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> topicCountMap.put(topic, new Integer(1));
> Map<String, List<KafkaMessageStream<Message>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
> KafkaMessageStream<Message> stream =  consumerMap.get(topic).get(0);
> ConsumerIterator<Message> it = stream.iterator();
> Integer prev = null;
> while(it.hasNext())
> {
>     String msg = getMessage(it.next());
>     int current = Integer.parseInt(msg);
>     ...
> }
>
> The consumer is initialized as-
>
> public SimpleKafkaQueueConsumer(String _topic)
> {
>    topic = _topic;
>    consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
> }
>
> private static ConsumerConfig createConsumerConfig()
> {
>    Properties props = new Properties();
>    props.put("zk.connect", "localhost:2181");
>    props.put("groupid", "simpleconsumer");
>    return new ConsumerConfig(props);
> }
>
> We started both the producer and consumer with the same topic but we
> noticed that on the consumer side the sequence of the number changes (only
> for some of the numbers). We would like the sequence to be exactly the
> same.
>
> We initially thought that it might be an issue with the data being stored
> in different partitions. Is that the case?
>
> Further, we looked into the quick start guide of Kafka to produce data to
> the same partition but the constructor mentioned in the point-6 that takes
> an extra parameter for the partition key is not available in kafka-0.7. Am
> I missing something here?
>
> Is there a way to preserve the sequence?
>
> Regards,
> Anuj
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message