kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Florin Trofin <ftro...@adobe.com>
Subject Re: how to force a consumer to start at the beginning
Date Thu, 25 Jul 2013 22:35:32 GMT
You can set the "consumer.timeout.ms" to have a ConsumerTimeoutException thrown if the broker
doesn't respond within that time period:

      var done = False
      val consumerIterator = initConsumer()
      while(true) {
        try {
          val messageAndMetadata = consumerIterator.next() // Blocks until a new message is
available or timeout is reached
          val message = messageAndMetadata.message
          val offset = messageAndMetadata.offset
          System.out.println(new String(message, "UTF-8")) // For debugging purposes

          parseMessage( message, offset)
        }
        catch {
          case t: ConsumerTimeoutException => return
          case e => {
            println("unexpected exception: ")
            e.printStackTrace()
          }
        }
      } // End while loop

HTH,

Florin



On Jul 25, 2013, at 10:56 AM, Rob Withers <reefedjib@gmail.com> wrote:

> Oh boy, is my mind slow today.  The tamasic cells woke up but the rajasic ones stayed
asleep, which is rather ironic, if you know what I mean.   My only hope is the sattvasic few.
> 
> The issue of threading is secondary to the blocking api.  How can I know the traffic
is drained from a topic/partition, cleanup, and return the response to the REST call?
> 
> thanks,
> rob
> 
> On Jul 25, 2013, at 11:49 AM, Rob Withers <reefedjib@gmail.com> wrote:
> 
>> Thanks, Joe, I also see the answer to my other question, that the KafkaStream is
not on a different thread, but I automatically expect it to be since all other uses we have
had of the KafkaStream are stuffed in a Runnable.  duh.
>> 
>> thanks,
>> rob
>> 
>> On Jul 25, 2013, at 11:41 AM, Joe Stein <cryptcom@gmail.com> wrote:
>> 
>>> in 0.8 you can set the property "auto.offset.reset" = "smallest" when
>>> creating your ConsumerConfig ... this will override the default value of
>>> "largest"
>>> 
>>> take a look at ConsoleConsumer.scala for more example if need be
>>> 
>>> 
>>> /*******************************************
>>> 
>>> Joe Stein
>>> Founder, Principal Consultant
>>> Big Data Open Source Security LLC
>>> http://www.stealth.ly <http://www.stealth.ly>
>>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>>> 
>>> ********************************************/
>>> 
>>> 
>>> On Thu, Jul 25, 2013 at 1:24 PM, James A. Robinson <
>>> jimr@highwire.stanford.edu> wrote:
>>> 
>>>> On Thu, Jul 25, 2013 at 9:11 AM, Withers, Robert
>>>> <Robert.Withers@dish.com> wrote:
>>>>> We are creating a consumer with properties and I did not see a
>>>>> property that screamed that it was to start at the beginning of a
>>>>> topic.  Is there such a property?
>>>> 
>>>> In v0.7, set 'autooffset.reset' to 'smallest'.
>>>> 
>>>> Jim
>>>> 
>>>> - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
>>>> James A. Robinson                       jimr@highwire.stanford.edu
>>>> HighWire | Stanford University          http://highwire.stanford.edu/
>>>> +1 650 7237294 (Work)                   +1 650 7259335 (Fax)
>>>> 
>> 
> 


Mime
View raw message