kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shangan chen <chenshangan...@gmail.com>
Subject Re: question about usage of SimpleConsumer
Date Sat, 01 Jun 2013 02:44:02 GMT
You mean kafka broker will maintain earliest/latest offset for each
partition of each topic ? Do these offsets information have anything to do
with consumers or just broker's own information? I mean, simple consumer
won't get the offset where it stopped, but the offset of brokers latest
offset.

Another thing, If I have multiple simple consumers (in storm there are
multiple spouts)consume messages from specific partition of specific topic
of a single broker, will they share the same stream or consume the messages
independently. In testing like last example, I found each consumer
established a stream  other than all consumers share the same stream. It
seems to verify that latest offset is brokers's queue info other than
consumer's state.




On Fri, May 31, 2013 at 11:49 PM, Neha Narkhede <neha.narkhede@gmail.com>wrote:

> getOffsetsBefore sends an RPC call to the Kafka brokers to find out the
> earliest/latest offset for that topic, partition. In your example, it will
> get you the latest offset at the time of the request.
>
> Thanks,
> Neha
>
>
> On Thu, May 30, 2013 at 11:31 PM, shangan chen <chenshangan521@gmail.com
> >wrote:
>
> > In Kafka, the consumers are responsible for maintaining state information
> > (offset) on what has been consumed (refer from kafka design
> > page).high-level consumer api will store its consumption state in
> > zookeeper, while simple consumer shoud deal with these things itself.
> > My doubt is  what happened when I call getOffsetsBefore(topic,
> > partition,OffsetRequest.LatestTime(), maxNumOffsets) ? Where did it fetch
> > offset as I didn't store the offset, it seems that kafka maintain the
> > offset, anybody can give some explanation.
> >
> >
> > public void open(Map conf, TopologyContext context,
> > SpoutOutputCollector collector) {
> > _collector = collector;
> > _consumer = new SimpleConsumer(host, port, soTimeout, buffersize);
> > long[] offsets = _consumer.getOffsetsBefore(topic, partition,
> > OffsetRequest.LatestTime(), maxNumOffsets);
> > offset = offsets[0];
> > new StringScheme();
> > }
> >
> > @Override
> > public void nextTuple() {
> > FetchRequest fetch = new FetchRequest(topic, partition, offset, maxSize);
> > ByteBufferMessageSet msgSet = _consumer.fetch(fetch);
> > for (MessageAndOffset msgAndOffset : msgSet) {
> > String msg = getMessage(msgAndOffset.message());
> > // log spout process time
> > Debug.log(this.getClass().getSimpleName(), msg);
> > Debug.incr(topic + "_" + this.getClass().getSimpleName(), 1);
> > _collector
> > .emit(new Values(msg), new KafkaMessageId(msg, offset, 1));
> > offset = msgAndOffset.offset();
> > }
> > }
> >
> >
> > --
> > have a good day!
> > chenshang'an
> >
>



-- 
have a good day!
chenshang'an

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message