kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ian Friedman <...@flurry.com>
Subject Re: High Level Consumer error handling and clean exit
Date Tue, 09 Jul 2013 21:51:07 GMT
Hey Chris, 

The way I handled this in my application using the High Level Consumer was to turn off auto-commit
and commit manually after finishing a batch of messages (obviously you could do it after every
message, but for my purposes it was better to have batches) 

-- 
Ian Friedman


On Tuesday, July 9, 2013 at 4:09 PM, Chris Curtin wrote:

> Enhancement submitted: https://issues.apache.org/jira/browse/KAFKA-966
> 
> 
> 
> On Tue, Jul 9, 2013 at 3:53 PM, Chris Curtin <curtin.chris@gmail.com (mailto:curtin.chris@gmail.com)>
wrote:
> 
> > Thanks. I know I can write a SimpleConsumer to do this, but it feels like
> > the High Level consumer is _so_ close to being robust enough to handle
> > what I'd think people want to do in most applications. I'm going to submit
> > an enhancement request.
> > 
> > I'm trying to understand the level of data loss in this situation, so I
> > looked deeper into the KafkaStream logic: it looks like a KafkaStream
> > includes a BlockingQueue for transferring the messages to my code from
> > Kafka. If I call shutdown() when I detect the problem, are the messages
> > already in the BlockingQueue considered 'read' by Kafka, or does the
> > shutdown peek into the Queue to see what is still there before updating
> > ZooKeeper?
> > 
> > My concern is if that queue is not empty I'll be losing more than the one
> > message that led to the failure.
> > 
> > I'm also curious how others are handling this situation. Do you assume the
> > message that is causing problems is lost or somehow know to go get it
> > later? I'd think others would have this problem too.
> > 
> > Thanks,
> > 
> > Chris
> > 
> > 
> > 
> > On Tue, Jul 9, 2013 at 3:23 PM, Philip O'Toole <philip@loggly.com (mailto:philip@loggly.com)>
wrote:
> > 
> > > OK.
> > > 
> > > It sounds like you're requesting functionality that the high-level
> > > consumer
> > > simply doesn't have. As I am sure you know, there is no API call that
> > > supports "handing back a message".
> > > 
> > > I might be missing something, but if you need this kind of control, I
> > > think
> > > you need to code your application differently. You could try creating a
> > > ConsumerConnection per partition (your clients will then need to know the
> > > number of partitions out there). That way commitOffsets() will actually
> > > only apply to that partition. Auto-commit the same way. It might give you
> > > the level of control you need.
> > > 
> > > Philip
> > > 
> > > On Tue, Jul 9, 2013 at 2:22 PM, Chris Curtin <curtin.chris@gmail.com (mailto:curtin.chris@gmail.com)>
> > > wrote:
> > > 
> > > > Hi Philip,
> > > > 
> > > > Correct, I don't want to explicitly control the offset committing. The
> > > > ConsumerConnector handles that well enough except for when I want to
> > > > shutdown and NOT have Kafka think I consumed that last message for a
> > > > stream. This isn't the crash case, it is a case where the logic
> > > > 
> > > 
> > > consuming
> > > > the message detects and error and wants to cleanly exit until that issue
> > > > can be resolved, but not lose the message it was trying to process when
> > > > 
> > > 
> > > the
> > > > problem is resolved.
> > > > 
> > > > My understanding is that the commitOffsets() call is across all threads,
> > > > not just for the stream my thread is reading from. So knowing it is
> > > > 
> > > 
> > > okay to
> > > > call this requires coordination across all my threads, which makes a
> > > 
> > > High
> > > > Level Consumer a lot harder to write correctly.
> > > > 
> > > > Thinking about what I'd like to happen is: my code hands the message
> > > back
> > > > to the KafkaStream (or whatever level knows about the consumed offsets)
> > > 
> > > and
> > > > says
> > > > - set the next start offset for this topic/partition to this message in
> > > > ZooKeeper
> > > > - cleanly shutdown the stream from the broker(s)
> > > > - don't force a rebalance on the consumer since something is wrong with
> > > > processing of the data in the message, not the message.
> > > > - If I try to use the stream again I should get an exception
> > > > - I don't think I would want this to cause a complete shutdown of the
> > > > ConsumerConnector, in case other threads are still processing. If all
> > > > threads have the same issue they will all fail soon enough and do the
> > > > 
> > > 
> > > same
> > > > logic. But if only one thread fails, our Operations teams will need to
> > > > resolve the issue then do a clean restart to recover.
> > > > 
> > > > I think this logic would only happen when the down stream system was
> > > having
> > > > issues since the iterator would be drained correctly when the 'shutdown'
> > > > call to ConsumerConnector is made.
> > > > 
> > > > Thanks,
> > > > 
> > > > Chris
> > > > 
> > > > 
> > > > 
> > > > On Tue, Jul 9, 2013 at 11:21 AM, Philip O'Toole <philip@loggly.com
(mailto:philip@loggly.com)>
> > > wrote:
> > > > 
> > > > > It seems like you're not explicitly controlling the offsets. Is that
> > > > > correct?
> > > > > 
> > > > > If so, the moment you pull a message from the stream, the client
> > > > framework
> > > > > considers it processed. So if your app subsequently crashes before
the
> > > > > message is fully processed, and "auto-commit" updates the offsets
in
> > > > > Zookeeper, you will drop that message.
> > > > > 
> > > > > The solution to this to call commitOffsets() explicitly.
> > > > > 
> > > > > Philip
> > > > > 
> > > > > On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin <curtin.chris@gmail.com
(mailto:curtin.chris@gmail.com)
> > > > > > wrote:
> > > > > 
> > > > > 
> > > > > > Hi,
> > > > > > 
> > > > > > I'm working through a production-level High Level Consumer app
and
> > > > have a
> > > > > > couple of error/shutdown questions to understand how the offset
> > > > > 
> > > > 
> > > > 
> > > 
> > > storage
> > > > > is
> > > > > > handled.
> > > > > > 
> > > > > > Test case - simulate an error writing to destination application,
> > > for
> > > > > > example a database, offset is 'lost'
> > > > > > 
> > > > > > Scenario
> > > > > > - write 500 messages for each topic/partition
> > > > > > - use the example High Level Consumer code I wrote for the Wiki
> > > > > > - Change the code so that every 10th read from the 'hasNext()'
> > > > > > ConsumerIterator breaks out of the loop and returns from the
thread,
> > > > > > simulating a hard error. I write the offset to System.out to
see
> > > > > > 
> > > > > 
> > > > 
> > > 
> > > what
> > > > was
> > > > > > provided
> > > > > > - startup again and look to see what offset was first emitted
for a
> > > > > > partition
> > > > > > 
> > > > > > Issue: Kafka treats the offset for the message read that caused
me
> > > to
> > > > > break
> > > > > > out of the loop as processed (as expected), but I really failed.
How
> > > > > 
> > > > > 
> > > > 
> > > > do I
> > > > > > tell Kafka that I didn't really consume that offset?
> > > > > > 
> > > > > > Here is the example code in the 'business logic':
> > > > > > 
> > > > > > public void run() {
> > > > > > ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
> > > > > > int counter = 0;
> > > > > > while (it.hasNext()) {
> > > > > > MessageAndMetadata<byte[], byte[]> msg = it.next();
> > > > > > if (counter == 10) {
> > > > > > System.out.println("Stopping Thread " +
> > > > > > 
> > > > > 
> > > > 
> > > > 
> > > 
> > > m_threadNumber
> > > > +
> > > > > ":
> > > > > > Partition: " + msg.partition() +
> > > > > > ": Offset: " + msg.offset() + " :" + new
> > > > > > String(msg.message()));
> > > > > > break;
> > > > > > }
> > > > > > System.out.println("Thread " + m_threadNumber + ":
> > > > > > 
> > > > > 
> > > > > Partition: "
> > > > > > + msg.partition() +
> > > > > > ": Offset: " + msg.offset() + " :" + new
> > > > > > String(msg.message()));
> > > > > > counter++;
> > > > > > }
> > > > > > 
> > > > > > System.out.println("Shutting down Thread: " +
> > > m_threadNumber);
> > > > > > }
> > > > > > 
> > > > > > I understand that handling 'hard' errors like JVM crashes, kill
-9
> > > etc.
> > > > > may
> > > > > > leave the offsets in ZooKeeper incorrect, but I'm trying to
> > > > > 
> > > > > 
> > > > 
> > > 
> > > understand
> > > > > what
> > > > > > happens in a clean shutdown where Kafka and the Consumer are
> > > > > 
> > > > > 
> > > > 
> > > 
> > > behaving
> > > > > > correctly but I can't process what I read.
> > > > > > 
> > > > > > This also feels like I'm blurring SimpleConsumer theory into
this,
> > > but
> > > > > > except for the exception/shutdown case High Level Consumer does
> > > > > 
> > > > > everything
> > > > > > I want.
> > > > > > 
> > > > > > 
> > > > > > Thanks,
> > > > > > 
> > > > > > Chris 


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message