kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anand jain <anandjain1...@gmail.com>
Subject Re: Reading messages offset in Apache Kafka
Date Tue, 05 Aug 2014 05:13:21 GMT
Hi Guozhang,

By explicitly stopping consumer, do you mean *System.exit(-1)* ?? Also, as
per the solution provided by you, the code loops over the ack variable..is
it correct? because even if ack received from writeToDB is true, there is a
possibility that next message may have come making ack variable as false
and hence while condition becomes true due to which offset won't be
committed.

Please clarify.

Here is the piece of code written by me,

for (final KafkaStream stream : streams) {

            ConsumerIterator<byte[], byte[]> consumerIte =
stream.iterator();

            while (consumerIte.hasNext()){
                String msg = new String(consumerIte.next().message());
                System.out.println("Message from Topic :: "    +topic+ " is
: "+ msg);
                boolean ack = writeToDB(topic, msg);
                if(ack){
                    System.out.println("commiting offset");
                    consumer.commitOffsets();
                }else{
                    System.err.println("something has gone bad with saving
message : "+msg + " from topic : "+topic+" so exiting....");
                    *System.exit(-1);*
                }
            }
        }


Is the above correct?

Regards
Anand

On Mon, Aug 4, 2014 at 9:58 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Hi,
>
> In your case the exceptions thrown from the database operation may not stop
> the consumer automatically, hence you may need to catch the exception and
> explicitly stop consumer if you wanted it to behave so.
>
> With iter.hasNext() the code becomes
>
> while (iter.hasNext()) {
>
>   message = consumer.iter.next();
>   bool acked = false
>   while (!acked) {
>     process(message)
>     acked = writeToDB();
>   }
>   consumer.commit();
> }
>
> For your multiple topic case, if they are totally isolated (i.e. their
> corresponding updates to DB does not interfere with each other), then you
> can have 5 jvm processes each consumer one topic.
>
>
> On Mon, Aug 4, 2014 at 12:22 AM, anand jain <anandjain1984@gmail.com>
> wrote:
>
> > Also, is there any need of checking any acknowledgement variable? as on
> any
> > exception, while dealing with database, would make the consumer program
> > stop and hence *consumer.commit()* wouldn't have been called..right??
> >
> > The above question is for single topic.Now, let's assume there are 5
> > topics. First, i thought i can start a Java program which will spawn 5
> > threads, one for each topic but the issue i find is that if an exception
> > comes even for a single thread(topic), then i need to terminate the java
> > process explicitly to resolve the issue(may be due to database related
> > code) which will make all other threads come to halt. Isn't it better to
> > have 5 java processes for each topic?
> >
> > Regards
> > Anand
> >
> > On Mon, Aug 4, 2014 at 12:31 PM, anand jain <anandjain1984@gmail.com>
> > wrote:
> >
> > > Thanks Guozhang!!
> > >
> > > Below is the code for iterating over log messages:
> > > .........................................................
> > > .........................................................
> > > for (final KafkaStream stream : streams) {
> > >             ConsumerIterator<byte[], byte[]> consumerIte =
> > > stream.iterator();
> > >             *while (consumerIte.hasNext()){*
> > >                 System.out.println("Message from Topic :: "    + new
> > > String(consumerIte.next().message()));
> > >             }
> > >    }
> > > ........................................................
> > > .........................................................
> > >
> > > As far as I understand, the statement *while (consumerIte.hasNext())*
> > > runs in an infinite loop and returns true whenever a message is
> > published.
> > >
> > > How should I fit your piece of code(solution as suggested by you) here?
> > >
> > > Regards
> > > Anand
> > >
> > > On Fri, Aug 1, 2014 at 8:46 PM, Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> > >
> > >> Hi Anand,
> > >>
> > >> You can use the high-level consumer and turn of auto.offset.commit,
> and
> > do
> > >> sth. like:
> > >>
> > >> message = consumer.iter.next();
> > >> bool acked = false
> > >> while (!acked) {
> > >> process(message)
> > >> acked = writeToDB();
> > >> }
> > >> consumer.commit()
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Fri, Aug 1, 2014 at 3:30 AM, anand jain <anandjain1984@gmail.com>
> > >> wrote:
> > >>
> > >> > I am very much new to Kafka and we are using Kafka 0.8.1.
> > >> >
> > >> > What I need to do is to consume a message from topic. For that, I
> will
> > >> have
> > >> > to write one consumer in Java which will consume a message from
> topic
> > >> and
> > >> > then save that message to database. After a message is saved, some
> > >> > acknowledgement will be sent to Java consumer. If acknowledgement
is
> > >> true,
> > >> > then next message should be consumed from the topic. If
> acknowldgement
> > >> is
> > >> > false(which means due to some error message,read from the topic,
> > >> couldn't
> > >> > be saved into the database), then again that message should be read.
> > >> >
> > >> > I think I need to use Simple Consumer,to have control over message
> > >> offset
> > >> > and have gone through the Simple Consumer example as given in this
> > link
> > >> >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> > >> > .
> > >> >
> > >> > In this example, offset is evaluated in run method as 'readOffset'.
> > Do I
> > >> > need to play with that? For e.g. I can use LatestTime() instead of
> > >> > EarliestTime() and in case of false, I will reset the offset to the
> > one
> > >> > before using offset - 1.
> > >> >
> > >> > Is this how I should proceed? Or can the same be done using High
> Level
> > >> API?
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message