kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steven Parkes <smpar...@smparkes.net>
Subject Re: Consuming "backwards"?
Date Fri, 06 Dec 2013 16:04:33 GMT
I've figured I may end up doing something like this for reasons that sound similar to what
Otis describes.

In my case, I may end up binary searching the log to match msg #s to dates in the actual messages
to find a particular time range of interest.

So far it's just theory: I haven't gotten to the point of POC/sanity checking it. But it sounds
like it should work ...

On Dec 6, 2013, at 7:56 AM, Joe Stein <joe.stein@stealth.ly> wrote:

> You got the hamster on the wheel with this one :)
> 
> So one way to make it work without any changes (or at least maybe very
> minor changes if at all) would possibly be to use your max offset and fetch
> size this way
> 
> M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12
> 
> to get
> 
> get last 3: M10 M11 M12
> get last 3: M7 M8 M9
> get last 3: M4 M5 M6
> get last 3: M1 M2 M3
> 
> you would start at the end to get the highwaterk mark offset then you would
> do
> 
> maxOffset (since your at the end) - 3 with a fetch size of 3 and then keep
> doing that
> 
> so technically you are still looking forward but you are making the start
> position of your offset 3 behind
> 
> so if the offset numbers matched your numbers (so offset of M1 is 1 and
> offset of M2 is 2) for this example...
> 
> fetch((12-3),3)
> fetch((12-3-3),3)
> fetch(12-3-3-3),3)
> fetch(12-3-3-3),3)
> 
> would produce
> 
> M10 M11 M12
> M7 M8 M9
> M4 M5 M6
> M1 M2 M3
> 
> This would mean no broker changes :) and just "tricking" the val
> fetchRequest = fetchRequestBuilder in your implementation of the
> SimpleConsumerShell.scala to "look backwards" but you are just moving the
> offset backwards from the end looking forward for your fetch size
> 
> make sense?
> 
> 
> /*******************************************
> Joe Stein
> Founder, Principal Consultant
> Big Data Open Source Security LLC
> http://www.stealth.ly
> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ********************************************/
> 
> 
> On Fri, Dec 6, 2013 at 10:43 AM, Joe Stein <joe.stein@stealth.ly> wrote:
> 
>> hmmm, I just realized that wouldn't work actually (starting at the end is
>> fine)... the fetch size being taken in is still going to increment forward
>> ...
>> 
>> The KafkaApi would have to change because in readMessageSet it is doing a
>> log.read of the FileMessageSet ...
>> 
>> it should be possible though but not without changing the way the log is
>> read when getting the partition with ReplicaManager
>> 
>> so let me take that all back and say... can't be done now but I think it
>> is feasible to be done with some broker modifications to read the log
>> differently... off the top of my head can't think of how to change the
>> log.read to-do this without digging more down into the code
>> 
>> 
>> 
>> /*******************************************
>> Joe Stein
>> Founder, Principal Consultant
>> Big Data Open Source Security LLC
>> http://www.stealth.ly
>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>> ********************************************/
>> 
>> 
>> On Fri, Dec 6, 2013 at 10:26 AM, Joe Stein <joe.stein@stealth.ly> wrote:
>> 
>>> The fetch requests are very flexible to-do what you want with them.
>>> 
>>> Take a look at SimpleConsumerShell.scala as a reference
>>> 
>>> You could pass in OffsetRequest.LatestTime (-1) with a fetch size of 3
>>> and then just keep doing that over and over again.
>>> 
>>> I think that will do exactly what you are looking to-do.
>>> 
>>> /*******************************************
>>>  Joe Stein
>>> Founder, Principal Consultant
>>> Big Data Open Source Security LLC
>>> http://www.stealth.ly
>>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>>> ********************************************/
>>> 
>>> 
>>> On Fri, Dec 6, 2013 at 10:04 AM, Otis Gospodnetic <
>>> otis.gospodnetic@gmail.com> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> On Fri, Dec 6, 2013 at 9:38 AM, Tom Brown <tombrown52@gmail.com> wrote:
>>>> 
>>>>> Do you mean you want to start from the most recent data and go
>>>> backwards to
>>>>> the oldest data, or that you want to start with old data and consume
>>>>> forwards?
>>>>> 
>>>> 
>>>> Forwards is the "normal way".  I'm looking for the "abnormal way", of
>>>> course ;) i.e. backwards.
>>>> If the following are the messages that came in, oldest to newest:
>>>> 
>>>> M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12
>>>> 
>>>> Then I'd love to be able to consume from the end, say in batches of 3,
>>>> like
>>>> this:
>>>> 
>>>> get last 3: M10 M11 M12
>>>> get last 3: M7 M8 M9
>>>> get last 3: M4 M5 M6
>>>> get last 3: M1 M2 M3
>>>> 
>>>> Of course, if messages keep coming in, then the new ones that arrive
>>>> would
>>>> get picked up first and, eventually, assuming Consumer can consume faster
>>>> than messages are produced, all messages will get consumed.
>>>> 
>>>> But the important/key part is that any new ones that arrive will get
>>>> picked
>>>> up first.
>>>> 
>>>> If the former, it would be difficult or impossible in 0.7.x, but I think
>>>>> doable in 0.8.x. (They added some sort of message index). If the
>>>> latter,
>>>>> that is easily accomplished in both versions.
>>>>> 
>>>> 
>>>> I'd love to know if that's really so and how to do it!
>>>> 
>>>> We are looking to move to Kafka 0.8 in January and to add performance
>>>> monitoring for Kafka 0.8 to SPM (see
>>>> 
>>>> http://blog.sematext.com/2013/10/16/announcement-spm-performance-monitoring-for-kafka/
>>>> )
>>>> 
>>>> Thanks,
>>>> Otis
>>>> --
>>>> Performance Monitoring * Log Analytics * Search Analytics
>>>> Solr & Elasticsearch Support * http://sematext.com/
>>>> 
>>>> 
>>>> 
>>>>> On Friday, December 6, 2013, Otis Gospodnetic wrote:
>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> Does Kafka offer a way to consume messages in batches, but "from
the
>>>>> end"?
>>>>>> 
>>>>>> This would be valuable to have in all systems where the most recent
>>>> data
>>>>> is
>>>>>> a lot more important than older data, such as performance metrics,
>>>> and
>>>>>> maybe even logs....maybe also trading/financial data, and such.
>>>>>> 
>>>>>> Any chance of this sort of functionality ever making it into Kafka,
>>>> or is
>>>>>> this simply not implementable due to some underlying assumptions,
or
>>>> data
>>>>>> structures, or ... ?
>>>>>> 
>>>>>> Thanks,
>>>>>> Otis
>>>>>> --
>>>>>> Performance Monitoring * Log Analytics * Search Analytics
>>>>>> Solr & Elasticsearch Support * http://sematext.com/
>>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 


Mime
View raw message