kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Palur Sandeep <psand...@hawk.iit.edu>
Subject Re: How many messages does each broker have?
Date Tue, 02 Dec 2014 03:25:13 GMT
Thank you so much Jiangle. I got it working.

I have another problem the consumer doesnt receive message if it is big:
When the producer sends 256kb messages to broker, consumer is able to
retrieve it, but when producer sends 10MB messages to the broker, the
consumer doesn’t receive any message.

Please tell me how to make the consumer receive 10MB messages.

On Mon, Dec 1, 2014 at 10:24 AM, Jiangjie Qin <jqin@linkedin.com.invalid>
wrote:

> I think you are printing the class Message instead of MessageAndMetadata.
> The output you got was from Message.toString.
>
> Can you just try something like below?
>
> ...
> ConsumeIterator iter = consumerStream.iterator(); // assuming you have got
> a consumer stream.
> MessageAndMetadata messageAndMetadta = iter.next();
> System.out.println(“topic: “ + messageAndMeatadata.topic() + ”partition: “
> + messageAndMetadata.partition());
>
>
> Jiangjie (Becket) Qin
>
> On 11/26/14, 12:56 PM, "Palur Sandeep" <psandeep@hawk.iit.edu> wrote:
>
> >Hi Jiangle,
> >
> >
> >Thanks for the information. This is what I get when I print
> >MessageandMetadata
> >
> >*Thread 0: Message(magic = 0, attributes = 0, crc = 127991357, key =
> >java.nio.HeapByteBuffer[pos=0 lim=1 cap=55], payload =
> >java.nio.HeapByteBuffer[pos=0 lim=50 cap=50])*
> >
> >Can you please tell me where can I find partition number in this?
> >
> >
> >
> >On Wed, Nov 26, 2014 at 1:29 PM, Jiangjie Qin <jqin@linkedin.com.invalid>
> >wrote:
> >
> >> Hi Sandeep,
> >>
> >> For old producer, I don’t think you can achieve strict even distribution
> >> of messages across partitions within the same topic. But You can
> >> potentially reduce the sticking time by setting
> >> topic.metadata.refresh.interval.ms to be lower, e.g. 1 second.
> >>
> >> Kafka-544 added the partition information to MessageAndMetadata. And
> >>that
> >> is back to 11/15/12 so it should have been included in 0.8.1.1. Do you
> >> mean the MessageAndMetadata you got does not partition member or
> >> MessageAndMetadata.partition give you nothing?
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On 11/26/14, 10:31 AM, "Palur Sandeep" <psandeep@hawk.iit.edu> wrote:
> >>
> >> >Hi Jiangjie,
> >> >
> >> >I am using the high level consumer (ZookeeperConsumerConnector), after
> >> >getting the message from stream, but I don't see this
> >>"message.Partition".
> >> >Please help me how to get the partition id form message.
> >> >
> >> >What is that I can to do get messages evenly distributed among
> >>partitions?
> >> >do you mean that  it is not possible in 0.8.1.1 version?
> >> >
> >> >On Wed, Nov 26, 2014 at 12:03 PM, Jiangjie Qin
> >><jqin@linkedin.com.invalid
> >> >
> >> >wrote:
> >> >
> >> >> Hi Sandeep,
> >> >>
> >> >> If you are sending messages to different topics, each topic will
> >>stick
> >> >>to
> >> >> a random partition for 10 min. Since they are likely sticking to
> >> >>different
> >> >> brokers, you will still see messages roughly evenly distributed.
> >> >> If you are using high level consumer (ZookeeperConsumerConnector),
> >>after
> >> >> getting the message from stream, you can simply call
> >>message.Partition
> >> >>to
> >> >> get the partition id.
> >> >>
> >> >> Jiangjie (Becket) Qin
> >> >>
> >> >> On 11/25/14, 5:30 PM, "Palur Sandeep" <psandeep@hawk.iit.edu>
wrote:
> >> >>
> >> >> >Hi Jiangjie,
> >> >> >
> >> >> >This is what I have understood. Please correct me if I am wrong
> >> >> >
> >> >> >I don¹t use the partition class at all(KeyedMessage<String,String>
> >> >>data =
> >> >> >new KeyedMessage<String, String>(topic_name,new_mes). It
partitions
> >> >> >messages randomly to different partitions. I don¹t see it sticking
> >>to
> >> >>any
> >> >> >broker for 10 mins. I guess it follows some random partitioning
> >>logic.
> >> >>I
> >> >> >am
> >> >> >using the following 0.8.1.1 version.
> >> >> >
> >> >> >MessageAndMetadata on consumer side prints the following message:
> >>Can
> >> >>you
> >> >> >help me find out metadat regarding partition number?
> >> >> >
> >> >> >*Thread 0: Message(magic = 0, attributes = 0, crc = 127991357,
key =
> >> >> >java.nio.HeapByteBuffer[pos=0 lim=1 cap=55], payload =
> >> >> >java.nio.HeapByteBuffer[pos=0 lim=50 cap=50])*
> >> >> >
> >> >> >Thanks
> >> >> >Sandeep
> >> >> >
> >> >> >On Tue, Nov 25, 2014 at 7:07 PM, Jiangjie Qin
> >> >><jqin@linkedin.com.invalid>
> >> >> >wrote:
> >> >> >
> >> >> >> Palur,
> >> >> >>
> >> >> >> Just adding to what Guozhang said, the answer to your question
> >>might
> >> >> >> depend on which producer you are using.
> >> >> >> Assuming you are producing messages without keys to the same
> >>topic,
> >> >>in
> >> >> >>new
> >> >> >> producer(KafkaProducer), the messages will go to brokers in
a
> >>round
> >> >> >>robin
> >> >> >> way, so the messages will end up in brokers evenly distributed.
> >> >>Whereas
> >> >> >>in
> >> >> >> old producer, it actually sticks to a particular broker for
10 min
> >> >>(by
> >> >> >> default) then switch to another random partition. In that
case, if
> >> >>you
> >> >> >> send messages fast enough, you might see uneven distribution
in
> >> >>brokers.
> >> >> >>
> >> >> >> For the consumer, if you are using high level consumer, when
> >>reading
> >> >> >>from
> >> >> >> KafkaStream, you will get MessageAndMetadata, the topic and
> >>partition
> >> >> >> information is included in it as well as the raw message.
> >> >> >>
> >> >> >> Jiangjie (Becket) Qin
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> On 11/25/14, 10:01 AM, "Guozhang Wang" <wangguoz@gmail.com>
> wrote:
> >> >> >>
> >> >> >> >Palur,
> >> >> >> >
> >> >> >> >If the 8 partitions are hosted on each one of the nodes,
assuming
> >> >> >> >replication factor 1 then each node will get roughly 100000
/ 8
> >> >> >>messages
> >> >> >> >due to the random partitioner. If you want to know exactly
how
> >>many
> >> >> >> >messages is on each broker then you can use a simple consumer
> >>which
> >> >> >>allows
> >> >> >> >you to specify the partition id you want to consume from.
> >> >> >> >
> >> >> >> >In the new consumer (0.9), each of the consumed message
will
> >>contain
> >> >> >>the
> >> >> >> >partition id as part of its message metadata.
> >> >> >> >
> >> >> >> >Guozhang
> >> >> >> >
> >> >> >> >On Tue, Nov 25, 2014 at 7:47 AM, Palur Sandeep
> >> >><psandeep@hawk.iit.edu>
> >> >> >> >wrote:
> >> >> >> >
> >> >> >> >> Dear Developers,
> >> >> >> >>
> >> >> >> >> I am using the default partitioning logic(Random
Partitioning)
> >>to
> >> >> >> >>produce
> >> >> >> >> messages into brokers. That is I don't use a partitioner.class.
> >> >> >> >>
> >> >> >> >> My requirement is If I produce 100000 messages using
the below
> >> >>code
> >> >> >>for
> >> >> >> >>a
> >> >> >> >> broker that has 8 partitions across 8 nodes. How
many messages
> >> >>will
> >> >> >>each
> >> >> >> >> partition have? Is there any API that can help me
find the
> >>broker
> >> >>id
> >> >> >>of
> >> >> >> >>the
> >> >> >> >> each message I consume from the consumer side?
> >> >> >> >>
> >> >> >> >> PS: I dont want to use partitioner.class. I want
use the
> >>kafka's
> >> >> >>default
> >> >> >> >> partitioning logic.
> >> >> >> >>
> >> >> >> >>   KeyedMessage<String,String> data = new KeyedMessage<String,
> >> >> >> >> String>(topic_name,new_mes);
> >> >> >> >>
> >> >> >> >> producer.send(data);
> >> >> >> >>
> >> >> >> >> --
> >> >> >> >> Regards,
> >> >> >> >> Sandeep Palur
> >> >> >> >> Data-Intensive Distributed Systems Laboratory, CS/IIT
> >> >> >> >> Department of Computer Science, Illinois Institute
of
> >>Technology
> >> >> >>(IIT)
> >> >> >> >> Phone : 312-647-9833
> >> >> >> >> Email : psandeep@hawk.iit.edu <srajasur@hawk.iit.edu>
> >> >> >> >>
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >--
> >> >> >> >-- Guozhang
> >> >> >>
> >> >> >>
> >> >> >
> >> >> >
> >> >> >--
> >> >> >Regards,
> >> >> >Sandeep Palur
> >> >> >Data-Intensive Distributed Systems Laboratory, CS/IIT
> >> >> >Department of Computer Science, Illinois Institute of Technology
> >>(IIT)
> >> >> >Phone : 312-647-9833
> >> >> >Email : psandeep@hawk.iit.edu <srajasur@hawk.iit.edu>
> >> >>
> >> >>
> >> >
> >> >
> >> >--
> >> >Regards,
> >> >Sandeep Palur
> >> >Data-Intensive Distributed Systems Laboratory, CS/IIT
> >> >Department of Computer Science, Illinois Institute of Technology (IIT)
> >> >Phone : 312-647-9833
> >> >Email : psandeep@hawk.iit.edu <srajasur@hawk.iit.edu>
> >>
> >>
> >
> >
> >--
> >Regards,
> >Sandeep Palur
> >Data-Intensive Distributed Systems Laboratory, CS/IIT
> >Department of Computer Science, Illinois Institute of Technology (IIT)
> >Phone : 312-647-9833
> >Email : psandeep@hawk.iit.edu <srajasur@hawk.iit.edu>
>
>


-- 
Regards,
Sandeep Palur
Data-Intensive Distributed Systems Laboratory, CS/IIT
Department of Computer Science, Illinois Institute of Technology (IIT)
Phone : 312-647-9833
Email : psandeep@hawk.iit.edu <srajasur@hawk.iit.edu>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message