kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ewen Cheslack-Postava <e...@confluent.io>
Subject Re: How to disable auto commit for SimpleConsumer kafka 0.8.1
Date Sun, 11 Dec 2016 03:42:45 GMT
The simple consumer doesn't do auto-commit. It really only issues
individual low-level Kafka protocol request types, so `commitOffsets` is
the only way it should write offsets.

Is it possible it crashed after the request was sent but before finishing
reading the response?

Side-note: I know you mentioned 0.8.1, but if at all possible, we'd highly
recommend moving to the new consumer if at all possible. It supports both
simple and consumer group modes and is what will be supported in the long
term moving forward.

-Ewen

On Tue, Dec 6, 2016 at 12:47 PM, Anjani Gupta <anjani.gupta@salesforce.com>
wrote:

> I want to disable auto commit for kafka SimpleConsumer. I am using 0.8.1
> version.For High level consumer, config options can be set and passed via
> consumerConfig as follows kafka.consumer.Consumer.
> createJavaConsumerConnector(this.consumerConfig);
>
> How can I achieve the same for SimpleConsumer? I mainly want to disable
> auto commit. I tried setting auto commit to false in consumer.properties
> and restarted kafka server, zookeeper and producer. But, that does not
> work. I think I need to apply this setting through code, not in
> consumer.properties. Can anyone help here?
>
> Here is how my code looks like
>
> List<TopicAndPartition> topicAndPartitionList = new ArrayList<>();
> topicAndPartitionList.add(topicAndPartition);
> OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new
>  OffsetFetchRequest("testGroup", topicAndPartitionList, (short) 0,
> correlationId,    clientName));
>
> Map<TopicAndPartition, OffsetMetadataAndError> offsets =
> offsetFetchResponse.offsets();
> FetchRequest req = new FetchRequestBuilder() .clientId(clientName)
>            .addFetch(a_topic, a_partition,
> offsets.get(topicAndPartition).offset(), 100000)   .build();
> long readOffset = offsets.get(topicAndPartition).offset();
> FetchResponse fetchResponse = consumer.fetch(req);
>
> //Consume messages from fetchResponse
>
>
> Map<TopicAndPartition, OffsetMetadataAndError > requestInfo = new
> HashMap<>  ();
> requestInfo.put(topicAndPartition, new
> OffsetMetadataAndError(readOffset, "metadata", (short)0));
> OffsetCommitResponse offsetCommitResponse = consumer.commitOffsets(new
>         OffsetCommitRequest("testGroup", requestInfo, (short)0,
> correlationId, clientName));
>
>
> If above code crashes before committing offset, I still get latest offset
> as result of offsets.get(topicAndPartition).offset() in next run which
> makes me to think that auto commit of offset happens as code is executed.
>



-- 
Thanks,
Ewen

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message