kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Snehalata Nagaje <snehalata.nag...@harbingergroup.com>
Subject Re: How to fetch old messages from kafka
Date Tue, 10 Feb 2015 04:36:25 GMT

Hi Mayuresh,


Does it mean , we need to maintain the offset in db, After every message post, we can check
the last offset maintained in db, for e.g it is 20, then on every message post we can check
latest offset present in given topic, if it greater that 30, then we can add one more entry
in db.

And while consuming in reverse direction, we can give offset 20, with certain size of bytes,
using simple comsumer, once while reading once we reach next offset 30, stop consuming, display
those set of messages?

But is the offset numbers are sequential?, Can I assume one message is represented by one
offset?

Thanks,
Snehalata



----- Original Message -----
From: "Mayuresh Gharat" <gharatmayuresh15@gmail.com>
To: users@kafka.apache.org
Sent: Friday, February 6, 2015 3:48:42 AM
Subject: Re: How to fetch old messages from kafka

If you see the code for getOffsetsBefore() :

 /**
   *  Get a list of valid offsets (up to maxSize) before the given time.
   *
   *  @param request a [[kafka.javaapi.OffsetRequest]] object.
   *  @return a [[kafka.javaapi.OffsetResponse]] object.
   */
  def getOffsetsBefore(request: *OffsetRequest*):
kafka.javaapi.OffsetResponse = {
    import kafka.javaapi.Implicits._
    underlying.getOffsetsBefore(request.underlying)
  }


It says get Offsets before a valid time.

Also if you dig deeper in to OffsetRequest :

case class OffsetRequest(requestInfo: Map[TopicAndPartition,
*PartitionOffsetRequestInfo*],
                         versionId: Short = OffsetRequest.CurrentVersion,
                         correlationId: Int = 0,
                         clientId: String = OffsetRequest.DefaultClientId,
                         replicaId: Int = Request.OrdinaryConsumerId)
    extends RequestOrResponse(Some(RequestKeys.OffsetsKey))


case class PartitionOffsetRequestInfo(*time: Long*, maxNumOffsets: Int)


So I don't think you can use it.
What you can do is maintain the beginning and end offsets for each page,
for your example it would be range 10 and use those to do the resetting.
You can maintain the max offsets the user can go back, it would be like
maintaining a sliding window and moving the window back only some
configurable max number of times.


Thanks,

Mayuresh



On Wed, Feb 4, 2015 at 1:29 AM, Snehalata Nagaje <
snehalata.nagaje@harbingergroup.com> wrote:

>
>
> Hi Mayuresh,
>
>
> Thanks for quick response.
>
> We can reset the offset and get first 10 messages, but since we need to
> back in reverse sequence, suppose user has consumed messages upto 100
> offset , currently there are only last 10 messages are visible, from 100
> -90, now I want to retrieve messages from 80 to 90, how can we do that?
>
> Can we use getOffsetBefore() function to get valid offset before given
> time, this will return all valid offsets. we can get all valid offsets
> before latest time.
>
> Then we can fetch messages from any given valid offset returned from
> getOffsetBefore().
>
> is this correct approach?
>
> Thanks,
> Snehalata
>
> ----- Original Message -----
> From: gharatmayuresh15@gmail.com
> To: users@kafka.apache.org
> Sent: Wednesday, February 4, 2015 11:24:46 AM
> Subject: Re: How to fetch old messages from kafka
>
> In that case you will have to maintain the offsets consumed and reset the
> offsets in case you need to consume from past.
>
> For example, suppose you have a userA for which you have a partitionA for
> topic TopicA. Each page shown to user increments the offset by 10. You have
> consumed till offset 100 and the user wants to go back 1 page you will have
> to reset the offset for TopicA partitionA in the zookeeper. Since you are
> using simple consumer the offset management has to be done by your
> application.
>
> Thanks,
>
> Mayuresh
>
> Sent from my iPhone
>
> > On Feb 3, 2015, at 9:18 PM, Snehalata Nagaje <
> snehalata.nagaje@harbingergroup.com> wrote:
> >
> >
> >
> > Hi ,
> >
> >
> > We are using kafka for storing messages in chat application.
> >
> > Currently we divided each topic in multiple partitions. each partition
> stores data for given customer who uses the application.
> >
> > Right now on very first request, application fetches log from kafka from
> earliest valid offset to maxiumum 100000 bytes. hence it reads all messages
> for given topic
> >
> > for given partition. Now we want to apply pagination as linkedin,
> facebook does. Only latest 10-15 messages should be displayed. And then on
> scroll down
> >
> > fetch next set of previous messages, we are using Simple consumer to
> fetch messages.
> >
> > Can you please guide on this?
> >
> >
> > Thanks,
> > Snehalata
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Mime
View raw message