kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Surendranauth Hiraman <suren.hira...@sociocast.com>
Subject Re: Consuming "backwards"?
Date Fri, 06 Dec 2013 16:14:27 GMT
If there is a TTL/expiration hook, that's another possibility as well.

Add a hook that when a message times out, it gets put onto another "low
priority" queue. Or something along those lines.

So the main queue only has recent messages to consume.

-Suren



On Fri, Dec 6, 2013 at 11:11 AM, Surendranauth Hiraman <
suren.hiraman@sociocast.com> wrote:

> Depending on your exact requirements, you may consider priority queues as
> well. I'm not sure if Kafka has this (without rolling your own) but it is
> in JMS implementations.
>
> I guess you aren't looking for strict LIFO order, since messages are
> constantly being put on the queue, so "LIFO" is only with respect to a very
> transient moment in time.
>
> -Suren
>
>
>
> On Fri, Dec 6, 2013 at 11:07 AM, Joe Stein <joe.stein@stealth.ly> wrote:
>
>> Steven, you might be better off reading the Kafka stream into Cassandra
>> and
>> then doing the reads that way
>>
>> /*******************************************
>>  Joe Stein
>>  Founder, Principal Consultant
>>  Big Data Open Source Security LLC
>>  http://www.stealth.ly
>>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>> ********************************************/
>>
>>
>> On Fri, Dec 6, 2013 at 11:04 AM, Steven Parkes <smparkes@smparkes.net
>> >wrote:
>>
>> > I've figured I may end up doing something like this for reasons that
>> sound
>> > similar to what Otis describes.
>> >
>> > In my case, I may end up binary searching the log to match msg #s to
>> dates
>> > in the actual messages to find a particular time range of interest.
>> >
>> > So far it's just theory: I haven't gotten to the point of POC/sanity
>> > checking it. But it sounds like it should work ...
>> >
>> > On Dec 6, 2013, at 7:56 AM, Joe Stein <joe.stein@stealth.ly> wrote:
>> >
>> > > You got the hamster on the wheel with this one :)
>> > >
>> > > So one way to make it work without any changes (or at least maybe very
>> > > minor changes if at all) would possibly be to use your max offset and
>> > fetch
>> > > size this way
>> > >
>> > > M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12
>> > >
>> > > to get
>> > >
>> > > get last 3: M10 M11 M12
>> > > get last 3: M7 M8 M9
>> > > get last 3: M4 M5 M6
>> > > get last 3: M1 M2 M3
>> > >
>> > > you would start at the end to get the highwaterk mark offset then you
>> > would
>> > > do
>> > >
>> > > maxOffset (since your at the end) - 3 with a fetch size of 3 and then
>> > keep
>> > > doing that
>> > >
>> > > so technically you are still looking forward but you are making the
>> start
>> > > position of your offset 3 behind
>> > >
>> > > so if the offset numbers matched your numbers (so offset of M1 is 1
>> and
>> > > offset of M2 is 2) for this example...
>> > >
>> > > fetch((12-3),3)
>> > > fetch((12-3-3),3)
>> > > fetch(12-3-3-3),3)
>> > > fetch(12-3-3-3),3)
>> > >
>> > > would produce
>> > >
>> > > M10 M11 M12
>> > > M7 M8 M9
>> > > M4 M5 M6
>> > > M1 M2 M3
>> > >
>> > > This would mean no broker changes :) and just "tricking" the val
>> > > fetchRequest = fetchRequestBuilder in your implementation of the
>> > > SimpleConsumerShell.scala to "look backwards" but you are just moving
>> the
>> > > offset backwards from the end looking forward for your fetch size
>> > >
>> > > make sense?
>> > >
>> > >
>> > > /*******************************************
>> > > Joe Stein
>> > > Founder, Principal Consultant
>> > > Big Data Open Source Security LLC
>> > > http://www.stealth.ly
>> > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>> > > ********************************************/
>> > >
>> > >
>> > > On Fri, Dec 6, 2013 at 10:43 AM, Joe Stein <joe.stein@stealth.ly>
>> wrote:
>> > >
>> > >> hmmm, I just realized that wouldn't work actually (starting at the
>> end
>> > is
>> > >> fine)... the fetch size being taken in is still going to increment
>> > forward
>> > >> ...
>> > >>
>> > >> The KafkaApi would have to change because in readMessageSet it is
>> doing
>> > a
>> > >> log.read of the FileMessageSet ...
>> > >>
>> > >> it should be possible though but not without changing the way the
>> log is
>> > >> read when getting the partition with ReplicaManager
>> > >>
>> > >> so let me take that all back and say... can't be done now but I
>> think it
>> > >> is feasible to be done with some broker modifications to read the log
>> > >> differently... off the top of my head can't think of how to change
>> the
>> > >> log.read to-do this without digging more down into the code
>> > >>
>> > >>
>> > >>
>> > >> /*******************************************
>> > >> Joe Stein
>> > >> Founder, Principal Consultant
>> > >> Big Data Open Source Security LLC
>> > >> http://www.stealth.ly
>> > >> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>> > >> ********************************************/
>> > >>
>> > >>
>> > >> On Fri, Dec 6, 2013 at 10:26 AM, Joe Stein <joe.stein@stealth.ly>
>> > wrote:
>> > >>
>> > >>> The fetch requests are very flexible to-do what you want with them.
>> > >>>
>> > >>> Take a look at SimpleConsumerShell.scala as a reference
>> > >>>
>> > >>> You could pass in OffsetRequest.LatestTime (-1) with a fetch size
>> of 3
>> > >>> and then just keep doing that over and over again.
>> > >>>
>> > >>> I think that will do exactly what you are looking to-do.
>> > >>>
>> > >>> /*******************************************
>> > >>>  Joe Stein
>> > >>> Founder, Principal Consultant
>> > >>> Big Data Open Source Security LLC
>> > >>> http://www.stealth.ly
>> > >>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>> > >>> ********************************************/
>> > >>>
>> > >>>
>> > >>> On Fri, Dec 6, 2013 at 10:04 AM, Otis Gospodnetic <
>> > >>> otis.gospodnetic@gmail.com> wrote:
>> > >>>
>> > >>>> Hi,
>> > >>>>
>> > >>>> On Fri, Dec 6, 2013 at 9:38 AM, Tom Brown <tombrown52@gmail.com>
>> > wrote:
>> > >>>>
>> > >>>>> Do you mean you want to start from the most recent data
and go
>> > >>>> backwards to
>> > >>>>> the oldest data, or that you want to start with old data
and
>> consume
>> > >>>>> forwards?
>> > >>>>>
>> > >>>>
>> > >>>> Forwards is the "normal way".  I'm looking for the "abnormal
way",
>> of
>> > >>>> course ;) i.e. backwards.
>> > >>>> If the following are the messages that came in, oldest to newest:
>> > >>>>
>> > >>>> M1 M2 M3 M4 M5 M6 M7 M8 M9 M10 M11 M12
>> > >>>>
>> > >>>> Then I'd love to be able to consume from the end, say in batches
>> of 3,
>> > >>>> like
>> > >>>> this:
>> > >>>>
>> > >>>> get last 3: M10 M11 M12
>> > >>>> get last 3: M7 M8 M9
>> > >>>> get last 3: M4 M5 M6
>> > >>>> get last 3: M1 M2 M3
>> > >>>>
>> > >>>> Of course, if messages keep coming in, then the new ones that
>> arrive
>> > >>>> would
>> > >>>> get picked up first and, eventually, assuming Consumer can
consume
>> > faster
>> > >>>> than messages are produced, all messages will get consumed.
>> > >>>>
>> > >>>> But the important/key part is that any new ones that arrive
will
>> get
>> > >>>> picked
>> > >>>> up first.
>> > >>>>
>> > >>>> If the former, it would be difficult or impossible in 0.7.x,
but I
>> > think
>> > >>>>> doable in 0.8.x. (They added some sort of message index).
If the
>> > >>>> latter,
>> > >>>>> that is easily accomplished in both versions.
>> > >>>>>
>> > >>>>
>> > >>>> I'd love to know if that's really so and how to do it!
>> > >>>>
>> > >>>> We are looking to move to Kafka 0.8 in January and to add
>> performance
>> > >>>> monitoring for Kafka 0.8 to SPM (see
>> > >>>>
>> > >>>>
>> >
>> http://blog.sematext.com/2013/10/16/announcement-spm-performance-monitoring-for-kafka/
>> > >>>> )
>> > >>>>
>> > >>>> Thanks,
>> > >>>> Otis
>> > >>>> --
>> > >>>> Performance Monitoring * Log Analytics * Search Analytics
>> > >>>> Solr & Elasticsearch Support * http://sematext.com/
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>>> On Friday, December 6, 2013, Otis Gospodnetic wrote:
>> > >>>>>
>> > >>>>>> Hi,
>> > >>>>>>
>> > >>>>>> Does Kafka offer a way to consume messages in batches,
but "from
>> the
>> > >>>>> end"?
>> > >>>>>>
>> > >>>>>> This would be valuable to have in all systems where
the most
>> recent
>> > >>>> data
>> > >>>>> is
>> > >>>>>> a lot more important than older data, such as performance
>> metrics,
>> > >>>> and
>> > >>>>>> maybe even logs....maybe also trading/financial data,
and such.
>> > >>>>>>
>> > >>>>>> Any chance of this sort of functionality ever making
it into
>> Kafka,
>> > >>>> or is
>> > >>>>>> this simply not implementable due to some underlying
>> assumptions, or
>> > >>>> data
>> > >>>>>> structures, or ... ?
>> > >>>>>>
>> > >>>>>> Thanks,
>> > >>>>>> Otis
>> > >>>>>> --
>> > >>>>>> Performance Monitoring * Log Analytics * Search Analytics
>> > >>>>>> Solr & Elasticsearch Support * http://sematext.com/
>> > >>>>>>
>> > >>>>>
>> > >>>>
>> > >>>
>> > >>>
>> > >>
>> >
>> >
>>
>
>
>
> --
> ___________________________________________
> Available at these partners:
> [image: CloudFlare | shopify | Bigcommerce]
>
> SUREN HIRAMAN, VP TECHNOLOGY
> SOCIOCAST
> Simple. Powerful. Predictions.
>
> 96 SPRING STREET, 7TH FLOOR
> NEW YORK, NY 10012
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@sociocast.com
> W: www.sociocast.com
>
> Increase Conversion Rates up to 500%. Go to www.sociocast.com and enter
> your URL for a free trial!
>



-- 
___________________________________________
Available at these partners:
[image: CloudFlare | shopify | Bigcommerce]

SUREN HIRAMAN, VP TECHNOLOGY
SOCIOCAST
Simple. Powerful. Predictions.

96 SPRING STREET, 7TH FLOOR
NEW YORK, NY 10012
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@sociocast.com
W: www.sociocast.com

Increase Conversion Rates up to 500%. Go to www.sociocast.com and enter
your URL for a free trial!

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message