kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gwen Shapira <gshap...@cloudera.com>
Subject Re: High Level Consumer and Commit
Date Wed, 03 Sep 2014 22:20:28 GMT
Thanks, Balaji!

It looks like your approach depends on specific implementation
details, such as the directory structure in ZK.
In this case it doesn't matter much since the APIs are not stable yet,
but in general, wouldn't you prefer to use public APIs, even if it
means multiple consumers without threads?

Gwen

On Wed, Sep 3, 2014 at 3:06 PM, Seshadri, Balaji
<Balaji.Seshadri@dish.com> wrote:
> We can still do with single ConsumerConnector with multiple threads.
>
> Each thread updates its own data in zookeeper.The below one is our own implementation
of commitOffset.
>
> public void commitOffset(DESMetadata metaData) {
>                 log.debug("Update offsets only for ->"+ metaData.toString());
>                 String key = metaData.getTopic()+"/"+metaData.getPartitionNumber();
>                 Long nextOffset = metaData.getOffSet()+1;
>                 if(nextOffset!=checkPointedOffset.get(key)){
>                         ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(metaData.getGroupId(),metaData.getTopic());
>                         ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir()+"/"+metaData.getPartitionNumber(),nextOffset+"");
>                         checkPointedOffset.put(key,nextOffset);
>                 }
> }
>
> -----Original Message-----
> From: Gwen Shapira [mailto:gshapira@cloudera.com]
> Sent: Tuesday, September 02, 2014 11:38 PM
> To: users@kafka.apache.org; Philip O'Toole
> Subject: Re: High Level Consumer and Commit
>
> I believe a simpler solution would be to create multiple ConsumerConnector, each with
1 thread (single ConsumerStream) and use commitOffset API to commit all partitions managed
by each ConsumerConnector after the thread finished processing the messages.
>
> Does that solve the problem, Bhavesh?
>
> Gwen
>
> On Tue, Sep 2, 2014 at 5:47 PM, Philip O'Toole <philip.otoole@yahoo.com.invalid>
wrote:
>> Yeah, from reading that I suspect you need the SimpleConsumer. Try it out and see.
>>
>> Philip
>>
>>
>> -----------------------------------------
>> http://www.philipotoole.com
>>
>>
>> On Tuesday, September 2, 2014 5:43 PM, Bhavesh Mistry <mistry.p.bhavesh@gmail.com>
wrote:
>>
>>
>>
>> Hi Philip,
>>
>> Yes, We have disabled auto commit but, we need to be able to read from
>> particular offset if we manage the offset ourself in some storage(DB).
>> High Level consumer does not allow per partition management plug-ability.
>>
>> I like to have the High Level consumers Failover and auto rebalancing.
>> We just need plug ability of offset management.
>>
>> Thanks,
>>
>> Bhavesh
>>
>>
>>
>> On Tue, Sep 2, 2014 at 5:20 PM, Philip O'Toole <
>> philip.otoole@yahoo.com.invalid> wrote:
>>
>>> No, you'll need to write your own failover.
>>>
>>> I'm not sure I follow your second question, but the high-level
>>> Consumer should be able to do what you want if you disable
>>> auto-commit. I'm not sure what else you're asking.
>>>
>>>
>>> Philip
>>>
>>>
>>> -----------------------------------------
>>> http://www.philipotoole.com
>>>
>>>
>>> On Tuesday, September 2, 2014 5:15 PM, Bhavesh Mistry <
>>> mistry.p.bhavesh@gmail.com> wrote:
>>>
>>>
>>>
>>> Hi Philip,
>>>
>>> Thanks for the update.  With Simple Consumer I will not get failover
>>> and rebalance that is provided out of box.  what is other option not
>>> to block reading and keep processing and commit only when batch is done.
>>>
>>> Thanks,
>>>
>>> Bhavesh
>>>
>>>
>>>
>>> On Tue, Sep 2, 2014 at 4:43 PM, Philip O'Toole <
>>> philip.otoole@yahoo.com.invalid> wrote:
>>>
>>> > Either use the SimpleConsumer which gives you much finer-grained
>>> > control, or (this worked with 0.7) spin up a ConsumerConnection
>>> > (this is a
>>> HighLevel
>>> > consumer concept) per partition, turn off auto-commit.
>>> >
>>> > Philip
>>> >
>>> >
>>> > -----------------------------------------
>>> > http://www.philipotoole.com
>>> >
>>> >
>>> > On Tuesday, September 2, 2014 4:38 PM, Bhavesh Mistry <
>>> > mistry.p.bhavesh@gmail.com> wrote:
>>> >
>>> >
>>> >
>>> > Hi Kafka Group,
>>> >
>>> > I have to pull the data from the Topic and index into Elastic
>>> > Search with Bulk API and wanted to commit only batch that has been
>>> > committed and
>>> still
>>> > continue to read from topic further on same topic.  I have auto
>>> > commit to be off.
>>> >
>>> >
>>> > List<Message>  batch .....
>>> >
>>> > while (iterator.hasNext()) {
>>> > batch.add(iterator.next().message());
>>> > if(batch size is 50 ){
>>> >       //===>>>>  Once the bulk API is successful it will commit
the
>>> offset
>>> > to zookeeper...
>>> >       executor.submit(new Thread() process batch and commit batch,
>>> > cconsumerConnector)
>>> >       batch = new batch buffer....
>>> >    }
>>> > }
>>> >
>>> > This commitOffset API commits all messages that have been read so far.
>>> > What is best way to continue reading and only commit another thread
>>> finish
>>> > batch process is successful.  This will lead to fragmentation of
>>> > the Consumer offset so what is best way to implement continuous
>>> > reading
>>> stream
>>> > and commit the rage offset.
>>> >
>>> > Is Simple Consumer a better approach for this.
>>> >
>>> >
>>> > Thanks,
>>> >
>>> > Bhavesh
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > Thanks,
>>> > Bhavesh
>>> >
>>>

Mime
View raw message