kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jiangjie Qin <j...@linkedin.com.INVALID>
Subject Re: How many messages does each broker have?
Date Mon, 01 Dec 2014 16:24:24 GMT
I think you are printing the class Message instead of MessageAndMetadata.
The output you got was from Message.toString.

Can you just try something like below?

...
ConsumeIterator iter = consumerStream.iterator(); // assuming you have got
a consumer stream.
MessageAndMetadata messageAndMetadta = iter.next();
System.out.println(“topic: “ + messageAndMeatadata.topic() + ”partition: “
+ messageAndMetadata.partition());


Jiangjie (Becket) Qin

On 11/26/14, 12:56 PM, "Palur Sandeep" <psandeep@hawk.iit.edu> wrote:

>Hi Jiangle,
>
>
>Thanks for the information. This is what I get when I print
>MessageandMetadata
>
>*Thread 0: Message(magic = 0, attributes = 0, crc = 127991357, key =
>java.nio.HeapByteBuffer[pos=0 lim=1 cap=55], payload =
>java.nio.HeapByteBuffer[pos=0 lim=50 cap=50])*
>
>Can you please tell me where can I find partition number in this?
>
>
>
>On Wed, Nov 26, 2014 at 1:29 PM, Jiangjie Qin <jqin@linkedin.com.invalid>
>wrote:
>
>> Hi Sandeep,
>>
>> For old producer, I don’t think you can achieve strict even distribution
>> of messages across partitions within the same topic. But You can
>> potentially reduce the sticking time by setting
>> topic.metadata.refresh.interval.ms to be lower, e.g. 1 second.
>>
>> Kafka-544 added the partition information to MessageAndMetadata. And
>>that
>> is back to 11/15/12 so it should have been included in 0.8.1.1. Do you
>> mean the MessageAndMetadata you got does not partition member or
>> MessageAndMetadata.partition give you nothing?
>>
>> Jiangjie (Becket) Qin
>>
>> On 11/26/14, 10:31 AM, "Palur Sandeep" <psandeep@hawk.iit.edu> wrote:
>>
>> >Hi Jiangjie,
>> >
>> >I am using the high level consumer (ZookeeperConsumerConnector), after
>> >getting the message from stream, but I don't see this
>>"message.Partition".
>> >Please help me how to get the partition id form message.
>> >
>> >What is that I can to do get messages evenly distributed among
>>partitions?
>> >do you mean that  it is not possible in 0.8.1.1 version?
>> >
>> >On Wed, Nov 26, 2014 at 12:03 PM, Jiangjie Qin
>><jqin@linkedin.com.invalid
>> >
>> >wrote:
>> >
>> >> Hi Sandeep,
>> >>
>> >> If you are sending messages to different topics, each topic will
>>stick
>> >>to
>> >> a random partition for 10 min. Since they are likely sticking to
>> >>different
>> >> brokers, you will still see messages roughly evenly distributed.
>> >> If you are using high level consumer (ZookeeperConsumerConnector),
>>after
>> >> getting the message from stream, you can simply call
>>message.Partition
>> >>to
>> >> get the partition id.
>> >>
>> >> Jiangjie (Becket) Qin
>> >>
>> >> On 11/25/14, 5:30 PM, "Palur Sandeep" <psandeep@hawk.iit.edu> wrote:
>> >>
>> >> >Hi Jiangjie,
>> >> >
>> >> >This is what I have understood. Please correct me if I am wrong
>> >> >
>> >> >I don¹t use the partition class at all(KeyedMessage<String,String>
>> >>data =
>> >> >new KeyedMessage<String, String>(topic_name,new_mes). It partitions
>> >> >messages randomly to different partitions. I don¹t see it sticking
>>to
>> >>any
>> >> >broker for 10 mins. I guess it follows some random partitioning
>>logic.
>> >>I
>> >> >am
>> >> >using the following 0.8.1.1 version.
>> >> >
>> >> >MessageAndMetadata on consumer side prints the following message:
>>Can
>> >>you
>> >> >help me find out metadat regarding partition number?
>> >> >
>> >> >*Thread 0: Message(magic = 0, attributes = 0, crc = 127991357, key =
>> >> >java.nio.HeapByteBuffer[pos=0 lim=1 cap=55], payload =
>> >> >java.nio.HeapByteBuffer[pos=0 lim=50 cap=50])*
>> >> >
>> >> >Thanks
>> >> >Sandeep
>> >> >
>> >> >On Tue, Nov 25, 2014 at 7:07 PM, Jiangjie Qin
>> >><jqin@linkedin.com.invalid>
>> >> >wrote:
>> >> >
>> >> >> Palur,
>> >> >>
>> >> >> Just adding to what Guozhang said, the answer to your question
>>might
>> >> >> depend on which producer you are using.
>> >> >> Assuming you are producing messages without keys to the same
>>topic,
>> >>in
>> >> >>new
>> >> >> producer(KafkaProducer), the messages will go to brokers in a
>>round
>> >> >>robin
>> >> >> way, so the messages will end up in brokers evenly distributed.
>> >>Whereas
>> >> >>in
>> >> >> old producer, it actually sticks to a particular broker for 10
min
>> >>(by
>> >> >> default) then switch to another random partition. In that case,
if
>> >>you
>> >> >> send messages fast enough, you might see uneven distribution in
>> >>brokers.
>> >> >>
>> >> >> For the consumer, if you are using high level consumer, when
>>reading
>> >> >>from
>> >> >> KafkaStream, you will get MessageAndMetadata, the topic and
>>partition
>> >> >> information is included in it as well as the raw message.
>> >> >>
>> >> >> Jiangjie (Becket) Qin
>> >> >>
>> >> >>
>> >> >>
>> >> >> On 11/25/14, 10:01 AM, "Guozhang Wang" <wangguoz@gmail.com>
wrote:
>> >> >>
>> >> >> >Palur,
>> >> >> >
>> >> >> >If the 8 partitions are hosted on each one of the nodes, assuming
>> >> >> >replication factor 1 then each node will get roughly 100000
/ 8
>> >> >>messages
>> >> >> >due to the random partitioner. If you want to know exactly
how
>>many
>> >> >> >messages is on each broker then you can use a simple consumer
>>which
>> >> >>allows
>> >> >> >you to specify the partition id you want to consume from.
>> >> >> >
>> >> >> >In the new consumer (0.9), each of the consumed message will
>>contain
>> >> >>the
>> >> >> >partition id as part of its message metadata.
>> >> >> >
>> >> >> >Guozhang
>> >> >> >
>> >> >> >On Tue, Nov 25, 2014 at 7:47 AM, Palur Sandeep
>> >><psandeep@hawk.iit.edu>
>> >> >> >wrote:
>> >> >> >
>> >> >> >> Dear Developers,
>> >> >> >>
>> >> >> >> I am using the default partitioning logic(Random Partitioning)
>>to
>> >> >> >>produce
>> >> >> >> messages into brokers. That is I don't use a partitioner.class.
>> >> >> >>
>> >> >> >> My requirement is If I produce 100000 messages using the
below
>> >>code
>> >> >>for
>> >> >> >>a
>> >> >> >> broker that has 8 partitions across 8 nodes. How many
messages
>> >>will
>> >> >>each
>> >> >> >> partition have? Is there any API that can help me find
the
>>broker
>> >>id
>> >> >>of
>> >> >> >>the
>> >> >> >> each message I consume from the consumer side?
>> >> >> >>
>> >> >> >> PS: I dont want to use partitioner.class. I want use the
>>kafka's
>> >> >>default
>> >> >> >> partitioning logic.
>> >> >> >>
>> >> >> >>   KeyedMessage<String,String> data = new KeyedMessage<String,
>> >> >> >> String>(topic_name,new_mes);
>> >> >> >>
>> >> >> >> producer.send(data);
>> >> >> >>
>> >> >> >> --
>> >> >> >> Regards,
>> >> >> >> Sandeep Palur
>> >> >> >> Data-Intensive Distributed Systems Laboratory, CS/IIT
>> >> >> >> Department of Computer Science, Illinois Institute of
>>Technology
>> >> >>(IIT)
>> >> >> >> Phone : 312-647-9833
>> >> >> >> Email : psandeep@hawk.iit.edu <srajasur@hawk.iit.edu>
>> >> >> >>
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >--
>> >> >> >-- Guozhang
>> >> >>
>> >> >>
>> >> >
>> >> >
>> >> >--
>> >> >Regards,
>> >> >Sandeep Palur
>> >> >Data-Intensive Distributed Systems Laboratory, CS/IIT
>> >> >Department of Computer Science, Illinois Institute of Technology
>>(IIT)
>> >> >Phone : 312-647-9833
>> >> >Email : psandeep@hawk.iit.edu <srajasur@hawk.iit.edu>
>> >>
>> >>
>> >
>> >
>> >--
>> >Regards,
>> >Sandeep Palur
>> >Data-Intensive Distributed Systems Laboratory, CS/IIT
>> >Department of Computer Science, Illinois Institute of Technology (IIT)
>> >Phone : 312-647-9833
>> >Email : psandeep@hawk.iit.edu <srajasur@hawk.iit.edu>
>>
>>
>
>
>-- 
>Regards,
>Sandeep Palur
>Data-Intensive Distributed Systems Laboratory, CS/IIT
>Department of Computer Science, Illinois Institute of Technology (IIT)
>Phone : 312-647-9833
>Email : psandeep@hawk.iit.edu <srajasur@hawk.iit.edu>

Mime
View raw message