kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Neha Narkhede <neha.narkh...@gmail.com>
Subject Re: Can the sequence be preserved?
Date Tue, 28 Feb 2012 18:06:37 GMT
>> Yes, it is clear from the send() documentation but isn't it true that the
above property will guarantee a single partition?

Yes, it is a little tricky to understand. Currently, the Kafka server
behavior is that every topic lives on every broker. So, if you have
one kafka server with one partition, you truly have only a single
partition for that topic. Now, if you bring up another Kafka server
with a single partition, the topic *can* have 2 partitions, if you
don't use your own custom partitioner. If you stay with our default
partitioner, it will think you have 2 available partitions for that
topic, and will try to send data to both of those partitions.

Thanks,
Neha

On Tue, Feb 28, 2012 at 9:47 AM, Anuj Kumar <anujsays@gmail.com> wrote:
> Thanks Jun for the explanation.
>
> Hi Neha,
>
> Yes, we have the num.partitions set to 1 in server.properties-
>
> # The number of logical partitions per topic per server. More partitions
> allow greater parallelism
> # for consumption, but also mean more files.
> num.partitions=1
>
> Yes, it is clear from the send() documentation but isn't it true that the
> above property will guarantee a single partition?
>
> Thanks,
> Anuj
>
> On Tue, Feb 28, 2012 at 11:08 PM, Neha Narkhede <neha.narkhede@gmail.com>wrote:
>
>> Anuj,
>>
>> Here are the API docs for the Producer -
>> http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/docs/
>>
>> The API docs will show that the send() request is a ProducerData
>> object, that takes in a topic, list of messages, and an optional key.
>>
>> When you say, you've set the number of partitions to 1, do you mean
>> you have one broker with num.partitions=1 ?
>>
>> Thanks,
>> Neha
>>
>> On Mon, Feb 27, 2012 at 11:56 PM, Anuj Kumar <anujsays@gmail.com> wrote:
>> > Thanks Jun and Neha.
>> >
>> > Setting reconnect.interval to a higher value works but this doesn't look
>> > like a good solution to me. I would like to know why we have
>> > reconnect.interval? If the socket is already active and sending data, why
>> > do we want to disconnect it and connect again? I am trying to understand
>> > the use case where reconnect.interval will be useful.
>> >
>> > Regarding the partition key, we already have set the number of partitions
>> > to 1, so do we need to explicitly use that ProducerData constructor?
>> >
>> > Thanks for your help.
>> >
>> > Regards,
>> > Anuj
>> >
>> > On Mon, Feb 27, 2012 at 11:31 PM, Neha Narkhede <neha.narkhede@gmail.com
>> >wrote:
>> >
>> >> >> Further, we looked into the quick start guide of Kafka to produce
>> data
>> >> to
>> >> the same partition but the constructor mentioned in the point-6 that
>> takes
>> >> an extra parameter for the partition key is not available in kafka-0.7.
>> Am
>> >> I missing something here?
>> >>
>> >> Instead of that, the ProducerData object takes in a key, for a set of
>> >> messages. It uses the "partitioner.class" to pick a partition id. The
>> >> default partitioner picks a partiiton id at random. You can also plug
>> >> in your own custom partitioner if you want a different partitioning
>> >> scheme. Messages are delivered in order per partition.
>> >>
>> >> Thanks,
>> >> Neha
>> >>
>> >> On Mon, Feb 27, 2012 at 9:43 AM, Jun Rao <junrao@gmail.com> wrote:
>> >> > Anuj,
>> >> >
>> >> > Sequencing should be guaranteed within a partition. The problem you
>> saw
>> >> > could be related to reconnect.interval, which controls how frequently
>> the
>> >> > producer gets a new socket. Everytime a producer gets a new socket,
>> there
>> >> > is a small window that events sent over the old socket and over the
>> new
>> >> one
>> >> > may be out of order. You can set reconnect.interval to maxInt to avoid
>> >> new
>> >> > sockets being created.
>> >> >
>> >> > Thanks,
>> >> >
>> >> > Jun
>> >> >
>> >> > On Sun, Feb 26, 2012 at 8:06 PM, Anuj Kumar <anujsays@gmail.com>
>> wrote:
>> >> >
>> >> >> Hello Everyone,
>> >> >>
>> >> >> We are using Kafka-0.7.0 to push a sequence of numbers and we want
to
>> >> >> preserve the exact sequence in which they are produced by the
>> producer.
>> >> We
>> >> >> wrote a sample producer that simply produces a million points in
a
>> >> >> sequence. The code snippet is-
>> >> >>
>> >> >> int t = 0;
>> >> >> while( t<=1000000)
>> >> >> {
>> >> >>    producer.send( new ProducerData<String,String>( topic,new
>> >> >> Integer(t).toString()) );
>> >> >>     ++t;
>> >> >> }
>> >> >>
>> >> >> where, the producer is initialized as-
>> >> >>
>> >> >> Properties props = new Properties();
>> >> >> props.put("serializer.class", "kafka.serializer.StringEncoder");
>> >> >> props.put("zk.connect", "localhost:2181");
>> >> >> producer = new Producer<String, String>(new ProducerConfig(props));
>> >> >>
>> >> >> and topic is taken as an input to the constructor of the Producer
>> class.
>> >> >>
>> >> >> On the consumer side, our code looks like-
>> >> >>
>> >> >> Map<String, Integer> topicCountMap = new HashMap<String,
Integer>();
>> >> >> topicCountMap.put(topic, new Integer(1));
>> >> >> Map<String, List<KafkaMessageStream<Message>>>
consumerMap =
>> >> >> consumer.createMessageStreams(topicCountMap);
>> >> >> KafkaMessageStream<Message> stream =  consumerMap.get(topic).get(0);
>> >> >> ConsumerIterator<Message> it = stream.iterator();
>> >> >> Integer prev = null;
>> >> >> while(it.hasNext())
>> >> >> {
>> >> >>     String msg = getMessage(it.next());
>> >> >>     int current = Integer.parseInt(msg);
>> >> >>     ...
>> >> >> }
>> >> >>
>> >> >> The consumer is initialized as-
>> >> >>
>> >> >> public SimpleKafkaQueueConsumer(String _topic)
>> >> >> {
>> >> >>    topic = _topic;
>> >> >>    consumer =
>> >> Consumer.createJavaConsumerConnector(createConsumerConfig());
>> >> >> }
>> >> >>
>> >> >> private static ConsumerConfig createConsumerConfig()
>> >> >> {
>> >> >>    Properties props = new Properties();
>> >> >>    props.put("zk.connect", "localhost:2181");
>> >> >>    props.put("groupid", "simpleconsumer");
>> >> >>    return new ConsumerConfig(props);
>> >> >> }
>> >> >>
>> >> >> We started both the producer and consumer with the same topic but
we
>> >> >> noticed that on the consumer side the sequence of the number changes
>> >> (only
>> >> >> for some of the numbers). We would like the sequence to be exactly
>> the
>> >> >> same.
>> >> >>
>> >> >> We initially thought that it might be an issue with the data being
>> >> stored
>> >> >> in different partitions. Is that the case?
>> >> >>
>> >> >> Further, we looked into the quick start guide of Kafka to produce
>> data
>> >> to
>> >> >> the same partition but the constructor mentioned in the point-6
that
>> >> takes
>> >> >> an extra parameter for the partition key is not available in
>> kafka-0.7.
>> >> Am
>> >> >> I missing something here?
>> >> >>
>> >> >> Is there a way to preserve the sequence?
>> >> >>
>> >> >> Regards,
>> >> >> Anuj
>> >> >>
>> >>
>>

Mime
View raw message