In Kafka, the consumers are responsible for maintaining state information
(offset) on what has been consumed (refer from kafka design
page).high-level consumer api will store its consumption state in
zookeeper, while simple consumer shoud deal with these things itself.
My doubt is what happened when I call getOffsetsBefore(topic,
partition,OffsetRequest.LatestTime(), maxNumOffsets) ? Where did it fetch
offset as I didn't store the offset, it seems that kafka maintain the
offset, anybody can give some explanation.
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
_collector = collector;
_consumer = new SimpleConsumer(host, port, soTimeout, buffersize);
long[] offsets = _consumer.getOffsetsBefore(topic, partition,
OffsetRequest.LatestTime(), maxNumOffsets);
offset = offsets[0];
new StringScheme();
}
@Override
public void nextTuple() {
FetchRequest fetch = new FetchRequest(topic, partition, offset, maxSize);
ByteBufferMessageSet msgSet = _consumer.fetch(fetch);
for (MessageAndOffset msgAndOffset : msgSet) {
String msg = getMessage(msgAndOffset.message());
// log spout process time
Debug.log(this.getClass().getSimpleName(), msg);
Debug.incr(topic + "_" + this.getClass().getSimpleName(), 1);
_collector
.emit(new Values(msg), new KafkaMessageId(msg, offset, 1));
offset = msgAndOffset.offset();
}
}
--
have a good day!
chenshang'an
|