kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anand jain <anandjain1...@gmail.com>
Subject Re: Reading messages offset in Apache Kafka
Date Mon, 04 Aug 2014 07:01:47 GMT
Thanks Guozhang!!

Below is the code for iterating over log messages:
.........................................................
.........................................................
for (final KafkaStream stream : streams) {
            ConsumerIterator<byte[], byte[]> consumerIte =
stream.iterator();
            *while (consumerIte.hasNext()){*
                System.out.println("Message from Topic :: "    + new
String(consumerIte.next().message()));
            }
   }
........................................................
.........................................................

As far as I understand, the statement *while (consumerIte.hasNext())* runs
in an infinite loop and returns true whenever a message is published.

How should I fit your piece of code(solution as suggested by you) here?

Regards
Anand

On Fri, Aug 1, 2014 at 8:46 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Hi Anand,
>
> You can use the high-level consumer and turn of auto.offset.commit, and do
> sth. like:
>
> message = consumer.iter.next();
> bool acked = false
> while (!acked) {
> process(message)
> acked = writeToDB();
> }
> consumer.commit()
>
>
> Guozhang
>
>
> On Fri, Aug 1, 2014 at 3:30 AM, anand jain <anandjain1984@gmail.com>
> wrote:
>
> > I am very much new to Kafka and we are using Kafka 0.8.1.
> >
> > What I need to do is to consume a message from topic. For that, I will
> have
> > to write one consumer in Java which will consume a message from topic and
> > then save that message to database. After a message is saved, some
> > acknowledgement will be sent to Java consumer. If acknowledgement is
> true,
> > then next message should be consumed from the topic. If acknowldgement is
> > false(which means due to some error message,read from the topic, couldn't
> > be saved into the database), then again that message should be read.
> >
> > I think I need to use Simple Consumer,to have control over message offset
> > and have gone through the Simple Consumer example as given in this link
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> > .
> >
> > In this example, offset is evaluated in run method as 'readOffset'. Do I
> > need to play with that? For e.g. I can use LatestTime() instead of
> > EarliestTime() and in case of false, I will reset the offset to the one
> > before using offset - 1.
> >
> > Is this how I should proceed? Or can the same be done using High Level
> API?
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message