It says:
Caused by: java.net.ConnectException: Connection refused: slave2/...:54845
Could you look in the executor logs (stderr on slave2) and see what made it shut down? Since you are doing a join there's a high possibility of OOM etc.


Thanks
Best Regards

On Wed, Jul 1, 2015 at 10:20 AM, Pooja Jain <pooja.jain8@gmail.com> wrote:
Hi,

We are using Spark 1.4.0 on hadoop using yarn-cluster mode via spark-submit. We are facing parquet write issue after doing dataframe joins

We have a full data set and then an incremental data. We are reading them as dataframes, joining them, and then writing the data to the hdfs system in parquet format. We are getting the timeout error on the last partition.

But if we do a count on the joined data it is working - which gives us the confidence that join is happening properly. Only in case of writing to the hdfs it is timing out.

Code flow:
// join two data frames - dfBase and dfIncr on primaryKey
val joinedDF = dfBase.join(dfIncr, dfBase(primaryKey) === dfIncr(primaryKey), "outer")

// applying a reduce function on each row.
val
mergedDF = joinedDF.map(x =>
reduceFunc(x)
)

//converting back to dataframe
val
newdf = Spark.getSqlContext().createDataFrame(mergedDF, dfSchema)

//writing to parquet file
newdf.write.parquet(hdfsfilepath)

Getting following exception:

15/06/30 22:47:04 WARN spark.HeartbeatReceiver: Removing executor 26 with no recent heartbeats: 255766 ms exceeds timeout 240000 ms
15/06/30 22:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: Executor heartbeat timed out after 255766 ms
15/06/30 22:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0
15/06/30 22:47:04 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 7.0 (TID 216, slave2): ExecutorLostFailure (executor 26 lost)
15/06/30 22:47:04 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 7.0 (TID 310, slave2, PROCESS_LOCAL, 1910 bytes)
15/06/30 22:47:04 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 3)
15/06/30 22:47:04 INFO cluster.YarnClusterSchedulerBackend: Requesting to kill executor(s) 26
15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster.
15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(26, slave2, 54845)
15/06/30 22:47:04 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor
15/06/30 22:47:04 INFO yarn.YarnAllocator: Driver requested a total number of 26 executor(s).
15/06/30 22:47:04 INFO scheduler.ShuffleMapStage: ShuffleMapStage 6 is now unavailable on executor 26 (193/200, false)
15/06/30 22:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver requested to kill executor(s) 26.
15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849
15/06/30 22:47:06 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: remote Rpc client disassociated
15/06/30 22:47:06 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0
15/06/30 22:47:06 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 5)
15/06/30 22:47:06 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster.
15/06/30 22:47:06 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor
15/06/30 22:47:06 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@slave2:51849] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849
15/06/30 22:47:21 WARN scheduler.TaskSetManager: Lost task 6.1 in stage 7.0 (TID 310, slave2): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to connect to slave2/...:54845
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.joins.HashOuterJoin.org$apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:170)
at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:211)
at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:188)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:152)
... 8 more
Caused by: java.io.IOException: Failed to connect to slave2/...:54845
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
... 3 more
Caused by: java.net.ConnectException: Connection refused: slave2/...:54845
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
... 1 more