kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Neha Narkhede <neha.narkh...@gmail.com>
Subject Re: New Consumer API discussion
Date Tue, 25 Mar 2014 00:29:56 GMT
Hey Chris,

Really sorry for the late reply, wonder how this fell through the cracks.
Anyhow, thanks for the great feedback! Here are my comments -

1. Why is the config String->Object instead of String->String?

This is probably more of a feedback about the new config management that
we adopted in the new clients. I think it is more convenient to write
configs.put("a", 42);
instead of
configs.put("a", Integer.toString(42));

2. Are these Java docs correct?

  KafkaConsumer(java.util.Map<
java.lang.String,java.lang.Object> configs)
  A consumer is instantiated by providing a set of key-value pairs as
configuration and a ConsumerRebalanceCallback implementation

There is no ConsumerRebalanceCallback parameter.

Fixed.

3. Would like to have a method:

  poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
TopicPartition... topicAndPartitionsToPoll)

I see I can effectively do this by just fiddling with subscribe and
unsubscribe before each poll. Is this a low-overhead operation? Can I just
unsubscribe from everything after each poll, then re-subscribe to a topic
the next iteration. I would probably be doing this in a fairly tight loop.

The subscribe and unsubscribe will be very lightweight in-memory operations,
so it shouldn't be a problem to just use those APIs directly.
Let me know if you think otherwise.

4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
are use cases for decoupling "what to do when no offset exists" from "what
to do when I'm out of range". I might want to start from smallest the
first time I run, but fail if I ever get offset out of range.

How about adding a third option "disable" to "auto.offset.reset"?
What this says is that never automatically reset the offset, either if one
is not found or if the offset
falls out of range. Presumably, you would want to turn this off when you
want to control the offsets
yourself and use custom rewind/replay logic to reset the consumer's offset.
In this case, you would
want to turn this feature off so Kafka does not accidentally reset the
offset to something else.

I'm not so sure when you would want to make the distinction regarding
startup and offset falling out
of range. Presumably, if you don't trust Kafka to reset the offset, then
you can always turn this off
and use commit/commitAsync and seek() to set the consumer to the right
offset on startup and every
time your consumer falls out of range.

Does that make sense?

5. ENABLE_JMX could use Java docs, even though it's fairly
self-explanatory.

Fixed.

6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
across all topic/partitions is useful. I believe it's per-topic/partition,
right? That is, setting to 2 megs with two TopicAndPartitions would result
in 4 megs worth of data coming in per fetch, right?

Good point, clarified that. Take a look again to see if it makes sense now.

7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
Retry, or throw exception?

Throw a TimeoutException. Clarified that in the
docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html>
.

8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
fetch requests?

Applies to all requests. Clarified that in the docs.

9. What does SESSION_TIMEOUT_MS default to?

Defaults are largely TODO, but session.timeout.ms currently defaults to
1000.

10. Is this consumer thread-safe?

It should be. Updated the
docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>to
clarify that.

11. How do you use a different offset management strategy? Your email
implies that it's pluggable, but I don't see how. "The offset management
strategy defaults to Kafka based offset management and the API provides a
way for the user to use a customized offset store to manage the consumer's
offsets."

12. If I wish to decouple the consumer from the offset checkpointing, is
it OK to use Joel's offset management stuff directly, rather than through
the consumer's commit API?

For #11 and #12, I updated the
docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>to
include actual usage examples.
Could you take a look and see if answers your questions?

Thanks,
Neha



On Mon, Mar 3, 2014 at 10:28 AM, Chris Riccomini <criccomini@linkedin.com>wrote:

