spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gerard Maas <gerard.m...@gmail.com>
Subject Re: Key not valid / already cancelled using Spark Streaming
Date Thu, 11 Dec 2014 09:42:30 GMT
If the timestamps in the logs are to be trusted It looks like your driver
is dying with that *java.io.FileNotFoundException*: and therefore the
workers loose their connection and close down.

-kr, Gerard.

On Thu, Dec 11, 2014 at 7:39 AM, Akhil Das <akhil@sigmoidanalytics.com>
wrote:

> Try to add the following to the sparkConf
>
>  .set("spark.core.connection.ack.wait.timeout","6000")
>
>       .set("spark.akka.frameSize","60")
>
> Used to face that issue with spark 1.1.0
>
> Thanks
> Best Regards
>
> On Thu, Dec 11, 2014 at 2:14 AM, Flávio Santos <barata@chaordicsystems.com
> > wrote:
>
>> Dear Spark'ers,
>>
>> I'm trying to run a simple job using Spark Streaming (version 1.1.1) and
>> YARN (yarn-cluster), but unfortunately I'm facing many issues. In short, my
>> job does the following:
>> - Consumes a specific Kafka topic
>> - Writes its content to S3 or HDFS
>>
>> Records in Kafka are in the form:
>> {"key": "someString"}
>>
>> This is important because I use the value of "key" to define the output
>> file name in S3.
>> Here are the Spark and Kafka parameters I'm using:
>>
>> val sparkConf = new SparkConf()
>>>   .setAppName("MyDumperApp")
>>>   .set("spark.task.maxFailures", "100")
>>>   .set("spark.hadoop.validateOutputSpecs", "false")
>>>   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>>   .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops")
>>> val kafkaParams = Map(
>>>   "zookeeper.connect" -> zkQuorum,
>>>   "zookeeper.session.timeout.ms" -> "10000",
>>>   "rebalance.backoff.ms" -> "8000",
>>>   "rebalance.max.retries" -> "10",
>>>   "group.id" -> group,
>>>   "auto.offset.reset" -> "largest"
>>> )
>>
>>
>> My application is the following:
>>
>> KafkaUtils.createStream[String, String, StringDecoder,
>>> StringDecoder](ssc, kafkaParams, Map(topic -> 1),
>>> StorageLevel.MEMORY_AND_DISK_SER_2)
>>>   .foreachRDD((rdd, time) =>
>>>     rdd.map {
>>>       case (_, line) =>
>>>         val json = parse(line)
>>>         val key = extract(json, "key").getOrElse("key_not_found")
>>>         (key, dateFormatter.format(time.milliseconds)) -> line
>>>     }
>>>       .partitionBy(new HashPartitioner(10))
>>>       .saveAsHadoopFile[KeyBasedOutput[(String,String),
>>> String]]("s3://BUCKET", classOf[BZip2Codec])
>>>   )
>>
>>
>> And the last piece:
>>
>> class KeyBasedOutput[T >: Null, V <: AnyRef] extends
>>> MultipleTextOutputFormat[T , V] {
>>>   override protected def generateFileNameForKeyValue(key: T, value: V,
>>> leaf: String) = key match {
>>>     case (myKey, batchId) =>
>>>       "somedir" + "/" + myKey + "/" +
>>>         "prefix-" + myKey + "_" + batchId + "_" + leaf
>>>   }
>>>   override protected def generateActualKey(key: T, value: V) = null
>>> }
>>
>>
>> I use batch sizes of 5 minutes with checkpoints activated.
>> The job fails nondeterministically (I think it never ran longer than ~5
>> hours). I have no clue why, it simply fails.
>> Please find below the exceptions thrown by my application.
>>
>> I really appreciate any kind of hint.
>> Thank you very much in advance.
>>
>> Regards,
>> -- Flávio
>>
>> ==== Executor 1
>>
>> 2014-12-10 19:05:15,150 INFO  [handle-read-write-executor-3]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection
>>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>> 2014-12-10 19:05:15,201 INFO  [Thread-6] storage.MemoryStore
>> (Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with
>> curMem=194463488,
>>  maxMem=4445479895
>> 2014-12-10 19:05:15,202 INFO  [Thread-6] storage.MemoryStore
>> (Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as bytes
>> in memor
>> y (estimated size 96.4 KB, free 4.0 GB)
>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> ReceivingConnecti
>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-1]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection
>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection
>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> ReceivingConnecti
>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-3]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection
>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection
>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>> 2014-12-10 19:05:16,650 INFO  [connection-manager-thread]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
>> sun.nio.ch.Se
>> lectionKeyImpl@da2e041
>> 2014-12-10 19:05:16,651 INFO  [connection-manager-thread]
>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
>> cancelled ? sun.n
>> io.ch.SelectionKeyImpl@da2e041
>> *java.nio.channels.CancelledKeyException*
>>         at
>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:316)
>>         at
>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> ReceivingConnection to ConnectionManagerId(
>> ec2-EXECUTOR.compute-1.amazonaws.com,39444)
>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-2]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection to ConnectionManagerId(
>> ec2-EXECUTOR.compute-1.amazonaws.com,39444)
>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection to ConnectionManagerId(
>> ec2-EXECUTOR.compute-1.amazonaws.com,39444)
>> 2014-12-10 19:05:16,680 INFO  [connection-manager-thread]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
>> sun.nio.ch.SelectionKeyImpl@6a0dd98a
>> 2014-12-10 19:05:16,681 INFO  [connection-manager-thread]
>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
>> cancelled ? sun.nio.ch.SelectionKeyImpl@6a0dd98a
>> *java.nio.channels.CancelledKeyException*
>>         at
>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
>>         at
>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> ReceivingConnection to ConnectionManagerId(
>> ec2-EXECUTOR.compute-1.amazonaws.com,57984)
>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-0]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection to ConnectionManagerId(
>> ec2-EXECUTOR.compute-1.amazonaws.com,57984)
>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection to ConnectionManagerId(
>> ec2-EXECUTOR.compute-1.amazonaws.com,57984)
>> 2014-12-10 19:05:17,182 ERROR [SIGTERM handler]
>> executor.CoarseGrainedExecutorBackend (SignalLogger.scala:handle(57)) -
>> RECEIVED SIGNAL 15: SIGTERM
>>
>> ==== Executor 2
>>
>> 2014-12-10 19:05:15,010 INFO  [handle-message-executor-11]
>> storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of
>> block input
>> -0-1418238314800
>> 2014-12-10 19:05:15,157 INFO  [connection-manager-thread]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
>> sun.nio.ch.Se
>> lectionKeyImpl@66ea19c
>> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-2]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection
>>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-0]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> ReceivingConnecti
>> on to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>> 2014-12-10 19:05:15,158 ERROR [handle-read-write-executor-0]
>> network.ConnectionManager (Logging.scala:logError(75)) - Corresponding
>> SendingConn
>> ection to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>> not found
>> 2014-12-10 19:05:15,158 INFO  [connection-manager-thread]
>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
>> cancelled ? sun.n
>> io.ch.SelectionKeyImpl@66ea19c
>> *java.nio.channels.CancelledKeyException*
>>         at
>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
>>         at
>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>>
>> ==== Driver
>>
>> 2014-12-10 19:05:13,805 INFO
>>  [sparkDriver-akka.actor.default-dispatcher-13] storage.BlockManagerInfo
>> (Logging.scala:logInfo(59)) - Added input
>> -0-1418238313600 in memory on ec2-EXECUTOR.compute-1.amazonaws.com:39444
>> (size: 38.2 KB, free: 4.1 GB)
>> 2014-12-10 19:05:13,823 ERROR
>> [sparkDriver-akka.actor.default-dispatcher-16] scheduler.JobScheduler
>> (Logging.scala:logError(96)) - Error runnin
>> g job streaming job 1418238300000 ms.0
>> *java.io.FileNotFoundException*: File
>> s3n://BUCKET/_temporary/0/task_201412101900_0039_m_000033 does not exist.
>>         at
>> org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506)
>>         at
>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
>>         at
>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
>>         at
>> org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
>>         at
>> org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
>>         at
>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995)
>>         at
>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878)
>>         at
>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:845)
>>         at
>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:803)
>>         at
>> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:100)
>>         at
>> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:79)
>>         at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>         at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>         at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>         at scala.util.Try$.apply(Try.scala:161)
>>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>         at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>> 2014-12-10 19:05:13,829 INFO  [Driver] yarn.ApplicationMaster
>> (Logging.scala:logInfo(59)) - Unregistering ApplicationMaster with FAILED
>>
>>
>> *--Flávio R. Santos*
>>
>> Chaordic | *Platform*
>> *www.chaordic.com.br <http://www.chaordic.com.br/>*
>> +55 48 3232.3200
>>
>
>

Mime
View raw message