kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Palur Sandeep <psand...@hawk.iit.edu>
Subject Re: How many messages does each broker have?
Date Wed, 03 Dec 2014 22:08:40 GMT
Thanks Guozhang and others for helping me. I am able to send and receive
10MB size messages now. In case anybody has the same requirement. Please
make the following changes

*server.properties*
message.max.bytes=10485800
replica.fetch.max.bytes=10485800
socket.send.buffer.bytes=104857600
socket.receive.buffer.bytes=104857600
socket.request.max.bytes=104857600

*consumer.properties*
props.put("fetch.message.max.bytes","10485800");

*Topic*
bin/kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic big_topic -config
max.message.bytes=10485800


On Tue, Dec 2, 2014 at 7:15 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Palur,
>
> First you need to make sure the message is received at Kafka:
>
> message.max.bytes
>
> controls the maximum size of a message that can be accepted, and
>
> fetch.message.max.bytes
>
> controls the maximum number of bytes a consumer issues in one fetch.
>
>
> Guozhang
>
>
> On Mon, Dec 1, 2014 at 7:25 PM, Palur Sandeep <psandeep@hawk.iit.edu>
> wrote:
>
> > Thank you so much Jiangle. I got it working.
> >
> > I have another problem the consumer doesnt receive message if it is big:
> > When the producer sends 256kb messages to broker, consumer is able to
> > retrieve it, but when producer sends 10MB messages to the broker, the
> > consumer doesn’t receive any message.
> >
> > Please tell me how to make the consumer receive 10MB messages.
> >
> > On Mon, Dec 1, 2014 at 10:24 AM, Jiangjie Qin <jqin@linkedin.com.invalid
> >
> > wrote:
> >
> > > I think you are printing the class Message instead of
> MessageAndMetadata.
> > > The output you got was from Message.toString.
> > >
> > > Can you just try something like below?
> > >
> > > ...
> > > ConsumeIterator iter = consumerStream.iterator(); // assuming you have
> > got
> > > a consumer stream.
> > > MessageAndMetadata messageAndMetadta = iter.next();
> > > System.out.println(“topic: “ + messageAndMeatadata.topic() +
> ”partition:
> > “
> > > + messageAndMetadata.partition());
> > >
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On 11/26/14, 12:56 PM, "Palur Sandeep" <psandeep@hawk.iit.edu> wrote:
> > >
> > > >Hi Jiangle,
> > > >
> > > >
> > > >Thanks for the information. This is what I get when I print
> > > >MessageandMetadata
> > > >
> > > >*Thread 0: Message(magic = 0, attributes = 0, crc = 127991357, key =
> > > >java.nio.HeapByteBuffer[pos=0 lim=1 cap=55], payload =
> > > >java.nio.HeapByteBuffer[pos=0 lim=50 cap=50])*
> > > >
> > > >Can you please tell me where can I find partition number in this?
> > > >
> > > >
> > > >
> > > >On Wed, Nov 26, 2014 at 1:29 PM, Jiangjie Qin
> <jqin@linkedin.com.invalid
> > >
> > > >wrote:
> > > >
> > > >> Hi Sandeep,
> > > >>
> > > >> For old producer, I don’t think you can achieve strict even
> > distribution
> > > >> of messages across partitions within the same topic. But You can
> > > >> potentially reduce the sticking time by setting
> > > >> topic.metadata.refresh.interval.ms to be lower, e.g. 1 second.
> > > >>
> > > >> Kafka-544 added the partition information to MessageAndMetadata. And
> > > >>that
> > > >> is back to 11/15/12 so it should have been included in 0.8.1.1. Do
> you
> > > >> mean the MessageAndMetadata you got does not partition member or
> > > >> MessageAndMetadata.partition give you nothing?
> > > >>
> > > >> Jiangjie (Becket) Qin
> > > >>
> > > >> On 11/26/14, 10:31 AM, "Palur Sandeep" <psandeep@hawk.iit.edu>
> wrote:
> > > >>
> > > >> >Hi Jiangjie,
> > > >> >
> > > >> >I am using the high level consumer (ZookeeperConsumerConnector),
> > after
> > > >> >getting the message from stream, but I don't see this
> > > >>"message.Partition".
> > > >> >Please help me how to get the partition id form message.
> > > >> >
> > > >> >What is that I can to do get messages evenly distributed among
> > > >>partitions?
> > > >> >do you mean that  it is not possible in 0.8.1.1 version?
> > > >> >
> > > >> >On Wed, Nov 26, 2014 at 12:03 PM, Jiangjie Qin
> > > >><jqin@linkedin.com.invalid
> > > >> >
> > > >> >wrote:
> > > >> >
> > > >> >> Hi Sandeep,
> > > >> >>
> > > >> >> If you are sending messages to different topics, each topic
will
> > > >>stick
> > > >> >>to
> > > >> >> a random partition for 10 min. Since they are likely sticking
to
> > > >> >>different
> > > >> >> brokers, you will still see messages roughly evenly distributed.
> > > >> >> If you are using high level consumer
> (ZookeeperConsumerConnector),
> > > >>after
> > > >> >> getting the message from stream, you can simply call
> > > >>message.Partition
> > > >> >>to
> > > >> >> get the partition id.
> > > >> >>
> > > >> >> Jiangjie (Becket) Qin
> > > >> >>
> > > >> >> On 11/25/14, 5:30 PM, "Palur Sandeep" <psandeep@hawk.iit.edu>
> > wrote:
> > > >> >>
> > > >> >> >Hi Jiangjie,
> > > >> >> >
> > > >> >> >This is what I have understood. Please correct me if
I am wrong
> > > >> >> >
> > > >> >> >I don¹t use the partition class at
> all(KeyedMessage<String,String>
> > > >> >>data =
> > > >> >> >new KeyedMessage<String, String>(topic_name,new_mes).
It
> > partitions
> > > >> >> >messages randomly to different partitions. I don¹t see
it
> sticking
> > > >>to
> > > >> >>any
> > > >> >> >broker for 10 mins. I guess it follows some random partitioning
> > > >>logic.
> > > >> >>I
> > > >> >> >am
> > > >> >> >using the following 0.8.1.1 version.
> > > >> >> >
> > > >> >> >MessageAndMetadata on consumer side prints the following
> message:
> > > >>Can
> > > >> >>you
> > > >> >> >help me find out metadat regarding partition number?
> > > >> >> >
> > > >> >> >*Thread 0: Message(magic = 0, attributes = 0, crc = 127991357,
> > key =
> > > >> >> >java.nio.HeapByteBuffer[pos=0 lim=1 cap=55], payload
=
> > > >> >> >java.nio.HeapByteBuffer[pos=0 lim=50 cap=50])*
> > > >> >> >
> > > >> >> >Thanks
> > > >> >> >Sandeep
> > > >> >> >
> > > >> >> >On Tue, Nov 25, 2014 at 7:07 PM, Jiangjie Qin
> > > >> >><jqin@linkedin.com.invalid>
> > > >> >> >wrote:
> > > >> >> >
> > > >> >> >> Palur,
> > > >> >> >>
> > > >> >> >> Just adding to what Guozhang said, the answer to
your question
> > > >>might
> > > >> >> >> depend on which producer you are using.
> > > >> >> >> Assuming you are producing messages without keys
to the same
> > > >>topic,
> > > >> >>in
> > > >> >> >>new
> > > >> >> >> producer(KafkaProducer), the messages will go to
brokers in a
> > > >>round
> > > >> >> >>robin
> > > >> >> >> way, so the messages will end up in brokers evenly
> distributed.
> > > >> >>Whereas
> > > >> >> >>in
> > > >> >> >> old producer, it actually sticks to a particular
broker for 10
> > min
> > > >> >>(by
> > > >> >> >> default) then switch to another random partition.
In that
> case,
> > if
> > > >> >>you
> > > >> >> >> send messages fast enough, you might see uneven
distribution
> in
> > > >> >>brokers.
> > > >> >> >>
> > > >> >> >> For the consumer, if you are using high level consumer,
when
> > > >>reading
> > > >> >> >>from
> > > >> >> >> KafkaStream, you will get MessageAndMetadata, the
topic and
> > > >>partition
> > > >> >> >> information is included in it as well as the raw
message.
> > > >> >> >>
> > > >> >> >> Jiangjie (Becket) Qin
> > > >> >> >>
> > > >> >> >>
> > > >> >> >>
> > > >> >> >> On 11/25/14, 10:01 AM, "Guozhang Wang" <wangguoz@gmail.com>
> > > wrote:
> > > >> >> >>
> > > >> >> >> >Palur,
> > > >> >> >> >
> > > >> >> >> >If the 8 partitions are hosted on each one of
the nodes,
> > assuming
> > > >> >> >> >replication factor 1 then each node will get
roughly 100000
> / 8
> > > >> >> >>messages
> > > >> >> >> >due to the random partitioner. If you want to
know exactly
> how
> > > >>many
> > > >> >> >> >messages is on each broker then you can use
a simple consumer
> > > >>which
> > > >> >> >>allows
> > > >> >> >> >you to specify the partition id you want to
consume from.
> > > >> >> >> >
> > > >> >> >> >In the new consumer (0.9), each of the consumed
message will
> > > >>contain
> > > >> >> >>the
> > > >> >> >> >partition id as part of its message metadata.
> > > >> >> >> >
> > > >> >> >> >Guozhang
> > > >> >> >> >
> > > >> >> >> >On Tue, Nov 25, 2014 at 7:47 AM, Palur Sandeep
> > > >> >><psandeep@hawk.iit.edu>
> > > >> >> >> >wrote:
> > > >> >> >> >
> > > >> >> >> >> Dear Developers,
> > > >> >> >> >>
> > > >> >> >> >> I am using the default partitioning logic(Random
> > Partitioning)
> > > >>to
> > > >> >> >> >>produce
> > > >> >> >> >> messages into brokers. That is I don't
use a
> > partitioner.class.
> > > >> >> >> >>
> > > >> >> >> >> My requirement is If I produce 100000 messages
using the
> > below
> > > >> >>code
> > > >> >> >>for
> > > >> >> >> >>a
> > > >> >> >> >> broker that has 8 partitions across 8 nodes.
How many
> > messages
> > > >> >>will
> > > >> >> >>each
> > > >> >> >> >> partition have? Is there any API that can
help me find the
> > > >>broker
> > > >> >>id
> > > >> >> >>of
> > > >> >> >> >>the
> > > >> >> >> >> each message I consume from the consumer
side?
> > > >> >> >> >>
> > > >> >> >> >> PS: I dont want to use partitioner.class.
I want use the
> > > >>kafka's
> > > >> >> >>default
> > > >> >> >> >> partitioning logic.
> > > >> >> >> >>
> > > >> >> >> >>   KeyedMessage<String,String> data
= new
> KeyedMessage<String,
> > > >> >> >> >> String>(topic_name,new_mes);
> > > >> >> >> >>
> > > >> >> >> >> producer.send(data);
> > > >> >> >> >>
> > > >> >> >> >> --
> > > >> >> >> >> Regards,
> > > >> >> >> >> Sandeep Palur
> > > >> >> >> >> Data-Intensive Distributed Systems Laboratory,
CS/IIT
> > > >> >> >> >> Department of Computer Science, Illinois
Institute of
> > > >>Technology
> > > >> >> >>(IIT)
> > > >> >> >> >> Phone : 312-647-9833
> > > >> >> >> >> Email : psandeep@hawk.iit.edu <srajasur@hawk.iit.edu>
> > > >> >> >> >>
> > > >> >> >> >
> > > >> >> >> >
> > > >> >> >> >
> > > >> >> >> >--
> > > >> >> >> >-- Guozhang
> > > >> >> >>
> > > >> >> >>
> > > >> >> >
> > > >> >> >
> > > >> >> >--
> > > >> >> >Regards,
> > > >> >> >Sandeep Palur
> > > >> >> >Data-Intensive Distributed Systems Laboratory, CS/IIT
> > > >> >> >Department of Computer Science, Illinois Institute of
Technology
> > > >>(IIT)
> > > >> >> >Phone : 312-647-9833
> > > >> >> >Email : psandeep@hawk.iit.edu <srajasur@hawk.iit.edu>
> > > >> >>
> > > >> >>
> > > >> >
> > > >> >
> > > >> >--
> > > >> >Regards,
> > > >> >Sandeep Palur
> > > >> >Data-Intensive Distributed Systems Laboratory, CS/IIT
> > > >> >Department of Computer Science, Illinois Institute of Technology
> > (IIT)
> > > >> >Phone : 312-647-9833
> > > >> >Email : psandeep@hawk.iit.edu <srajasur@hawk.iit.edu>
> > > >>
> > > >>
> > > >
> > > >
> > > >--
> > > >Regards,
> > > >Sandeep Palur
> > > >Data-Intensive Distributed Systems Laboratory, CS/IIT
> > > >Department of Computer Science, Illinois Institute of Technology (IIT)
> > > >Phone : 312-647-9833
> > > >Email : psandeep@hawk.iit.edu <srajasur@hawk.iit.edu>
> > >
> > >
> >
> >
> > --
> > Regards,
> > Sandeep Palur
> > Data-Intensive Distributed Systems Laboratory, CS/IIT
> > Department of Computer Science, Illinois Institute of Technology (IIT)
> > Phone : 312-647-9833
> > Email : psandeep@hawk.iit.edu <srajasur@hawk.iit.edu>
> >
>
>
>
> --
> -- Guozhang
>



-- 
Regards,
Sandeep Palur
Data-Intensive Distributed Systems Laboratory, CS/IIT
Department of Computer Science, Illinois Institute of Technology (IIT)
Phone : 312-647-9833
Email : psandeep@hawk.iit.edu <srajasur@hawk.iit.edu>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message