kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Englert <jengl...@gilt.com>
Subject Re: Consume from X messages ago
Date Fri, 22 Mar 2013 15:12:15 GMT
Thanks for the help.

FWIW, I ended up writing a simple Util that I can use as my consumer is
starting up to move the offset back.  It *seems* to work decently.
Thoughts?  Would this be something that would be helpful for contribution
back to Kafka, or is the idea just poor?

 /**
   * Attempts to move the consumer offset back.  If it has any issues,
throws an exception.
   */
  def moveConsumerOffsetBack(@Nonnull groupId: String, @Nonnull topic:
String,  partition: Long, approximateMessagesBack: Long) {
    Preconditions.checkNotNull(groupId)
    Preconditions.checkNotNull(topic)
    Preconditions.checkArgument(partition >= 0)
    Preconditions.checkArgument(approximateMessagesBack > 0)

    val path = ZkUtils.ConsumersPath + "/" + groupId + "/offsets/" + topic
+ "/" + partition

    if (zkClient.exists(path)) {
        val currentOffset = zkClient.readData[Any](path)  // We get this
from ZK as Any because the exact type is unpredictable. It might be Long or
String.
        val desiredOffset = math.max(0, (currentOffset.toString.toLong -
approximateMessagesBack))
        zkClient.writeData(path, desiredOffset.toString)
        warn("Reset the " + topic + " consumer to " + desiredOffset)
    }
    else {
      throw new RuntimeException("Unable to find the move the consumer back
in ZK.  This may or may not be an issue, depending on whether you expect
the path to exist. Path: " + path)
    }
  }


On Tue, Mar 19, 2013 at 2:05 PM, Neha Narkhede <neha.narkhede@gmail.com>wrote:

> I guess I missed a step between 4 and 5 -
>
> 4. Replace the exported offsets with these offsets
> *Use ImportZkOffsets to import the offsets from the modified export file.*
> 5. Restart the consumer.
>
> Thanks,
> Neha
>
>
> On Tue, Mar 19, 2013 at 11:00 AM, S Ahmed <sahmed1020@gmail.com> wrote:
>
> > I thought since the offsets in .8 are numeric and not byte offsets like
> in
> > 0.7x, you can simply just take say the current offset - 10000.
> >
> >
> > On Tue, Mar 19, 2013 at 12:16 PM, Neha Narkhede <neha.narkhede@gmail.com
> > >wrote:
> >
> > > Jim,
> > >
> > > You can leverage the ExportZkOffsets/ImportZkOffsets tools to do this.
> > > ExportZkOffsets exports the consumer offsets for your group to a file
> in
> > a
> > > certain format. You can then place the desired offset per partition you
> > > want to reset your consumer to in the exported file.
> > >
> > > 1. Shutdown the consumer
> > > 2. Export current offsets
> > > 3. Get the desired offset (current offset - 10K). As David mentions,
> this
> > > is approximate and might get you more than 10K messages if the data is
> > > compressed.
> > > 4. Replace the exported offsets with these offsets
> > > 5. Restart the consumer.
> > >
> > > HTH,
> > > Neha
> > >
> > >
> > > On Tue, Mar 19, 2013 at 8:49 AM, David Arthur <mumrah@gmail.com>
> wrote:
> > >
> > > > This API is exposed through the SimpleConsumer scala class. See
> > > > https://github.com/apache/**kafka/blob/trunk/core/src/**
> > > > main/scala/kafka/consumer/**SimpleConsumer.scala#L60<
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L60
> > > >
> > > >
> > > > You will need to set earliestOrLatest to -1 for the latest offset.
> > > >
> > > > There is also a command line tool https://github.com/apache/**
> > > >
> > >
> >
> kafka/blob/trunk/core/src/**main/scala/kafka/tools/**GetOffsetShell.scala<
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/GetOffsetShell.scala
> > > >
> > > >
> > > > -David
> > > >
> > > >
> > > > On 3/19/13 11:25 AM, James Englert wrote:
> > > >
> > > >> I'm still a bit lost.  Where is the offsets API?  I.e. which class?
> > > >>
> > > >>
> > > >> On Tue, Mar 19, 2013 at 11:16 AM, David Arthur <mumrah@gmail.com>
> > > wrote:
> > > >>
> > > >>  Using the Offsets API, you can get the latest offset by setting
> time
> > to
> > > >>> -1. Then you subtract 10000
> > > >>>
> > > >>> There is no guarantee that 10k prior messages exist of course,
so
> > you'd
> > > >>> need to handle that case.
> > > >>>
> > > >>> -David
> > > >>>
> > > >>>
> > > >>> On 3/19/13 11:04 AM, James Englert wrote:
> > > >>>
> > > >>>  Hi,
> > > >>>>
> > > >>>> I'm using Kafka 0.8.  I would like to setup a consumer to
fetch
> the
> > > last
> > > >>>> 10,000 messages and then start consuming messages.
> > > >>>>
> > > >>>> I see the configuration autooffset.reset, but that isn't quite
> what
> > I
> > > >>>> want.  I want only the last 10,000 messages.
> > > >>>>
> > > >>>> Is there a good way to achieve this in 0.8, besides just hacking
> the
> > > >>>> data
> > > >>>> in ZK?
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Jim
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>
> > > >
> > >
> >
>



-- 
Jim Englert
Gilt Groupe
2 Park Ave South, 5th Floor
New York, NY 10011
M: 847-707-2942
Please accept my invitation to join Gilt:
http://www.giltgroupe.com/invite/jenglert

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message