kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Manually controlling the start offset in the high level API
Date Wed, 22 Apr 2015 08:09:37 GMT
Hi,

I'm a committer at the Apache Flink project.
I'm working on adding support for exactly-once semantics for Flink's stream
processing component.
Therefore, we want to keep track of the read offset from the KafkaSource
and restart the consumption from the last known offset (tracked within
Flink).

Over the past few weeks, we tried implementing our own low level Kafka
consumer which allows setting our own offsets on recovery. It seems that
our own consumer is somewhat working, but our users need basically the same
features as the high level consumer.
For example they want to set various custom configuration values (like the
max fetch size) and they would like to use the ./kafka-run-class.sh
kafka.tools.ConsumerOffsetChecker tool for getting the "lag" of the
consumer.

I'm not sure if its the best approach for us to re-implement a well
engineered consumer because we can not pass one offset in the beginning.

If found the link to the upcoming Consumer API (in 0.9) that allows us to
manually manage the offset (
http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
).
Will the new Consumer API be compatible with older Kafka installations, or
will we need to implement our own Consumer for pre-0.9 users in any case?
I know its always hard to predict release dates, but can you give me a
rough estimate when you'll release 0.9 ?

Is there a way to "hack" something for making our users happy in the
meantime?
For the "hack" I had two ideas:
a) copy/extend your consumer code to pass the offset
b) set the offset manually in Zookeeper before starting the consumer.

I also have a quick conceptual question: The committing of offsets (either
manual through commitOffsets() or the autocommit feature) is only about
maintaining the offset in the broker or in Zookeeper for that consumer.
Kafka is using this offset to recover in case of a consumer restart.
The offset committing is independent of the log compression/message
retention.
Are these assumptions correct?


Best regards,
Robert

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message