kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Neha Narkhede <neha.narkh...@gmail.com>
Subject Re: New Consumer API discussion
Date Sat, 01 Mar 2014 00:40:36 GMT
1. "The new
consumer API is non blocking and instead of returning a blocking iterator,
the consumer provides a poll() API that returns a list of records. "

So this means the consumer polls, and if there are new messages it pulls
them down and then disconnects?

Not really. The point I was trying to make is that the consumer now just
returns
a list of records instead of an iterator. If there are no more messages
available,
it returns an empty list of records. Under the covers, it keeps a
connection open
to every broker.

2.
" The consumer also allows long poll
to reduce the end-to-end message latency for low throughput data."

How is this different than blocking?  Is it even based meaning it keeps a
long poll conneciton open, and if/when a new message arrives it triggers an
event on the consumer side?

It means that you can invoke poll with a timeout. If a message is available
before
the timeout is hit, it returns earlier.


3.
" The consumer batches
data and multiplexes I/O over TCP connections to each of the brokers it
communicates with, for high throughput. "

If it is single threaded, does each tcp brocker connection block?  Not sure
I understand how this works if it is single threaded.

Take a look at this tutorial that explains non blocking socket I/O -
*http://rox-xmlrpc.sourceforge.net/niotut/
<http://rox-xmlrpc.sourceforge.net/niotut/>*

Thanks,
Neha


On Fri, Feb 28, 2014 at 12:44 PM, S Ahmed <sahmed1020@gmail.com> wrote:

> Few clarifications:
>
> 1. "The new
> consumer API is non blocking and instead of returning a blocking iterator,
> the consumer provides a poll() API that returns a list of records. "
>
> So this means the consumer polls, and if there are new messages it pulls
> them down and then disconnects?
>
> 2.
> " The consumer also allows long poll
> to reduce the end-to-end message latency for low throughput data."
>
> How is this different than blocking?  Is it even based meaning it keeps a
> long poll conneciton open, and if/when a new message arrives it triggers an
> event on the consumer side?
>
>
> 3.
> " The consumer batches
> data and multiplexes I/O over TCP connections to each of the brokers it
> communicates with, for high throughput. "
>
> If it is single threaded, does each tcp brocker connection block?  Not sure
> I understand how this works if it is single threaded.
>
>
>
> On Thu, Feb 27, 2014 at 11:38 PM, Robert Withers <
> robert.w.withers@gmail.com
> > wrote:
>
> > Thank you, Neha, that makes it clear.  Really, the aspect of all this
> that
> > we could really use is a way to do exactly once processing.  We are
> looking
> > at more critical data.  What are the latest thoughts on how to achieve
> > exactly once and how might that affect a consumer API?
> >
> > Thanks,
> > Rob
> >
> > On Feb 27, 2014, at 10:29 AM, Neha Narkhede <neha.narkhede@gmail.com>
> > wrote:
> >
> > > Is this<
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#seek%28kafka.common.TopicPartitionOffset...%29
> > >what
> > > you are looking for? Basically, I think from the overall feedback, it
> > > looks like code snippets don't seem to work for overall understanding
> of
> > > the APIs. I plan to update the javadoc with more complete examples that
> > > have been discussed so far on this thread and generally on the mailing
> > list.
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > >
> > >
> > > On Thu, Feb 27, 2014 at 4:17 AM, Robert Withers
> > > <robert.w.withers@gmail.com>wrote:
> > >
> > >> Neha,
> > >>
> > >> I see how one might wish to implement onPartitionsAssigned and
> > >> onPartitionsRevoked, but I don't have a sense for how I might supply
> > these
> > >> implementations to a running consumer.  What would the setup code look
> > like
> > >> to start a high-level consumer with these provided implementations?
> > >>
> > >> thanks,
> > >> Rob
> > >>
> > >>
> > >> On Feb 27, 2014, at 3:48 AM, Neha Narkhede <neha.narkhede@gmail.com>
> > >> wrote:
> > >>
> > >>> Rob,
> > >>>
> > >>> The use of the callbacks is explained in the javadoc here -
> > >>>
> > >>
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
> > >>>
> > >>> Let me know if it makes sense. The hope is to improve the javadoc so
> > that
> > >>> it is self explanatory.
> > >>>
> > >>> Thanks,
> > >>> Neha
> > >>>
> > >>>
> > >>> On Wed, Feb 26, 2014 at 9:16 AM, Robert Withers
> > >>> <robert.w.withers@gmail.com>wrote:
> > >>>
> > >>>> Neha, what does the use of the RebalanceBeginCallback and
> > >>>> RebalanceEndCallback look like?
> > >>>>
> > >>>> thanks,
> > >>>> Rob
> > >>>>
> > >>>> On Feb 25, 2014, at 3:51 PM, Neha Narkhede <neha.narkhede@gmail.com
> >
> > >>>> wrote:
> > >>>>
> > >>>>> How do you know n? The whole point is that you need to be able to
> > fetch
> > >>>> the
> > >>>>> end offset. You can't a priori decide you will load 1m messages
> > without
> > >>>>> knowing what is there.
> > >>>>>
> > >>>>> Hmm. I think what you are pointing out is that in the new consumer
> > API,
> > >>>> we
> > >>>>> don't have a way to issue the equivalent of the existing
> > >>>> getOffsetsBefore()
> > >>>>> API. Agree that is a flaw that we should fix.
> > >>>>>
> > >>>>> Will update the docs/wiki with a few use cases that I've collected
> so
> > >> far
> > >>>>> and see if the API covers those.
> > >>>>>
> > >>>>> I would prefer PartitionsAssigned and PartitionsRevoked as that
> seems
> > >>>>> clearer to me
> > >>>>>
> > >>>>> Well the RebalanceBeginCallback interface will have
> > >>>> onPartitionsAssigned()
> > >>>>> as the callback. Similarly, the RebalanceEndCallback interface will
> > >> have
> > >>>>> onPartitionsRevoked() as the callback. Makes sense?
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Neha
> > >>>>>
> > >>>>>
> > >>>>> On Tue, Feb 25, 2014 at 2:38 PM, Jay Kreps <jay.kreps@gmail.com>
> > >> wrote:
> > >>>>>
> > >>>>>> 1. I would prefer PartitionsAssigned and PartitionsRevoked as that
> > >> seems
> > >>>>>> clearer to me.
> > >>>>>>
> > >>>>>> -Jay
> > >>>>>>
> > >>>>>>
> > >>>>>> On Tue, Feb 25, 2014 at 10:19 AM, Neha Narkhede <
> > >>>> neha.narkhede@gmail.com
> > >>>>>>> wrote:
> > >>>>>>
> > >>>>>>> Thanks for the reviews so far! There are a few outstanding
> > questions
> > >> -
> > >>>>>>>
> > >>>>>>> 1.  It will be good to make the rebalance callbacks forward
> > >> compatible
> > >>>>>> with
> > >>>>>>> Java 8 capabilities. We can change it to
> PartitionsAssignedCallback
> > >>>>>>> and PartitionsRevokedCallback or RebalanceBeginCallback and
> > >>>>>>> RebalanceEndCallback?
> > >>>>>>>
> > >>>>>>> If there are no objections, I will change it to
> > >> RebalanceBeginCallback
> > >>>>>> and
> > >>>>>>> RebalanceEndCallback.
> > >>>>>>>
> > >>>>>>> 2.  The return type for committed() is
> List<TopicPartitionOffset>.
> > >>>> There
> > >>>>>>> was a suggestion to change it to either be
> Map<TopicPartition,Long>
> > >> or
> > >>>>>>> Map<TopicPartition, TopicPartitionOffset>
> > >>>>>>>
> > >>>>>>> Do people have feedback on this suggestion?
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Tue, Feb 25, 2014 at 9:56 AM, Neha Narkhede <
> > >>>> neha.narkhede@gmail.com
> > >>>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Robert,
> > >>>>>>>>
> > >>>>>>>> Are you saying it is possible to get events from the high-level
> > >>>>>>> consumerregarding various state machine changes?  For instance,
> can
> > >> we
> > >>>>>> get a
> > >>>>>>>> notification when a rebalance starts and ends, when a partition
> is
> > >>>>>>>> assigned/unassigned, when an offset is committed on a partition,
> > >> when
> > >>>> a
> > >>>>>>>> leader changes and so on?  I call this OOB traffic, since they
> are
> > >> not
> > >>>>>>> the
> > >>>>>>>> core messages streaming, but side-band events, yet they are
> still
> > >>>>>>>> potentially useful to consumers.
> > >>>>>>>>
> > >>>>>>>> In the current proposal, you get notified when the state machine
> > >>>>>> changes
> > >>>>>>>> i.e. before and after a rebalance is triggered. Look at
> > >>>>>>>> ConsumerRebalanceCallback<
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
> > >>>>>>>>
> > >>>>>>>> .Leader changes do not count as state machine changes for
> consumer
> > >>>>>>>> rebalance purposes.
> > >>>>>>>>
> > >>>>>>>> Thanks,
> > >>>>>>>> Neha
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede <
> > >>>>>> neha.narkhede@gmail.com
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Jay/Robert -
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> I think what Robert is saying is that we need to think through
> > the
> > >>>>>>> offset
> > >>>>>>>>> API to enable "batch processing" of topic data. Think of a
> > process
> > >>>>>> that
> > >>>>>>>>> periodically kicks off to compute a data summary or do a data
> > load
> > >> or
> > >>>>>>>>> something like that. I think what we need to support this is an
> > api
> > >>>> to
> > >>>>>>>>> fetch the last offset from the server for a partition.
> Something
> > >> like
> > >>>>>>>>> long lastOffset(TopicPartition tp)
> > >>>>>>>>> and for symmetry
> > >>>>>>>>> long firstOffset(TopicPartition tp)
> > >>>>>>>>>
> > >>>>>>>>> Likely this would have to be batched.
> > >>>>>>>>>
> > >>>>>>>>> A fixed range of data load can be done using the existing APIs
> as
> > >>>>>>>>> follows. This assumes you know the endOffset which can be
> > >>>>>> currentOffset
> > >>>>>>> + n
> > >>>>>>>>> (number of messages in the load)
> > >>>>>>>>>
> > >>>>>>>>> long startOffset = consumer.position(partition);
> > >>>>>>>>> long endOffset = startOffset + n;
> > >>>>>>>>> while(consumer.position(partition) <= endOffset) {
> > >>>>>>>>>   List<ConsumerRecord> messages = consumer.poll(timeout,
> > >>>>>>>>> TimeUnit.MILLISECONDS);
> > >>>>>>>>>   process(messages, endOffset);          // processes messages
> > >>>>>> until
> > >>>>>>>>> endOffset
> > >>>>>>>>> }
> > >>>>>>>>>
> > >>>>>>>>> Does that make sense?
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede <
> > >>>>>> neha.narkhede@gmail.com
> > >>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Thanks for the review, Jun. Here are some comments -
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> 1. The using of ellipsis: This may make passing a list of
> items
> > >> from
> > >>>>>> a
> > >>>>>>>>>> collection to the api a bit harder. Suppose that you have a
> list
> > >> of
> > >>>>>>>>>> topics
> > >>>>>>>>>> stored in
> > >>>>>>>>>>
> > >>>>>>>>>> ArrayList<String> topics;
> > >>>>>>>>>>
> > >>>>>>>>>> If you want subscribe to all topics in one call, you will have
> > to
> > >>>> do:
> > >>>>>>>>>>
> > >>>>>>>>>> String[] topicArray = new String[topics.size()];
> > >>>>>>>>>> consumer.subscribe(topics.
> > >>>>>>>>>> toArray(topicArray));
> > >>>>>>>>>>
> > >>>>>>>>>> A similar argument can be made for arguably the more common
> use
> > >> case
> > >>>>>> of
> > >>>>>>>>>> subscribing to a single topic as well. In these cases, user is
> > >>>>>> required
> > >>>>>>>>>> to write more
> > >>>>>>>>>> code to create a single item collection and pass it in. Since
> > >>>>>>>>>> subscription is extremely lightweight
> > >>>>>>>>>> invoking it multiple times also seems like a workable
> solution,
> > >> no?
> > >>>>>>>>>>
> > >>>>>>>>>> 2. It would be good to document that the following apis are
> > >> mutually
> > >>>>>>>>>> exclusive. Also, if the partition level subscription is
> > specified,
> > >>>>>>> there
> > >>>>>>>>>> is
> > >>>>>>>>>> no group management. Finally, unsubscribe() can only be used
> to
> > >>>>>> cancel
> > >>>>>>>>>> subscriptions with the same pattern. For example, you can't
> > >>>>>> unsubscribe
> > >>>>>>>>>> at
> > >>>>>>>>>> the partition level if the subscription is done at the topic
> > >> level.
> > >>>>>>>>>>
> > >>>>>>>>>> *subscribe*(java.lang.String... topics)
> > >>>>>>>>>> *subscribe*(java.lang.String topic, int... partitions)
> > >>>>>>>>>>
> > >>>>>>>>>> Makes sense. Made the suggested improvements to the docs<
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29
> > >>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> 3.commit(): The following comment in the doc should probably
> say
> > >>>>>>> "commit
> > >>>>>>>>>> offsets for partitions assigned to this consumer".
> > >>>>>>>>>>
> > >>>>>>>>>> If no partitions are specified, commits offsets for the
> > subscribed
> > >>>>>>> list
> > >>>>>>>>>> of
> > >>>>>>>>>> topics and partitions to Kafka.
> > >>>>>>>>>>
> > >>>>>>>>>> Could you give more context on this suggestion? Here is the
> > entire
> > >>>>>> doc
> > >>>>>>> -
> > >>>>>>>>>>
> > >>>>>>>>>> Synchronously commits the specified offsets for the specified
> > list
> > >>>> of
> > >>>>>>>>>> topics and partitions to *Kafka*. If no partitions are
> > specified,
> > >>>>>>>>>> commits offsets for the subscribed list of topics and
> > partitions.
> > >>>>>>>>>>
> > >>>>>>>>>> The hope is to convey that if no partitions are specified,
> > offsets
> > >>>>>> will
> > >>>>>>>>>> be committed for the subscribed list of partitions. One
> > >> improvement
> > >>>>>>> could
> > >>>>>>>>>> be to
> > >>>>>>>>>> explicitly state that the offsets returned on the last poll
> will
> > >> be
> > >>>>>>>>>> committed. I updated this to -
> > >>>>>>>>>>
> > >>>>>>>>>> Synchronously commits the specified offsets for the specified
> > list
> > >>>> of
> > >>>>>>>>>> topics and partitions to *Kafka*. If no offsets are specified,
> > >>>>>> commits
> > >>>>>>>>>> offsets returned on the last {@link #poll(long, TimeUnit)
> > poll()}
> > >>>> for
> > >>>>>>>>>> the subscribed list of topics and partitions.
> > >>>>>>>>>>
> > >>>>>>>>>> 4. There is inconsistency in specifying partitions. Sometimes
> we
> > >> use
> > >>>>>>>>>> TopicPartition and some other times we use String and int (see
> > >>>>>>>>>> examples below).
> > >>>>>>>>>>
> > >>>>>>>>>> void onPartitionsAssigned(Consumer consumer,
> > >>>>>>>>>> TopicPartition...partitions)
> > >>>>>>>>>>
> > >>>>>>>>>> public void *subscribe*(java.lang.String topic, int...
> > partitions)
> > >>>>>>>>>>
> > >>>>>>>>>> Yes, this was discussed previously. I think generally the
> > >> consensus
> > >>>>>>>>>> seems to be to use the higher level
> > >>>>>>>>>> classes everywhere. Made those changes.
> > >>>>>>>>>>
> > >>>>>>>>>> What's the use case of position()? Isn't that just the
> > >> nextOffset()
> > >>>>>> on
> > >>>>>>>>>> the
> > >>>>>>>>>> last message returned from poll()?
> > >>>>>>>>>>
> > >>>>>>>>>> Yes, except in the case where a rebalance is triggered and
> > poll()
> > >> is
> > >>>>>>> not
> > >>>>>>>>>> yet invoked. Here, you would use position() to get the new
> fetch
> > >>>>>>> position
> > >>>>>>>>>> for the specific partition. Even if this is not a common use
> > case,
> > >>>>>> IMO
> > >>>>>>> it
> > >>>>>>>>>> is much easier to use position() to get the fetch offset than
> > >>>>>> invoking
> > >>>>>>>>>> nextOffset() on the last message. This also keeps the APIs
> > >>>> symmetric,
> > >>>>>>> which
> > >>>>>>>>>> is nice.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert <
> > >>>>>>>>>> Robert.Withers@dish.com> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> That's wonderful.  Thanks for kafka.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Rob
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Feb 24, 2014, at 9:58 AM, Guozhang Wang <
> wangguoz@gmail.com
> > >>>>>>> <mailto:
> > >>>>>>>>>>> wangguoz@gmail.com>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> Hi Robert,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Yes, you can check out the callback functions in the new API
> > >>>>>>>>>>>
> > >>>>>>>>>>> onPartitionDesigned
> > >>>>>>>>>>> onPartitionAssigned
> > >>>>>>>>>>>
> > >>>>>>>>>>> and see if they meet your needs.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Guozhang
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert <
> > >>>>>>>>>>> Robert.Withers@dish.com<mailto:Robert.Withers@dish.com
> >>wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> Jun,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Are you saying it is possible to get events from the
> high-level
> > >>>>>>> consumer
> > >>>>>>>>>>> regarding various state machine changes?  For instance, can
> we
> > >> get
> > >>>> a
> > >>>>>>>>>>> notification when a rebalance starts and ends, when a
> partition
> > >> is
> > >>>>>>>>>>> assigned/unassigned, when an offset is committed on a
> > partition,
> > >>>>>> when
> > >>>>>>> a
> > >>>>>>>>>>> leader changes and so on?  I call this OOB traffic, since
> they
> > >> are
> > >>>>>> not
> > >>>>>>>>>>> the
> > >>>>>>>>>>> core messages streaming, but side-band events, yet they are
> > still
> > >>>>>>>>>>> potentially useful to consumers.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thank you,
> > >>>>>>>>>>> Robert
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Robert Withers
> > >>>>>>>>>>> Staff Analyst/Developer
> > >>>>>>>>>>> o: (720) 514-8963
> > >>>>>>>>>>> c:  (571) 262-1873
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> -----Original Message-----
> > >>>>>>>>>>> From: Jun Rao [mailto:junrao@gmail.com]
> > >>>>>>>>>>> Sent: Sunday, February 23, 2014 4:19 PM
> > >>>>>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> > >>>>>>>>>>> Subject: Re: New Consumer API discussion
> > >>>>>>>>>>>
> > >>>>>>>>>>> Robert,
> > >>>>>>>>>>>
> > >>>>>>>>>>> For the push orient api, you can potentially implement your
> own
> > >>>>>>>>>>> MessageHandler with those methods. In the main loop of our
> new
> > >>>>>>> consumer
> > >>>>>>>>>>> api, you can just call those methods based on the events you
> > get.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Also, we already have an api to get the first and the last
> > offset
> > >>>>>> of a
> > >>>>>>>>>>> partition (getOffsetBefore).
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Jun
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
> > >>>>>>>>>>> <Robert.Withers@dish.com<mailto:Robert.Withers@dish.com
> > >>wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> This is a good idea, too.  I would modify it to include
> stream
> > >>>>>>>>>>> marking, then you can have:
> > >>>>>>>>>>>
> > >>>>>>>>>>> long end = consumer.lastOffset(tp);
> > >>>>>>>>>>> consumer.setMark(end);
> > >>>>>>>>>>> while(consumer.beforeMark()) {
> > >>>>>>>>>>> process(consumer.pollToMark());
> > >>>>>>>>>>> }
> > >>>>>>>>>>>
> > >>>>>>>>>>> or
> > >>>>>>>>>>>
> > >>>>>>>>>>> long end = consumer.lastOffset(tp);
> > >>>>>>>>>>> consumer.setMark(end);
> > >>>>>>>>>>> for(Object msg : consumer.iteratorToMark()) {
> > >>>>>>>>>>> process(msg);
> > >>>>>>>>>>> }
> > >>>>>>>>>>>
> > >>>>>>>>>>> I actually have 4 suggestions, then:
> > >>>>>>>>>>>
> > >>>>>>>>>>> *   pull: stream marking
> > >>>>>>>>>>> *   pull: finite streams, bound by time range (up-to-now,
> > >>>> yesterday)
> > >>>>>>> or
> > >>>>>>>>>>> offset
> > >>>>>>>>>>> *   pull: async api
> > >>>>>>>>>>> *   push: KafkaMessageSource, for a push model, with msg and
> > OOB
> > >>>>>>> events.
> > >>>>>>>>>>> Build one in either individual or chunk mode and have a
> > listener
> > >>>> for
> > >>>>>>>>>>> each msg or a listener for a chunk of msgs.  Make it
> composable
> > >> and
> > >>>>>>>>>>> policy driven (chunked, range, commitOffsets policy, retry
> > >> policy,
> > >>>>>>>>>>> transactional)
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thank you,
> > >>>>>>>>>>> Robert
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kreps@gmail.com
> > >>>>>> <mailto:
> > >>>>>>>>>>> jay.kreps@gmail.com><mailto:
> > >>>>>>>>>>> jay.kreps@gmail.com<mailto:jay.kreps@gmail.com>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> I think what Robert is saying is that we need to think
> through
> > >> the
> > >>>>>>>>>>> offset API to enable "batch processing" of topic data. Think
> > of a
> > >>>>>>>>>>> process that periodically kicks off to compute a data summary
> > or
> > >> do
> > >>>>>> a
> > >>>>>>>>>>> data load or something like that. I think what we need to
> > support
> > >>>>>> this
> > >>>>>>>>>>> is an api to fetch the last offset from the server for a
> > >> partition.
> > >>>>>>>>>>> Something like
> > >>>>>>>>>>> long lastOffset(TopicPartition tp)
> > >>>>>>>>>>> and for symmetry
> > >>>>>>>>>>> long firstOffset(TopicPartition tp)
> > >>>>>>>>>>>
> > >>>>>>>>>>> Likely this would have to be batched. Essentially we should
> add
> > >>>> this
> > >>>>>>>>>>> use case to our set of code examples to write and think
> > through.
> > >>>>>>>>>>>
> > >>>>>>>>>>> The usage would be something like
> > >>>>>>>>>>>
> > >>>>>>>>>>> long end = consumer.lastOffset(tp);
> > >>>>>>>>>>> while(consumer.position < end)
> > >>>>>>>>>>> process(consumer.poll());
> > >>>>>>>>>>>
> > >>>>>>>>>>> -Jay
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
> > >>>>>>>>>>> <Robert.Withers@dish.com<mailto:Robert.Withers@dish.com>
> > >>>>>>>>>>> <mailto:Robert.Withers@dish.com>>wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> Jun,
> > >>>>>>>>>>>
> > >>>>>>>>>>> I was originally thinking a non-blocking read from a
> > distributed
> > >>>>>>>>>>> stream should distinguish between "no local messages, but a
> > fetch
> > >>>> is
> > >>>>>>>>>>> occurring"
> > >>>>>>>>>>> versus "you have drained the stream".  The reason this may be
> > >>>>>> valuable
> > >>>>>>>>>>> to me is so I can write consumers that read all known traffic
> > >> then
> > >>>>>>>>>>> terminate.
> > >>>>>>>>>>> You caused me to reconsider and I think I am conflating 2
> > things.
> > >>>>>> One
> > >>>>>>>>>>> is a sync/async api while the other is whether to have an
> > >> infinite
> > >>>>>> or
> > >>>>>>>>>>> finite stream.  Is it possible to build a finite KafkaStream
> > on a
> > >>>>>>>>>>> range of messages?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Perhaps a Simple Consumer would do just fine and then I could
> > >> start
> > >>>>>>>>>>> off getting the writeOffset from zookeeper and tell it to
> read
> > a
> > >>>>>>>>>>> specified range per partition.  I've done this and forked a
> > >> simple
> > >>>>>>>>>>> consumer runnable for each partition, for one of our
> analyzers.
> > >>>> The
> > >>>>>>>>>>> great thing about the high-level consumer is that rebalance,
> > so I
> > >>>>>> can
> > >>>>>>>>>>> fork however many stream readers I want and you just figure
> it
> > >> out
> > >>>>>> for
> > >>>>>>>>>>> me.  In that way you offer us the control over the resource
> > >>>>>>>>>>> consumption within a pull model.  This is best to regulate
> > >> message
> > >>>>>>>>>>> pressure, they say.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Combining that high-level rebalance ability with a ranged
> > >> partition
> > >>>>>>>>>>> drain could be really nice...build the stream with an ending
> > >>>>>> position
> > >>>>>>>>>>> and it is a finite stream, but retain the high-level
> rebalance.
> > >>>>>> With
> > >>>>>>>>>>> a finite stream, you would know the difference of the 2 async
> > >>>>>>>>>>> scenarios: fetch-in-progress versus end-of-stream.  With an
> > >>>> infinite
> > >>>>>>>>>>> stream, you never get end-of-stream.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Aside from a high-level consumer over a finite range within
> > each
> > >>>>>>>>>>> partition, the other feature I can think of is more
> > complicated.
> > >> A
> > >>>>>>>>>>> high-level consumer has state machine changes that the client
> > >>>> cannot
> > >>>>>>>>>>> access, to my knowledge.  Our use of kafka has us invoke a
> > >> message
> > >>>>>>>>>>> handler with each message we consumer from the KafkaStream,
> so
> > we
> > >>>>>>>>>>> convert a pull-model to a push-model.  Including the idea of
> > >>>>>> receiving
> > >>>>>>>>>>> notifications from state machine changes, what would be
> really
> > >> nice
> > >>>>>> is
> > >>>>>>>>>>> to have a KafkaMessageSource, that is an eventful push model.
> >  If
> > >>>> it
> > >>>>>>>>>>> were thread-safe, then we could register listeners for
> various
> > >>>>>> events:
> > >>>>>>>>>>>
> > >>>>>>>>>>> *   opening-stream
> > >>>>>>>>>>> *   closing-stream
> > >>>>>>>>>>> *   message-arrived
> > >>>>>>>>>>> *   end-of-stream/no-more-messages-in-partition (for finite
> > >>>> streams)
> > >>>>>>>>>>> *   rebalance started
> > >>>>>>>>>>> *   partition assigned
> > >>>>>>>>>>> *   partition unassigned
> > >>>>>>>>>>> *   rebalance finished
> > >>>>>>>>>>> *   partition-offset-committed
> > >>>>>>>>>>>
> > >>>>>>>>>>> Perhaps that is just our use, but instead of a pull-oriented
> > >>>>>>>>>>> KafkaStream, is there any sense in your providing a
> > push-oriented
> > >>>>>>>>>>> KafkaMessageSource publishing OOB messages?
> > >>>>>>>>>>>
> > >>>>>>>>>>> thank you,
> > >>>>>>>>>>> Robert
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Feb 21, 2014, at 5:59 PM, Jun Rao <junrao@gmail.com
> <mailto:
> > >>>>>>>>>>> junrao@gmail.com><mailto:
> > >>>>>>>>>>> junrao@gmail.com<mailto:junrao@gmail.com>><mailto:
> > >>>>>>>>>>> junrao@gmail.com<mailto:junrao@gmail.com><mailto:
> > >> junrao@gmail.com
> > >>>>>>>>>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> Robert,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Could you explain why you want to distinguish btw
> > >>>>>>>>>>> FetchingInProgressException and NoMessagePendingException?
> The
> > >>>>>>>>>>> nextMsgs() method that you want is exactly what poll() does.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Jun
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert
> > >>>>>>>>>>> <Robert.Withers@dish.com<mailto:Robert.Withers@dish.com>
> > >> <mailto:
> > >>>>>>>>>>> Robert.Withers@dish.com>
> > >>>>>>>>>>> <mailto:Robert.Withers@dish.com>>wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> I am not clear on why the consumer stream should be
> > positionable,
> > >>>>>>>>>>> especially if it is limited to the in-memory fetched
> messages.
> > >>>>>> Could
> > >>>>>>>>>>> someone explain to me, please?  I really like the idea of
> > >>>> committing
> > >>>>>>>>>>> the offset specifically on those partitions with changed read
> > >>>>>> offsets,
> > >>>>>>>>>>> only.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> 2 items I would like to see added to the KafkaStream are:
> > >>>>>>>>>>>
> > >>>>>>>>>>> *         a non-blocking next(), throws several exceptions
> > >>>>>>>>>>> (FetchingInProgressException and a NoMessagePendingException
> or
> > >>>>>>>>>>> something) to differentiate between fetching or no messages
> > left.
> > >>>>>>>>>>>
> > >>>>>>>>>>> *         A nextMsgs() method which returns all locally
> > available
> > >>>>>>>>>>> messages
> > >>>>>>>>>>> and kicks off a fetch for the next chunk.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> If you are trying to add transactional features, then
> formally
> > >>>>>> define
> > >>>>>>>>>>> a DTP capability and pull in other server frameworks to share
> > the
> > >>>>>>>>>>> implementation.  Should it be XA/Open?  How about a new
> > peer2peer
> > >>>>>> DTP
> > >>>>>>>>>>> protocol?
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thank you,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Robert
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Robert Withers
> > >>>>>>>>>>>
> > >>>>>>>>>>> Staff Analyst/Developer
> > >>>>>>>>>>>
> > >>>>>>>>>>> o: (720) 514-8963
> > >>>>>>>>>>>
> > >>>>>>>>>>> c:  (571) 262-1873
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> -----Original Message-----
> > >>>>>>>>>>> From: Jay Kreps [mailto:jay.kreps@gmail.com]
> > >>>>>>>>>>> Sent: Sunday, February 16, 2014 10:13 AM
> > >>>>>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org
> > >>> <mailto:
> > >>>>>>>>>>> users@kafka.apache.org><mailto:
> > >>>>>>>>>>> users@kafka.apache.org<mailto:users@kafka.apache.org>>
> > >>>>>>>>>>> Subject: Re: New Consumer API discussion
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> +1 I think those are good. It is a little weird that changing
> > the
> > >>>>>>>>>>> +fetch
> > >>>>>>>>>>>
> > >>>>>>>>>>> point is not batched but changing the commit point is, but I
> > >>>> suppose
> > >>>>>>>>>>> there is no helping that.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> -Jay
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede
> > >>>>>>>>>>> <neha.narkhede@gmail.com<mailto:neha.narkhede@gmail.com>
> > >> <mailto:
> > >>>>>>>>>>> neha.narkhede@gmail.com>
> > >>>>>>>>>>> <mailto:neha.narkhede@gmail.com>
> > >>>>>>>>>>> <mailto:neha.narkhede@gmail.com>>wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Jay,
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> That makes sense. position/seek deal with changing the
> > consumers
> > >>>>>>>>>>>
> > >>>>>>>>>>> in-memory data, so there is no remote rpc there. For some
> > >> reason, I
> > >>>>>>>>>>>
> > >>>>>>>>>>> got committed and seek mixed up in my head at that time :)
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> So we still end up with
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> long position(TopicPartition tp)
> > >>>>>>>>>>>
> > >>>>>>>>>>> void seek(TopicPartitionOffset p)
> > >>>>>>>>>>>
> > >>>>>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp);
> > >>>>>>>>>>>
> > >>>>>>>>>>> void commit(TopicPartitionOffset...);
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Neha
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Friday, February 14, 2014, Jay Kreps <jay.kreps@gmail.com
> > >>>>>> <mailto:
> > >>>>>>>>>>> jay.kreps@gmail.com><mailto:
> > >>>>>>>>>>> jay.kreps@gmail.com<mailto:jay.kreps@gmail.com>><mailto:
> > >>>>>>>>>>> jay.kreps@gmail.com<mailto:jay.kreps@gmail.com><mailto:
> > >>>>>>>>>>> jay.kreps@gmail.com>><mailto:
> > >>>>>>>>>>> jay.kreps@gmail.com<mailto:jay.kreps@gmail.com><mailto:
> > >>>>>>>>>>> jay.kreps@gmail.com><mailto:jay.kreps@gmail
> > >>>>>>>>>>> .com>>>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Oh, interesting. So I am assuming the following
> implementation:
> > >>>>>>>>>>>
> > >>>>>>>>>>> 1. We have an in-memory fetch position which controls the
> next
> > >>>> fetch
> > >>>>>>>>>>>
> > >>>>>>>>>>> offset.
> > >>>>>>>>>>>
> > >>>>>>>>>>> 2. Changing this has no effect until you poll again at which
> > >> point
> > >>>>>>>>>>>
> > >>>>>>>>>>> your fetch request will be from the newly specified offset 3.
> > We
> > >>>>>>>>>>>
> > >>>>>>>>>>> then have an in-memory but also remotely stored committed
> > offset.
> > >>>>>>>>>>>
> > >>>>>>>>>>> 4. Calling commit has the effect of saving the fetch position
> > as
> > >>>>>>>>>>>
> > >>>>>>>>>>> both the in memory committed position and in the remote store
> > 5.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Auto-commit is the same as periodically calling commit on all
> > >>>>>>>>>>>
> > >>>>>>>>>>> positions.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> So batching on commit as well as getting the committed
> position
> > >>>>>>>>>>>
> > >>>>>>>>>>> makes sense, but batching the fetch position wouldn't,
> right? I
> > >>>>>>>>>>>
> > >>>>>>>>>>> think you are actually thinking of a different approach.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> -Jay
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede
> > >>>>>>>>>>>
> > >>>>>>>>>>> <neha.narkhede@gmail.com<mailto:neha.narkhede@gmail.com
> > ><mailto:
> > >>>>>>>>>>> neha.narkhede@gmail.com><mailto:
> > >>>>>>>>>>> neha.narkhede@gmail.com<mailto:neha.narkhede@gmail.com>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> <javascript:;>
> > >>>>>>>>>>>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> I think you are saying both, i.e. if you have committed on a
> > >>>>>>>>>>>
> > >>>>>>>>>>> partition it returns you that value but if you
> > >>>>>>>>>>>
> > >>>>>>>>>>> haven't
> > >>>>>>>>>>>
> > >>>>>>>>>>> it does a remote lookup?
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Correct.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> The other argument for making committed batched is that
> > commit()
> > >>>>>>>>>>>
> > >>>>>>>>>>> is batched, so there is symmetry.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> position() and seek() are always in memory changes (I assume)
> > so
> > >>>>>>>>>>>
> > >>>>>>>>>>> there
> > >>>>>>>>>>>
> > >>>>>>>>>>> is
> > >>>>>>>>>>>
> > >>>>>>>>>>> no need to batch them.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> I'm not as sure as you are about that assumption being true.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Basically
> > >>>>>>>>>>>
> > >>>>>>>>>>> in
> > >>>>>>>>>>>
> > >>>>>>>>>>> my example above, the batching argument for committed() also
> > >>>>>>>>>>>
> > >>>>>>>>>>> applies to
> > >>>>>>>>>>>
> > >>>>>>>>>>> position() since one purpose of fetching a partition's offset
> > is
> > >>>>>>>>>>>
> > >>>>>>>>>>> to use
> > >>>>>>>>>>>
> > >>>>>>>>>>> it
> > >>>>>>>>>>>
> > >>>>>>>>>>> to set the position of the consumer to that offset. Since
> that
> > >>>>>>>>>>>
> > >>>>>>>>>>> might
> > >>>>>>>>>>>
> > >>>>>>>>>>> lead
> > >>>>>>>>>>>
> > >>>>>>>>>>> to a remote OffsetRequest call, I think we probably would be
> > >>>>>>>>>>>
> > >>>>>>>>>>> better off batching it.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Another option for naming would be position/reposition
> instead
> > of
> > >>>>>>>>>>>
> > >>>>>>>>>>> position/seek.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> I think position/seek is better since it aligns with Java
> file
> > >>>> APIs.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> I also think your suggestion about ConsumerPosition makes
> > sense.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Neha
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kreps@gmail.com
> > >> <mailto:
> > >>>>>>>>>>> jay.kreps@gmail.com><mailto:
> > >>>>>>>>>>> jay.kreps@gmail.com<mailto:jay.kreps@gmail.com>><mailto:
> > >>>>>>>>>>> jay.kreps@gmail.com<mailto:jay.kreps@gmail.com><mailto:
> > >>>>>>>>>>> jay.kreps@gmail.com>><mailto:
> > >>>>>>>>>>> jay.kreps@gmail.com<mailto:jay.kreps@gmail.com><mailto:
> > >>>>>>>>>>> jay.kreps@gmail.com><mailto:jay.kreps@gmail
> > >>>>>>>>>>> .com>>>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Hey Neha,
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> I actually wasn't proposing the name TopicOffsetPosition,
> that
> > >>>>>>>>>>>
> > >>>>>>>>>>> was
> > >>>>>>>>>>>
> > >>>>>>>>>>> just a
> > >>>>>>>>>>>
> > >>>>>>>>>>> typo. I meant TopicPartitionOffset, and I was just
> referencing
> > >>>>>>>>>>>
> > >>>>>>>>>>> what
> > >>>>>>>>>>>
> > >>>>>>>>>>> was
> > >>>>>>>>>>>
> > >>>>>>>>>>> in
> > >>>>>>>>>>>
> > >>>>>>>>>>> the javadoc. So to restate my proposal without the typo,
> using
> > >>>>>>>>>>>
> > >>>>>>>>>>> just
> > >>>>>>>>>>>
> > >>>>>>>>>>> the
> > >>>>>>>>>>>
> > >>>>>>>>>>> existing classes (that naming is a separate question):
> > >>>>>>>>>>>
> > >>>>>>>>>>> long position(TopicPartition tp)
> > >>>>>>>>>>>
> > >>>>>>>>>>> void seek(TopicPartitionOffset p)
> > >>>>>>>>>>>
> > >>>>>>>>>>> long committed(TopicPartition tp)
> > >>>>>>>>>>>
> > >>>>>>>>>>> void commit(TopicPartitionOffset...);
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> So I may be unclear on committed() (AKA lastCommittedOffset).
> > Is
> > >>>>>>>>>>>
> > >>>>>>>>>>> it returning the in-memory value from the last commit by this
> > >>>>>>>>>>>
> > >>>>>>>>>>> consumer,
> > >>>>>>>>>>>
> > >>>>>>>>>>> or
> > >>>>>>>>>>>
> > >>>>>>>>>>> is
> > >>>>>>>>>>>
> > >>>>>>>>>>> it doing a remote fetch, or both? I think you are saying
> both,
> > >> i.e.
> > >>>>>>>>>>>
> > >>>>>>>>>>> if
> > >>>>>>>>>>>
> > >>>>>>>>>>> you
> > >>>>>>>>>>>
> > >>>>>>>>>>> have committed on a partition it returns you that value but
> if
> > >>>>>>>>>>>
> > >>>>>>>>>>> you
> > >>>>>>>>>>>
> > >>>>>>>>>>> haven't
> > >>>>>>>>>>>
> > >>>>>>>>>>> it does a remote lookup?
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> The other argument for making committed batched is that
> > commit()
> > >>>>>>>>>>>
> > >>>>>>>>>>> is batched, so there is symmetry.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> position() and seek() are always in memory changes (I assume)
> > so
> > >>>>>>>>>>>
> > >>>>>>>>>>> there
> > >>>>>>>>>>>
> > >>>>>>>>>>> is
> > >>>>>>>>>>>
> > >>>>>>>>>>> no need to batch them.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> So taking all that into account what if we revise it to
> > >>>>>>>>>>>
> > >>>>>>>>>>> long position(TopicPartition tp)
> > >>>>>>>>>>>
> > >>>>>>>>>>> void seek(TopicPartitionOffset p)
> > >>>>>>>>>>>
> > >>>>>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp);
> > >>>>>>>>>>>
> > >>>>>>>>>>> void commit(TopicPartitionOffset...);
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> This is not symmetric between position/seek and
> > commit/committed
> > >>>>>>>>>>>
> > >>>>>>>>>>> but
> > >>>>>>>>>>>
> > >>>>>>>>>>> it
> > >>>>>>>>>>>
> > >>>>>>>>>>> is
> > >>>>>>>>>>>
> > >>>>>>>>>>> convenient. Another option for naming would be
> > >>>>>>>>>>>
> > >>>>>>>>>>> position/reposition
> > >>>>>>>>>>>
> > >>>>>>>>>>> instead
> > >>>>>>>>>>>
> > >>>>>>>>>>> of position/seek.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> With respect to the name TopicPartitionOffset, what I was
> > trying
> > >>>>>>>>>>>
> > >>>>>>>>>>> to
> > >>>>>>>>>>>
> > >>>>>>>>>>> say
> > >>>>>>>>>>>
> > >>>>>>>>>>> is
> > >>>>>>>>>>>
> > >>>>>>>>>>> that I recommend we change that to something shorter. I think
> > >>>>>>>>>>>
> > >>>>>>>>>>> TopicPosition
> > >>>>>>>>>>>
> > >>>>>>>>>>> or ConsumerPosition might be better. Position does not refer
> to
> > >>>>>>>>>>>
> > >>>>>>>>>>> the variables in the object, it refers to the meaning of the
> > >>>>>>>>>>>
> > >>>>>>>>>>> object--it represents a position within a topic. The offset
> > >>>>>>>>>>>
> > >>>>>>>>>>> field in that object
> > >>>>>>>>>>>
> > >>>>>>>>>>> is
> > >>>>>>>>>>>
> > >>>>>>>>>>> still called the offset. TopicOffset, PartitionOffset, or
> > >>>>>>>>>>>
> > >>>>>>>>>>> ConsumerOffset
> > >>>>>>>>>>>
> > >>>>>>>>>>> would all be workable too. Basically I am just objecting to
> > >>>>>>>>>>>
> > >>>>>>>>>>> concatenating
> > >>>>>>>>>>>
> > >>>>>>>>>>> three nouns together. :-)
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> -Jay
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede <
> > >>>>>>>>>>>
> > >>>>>>>>>>> neha.narkhede@gmail.com<mailto:neha.narkhede@gmail.com
> > ><mailto:
> > >>>>>>>>>>> neha.narkhede@gmail.com><mailto:
> > >>>>>>>>>>> neha.narkhede@gmail.com<mailto:neha.narkhede@gmail.com
> > >><mailto:
> > >>>>>>>>>>> neha.narkhede@gmail.com<mailto:neha.narkhede@gmail.com
> > ><mailto:
> > >>>>>>>>>>> neha.narkhede@gmail.com>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> 2. It returns a list of results. But how can you use the
> list?
> > >>>>>>>>>>>
> > >>>>>>>>>>> The
> > >>>>>>>>>>>
> > >>>>>>>>>>> only
> > >>>>>>>>>>>
> > >>>>>>>>>>> way
> > >>>>>>>>>>>
> > >>>>>>>>>>> to use the list is to make a map of tp=>offset and then look
> > >>>>>>>>>>>
> > >>>>>>>>>>> up
> > >>>>>>>>>>>
> > >>>>>>>>>>> results
> > >>>>>>>>>>>
> > >>>>>>>>>>> in
> > >>>>>>>>>>>
> > >>>>>>>>>>> this map (or do a for loop over the list for the partition
> you
> > >>>>>>>>>>>
> > >>>>>>>>>>> want). I
> > >>>>>>>>>>>
> > >>>>>>>>>>> recommend that if this is an in-memory check we just do one
> at
> > >>>>>>>>>>>
> > >>>>>>>>>>> a
> > >>>>>>>>>>>
> > >>>>>>>>>>> time.
> > >>>>>>>>>>>
> > >>>>>>>>>>> E.g.
> > >>>>>>>>>>>
> > >>>>>>>>>>> long committedPosition(
> > >>>>>>>>>>>
> > >>>>>>>>>>> TopicPosition).
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> This was discussed in the previous emails. There is a choic
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> --
> > >>>>>>>>>>> Robert Withers
> > >>>>>>>>>>> robert.withers@dish.com<mailto:robert.withers@dish.com
> > ><mailto:
> > >>>>>>>>>>> robert.withers@dish.com><mailto:
> > >>>>>>>>>>> robert.withers@dish.com<mailto:robert.withers@dish.com>>
> > >>>>>>>>>>> c: 303.919.5856
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> --
> > >>>>>>>>>>> Robert Withers
> > >>>>>>>>>>> robert.withers@dish.com<mailto:robert.withers@dish.com
> > ><mailto:
> > >>>>>>>>>>> robert.withers@dish.com>
> > >>>>>>>>>>> c: 303.919.5856
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> --
> > >>>>>>>>>>> -- Guozhang
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> --
> > >>>>>>>>>>> Robert Withers
> > >>>>>>>>>>> robert.withers@dish.com<mailto:robert.withers@dish.com>
> > >>>>>>>>>>> c: 303.919.5856
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>>>
> > >>
> > >>
> > >>
> >
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message