kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Dealing with errors when using Kafka Consumer
Date Tue, 27 Mar 2012 04:40:25 GMT
Manish,

If you get IllegalStateException, this is a serious problem and it's
permanent. This typically means (1) the iterator is not used properly
(e.g., calling next before checking hasNext is true) or (2) a bug in Kafka.
We recently fixed the following issue in trunk. Not sure if this is exactly
your problem. Your usage of calling stream.head is a bit unusual. Normally,
people do "for (msg <- stream) { }".

https://issues.apache.org/jira/browse/KAFKA-241

Thanks,

Jun

On Mon, Mar 26, 2012 at 4:43 PM, Manish Khettry <manish@ooyala.com> wrote:

> We have a fairly simple class that runs in a loop and consumes
> messages from Kafka and feeds it to our stream processing system.
>
> {
>   .....
>    consumerConnector = Consumer.create(new ConsumerConfig(props))
>    val topicMessageStreams =
> consumerConnector.createMessageStreams(Map(topic -> 1))
>
>    // We only care about the first streamList from topicMessageStreams ...
>    kafkaStream = topicMessageStreams(topic)(0)
>    while (true) {
>       val logMessage: String =
> Utils.toString(kafkaStream.head.payload, "UTF-8")
>       // do stuff with the message.
>     }
>  }
>
> When this code gets an exception, it swallows it on the assumption
> that the error is transient, and continues on its merry way. Obviously
> this isn't the right thing to do in all cases (or even any case
> perhaps)-- over a weekend, this code kept getting the same exception
> and eventually logged many hundred gigs of error messages before it
> got restarted. The exception we were getting from Kafka was:
>
> java.lang.IllegalStateException: Iterator is in failed state
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:46)
> at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:35)
> at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:38)"
>
> I was wondering what exceptions are transient, which ones need special
> handing (say reconnecting to kafka? or just exiting the JVM and have
> our job monitors restart the process again). For example, with the
> iterator in an invalid state, would creating a new connector have
> helped? Any help would be appreciated.
>
> Thanks,
> Manish
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message