kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shangan chen <chenshangan...@gmail.com>
Subject question about usage of SimpleConsumer
Date Fri, 31 May 2013 06:31:39 GMT
In Kafka, the consumers are responsible for maintaining state information
(offset) on what has been consumed (refer from kafka design
page).high-level consumer api will store its consumption state in
zookeeper, while simple consumer shoud deal with these things itself.
My doubt is  what happened when I call getOffsetsBefore(topic,
partition,OffsetRequest.LatestTime(), maxNumOffsets) ? Where did it fetch
offset as I didn't store the offset, it seems that kafka maintain the
offset, anybody can give some explanation.


public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
_collector = collector;
_consumer = new SimpleConsumer(host, port, soTimeout, buffersize);
long[] offsets = _consumer.getOffsetsBefore(topic, partition,
OffsetRequest.LatestTime(), maxNumOffsets);
offset = offsets[0];
new StringScheme();
}

@Override
public void nextTuple() {
FetchRequest fetch = new FetchRequest(topic, partition, offset, maxSize);
ByteBufferMessageSet msgSet = _consumer.fetch(fetch);
for (MessageAndOffset msgAndOffset : msgSet) {
String msg = getMessage(msgAndOffset.message());
// log spout process time
Debug.log(this.getClass().getSimpleName(), msg);
Debug.incr(topic + "_" + this.getClass().getSimpleName(), 1);
_collector
.emit(new Values(msg), new KafkaMessageId(msg, offset, 1));
offset = msgAndOffset.offset();
}
}


-- 
have a good day!
chenshang'an

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message