This functionality is parallel to the stream being consumed into an indexed store: it serves
a different point in the latency/completeness/fault tolerance space. It's actually combined
with skipping forward and back, to prioritize new over old data, much as Otis describes, I
think.
On Dec 6, 2013, at 8:07 AM, Joe Stein <joe.stein@stealth.ly> wrote:
> Steven, you might be better off reading the Kafka stream into Cassandra and
> then doing the reads that way
>
> /*******************************************
> Joe Stein
> Founder, Principal Consultant
> Big Data Open Source Security LLC
> http://www.stealth.ly
> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ********************************************/
>
>
> On Fri, Dec 6, 2013 at 11:04 AM, Steven Parkes <smparkes@smparkes.net>wrote:
>
>> I've figured I may end up doing something like this for reasons that sound
>> similar to what Otis describes.
>>
>> In my case, I may end up binary searching the log to match msg #s to dates
>> in the actual messages to find a particular time range of interest.
>>
>> So far it's just theory: I haven't gotten to the point of POC/sanity
>> checking it. But it sounds like it should work ...
>>
>> On Dec 6, 2013, at 7:56 AM, Joe Stein <joe.stein@stealth.ly> wrote:
>>
>>> You got the hamster on the wheel with this one :)
>>>
>>> So one way to make it work without any changes (or at least maybe very
>>> minor changes if at all) would possibly be to use your max offset and
>> fetch
>>> size this way
>>>
>>> M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12
>>>
>>> to get
>>>
>>> get last 3: M10 M11 M12
>>> get last 3: M7 M8 M9
>>> get last 3: M4 M5 M6
>>> get last 3: M1 M2 M3
>>>
>>> you would start at the end to get the highwaterk mark offset then you
>> would
>>> do
>>>
>>> maxOffset (since your at the end) - 3 with a fetch size of 3 and then
>> keep
>>> doing that
>>>
>>> so technically you are still looking forward but you are making the start
>>> position of your offset 3 behind
>>>
>>> so if the offset numbers matched your numbers (so offset of M1 is 1 and
>>> offset of M2 is 2) for this example...
>>>
>>> fetch((12-3),3)
>>> fetch((12-3-3),3)
>>> fetch(12-3-3-3),3)
>>> fetch(12-3-3-3),3)
>>>
>>> would produce
>>>
>>> M10 M11 M12
>>> M7 M8 M9
>>> M4 M5 M6
>>> M1 M2 M3
>>>
>>> This would mean no broker changes :) and just "tricking" the val
>>> fetchRequest = fetchRequestBuilder in your implementation of the
>>> SimpleConsumerShell.scala to "look backwards" but you are just moving the
>>> offset backwards from the end looking forward for your fetch size
>>>
>>> make sense?
>>>
>>>
>>> /*******************************************
>>> Joe Stein
>>> Founder, Principal Consultant
>>> Big Data Open Source Security LLC
>>> http://www.stealth.ly
>>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>>> ********************************************/
>>>
>>>
>>> On Fri, Dec 6, 2013 at 10:43 AM, Joe Stein <joe.stein@stealth.ly> wrote:
>>>
>>>> hmmm, I just realized that wouldn't work actually (starting at the end
>> is
>>>> fine)... the fetch size being taken in is still going to increment
>> forward
>>>> ...
>>>>
>>>> The KafkaApi would have to change because in readMessageSet it is doing
>> a
>>>> log.read of the FileMessageSet ...
>>>>
>>>> it should be possible though but not without changing the way the log is
>>>> read when getting the partition with ReplicaManager
>>>>
>>>> so let me take that all back and say... can't be done now but I think it
>>>> is feasible to be done with some broker modifications to read the log
>>>> differently... off the top of my head can't think of how to change the
>>>> log.read to-do this without digging more down into the code
>>>>
>>>>
>>>>
>>>> /*******************************************
>>>> Joe Stein
>>>> Founder, Principal Consultant
>>>> Big Data Open Source Security LLC
>>>> http://www.stealth.ly
>>>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>>>> ********************************************/
>>>>
>>>>
>>>> On Fri, Dec 6, 2013 at 10:26 AM, Joe Stein <joe.stein@stealth.ly>
>> wrote:
>>>>
>>>>> The fetch requests are very flexible to-do what you want with them.
>>>>>
>>>>> Take a look at SimpleConsumerShell.scala as a reference
>>>>>
>>>>> You could pass in OffsetRequest.LatestTime (-1) with a fetch size of
3
>>>>> and then just keep doing that over and over again.
>>>>>
>>>>> I think that will do exactly what you are looking to-do.
>>>>>
>>>>> /*******************************************
>>>>> Joe Stein
>>>>> Founder, Principal Consultant
>>>>> Big Data Open Source Security LLC
>>>>> http://www.stealth.ly
>>>>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>>>>> ********************************************/
>>>>>
>>>>>
>>>>> On Fri, Dec 6, 2013 at 10:04 AM, Otis Gospodnetic <
>>>>> otis.gospodnetic@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> On Fri, Dec 6, 2013 at 9:38 AM, Tom Brown <tombrown52@gmail.com>
>> wrote:
>>>>>>
>>>>>>> Do you mean you want to start from the most recent data and go
>>>>>> backwards to
>>>>>>> the oldest data, or that you want to start with old data and
consume
>>>>>>> forwards?
>>>>>>>
>>>>>>
>>>>>> Forwards is the "normal way". I'm looking for the "abnormal way",
of
>>>>>> course ;) i.e. backwards.
>>>>>> If the following are the messages that came in, oldest to newest:
>>>>>>
>>>>>> M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12
>>>>>>
>>>>>> Then I'd love to be able to consume from the end, say in batches
of 3,
>>>>>> like
>>>>>> this:
>>>>>>
>>>>>> get last 3: M10 M11 M12
>>>>>> get last 3: M7 M8 M9
>>>>>> get last 3: M4 M5 M6
>>>>>> get last 3: M1 M2 M3
>>>>>>
>>>>>> Of course, if messages keep coming in, then the new ones that arrive
>>>>>> would
>>>>>> get picked up first and, eventually, assuming Consumer can consume
>> faster
>>>>>> than messages are produced, all messages will get consumed.
>>>>>>
>>>>>> But the important/key part is that any new ones that arrive will
get
>>>>>> picked
>>>>>> up first.
>>>>>>
>>>>>> If the former, it would be difficult or impossible in 0.7.x, but
I
>> think
>>>>>>> doable in 0.8.x. (They added some sort of message index). If
the
>>>>>> latter,
>>>>>>> that is easily accomplished in both versions.
>>>>>>>
>>>>>>
>>>>>> I'd love to know if that's really so and how to do it!
>>>>>>
>>>>>> We are looking to move to Kafka 0.8 in January and to add performance
>>>>>> monitoring for Kafka 0.8 to SPM (see
>>>>>>
>>>>>>
>> http://blog.sematext.com/2013/10/16/announcement-spm-performance-monitoring-for-kafka/
>>>>>> )
>>>>>>
>>>>>> Thanks,
>>>>>> Otis
>>>>>> --
>>>>>> Performance Monitoring * Log Analytics * Search Analytics
>>>>>> Solr & Elasticsearch Support * http://sematext.com/
>>>>>>
>>>>>>
>>>>>>
>>>>>>> On Friday, December 6, 2013, Otis Gospodnetic wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Does Kafka offer a way to consume messages in batches, but
"from the
>>>>>>> end"?
>>>>>>>>
>>>>>>>> This would be valuable to have in all systems where the most
recent
>>>>>> data
>>>>>>> is
>>>>>>>> a lot more important than older data, such as performance
metrics,
>>>>>> and
>>>>>>>> maybe even logs....maybe also trading/financial data, and
such.
>>>>>>>>
>>>>>>>> Any chance of this sort of functionality ever making it into
Kafka,
>>>>>> or is
>>>>>>>> this simply not implementable due to some underlying assumptions,
or
>>>>>> data
>>>>>>>> structures, or ... ?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Otis
>>>>>>>> --
>>>>>>>> Performance Monitoring * Log Analytics * Search Analytics
>>>>>>>> Solr & Elasticsearch Support * http://sematext.com/
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>
>>
|