kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Manish Khettry <man...@ooyala.com>
Subject Dealing with errors when using Kafka Consumer
Date Mon, 26 Mar 2012 23:43:25 GMT
We have a fairly simple class that runs in a loop and consumes
messages from Kafka and feeds it to our stream processing system.

    consumerConnector = Consumer.create(new ConsumerConfig(props))
    val topicMessageStreams =
consumerConnector.createMessageStreams(Map(topic -> 1))

    // We only care about the first streamList from topicMessageStreams ...
    kafkaStream = topicMessageStreams(topic)(0)
    while (true) {
       val logMessage: String =
Utils.toString(kafkaStream.head.payload, "UTF-8")
       // do stuff with the message.

When this code gets an exception, it swallows it on the assumption
that the error is transient, and continues on its merry way. Obviously
this isn't the right thing to do in all cases (or even any case
perhaps)-- over a weekend, this code kept getting the same exception
and eventually logged many hundred gigs of error messages before it
got restarted. The exception we were getting from Kafka was:

java.lang.IllegalStateException: Iterator is in failed state
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:46)
at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:35)
at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:38)"

I was wondering what exceptions are transient, which ones need special
handing (say reconnecting to kafka? or just exiting the JVM and have
our job monitors restart the process again). For example, with the
iterator in an invalid state, would creating a new connector have
helped? Any help would be appreciated.


View raw message