kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Can the sequence be preserved?
Date Tue, 28 Feb 2012 15:05:31 GMT
Anuj,

The reason we have reconnect.interval is for people who use VIP in a load
balancer. By periodically reconnecting, it gives the load balancer a change
of spreading the load among brokers. especially after broker restart.

Thanks,

Jun

On Mon, Feb 27, 2012 at 11:56 PM, Anuj Kumar <anujsays@gmail.com> wrote:

> Thanks Jun and Neha.
>
> Setting reconnect.interval to a higher value works but this doesn't look
> like a good solution to me. I would like to know why we have
> reconnect.interval? If the socket is already active and sending data, why
> do we want to disconnect it and connect again? I am trying to understand
> the use case where reconnect.interval will be useful.
>
> Regarding the partition key, we already have set the number of partitions
> to 1, so do we need to explicitly use that ProducerData constructor?
>
> Thanks for your help.
>
> Regards,
> Anuj
>
> On Mon, Feb 27, 2012 at 11:31 PM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > >> Further, we looked into the quick start guide of Kafka to produce data
> > to
> > the same partition but the constructor mentioned in the point-6 that
> takes
> > an extra parameter for the partition key is not available in kafka-0.7.
> Am
> > I missing something here?
> >
> > Instead of that, the ProducerData object takes in a key, for a set of
> > messages. It uses the "partitioner.class" to pick a partition id. The
> > default partitioner picks a partiiton id at random. You can also plug
> > in your own custom partitioner if you want a different partitioning
> > scheme. Messages are delivered in order per partition.
> >
> > Thanks,
> > Neha
> >
> > On Mon, Feb 27, 2012 at 9:43 AM, Jun Rao <junrao@gmail.com> wrote:
> > > Anuj,
> > >
> > > Sequencing should be guaranteed within a partition. The problem you saw
> > > could be related to reconnect.interval, which controls how frequently
> the
> > > producer gets a new socket. Everytime a producer gets a new socket,
> there
> > > is a small window that events sent over the old socket and over the new
> > one
> > > may be out of order. You can set reconnect.interval to maxInt to avoid
> > new
> > > sockets being created.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Sun, Feb 26, 2012 at 8:06 PM, Anuj Kumar <anujsays@gmail.com>
> wrote:
> > >
> > >> Hello Everyone,
> > >>
> > >> We are using Kafka-0.7.0 to push a sequence of numbers and we want to
> > >> preserve the exact sequence in which they are produced by the
> producer.
> > We
> > >> wrote a sample producer that simply produces a million points in a
> > >> sequence. The code snippet is-
> > >>
> > >> int t = 0;
> > >> while( t<=1000000)
> > >> {
> > >>    producer.send( new ProducerData<String,String>( topic,new
> > >> Integer(t).toString()) );
> > >>     ++t;
> > >> }
> > >>
> > >> where, the producer is initialized as-
> > >>
> > >> Properties props = new Properties();
> > >> props.put("serializer.class", "kafka.serializer.StringEncoder");
> > >> props.put("zk.connect", "localhost:2181");
> > >> producer = new Producer<String, String>(new ProducerConfig(props));
> > >>
> > >> and topic is taken as an input to the constructor of the Producer
> class.
> > >>
> > >> On the consumer side, our code looks like-
> > >>
> > >> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> > >> topicCountMap.put(topic, new Integer(1));
> > >> Map<String, List<KafkaMessageStream<Message>>> consumerMap
=
> > >> consumer.createMessageStreams(topicCountMap);
> > >> KafkaMessageStream<Message> stream =  consumerMap.get(topic).get(0);
> > >> ConsumerIterator<Message> it = stream.iterator();
> > >> Integer prev = null;
> > >> while(it.hasNext())
> > >> {
> > >>     String msg = getMessage(it.next());
> > >>     int current = Integer.parseInt(msg);
> > >>     ...
> > >> }
> > >>
> > >> The consumer is initialized as-
> > >>
> > >> public SimpleKafkaQueueConsumer(String _topic)
> > >> {
> > >>    topic = _topic;
> > >>    consumer =
> > Consumer.createJavaConsumerConnector(createConsumerConfig());
> > >> }
> > >>
> > >> private static ConsumerConfig createConsumerConfig()
> > >> {
> > >>    Properties props = new Properties();
> > >>    props.put("zk.connect", "localhost:2181");
> > >>    props.put("groupid", "simpleconsumer");
> > >>    return new ConsumerConfig(props);
> > >> }
> > >>
> > >> We started both the producer and consumer with the same topic but we
> > >> noticed that on the consumer side the sequence of the number changes
> > (only
> > >> for some of the numbers). We would like the sequence to be exactly the
> > >> same.
> > >>
> > >> We initially thought that it might be an issue with the data being
> > stored
> > >> in different partitions. Is that the case?
> > >>
> > >> Further, we looked into the quick start guide of Kafka to produce data
> > to
> > >> the same partition but the constructor mentioned in the point-6 that
> > takes
> > >> an extra parameter for the partition key is not available in
> kafka-0.7.
> > Am
> > >> I missing something here?
> > >>
> > >> Is there a way to preserve the sequence?
> > >>
> > >> Regards,
> > >> Anuj
> > >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message