kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Seshadri, Balaji" <Balaji.Sesha...@dish.com>
Subject RE: commitOffsets by partition 0.8-beta
Date Wed, 23 Apr 2014 02:20:43 GMT
Please find my code to commit offset;

public void handleAfterConsumption(MessageAndMetadata<K, P> mAndM) {
				String commitPerThread = props.getProperty("commitperthread","N");
				DESMetadata metadata= new DESMetadata(mAndM.topic(), consumerGroup, mAndM.partition(),mAndM.offset());
				if(isRunning()){
					if(commitPerThread.equals("Y")){
						commitOffset(metadata);
					}
					else{
						kafkaConsumer.commitOffsets();
					}
				}
			}


public void commitOffset(DESMetadata metaData) {
		log.info("Update offsets only for ->"+ metaData.toString());
		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(metaData.getGroupId(),metaData.getTopic());
		ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir()+"/"+metaData.getPartitionNumber(),metaData.getOffSet()+"");
	}

Thanks,

Balaji
-----Original Message-----
From: Seshadri, Balaji [mailto:Balaji.Seshadri@dish.com] 
Sent: Tuesday, April 22, 2014 8:10 PM
To: 'users@kafka.apache.org'
Subject: RE: commitOffsets by partition 0.8-beta

I'm updating the latest offset consumed to the zookeeper directory.

Say for eg if my last consumed message has offset of 5 i update it in the path,but when i
check zookeeper path it has 6 after sometimes.

Does any other process updates it ?.

________________________________________
From: Seshadri, Balaji
Sent: Friday, April 18, 2014 11:50 AM
To: 'users@kafka.apache.org'
Subject: RE: commitOffsets by partition 0.8-beta

Thanks Jun.


-----Original Message-----
From: Jun Rao [mailto:junrao@gmail.com]
Sent: Friday, April 18, 2014 11:37 AM
To: users@kafka.apache.org
Subject: Re: commitOffsets by partition 0.8-beta

We don't have the ability to commit offset at the partition level now. This feature probably
won't be available until we are done with the consumer rewrite, which is 3-4 months away.

If you want to do sth now and don't want to use SimpleConsumer, another hacky way is to turn
off auto offset commit and write the offset to ZK in the right path yourself in the app.

Thanks,

Jun


On Fri, Apr 18, 2014 at 10:02 AM, Seshadri, Balaji <Balaji.Seshadri@dish.com
> wrote:

> Hi,
>
> We have use case in DISH where we need to stop the consumer when we 
> have issues in proceeding further to database or another back end.
>
> We update offset manually for each consumed message. There are 4
> threads(e.g) consuming from same connector and when one thread commits 
> the offset there is chance that data for all other threads also get committed.
>
> We don't want to go with this to prod as we are going to take first 
> step of replacing traditional broker with Kafka for business critical 
> process, is it ok if we add commit Offset(Topic,partition) method that 
> commits only the consumed data for that particular thread.
>
> At this point we don't want to change our framework to use Simple 
> Consumer as it is lots of work for us.
>
> Please let us know the effect of committing the offset per partition 
> being consumed by the thread. We have around 131 partitions per topic 
> and around
>  20 topics.
>
> Thanks,
>
> Balaji
>
>
>

Mime
View raw message