spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nicolas Phung <nicolas.ph...@gmail.com>
Subject Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing
Date Mon, 20 Jul 2015 07:15:31 GMT
Hi Cody,

Thanks for you help. It seems there's something wrong with some messages
within my Kafka topics then. I don't understand how, I can get bigger or
incomplete message since I use default configuration to accept only 1Mb
message in my Kafka topic. If you have any others informations or
suggestions, please tell me.

Regards,
Nicolas PHUNG

On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger <cody@koeninger.org> wrote:

> Not exactly the same issue, but possibly related:
>
> https://issues.apache.org/jira/browse/KAFKA-1196
>
> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <cody@koeninger.org>
> wrote:
>
>> Well, working backwards down the stack trace...
>>
>> at java.nio.Buffer.limit(Buffer.java:275)
>>
>> That exception gets thrown if the limit is negative or greater than the buffer's
capacity
>>
>>
>> at kafka.message.Message.sliceDelimited(Message.scala:236)
>>
>> If size had been negative, it would have just returned null, so we know
>> the exception got thrown because the size was greater than the buffer's
>> capacity
>>
>>
>> I haven't seen that before... maybe a corrupted message of some kind?
>>
>> If that problem is reproducible, try providing an explicit argument for
>> messageHandler, with a function that logs the message offset.
>>
>>
>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <nicolas.phung@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming
Kafka method createDirectStream, everything is fine till a driver error happened (driver is
killed, connection lost...). When the driver pops up again, it resumes the processing with
the checkpoint in HDFS. Except, I got this:
>>>
>>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting
job
>>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000
ms.0
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in
stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local):
java.lang.IllegalArgumentException
>>> 	at java.nio.Buffer.limit(Buffer.java:275)
>>> 	at kafka.message.Message.sliceDelimited(Message.scala:236)
>>> 	at kafka.message.Message.payload(Message.scala:218)
>>> 	at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> 	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>> 	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> 	at java.lang.Thread.run(Thread.java:745)
>>>
>>> This is happening only when I'm doing a full data processing from Kafka.
>>> If there's no load, when you killed the driver and then restart, it resumes
>>> the checkpoint as expected without missing data. Did someone encounters
>>> something similar ? How did you solve this ?
>>>
>>> Regards,
>>>
>>> Nicolas PHUNG
>>>
>>
>>
>

Mime
View raw message