spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nicolas Phung <nicolas.ph...@gmail.com>
Subject Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing
Date Fri, 24 Jul 2015 07:46:59 GMT
Hello,

I manage to read all my data back with skipping offset that contains a
corrupt message. I have one more question regarding messageHandler method
vs dstream.foreachRDD.map vs dstream.map.foreachRDD best practices. I'm
using a function to read the serialized message from kafka and convert it
into my appropriate object with some enrichments and sometimes add filter
after that. Where's the best spot to put this logic inside messageHandler
method (convert each message within this handler) or dstream.foreachRDD.map
(map rdd) or dstream.map.foreachRDD (map dstream) ?

Thank you for your help Cody.
Regards,
Nicolas PHUNG

On Tue, Jul 21, 2015 at 4:53 PM, Cody Koeninger <cody@koeninger.org> wrote:

> Yeah, I'm referring to that api.
>
> If you want to filter messages in addition to catching that exception,
> have your mesageHandler return an option, so the type R would end up being
> Option[WhateverYourClassIs], then filter out None before doing the rest of
> your processing.
>
> If you aren't already recording offsets somewhere, and need to find the
> offsets at the beginning of the topic, you can take a look at this
>
>
> https://github.com/apache/spark/blob/branch-1.3/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L143
>
> as an example of querying offsets from Kafka.
>
> That code is private, but you can either use it as an example, or remove
> the private[spark] and recompile just the spark-streaming-kafka package.
> That artifact is included in your job assembly, so you won't have to
> redeploy spark if you go that route.
>
>
> On Tue, Jul 21, 2015 at 6:42 AM, Nicolas Phung <nicolas.phung@gmail.com>
> wrote:
>
>> Hi Cody,
>>
>> Thanks for your answer. I'm with Spark 1.3.0. I don't quite understand
>> how to use the messageHandler parameter/function in the createDirectStream
>> method. You are referring to this, aren't you ?
>>
>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
>> ClassTag, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc:
>> StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[
>> TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R )
>> : InputDStream[R] = { new DirectKafkaInputDStream[K, V, KD, VD, R]( ssc,
>> kafkaParams, fromOffsets, messageHandler) }
>>
>> So, I must supply the fromOffsets parameter too, but how do I tell this
>> method to read from the beginning of my topic ?
>>
>> If I have a filter (e.g. a R.date field) on my R class, I can put a
>> filter in the messageHandler function too ?
>>
>> Regards,
>> Nicolas P.
>>
>> On Mon, Jul 20, 2015 at 7:08 PM, Cody Koeninger <cody@koeninger.org>
>> wrote:
>>
>>> Yeah, in the function you supply for the messageHandler parameter to
>>> createDirectStream, catch the exception and do whatever makes sense for
>>> your application.
>>>
>>> On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung <nicolas.phung@gmail.com
>>> > wrote:
>>>
>>>> Hello,
>>>>
>>>> Using the old Spark Streaming Kafka API, I got the following around the
>>>> same offset:
>>>>
>>>> kafka.message.InvalidMessageException: Message is corrupt (stored crc =
>>>> 3561357254, computed crc = 171652633)
>>>>         at kafka.message.Message.ensureValid(Message.scala:166)
>>>>         at
>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)
>>>>         at
>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>>>>         at
>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>>>>         at
>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>>>>         at
>>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>>>         at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>> 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
>>>> 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message
>>>> java.lang.IllegalStateException: Iterator is in failed state
>>>>         at
>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
>>>>         at
>>>> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
>>>>         at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I found some old topic about some possible corrupt Kafka message
>>>> produced by the new producer API with Snappy compression on. My question
>>>> is, is it possible to skip/ignore those offsets when full processing with
>>>> KafkaUtils.createStream or KafkaUtils.createDirectStream ?
>>>>
>>>> Regards,
>>>> Nicolas PHUNG
>>>>
>>>> On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger <cody@koeninger.org>
>>>> wrote:
>>>>
>>>>> I'd try logging the offsets for each message, see where problems
>>>>> start, then try using the console consumer starting at those offsets
and
>>>>> see if you can reproduce the problem.
>>>>>
>>>>> On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung <
>>>>> nicolas.phung@gmail.com> wrote:
>>>>>
>>>>>> Hi Cody,
>>>>>>
>>>>>> Thanks for you help. It seems there's something wrong with some
>>>>>> messages within my Kafka topics then. I don't understand how, I can
get
>>>>>> bigger or incomplete message since I use default configuration to
accept
>>>>>> only 1Mb message in my Kafka topic. If you have any others informations
or
>>>>>> suggestions, please tell me.
>>>>>>
>>>>>> Regards,
>>>>>> Nicolas PHUNG
>>>>>>
>>>>>> On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger <cody@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Not exactly the same issue, but possibly related:
>>>>>>>
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-1196
>>>>>>>
>>>>>>> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <cody@koeninger.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Well, working backwards down the stack trace...
>>>>>>>>
>>>>>>>> at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>>
>>>>>>>> That exception gets thrown if the limit is negative or greater
than the buffer's capacity
>>>>>>>>
>>>>>>>>
>>>>>>>> at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>>>
>>>>>>>> If size had been negative, it would have just returned null,
so we
>>>>>>>> know the exception got thrown because the size was greater
than the
>>>>>>>> buffer's capacity
>>>>>>>>
>>>>>>>>
>>>>>>>> I haven't seen that before... maybe a corrupted message of
some
>>>>>>>> kind?
>>>>>>>>
>>>>>>>> If that problem is reproducible, try providing an explicit
argument
>>>>>>>> for messageHandler, with a function that logs the message
offset.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <
>>>>>>>> nicolas.phung@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> When I'm reprocessing the data from kafka (about 40 Gb)
with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver
error happened (driver is killed, connection lost...). When the driver pops up again, it resumes
the processing with the checkpoint in HDFS. Except, I got this:
>>>>>>>>>
>>>>>>>>> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage
4.0 failed 4 times; aborting job
>>>>>>>>> 15/07/16 15:23:41 ERROR JobScheduler: Error running job
streaming job 1437032118000 ms.0
>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0
(TID 16, slave05.local): java.lang.IllegalArgumentException
>>>>>>>>> 	at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>>> 	at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>>>> 	at kafka.message.Message.payload(Message.scala:218)
>>>>>>>>> 	at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>>>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
>>>>>>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>> 	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>>>>>> 	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>>>>>>>> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>>>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>>>>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>>>>>>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>>>>>>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>>> 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>>>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>>>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>
>>>>>>>>> This is happening only when I'm doing a full data processing
from
>>>>>>>>> Kafka. If there's no load, when you killed the driver
and then restart, it
>>>>>>>>> resumes the checkpoint as expected without missing data.
Did someone
>>>>>>>>> encounters something similar ? How did you solve this
?
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>>
>>>>>>>>> Nicolas PHUNG
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message