kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Anuj Kumar <anujs...@gmail.com>
Subject Re: Can the sequence be preserved?
Date Tue, 28 Feb 2012 07:56:57 GMT
Thanks Jun and Neha.

Setting reconnect.interval to a higher value works but this doesn't look
like a good solution to me. I would like to know why we have
reconnect.interval? If the socket is already active and sending data, why
do we want to disconnect it and connect again? I am trying to understand
the use case where reconnect.interval will be useful.

Regarding the partition key, we already have set the number of partitions
to 1, so do we need to explicitly use that ProducerData constructor?

Thanks for your help.

Regards,
Anuj

On Mon, Feb 27, 2012 at 11:31 PM, Neha Narkhede <neha.narkhede@gmail.com>wrote:

> >> Further, we looked into the quick start guide of Kafka to produce data
> to
> the same partition but the constructor mentioned in the point-6 that takes
> an extra parameter for the partition key is not available in kafka-0.7. Am
> I missing something here?
>
> Instead of that, the ProducerData object takes in a key, for a set of
> messages. It uses the "partitioner.class" to pick a partition id. The
> default partitioner picks a partiiton id at random. You can also plug
> in your own custom partitioner if you want a different partitioning
> scheme. Messages are delivered in order per partition.
>
> Thanks,
> Neha
>
> On Mon, Feb 27, 2012 at 9:43 AM, Jun Rao <junrao@gmail.com> wrote:
> > Anuj,
> >
> > Sequencing should be guaranteed within a partition. The problem you saw
> > could be related to reconnect.interval, which controls how frequently the
> > producer gets a new socket. Everytime a producer gets a new socket, there
> > is a small window that events sent over the old socket and over the new
> one
> > may be out of order. You can set reconnect.interval to maxInt to avoid
> new
> > sockets being created.
> >
> > Thanks,
> >
> > Jun
> >
> > On Sun, Feb 26, 2012 at 8:06 PM, Anuj Kumar <anujsays@gmail.com> wrote:
> >
> >> Hello Everyone,
> >>
> >> We are using Kafka-0.7.0 to push a sequence of numbers and we want to
> >> preserve the exact sequence in which they are produced by the producer.
> We
> >> wrote a sample producer that simply produces a million points in a
> >> sequence. The code snippet is-
> >>
> >> int t = 0;
> >> while( t<=1000000)
> >> {
> >>    producer.send( new ProducerData<String,String>( topic,new
> >> Integer(t).toString()) );
> >>     ++t;
> >> }
> >>
> >> where, the producer is initialized as-
> >>
> >> Properties props = new Properties();
> >> props.put("serializer.class", "kafka.serializer.StringEncoder");
> >> props.put("zk.connect", "localhost:2181");
> >> producer = new Producer<String, String>(new ProducerConfig(props));
> >>
> >> and topic is taken as an input to the constructor of the Producer class.
> >>
> >> On the consumer side, our code looks like-
> >>
> >> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> >> topicCountMap.put(topic, new Integer(1));
> >> Map<String, List<KafkaMessageStream<Message>>> consumerMap
=
> >> consumer.createMessageStreams(topicCountMap);
> >> KafkaMessageStream<Message> stream =  consumerMap.get(topic).get(0);
> >> ConsumerIterator<Message> it = stream.iterator();
> >> Integer prev = null;
> >> while(it.hasNext())
> >> {
> >>     String msg = getMessage(it.next());
> >>     int current = Integer.parseInt(msg);
> >>     ...
> >> }
> >>
> >> The consumer is initialized as-
> >>
> >> public SimpleKafkaQueueConsumer(String _topic)
> >> {
> >>    topic = _topic;
> >>    consumer =
> Consumer.createJavaConsumerConnector(createConsumerConfig());
> >> }
> >>
> >> private static ConsumerConfig createConsumerConfig()
> >> {
> >>    Properties props = new Properties();
> >>    props.put("zk.connect", "localhost:2181");
> >>    props.put("groupid", "simpleconsumer");
> >>    return new ConsumerConfig(props);
> >> }
> >>
> >> We started both the producer and consumer with the same topic but we
> >> noticed that on the consumer side the sequence of the number changes
> (only
> >> for some of the numbers). We would like the sequence to be exactly the
> >> same.
> >>
> >> We initially thought that it might be an issue with the data being
> stored
> >> in different partitions. Is that the case?
> >>
> >> Further, we looked into the quick start guide of Kafka to produce data
> to
> >> the same partition but the constructor mentioned in the point-6 that
> takes
> >> an extra parameter for the partition key is not available in kafka-0.7.
> Am
> >> I missing something here?
> >>
> >> Is there a way to preserve the sequence?
> >>
> >> Regards,
> >> Anuj
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message