> Hey Guys,
>
> Also, for reference, we'll be looking to implement new Samza consumers
> which have these APIs:
>
> http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
> g/apache/samza/system/SystemConsumer.html
>
> http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
> g/apache/samza/checkpoint/CheckpointManager.html
>
>
> Question (3) below is a result of having Samza's SystemConsumers poll
> allow specific topic/partitions to be specified.
>
> The split between consumer and checkpoint manager is the reason for
> question (12) below.
>
> Cheers,
> Chris
>
> On 3/3/14 10:19 AM, "Chris Riccomini" <criccomini@linkedin.com> wrote:
>
> >Hey Guys,
> >
> >Sorry for the late follow up. Here are my questions/thoughts on the API:
> >
> >1. Why is the config String->Object instead of String->String?
> >
> >2. Are these Java docs correct?
> >
> >  KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
> >  A consumer is instantiated by providing a set of key-value pairs as
> >configuration and a ConsumerRebalanceCallback implementation
> >
> >There is no ConsumerRebalanceCallback parameter.
> >
> >3. Would like to have a method:
> >
> >  poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
> >TopicPartition... topicAndPartitionsToPoll)
> >
> >I see I can effectively do this by just fiddling with subscribe and
> >unsubscribe before each poll. Is this a low-overhead operation? Can I just
> >unsubscribe from everything after each poll, then re-subscribe to a topic
> >the next iteration. I would probably be doing this in a fairly tight loop.
> >
> >4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
> >are use cases for decoupling "what to do when no offset exists" from "what
> >to do when I'm out of range". I might want to start from smallest the
> >first time I run, but fail if I ever get offset out of range.
> >
> >5. ENABLE_JMX could use Java docs, even though it's fairly
> >self-explanatory.
> >
> >6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
> >across all topic/partitions is useful. I believe it's per-topic/partition,
> >right? That is, setting to 2 megs with two TopicAndPartitions would result
> >in 4 megs worth of data coming in per fetch, right?
> >
> >7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
> >Retry, or throw exception?
> >
> >8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
> >fetch requests?
> >
> >9. What does SESSION_TIMEOUT_MS default to?
> >
> >10. Is this consumer thread-safe?
> >
> >11. How do you use a different offset management strategy? Your email
> >implies that it's pluggable, but I don't see how. "The offset management
> >strategy defaults to Kafka based offset management and the API provides a
> >way for the user to use a customized offset store to manage the consumer's
> >offsets."
> >
> >12. If I wish to decouple the consumer from the offset checkpointing, is
> >it OK to use Joel's offset management stuff directly, rather than through
> >the consumer's commit API?
> >
> >
> >Cheers,
> >Chris
> >
> >On 2/10/14 10:54 AM, "Neha Narkhede" <neha.narkhede@gmail.com> wrote:
> >
> >>As mentioned in previous emails, we are also working on a
> >>re-implementation
> >>of the consumer. I would like to use this email thread to discuss the
> >>details of the public API. I would also like us to be picky about this
> >>public api now so it is as good as possible and we don't need to break it
> >>in the future.
> >>
> >>The best way to get a feel for the API is actually to take a look at the
> >>javadoc<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc
> >>/
> >>doc/kafka/clients/consumer/KafkaConsumer.html>,
> >>the hope is to get the api docs good enough so that it is
> >>self-explanatory.
> >>You can also take a look at the configs
> >>here<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/do
> >>c
> >>/kafka/clients/consumer/ConsumerConfig.html>
> >>
> >>Some background info on implementation:
> >>
> >>At a high level the primary difference in this consumer is that it
> >>removes
> >>the distinction between the "high-level" and "low-level" consumer. The
> >>new
> >>consumer API is non blocking and instead of returning a blocking
> >>iterator,
> >>the consumer provides a poll() API that returns a list of records. We
> >>think
> >>this is better compared to the blocking iterators since it effectively
> >>decouples the threading strategy used for processing messages from the
> >>consumer. It is worth noting that the consumer is entirely single
> >>threaded
> >>and runs in the user thread. The advantage is that it can be easily
> >>rewritten in less multi-threading-friendly languages. The consumer
> >>batches
> >>data and multiplexes I/O over TCP connections to each of the brokers it
> >>communicates with, for high throughput. The consumer also allows long
> >>poll
> >>to reduce the end-to-end message latency for low throughput data.
> >>
> >>The consumer provides a group management facility that supports the
> >>concept
> >>of a group with multiple consumer instances (just like the current
> >>consumer). This is done through a custom heartbeat and group management
> >>protocol transparent to the user. At the same time, it allows users the
> >>option to subscribe to a fixed set of partitions and not use group
> >>management at all. The offset management strategy defaults to Kafka based
> >>offset management and the API provides a way for the user to use a
> >>customized offset store to manage the consumer's offsets.
> >>
> >>A key difference in this consumer also is the fact that it does not
> >>depend
> >>on zookeeper at all.
> >>
> >>More details about the new consumer design are
> >>here<
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer
> >>+
> >>Rewrite+Design>
> >>
> >>Please take a look at the new
> >>API<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc
> >>/
> >>kafka/clients/consumer/KafkaConsumer.html>and
> >>give us any thoughts you may have.
> >>
> >>Thanks,
> >>Neha
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message