spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing
Date Mon, 20 Jul 2015 17:08:33 GMT
Yeah, in the function you supply for the messageHandler parameter to
createDirectStream, catch the exception and do whatever makes sense for
your application.

On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung <nicolas.phung@gmail.com>
wrote:

> Hello,
>
> Using the old Spark Streaming Kafka API, I got the following around the
> same offset:
>
> kafka.message.InvalidMessageException: Message is corrupt (stored crc =
> 3561357254, computed crc = 171652633)
>         at kafka.message.Message.ensureValid(Message.scala:166)
>         at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)
>         at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>         at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>         at
> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
> 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message
> java.lang.IllegalStateException: Iterator is in failed state
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
>         at
> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>
> I found some old topic about some possible corrupt Kafka message produced
> by the new producer API with Snappy compression on. My question is, is it
> possible to skip/ignore those offsets when full processing with KafkaUtils.createStream
> or KafkaUtils.createDirectStream ?
>
> Regards,
> Nicolas PHUNG
>
> On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger <cody@koeninger.org>
> wrote:
>
>> I'd try logging the offsets for each message, see where problems start,
>> then try using the console consumer starting at those offsets and see if
>> you can reproduce the problem.
>>
>> On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung <nicolas.phung@gmail.com>
>> wrote:
>>
>>> Hi Cody,
>>>
>>> Thanks for you help. It seems there's something wrong with some messages
>>> within my Kafka topics then. I don't understand how, I can get bigger or
>>> incomplete message since I use default configuration to accept only 1Mb
>>> message in my Kafka topic. If you have any others informations or
>>> suggestions, please tell me.
>>>
>>> Regards,
>>> Nicolas PHUNG
>>>
>>> On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger <cody@koeninger.org>
>>> wrote:
>>>
>>>> Not exactly the same issue, but possibly related:
>>>>
>>>> https://issues.apache.org/jira/browse/KAFKA-1196
>>>>
>>>> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <cody@koeninger.org>
>>>> wrote:
>>>>
>>>>> Well, working backwards down the stack trace...
>>>>>
>>>>> at java.nio.Buffer.limit(Buffer.java:275)
>>>>>
>>>>> That exception gets thrown if the limit is negative or greater than the
buffer's capacity
>>>>>
>>>>>
>>>>> at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>
>>>>> If size had been negative, it would have just returned null, so we
>>>>> know the exception got thrown because the size was greater than the
>>>>> buffer's capacity
>>>>>
>>>>>
>>>>> I haven't seen that before... maybe a corrupted message of some kind?
>>>>>
>>>>> If that problem is reproducible, try providing an explicit argument
>>>>> for messageHandler, with a function that logs the message offset.
>>>>>
>>>>>
>>>>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <
>>>>> nicolas.phung@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> When I'm reprocessing the data from kafka (about 40 Gb) with the
new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error
happened (driver is killed, connection lost...). When the driver pops up again, it resumes
the processing with the checkpoint in HDFS. Except, I got this:
>>>>>>
>>>>>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed
4 times; aborting job
>>>>>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming
job 1437032118000 ms.0
>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16,
slave05.local): java.lang.IllegalArgumentException
>>>>>> 	at java.nio.Buffer.limit(Buffer.java:275)
>>>>>> 	at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>> 	at kafka.message.Message.payload(Message.scala:218)
>>>>>> 	at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
>>>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>> 	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>> 	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>>>>> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>>>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>> 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> This is happening only when I'm doing a full data processing from
>>>>>> Kafka. If there's no load, when you killed the driver and then restart,
it
>>>>>> resumes the checkpoint as expected without missing data. Did someone
>>>>>> encounters something similar ? How did you solve this ?
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Nicolas PHUNG
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message