kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alexei Levashov <alexei.levas...@arrayent.com>
Subject Re: Questions about single consumer per partition approach
Date Thu, 22 Dec 2016 04:02:32 GMT
Hello,

I appreciate your advice.

My major concern is - if consumer thread didn’t send heartbeat for time
period set in *session.timeout.ms <http://session.timeout.ms>* property it
should look dead for the broker. Re-balancing process is not triggered for
manually assigned consumers as API doc says.

Broker may or may NOT allow consumer/client side to be responsible for
handling these failures.

If broker does revoke partition ownership on failed heartbeats I can figure
out that consumer thread is unresponsive, remove it from the execution
queue, and assign a brand new consumer thread.

If the broker doesn’t revoke partition ownership and maps a dead consumer
thread to some partition – will it accept assignment of a new consumer to
this partition?

Thank you.

On Wed, Dec 21, 2016 at 4:00 PM, R Krishna <krishna81m@gmail.com> wrote:

> Newbie here,
> Q2) Think, there is no rebalance if you go with manual partition assignment
> (assign(List)
> <https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/
> KafkaConsumer.html#assign%28java.util.List%29>),
> but were you looking for subscribe(List, ConsumerRebalanceListener)
> <https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/
> KafkaConsumer.html#subscribe%28java.util.List,%20org.
> apache.kafka.clients.consumer.ConsumerRebalanceListener%29>
> to notify on rebalance to cleanup? Don't think you can do both, see here:
> https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/
> KafkaConsumer.html#assign(java.util.List)
> where it mentions that it is not possible to use both manual partition
> assignment and group assignment, but, I do wonder what happens if consumers
> do different approaches?
>
> Other consumer related exceptions can still happen. And unlike Producer
> callback, don't think there is a callback other than try/catch for example
> in poll, commit, and other calls. I could not find the best way to
> recover/continue from these (non rebalance) exceptions other than retry.
>
> https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/
> KafkaConsumer.html#poll(long)
> https://kafka.apache.org/090/javadoc/org/apache/kafka/
> common/errors/package-frame.html
>
>
> On Wed, Dec 21, 2016 at 2:32 PM, Alexei Levashov <
> alexei.levashov@arrayent.com> wrote:
>
> > Thank you Ben, I appreciate your answer.
> >
> > I AM using the key to send messages from the selected producer to a
> > selected partition – Q1 covered.
> >
> >
> > If Q2 seems obscure I could add that I am talking about Java client
> library
> > and basically asked for clarification to the API doc statement that in
> case
> > of manual partition assignment consumer failure doesn’t trigger the
> > re-balancing.
> >
> >  Does the broker register this failure in any shape or form?
> >
> > Is there a way to notify the application that started consumer thread
> (in
> > group topic subscription I can subscribe the consumer with a callback
> that
> > IS called on revocation of ownership) about failure of this thread from
> > broker point of view?
> >
> > Thank you
> >
> > On Wed, Dec 21, 2016 at 6:06 AM, Ben Stopford <ben@confluent.io> wrote:
> >
> > > Hi Alexi
> > >
> > > Typically you would use a key to guarantee that messages with the same
> > key
> > > have a global ordering, rather than using manual assignment. Kafka will
> > > send all messages with the same key to the same partition. If you need
> > > global ordering, spanning all messages from a single producer, you can
> > use
> > > a single partition topic. This will limit you to one active consumer
> per
> > > consumer group as the consumer group protocol guarantees that a
> partition
> > > can only be assigned to one consumer, within a group, at one time.
> > >
> > > B
> > >
> > > On Wed, Dec 21, 2016 at 4:36 AM Alexei Levashov <
> > > alexei.levashov@arrayent.com> wrote:
> > >
> > > > Hello,
> > > > I have a few newbie questions about usage of Kafka as a messaging
> > system.
> > > > Kafka version - 0.10.1.0.
> > > >
> > > > 1 - Let's assume that I want to ensure time sequence of events i.e.
> if
> > > > message A from producer was published at time t1 to partition P and
> > > message
> > > > B from the same producer published to partition P at time t2,
> > > > I want to consume message A before message B, provided t1<t2.
> > > >
> > > > Question1,
> > > > Do I have any choice except one consumer per partition?
> > > >
> > > > 2. - If I have one consumer per partition and use
> > > >  consumer.assign(partitionList) call to assign consumer to a
> partition
> > > do I
> > > > still need group membership for this single consumer?
> > > >      I didn't find clear description what is the protocol of
> > interaction
> > > > between GroupCoordinator and PartitionLeader
> > > > <
> > > > https://cwiki.apache.org/confluence/display/KAFKA/
> > > Kafka+Client-side+Assignment+Proposal
> > > > >
> > > > will be in case of  "manual" partition assignment.
> > > >      On one hand the API documentation
> > > > <
> > > > https://kafka.apache.org/0101/javadoc/index.html?org/apache/
> > > kafka/clients/consumer/KafkaConsumer.html
> > > > >
> > > > says
> > > > that :
> > > >      "Manual partition assignment does not use group coordination, so
> > > > consumer failures will not cause assigned partitions to be
> rebalanced.
> > > >       Each consumer acts independently even if it shares a groupId
> with
> > > > another consumer.
> > > >       To avoid offset commit conflicts, you should usually ensure
> that
> > > the
> > > > groupId is unique for each consumer instance."
> > > >
> > > >       On the other hand I am still consuming messages in
> > > > consumer.poll(timeout) loop and inside this poll() call consumer
> should
> > > > send heartbeats to coordinator.
> > > >
> > > > Question 2 .
> > > >      If consumer doesn't send these heartbeats for [*
> > session.timeout.ms
> > > > <http://session.timeout.ms>*] period of time  should the partition
> > > > ownership be revoked or not?
> > > >
> > > >      If no - does it mean I have to use homegrown heartbeats for
> > consumer
> > > > state monitoring? How would the application know that the consumer
> > thread
> > > > is dead?
> > > >      If yes - what callback to notify the application can I use?
> > > > ConsumerRebalanceListener is available only for group subscription.
> > > >
> > > > Thank you.
> > > >
> > >
> >
>
>
>
> --
> Radha Krishna, Proddaturi
> 253-234-5657
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message