kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adam Dubiel <dubiel.a...@gmail.com>
Subject Re: Issue with corrupt message in Topic
Date Tue, 21 Jul 2015 08:15:31 GMT
Hi Nicolas,

>From my experience there are only two ways out:
1) wait for retention time to pass, so data gets deleted (this is usually
unacceptable)
2) trace offset of corrupt message on all affected subscriptions and skip
this message by overwriting it (offset+1)

Problem is, that when encountering corrupt message, high level consumer
iterator goes into invalid state and closes. There is no way to skip this
message or recover from it without skipping offsets. You might try to use
SimpleConsumer though. Maybe someone knows other ways to deal with this
problem, but we haven't found any.

BR,
Adam

2015-07-21 9:38 GMT+02:00 Nicolas Phung <nicolas.phung@gmail.com>:

> Hello,
>
> I'm using Confluent Kafka (0.8.2.0-cp). When I'm trying to process message
> from my Kafka topic with Spark Streaming, I've got the following error :
>
> kafka.message.InvalidMessageException: Message is corrupt (stored crc =
> 3561357254, computed crc = 171652633)
>         at kafka.message.Message.ensureValid(Message.scala:166)
>         at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.
> scala:102)
>         at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.
> scala:33)
>         at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTempla
> te.scala:66)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>         at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa
> geHandler.run(ReliableKafkaReceiver.scala:265)
>         at java.util.concurrent.Executors$RunnableAdapter.call(
> Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
> Executor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
> lExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
> 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message
> java.lang.IllegalStateException: Iterator is in failed state
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
>         at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa
> geHandler.run(ReliableKafkaReceiver.scala:265)
>         at java.util.concurrent.Executors$RunnableAdapter.call(
> Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
> Executor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
> lExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>
> From my understanding, there's some corrupt message in my topic. I'm using
> the new Producer API to send message compress with Snappy. I found an old
> topic talking about it but with no further step to resolve the issue. Do
> you have any informations regarding this ?
>
> Is it possible in Kafka to somehow reread the topic and drop corrupt
> message ?
>
> Regards,
> Nicolas PHUNG
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message