kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Anuj Kumar <anujs...@gmail.com>
Subject Re: Can the sequence be preserved?
Date Tue, 28 Feb 2012 17:47:41 GMT
Thanks Jun for the explanation.

Hi Neha,

Yes, we have the num.partitions set to 1 in server.properties-

# The number of logical partitions per topic per server. More partitions
allow greater parallelism
# for consumption, but also mean more files.
num.partitions=1

Yes, it is clear from the send() documentation but isn't it true that the
above property will guarantee a single partition?

Thanks,
Anuj

On Tue, Feb 28, 2012 at 11:08 PM, Neha Narkhede <neha.narkhede@gmail.com>wrote:

> Anuj,
>
> Here are the API docs for the Producer -
> http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/docs/
>
> The API docs will show that the send() request is a ProducerData
> object, that takes in a topic, list of messages, and an optional key.
>
> When you say, you've set the number of partitions to 1, do you mean
> you have one broker with num.partitions=1 ?
>
> Thanks,
> Neha
>
> On Mon, Feb 27, 2012 at 11:56 PM, Anuj Kumar <anujsays@gmail.com> wrote:
> > Thanks Jun and Neha.
> >
> > Setting reconnect.interval to a higher value works but this doesn't look
> > like a good solution to me. I would like to know why we have
> > reconnect.interval? If the socket is already active and sending data, why
> > do we want to disconnect it and connect again? I am trying to understand
> > the use case where reconnect.interval will be useful.
> >
> > Regarding the partition key, we already have set the number of partitions
> > to 1, so do we need to explicitly use that ProducerData constructor?
> >
> > Thanks for your help.
> >
> > Regards,
> > Anuj
> >
> > On Mon, Feb 27, 2012 at 11:31 PM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
> >
> >> >> Further, we looked into the quick start guide of Kafka to produce
> data
> >> to
> >> the same partition but the constructor mentioned in the point-6 that
> takes
> >> an extra parameter for the partition key is not available in kafka-0.7.
> Am
> >> I missing something here?
> >>
> >> Instead of that, the ProducerData object takes in a key, for a set of
> >> messages. It uses the "partitioner.class" to pick a partition id. The
> >> default partitioner picks a partiiton id at random. You can also plug
> >> in your own custom partitioner if you want a different partitioning
> >> scheme. Messages are delivered in order per partition.
> >>
> >> Thanks,
> >> Neha
> >>
> >> On Mon, Feb 27, 2012 at 9:43 AM, Jun Rao <junrao@gmail.com> wrote:
> >> > Anuj,
> >> >
> >> > Sequencing should be guaranteed within a partition. The problem you
> saw
> >> > could be related to reconnect.interval, which controls how frequently
> the
> >> > producer gets a new socket. Everytime a producer gets a new socket,
> there
> >> > is a small window that events sent over the old socket and over the
> new
> >> one
> >> > may be out of order. You can set reconnect.interval to maxInt to avoid
> >> new
> >> > sockets being created.
> >> >
> >> > Thanks,
> >> >
> >> > Jun
> >> >
> >> > On Sun, Feb 26, 2012 at 8:06 PM, Anuj Kumar <anujsays@gmail.com>
> wrote:
> >> >
> >> >> Hello Everyone,
> >> >>
> >> >> We are using Kafka-0.7.0 to push a sequence of numbers and we want
to
> >> >> preserve the exact sequence in which they are produced by the
> producer.
> >> We
> >> >> wrote a sample producer that simply produces a million points in a
> >> >> sequence. The code snippet is-
> >> >>
> >> >> int t = 0;
> >> >> while( t<=1000000)
> >> >> {
> >> >>    producer.send( new ProducerData<String,String>( topic,new
> >> >> Integer(t).toString()) );
> >> >>     ++t;
> >> >> }
> >> >>
> >> >> where, the producer is initialized as-
> >> >>
> >> >> Properties props = new Properties();
> >> >> props.put("serializer.class", "kafka.serializer.StringEncoder");
> >> >> props.put("zk.connect", "localhost:2181");
> >> >> producer = new Producer<String, String>(new ProducerConfig(props));
> >> >>
> >> >> and topic is taken as an input to the constructor of the Producer
> class.
> >> >>
> >> >> On the consumer side, our code looks like-
> >> >>
> >> >> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> >> >> topicCountMap.put(topic, new Integer(1));
> >> >> Map<String, List<KafkaMessageStream<Message>>> consumerMap
=
> >> >> consumer.createMessageStreams(topicCountMap);
> >> >> KafkaMessageStream<Message> stream =  consumerMap.get(topic).get(0);
> >> >> ConsumerIterator<Message> it = stream.iterator();
> >> >> Integer prev = null;
> >> >> while(it.hasNext())
> >> >> {
> >> >>     String msg = getMessage(it.next());
> >> >>     int current = Integer.parseInt(msg);
> >> >>     ...
> >> >> }
> >> >>
> >> >> The consumer is initialized as-
> >> >>
> >> >> public SimpleKafkaQueueConsumer(String _topic)
> >> >> {
> >> >>    topic = _topic;
> >> >>    consumer =
> >> Consumer.createJavaConsumerConnector(createConsumerConfig());
> >> >> }
> >> >>
> >> >> private static ConsumerConfig createConsumerConfig()
> >> >> {
> >> >>    Properties props = new Properties();
> >> >>    props.put("zk.connect", "localhost:2181");
> >> >>    props.put("groupid", "simpleconsumer");
> >> >>    return new ConsumerConfig(props);
> >> >> }
> >> >>
> >> >> We started both the producer and consumer with the same topic but we
> >> >> noticed that on the consumer side the sequence of the number changes
> >> (only
> >> >> for some of the numbers). We would like the sequence to be exactly
> the
> >> >> same.
> >> >>
> >> >> We initially thought that it might be an issue with the data being
> >> stored
> >> >> in different partitions. Is that the case?
> >> >>
> >> >> Further, we looked into the quick start guide of Kafka to produce
> data
> >> to
> >> >> the same partition but the constructor mentioned in the point-6 that
> >> takes
> >> >> an extra parameter for the partition key is not available in
> kafka-0.7.
> >> Am
> >> >> I missing something here?
> >> >>
> >> >> Is there a way to preserve the sequence?
> >> >>
> >> >> Regards,
> >> >> Anuj
> >> >>
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message