kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steven Parkes <smpar...@smparkes.net>
Subject Re: Consuming "backwards"?
Date Fri, 06 Dec 2013 16:17:56 GMT
This functionality  is parallel to the stream being consumed into an indexed store: it serves
a different point in the latency/completeness/fault tolerance space. It's actually combined
with skipping forward and back, to prioritize new over old data, much as Otis describes, I
think.

On Dec 6, 2013, at 8:07 AM, Joe Stein <joe.stein@stealth.ly> wrote:

> Steven, you might be better off reading the Kafka stream into Cassandra and
> then doing the reads that way
> 
> /*******************************************
> Joe Stein
> Founder, Principal Consultant
> Big Data Open Source Security LLC
> http://www.stealth.ly
> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ********************************************/
> 
> 
> On Fri, Dec 6, 2013 at 11:04 AM, Steven Parkes <smparkes@smparkes.net>wrote:
> 
>> I've figured I may end up doing something like this for reasons that sound
>> similar to what Otis describes.
>> 
>> In my case, I may end up binary searching the log to match msg #s to dates
>> in the actual messages to find a particular time range of interest.
>> 
>> So far it's just theory: I haven't gotten to the point of POC/sanity
>> checking it. But it sounds like it should work ...
>> 
>> On Dec 6, 2013, at 7:56 AM, Joe Stein <joe.stein@stealth.ly> wrote:
>> 
>>> You got the hamster on the wheel with this one :)
>>> 
>>> So one way to make it work without any changes (or at least maybe very
>>> minor changes if at all) would possibly be to use your max offset and
>> fetch
>>> size this way
>>> 
>>> M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12
>>> 
>>> to get
>>> 
>>> get last 3: M10 M11 M12
>>> get last 3: M7 M8 M9
>>> get last 3: M4 M5 M6
>>> get last 3: M1 M2 M3
>>> 
>>> you would start at the end to get the highwaterk mark offset then you
>> would
>>> do
>>> 
>>> maxOffset (since your at the end) - 3 with a fetch size of 3 and then
>> keep
>>> doing that
>>> 
>>> so technically you are still looking forward but you are making the start
>>> position of your offset 3 behind
>>> 
>>> so if the offset numbers matched your numbers (so offset of M1 is 1 and
>>> offset of M2 is 2) for this example...
>>> 
>>> fetch((12-3),3)
>>> fetch((12-3-3),3)
>>> fetch(12-3-3-3),3)
>>> fetch(12-3-3-3),3)
>>> 
>>> would produce
>>> 
>>> M10 M11 M12
>>> M7 M8 M9
>>> M4 M5 M6
>>> M1 M2 M3
>>> 
>>> This would mean no broker changes :) and just "tricking" the val
>>> fetchRequest = fetchRequestBuilder in your implementation of the
>>> SimpleConsumerShell.scala to "look backwards" but you are just moving the
>>> offset backwards from the end looking forward for your fetch size
>>> 
>>> make sense?
>>> 
>>> 
>>> /*******************************************
>>> Joe Stein
>>> Founder, Principal Consultant
>>> Big Data Open Source Security LLC
>>> http://www.stealth.ly
>>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>>> ********************************************/
>>> 
>>> 
>>> On Fri, Dec 6, 2013 at 10:43 AM, Joe Stein <joe.stein@stealth.ly> wrote:
>>> 
>>>> hmmm, I just realized that wouldn't work actually (starting at the end
>> is
>>>> fine)... the fetch size being taken in is still going to increment
>> forward
>>>> ...
>>>> 
>>>> The KafkaApi would have to change because in readMessageSet it is doing
>> a
>>>> log.read of the FileMessageSet ...
>>>> 
>>>> it should be possible though but not without changing the way the log is
>>>> read when getting the partition with ReplicaManager
>>>> 
>>>> so let me take that all back and say... can't be done now but I think it
>>>> is feasible to be done with some broker modifications to read the log
>>>> differently... off the top of my head can't think of how to change the
>>>> log.read to-do this without digging more down into the code
>>>> 
>>>> 
>>>> 
>>>> /*******************************************
>>>> Joe Stein
>>>> Founder, Principal Consultant
>>>> Big Data Open Source Security LLC
>>>> http://www.stealth.ly
>>>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>>>> ********************************************/
>>>> 
>>>> 
>>>> On Fri, Dec 6, 2013 at 10:26 AM, Joe Stein <joe.stein@stealth.ly>
>> wrote:
>>>> 
>>>>> The fetch requests are very flexible to-do what you want with them.
>>>>> 
>>>>> Take a look at SimpleConsumerShell.scala as a reference
>>>>> 
>>>>> You could pass in OffsetRequest.LatestTime (-1) with a fetch size of
3
>>>>> and then just keep doing that over and over again.
>>>>> 
>>>>> I think that will do exactly what you are looking to-do.
>>>>> 
>>>>> /*******************************************
>>>>> Joe Stein
>>>>> Founder, Principal Consultant
>>>>> Big Data Open Source Security LLC
>>>>> http://www.stealth.ly
>>>>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>>>>> ********************************************/
>>>>> 
>>>>> 
>>>>> On Fri, Dec 6, 2013 at 10:04 AM, Otis Gospodnetic <
>>>>> otis.gospodnetic@gmail.com> wrote:
>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> On Fri, Dec 6, 2013 at 9:38 AM, Tom Brown <tombrown52@gmail.com>
>> wrote:
>>>>>> 
>>>>>>> Do you mean you want to start from the most recent data and go
>>>>>> backwards to
>>>>>>> the oldest data, or that you want to start with old data and
consume
>>>>>>> forwards?
>>>>>>> 
>>>>>> 
>>>>>> Forwards is the "normal way".  I'm looking for the "abnormal way",
of
>>>>>> course ;) i.e. backwards.
>>>>>> If the following are the messages that came in, oldest to newest:
>>>>>> 
>>>>>> M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12
>>>>>> 
>>>>>> Then I'd love to be able to consume from the end, say in batches
of 3,
>>>>>> like
>>>>>> this:
>>>>>> 
>>>>>> get last 3: M10 M11 M12
>>>>>> get last 3: M7 M8 M9
>>>>>> get last 3: M4 M5 M6
>>>>>> get last 3: M1 M2 M3
>>>>>> 
>>>>>> Of course, if messages keep coming in, then the new ones that arrive
>>>>>> would
>>>>>> get picked up first and, eventually, assuming Consumer can consume
>> faster
>>>>>> than messages are produced, all messages will get consumed.
>>>>>> 
>>>>>> But the important/key part is that any new ones that arrive will
get
>>>>>> picked
>>>>>> up first.
>>>>>> 
>>>>>> If the former, it would be difficult or impossible in 0.7.x, but
I
>> think
>>>>>>> doable in 0.8.x. (They added some sort of message index). If
the
>>>>>> latter,
>>>>>>> that is easily accomplished in both versions.
>>>>>>> 
>>>>>> 
>>>>>> I'd love to know if that's really so and how to do it!
>>>>>> 
>>>>>> We are looking to move to Kafka 0.8 in January and to add performance
>>>>>> monitoring for Kafka 0.8 to SPM (see
>>>>>> 
>>>>>> 
>> http://blog.sematext.com/2013/10/16/announcement-spm-performance-monitoring-for-kafka/
>>>>>> )
>>>>>> 
>>>>>> Thanks,
>>>>>> Otis
>>>>>> --
>>>>>> Performance Monitoring * Log Analytics * Search Analytics
>>>>>> Solr & Elasticsearch Support * http://sematext.com/
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> On Friday, December 6, 2013, Otis Gospodnetic wrote:
>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>> Does Kafka offer a way to consume messages in batches, but
"from the
>>>>>>> end"?
>>>>>>>> 
>>>>>>>> This would be valuable to have in all systems where the most
recent
>>>>>> data
>>>>>>> is
>>>>>>>> a lot more important than older data, such as performance
metrics,
>>>>>> and
>>>>>>>> maybe even logs....maybe also trading/financial data, and
such.
>>>>>>>> 
>>>>>>>> Any chance of this sort of functionality ever making it into
Kafka,
>>>>>> or is
>>>>>>>> this simply not implementable due to some underlying assumptions,
or
>>>>>> data
>>>>>>>> structures, or ... ?
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Otis
>>>>>>>> --
>>>>>>>> Performance Monitoring * Log Analytics * Search Analytics
>>>>>>>> Solr & Elasticsearch Support * http://sematext.com/
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>> 
>> 


Mime
View raw message