kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jiangjie Qin <j...@linkedin.com.INVALID>
Subject Re: Questions re: auto-commit and camel-kafka
Date Sat, 04 Jul 2015 07:59:02 GMT
If you disabled auto offset commit. Offset won’t be committed unless you did it manually.
So in your shutdown case, if there is some exception you should not commit offsets. Similarly
if there is something outbound is still inflight, you need to wait rather than commit offsets
for them.

For the initial message loss. Kafka actually create topic asynchronously which means topic
might have not been created yet when you started producing (We know it is not ideal). I would
try to increase the retry back off time or increase retries to see if it resolve the issue.

Jiangjie (Becket) Qin

From: "Michael J. Kitchin" <mcoyote.jr@gmail.com<mailto:mcoyote.jr@gmail.com>>
Reply-To: "mcoyote@mcoyote.com<mailto:mcoyote@mcoyote.com>" <mcoyote@mcoyote.com<mailto:mcoyote@mcoyote.com>>
Date: Friday, July 3, 2015 at 3:01 PM
To: Jiangjie Qin <jqin@linkedin.com<mailto:jqin@linkedin.com>>
Cc: "users@kafka.apache.org<mailto:users@kafka.apache.org>" <users@kafka.apache.org<mailto:users@kafka.apache.org>>
Subject: Re: Questions re: auto-commit and camel-kafka

Hi there,

Thanks for getting back to me so quickly.

Per auto-commit:
That's as I thought. Here is the setup, so far:
- Auto-commit off
- Same/fixed consumer group
- Auto offset reset set to "largest"

When the camel route is shut down in an orderly fashion the consumer and pending operation
appears to clean up and commit the results, even though exchanges are in-flight.

I walked through this in the debugger, FWIW. If interested, you can see the code, here:
- https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java

- Processing is underway line 148: processor.process(exchange);
- Camel triggers a shutdown and this returns, as normal without any exchange exception check
- The barrier await hits at line 165: berrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
- This aligns with the rest of the streams and triggers the barrier action, which in turn
performs the commit at line 193: consumer.commitOffsets();

Since any exception from line 148 is suppressed and there's no subsequent interrupted() or
exchange exception check it looks like there's no way to to signal not to commit and in-flight
exchanges are a guaranteed loss.

Does this sound correct?

If so, barring a change from the maintainers I figure I might fork this code and (optionally)
bypass the consumer.commitOffsets(); during shutdown.Thoughts?

Per the lost initial message:
The consumer is started first and uses a consistent/fixed group id, as mentioned. I did notice
suppressed exception when I walked through in the debugger, however  "failed to send after
3 tries".

This may be a simpler nut to crack however -- the observed behavior isn't consistent and doesn't
occur on camel route restart (i.e., when not restarting the process), suggesting a race condition.
I'm using an embedded broker, so it's realistic it may not be completely started when this
happens. I'll configure the producer to retry again with longer delays and/or delay route
startup to see if that helps.

Please let me know if I may provide any additional information. Thanks.


On Fri, Jul 3, 2015 at 1:24 AM, Jiangjie Qin <jqin@linkedin.com<mailto:jqin@linkedin.com>>
Hi Michael,

For the consumer side question. Yes, turning off auto offset commit is
what you want. But you should also commit offset manually after you have
written the processed data to somewhere else. Also, the offset is only
associated with a particular consumer group. So if you restart your
consumer with a different consumer group, by default it will consume from
the log end of the partitions. And this might also the reason why you see
the first message ³eaten² by producer - if you start producer before
starting consumer and the consumer uses a new group id, it will consume
from log end, which might miss the messages that are already produced.

Can you maybe clarify what did you do with the above?


Jiangjie (Becket) Qin

On 7/2/15, 9:47 PM, "Michael J. Kitchin" <mcoyote.jr@gmail.com<mailto:mcoyote.jr@gmail.com>>

>Hi there,
>These are questions re: the official camel-kafka integration. Since the
>issues touch on both camel and kafka I'm sending to both users lists in
>search of answers.
>- - - -
>I have a simple, inonly, point-to-point, synchronous camel route (a)
>consuming from kafka topic (consumer), (b) running the resulting exchanges
>(messages) through a processor chain and (c) forwarding the outcome on to
>somewhere else.
>If the runtime dies while exchanges are in this pipeline, I want things to
>pick up where they left off when restarted. I'm not picky about seeing the
>same data more than once (as long as it's recent), I just can't lose
>In brief, everything's working great except this failure/recovery part --
>in-flight exchanges are getting lost and there is no, apparent re-delivery
>on restart. My reading of the JMX data suggests the kafka logs are intact.
>I think this has to do with consumer auto-commit, which is the default
>behavior. My reading of the kafka and camel-kafka docs suggests disabling
>auto-commit will give me what I want, but when I try it I'm not seeing
>re-delivery kick off when I restart.
>So, first question:
>(1) Is auto-commit off the key to getting what I want and/or what else
>might I need to do?
>- - - - -
>Meanwhile, on the producer side I'm seeing the first (and only the first)
>message apparently get eaten. It's possible it's being buffered, but it
>never seems to timeout. There are no error messages on startup and the
>camel context, routes, etc. appear to have started successfully. The
>message and everything that follows is golden.
>The payloads are ~70-character byte arrays, if it makes a difference.
>Second question, then:
>(2) Is there a batching setting or something else I might be overlooking
>behind this behavior?
>- - - - -
>Thanks, in advance for your time and consideration. We've been impressed
>with kafka so far and are looking forward to employing it in production.
>Please let me know if I may provide any additional information. Thanks.
>- - - - -
>*Michael J. Kitchin*
>Senior Software Engineer
>Operational Systems, Inc.
>4450 Arapahoe Avenue, Suite 100
>Boulder, CO 80303
>Phone: 719-271-6476<tel:719-271-6476>
>Email: michael.kitchin@opsysinc.com<mailto:michael.kitchin@opsysinc.com>
>Web: www.opsysinc.com<http://www.opsysinc.com>

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message