spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Yu <yuzhih...@gmail.com>
Subject Re: Spark streaming with Kafka, multiple partitions fail, single partition ok
Date Mon, 30 Mar 2015 17:05:46 GMT
Nicolas:
See if there was occurrence of the following exception in the log:
          errs => throw new SparkException(
            s"Couldn't connect to leader for topic ${part.topic}
${part.partition}: " +
              errs.mkString("\n")),

Cheers

On Mon, Mar 30, 2015 at 9:40 AM, Cody Koeninger <cody@koeninger.org> wrote:

> This line
>
> at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(
> KafkaRDD.scala:158)
>
> is the attempt to close the underlying kafka simple consumer.
>
> We can add a null pointer check, but the underlying issue of the consumer
> being null probably indicates a problem earlier.  Do you see any previous
> error messages?
>
> Also, can you clarify for the successful and failed cases which topics you
> are attempting this on, how many partitions there are, and whether there
> are messages in the partitions?  There's an existing jira regarding empty
> partitions.
>
>
>
>
> On Mon, Mar 30, 2015 at 11:05 AM, Nicolas Phung <nicolas.phung@gmail.com>
> wrote:
>
>> Hello,
>>
>> I'm using spark-streaming-kafka 1.3.0 with the new consumer "Approach 2:
>> Direct Approach (No Receivers)" (
>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html).
>> I'm using the following code snippets :
>>
>> // Create direct kafka stream with brokers and topics
>> val messages = KafkaUtils.createDirectStream[String, Array[Byte],
>> StringDecoder, DefaultDecoder](
>> ssc, kafkaParams, topicsSet)
>>
>> // Get the stuff from Kafka and print them
>> val raw = messages.map(_._2)
>> val dStream: DStream[RawScala] = raw.map(
>> byte => {
>> // Avro Decoder
>> println("Byte length: " + byte.length)
>> val rawDecoder = new AvroDecoder[Raw](schema = Raw.getClassSchema)
>> RawScala.toScala(rawDecoder.fromBytes(byte))
>> }
>> )
>> // Useful for debug
>> dStream.print()
>>
>> I launch my Spark Streaming and everything is fine if there's no incoming
>> logs from Kafka. When I'm sending a log, I got the following error :
>>
>> 15/03/30 17:34:40 ERROR TaskContextImpl: Error in TaskCompletionListener
>> java.lang.NullPointerException
>> at
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
>> at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
>> at
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
>> at
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
>> at
>> org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49)
>> at
>> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68)
>> at
>> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:58)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 3.0 in stage 28.0
>> (TID 94) in 12 ms on localhost (2/4)
>> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 2.0 in stage 28.0
>> (TID 93) in 13 ms on localhost (3/4)
>> 15/03/30 17:34:40 ERROR Executor: Exception in task 0.0 in stage 28.0
>> (TID 91)
>> org.apache.spark.util.TaskCompletionListenerException
>> at
>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
>> at org.apache.spark.scheduler.Task.run(Task.scala:58)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> 15/03/30 17:34:40 WARN TaskSetManager: Lost task 0.0 in stage 28.0 (TID
>> 91, localhost): org.apache.spark.util.TaskCompletionListenerException
>> at
>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
>> at org.apache.spark.scheduler.Task.run(Task.scala:58)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 15/03/30 17:34:40 ERROR TaskSetManager: Task 0 in stage 28.0 failed 1
>> times; aborting job
>> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Removed TaskSet 28.0, whose
>> tasks have all completed, from pool
>> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Cancelling stage 28
>> 15/03/30 17:34:40 INFO DAGScheduler: Job 28 failed: print at
>> HotFCANextGen.scala:63, took 0,041068 s
>> 15/03/30 17:34:40 INFO JobScheduler: Starting job streaming job
>> 1427729680000 ms.1 from job set of time 1427729680000 ms
>> 15/03/30 17:34:40 ERROR JobScheduler: Error running job streaming job
>> 1427729680000 ms.0
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>> 28.0 (TID 91, localhost):
>> org.apache.spark.util.TaskCompletionListenerException
>> at
>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
>> at org.apache.spark.scheduler.Task.run(Task.scala:58)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> The same code snippet works well if the topic is with a single partition.
>> The issue happens when the topic is using multiple partitions. Am I doing
>> something wrong ? Can you help me find the right way to write this with
>> kafka topic with multiple partitions.
>>
>> Regards,
>> Nicolas PHUNG
>>
>
>

Mime
View raw message