spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Raghavendra Pandey <raghavendra.pan...@gmail.com>
Subject Re: Issue with parquet write after join (Spark 1.4.0)
Date Wed, 01 Jul 2015 15:34:56 GMT
By any chance, are you using time field in your df. Time fields are known
to be notorious in rdd conversion.
On Jul 1, 2015 6:13 PM, "Pooja Jain" <pooja.jain8@gmail.com> wrote:

> Join is happening successfully as I am able to do count() after the join.
>
> Error is coming only while trying to write in parquet format on hdfs.
>
> Thanks,
> Pooja.
>
> On Wed, Jul 1, 2015 at 1:06 PM, Akhil Das <akhil@sigmoidanalytics.com>
> wrote:
>
>> It says:
>>
>> Caused by: java.net.ConnectException: Connection refused: slave2/...:54845
>>
>> Could you look in the executor logs (stderr on slave2) and see what made
>> it shut down? Since you are doing a join there's a high possibility of OOM
>> etc.
>>
>>
>> Thanks
>> Best Regards
>>
>> On Wed, Jul 1, 2015 at 10:20 AM, Pooja Jain <pooja.jain8@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We are using Spark 1.4.0 on hadoop using yarn-cluster mode via
>>> spark-submit. We are facing parquet write issue after doing dataframe joins
>>>
>>> We have a full data set and then an incremental data. We are reading
>>> them as dataframes, joining them, and then writing the data to the hdfs
>>> system in parquet format. We are getting the timeout error on the last
>>> partition.
>>>
>>> But if we do a count on the joined data it is working - which gives us
>>> the confidence that join is happening properly. Only in case of writing to
>>> the hdfs it is timing out.
>>>
>>> Code flow:
>>>
>>> // join two data frames - dfBase and dfIncr on primaryKey
>>> val joinedDF = dfBase.join(dfIncr, dfBase(primaryKey) === dfIncr(primaryKey),
"outer")
>>>
>>> // applying a reduce function on each row.
>>> val mergedDF = joinedDF.map(x =>
>>>   reduceFunc(x)
>>> )
>>>
>>> //converting back to dataframe
>>> val newdf = Spark.getSqlContext().createDataFrame(mergedDF, dfSchema)
>>>
>>> //writing to parquet file
>>> newdf.write.parquet(hdfsfilepath)
>>>
>>> Getting following exception:
>>>
>>> 15/06/30 22:47:04 WARN spark.HeartbeatReceiver: Removing executor 26 with no
recent heartbeats: 255766 ms exceeds timeout 240000 ms
>>> 15/06/30 22:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2:
Executor heartbeat timed out after 255766 ms
>>> 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from
TaskSet 7.0
>>> 15/06/30 22:47:04 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 7.0 (TID
216, slave2): ExecutorLostFailure (executor 26 lost)
>>> 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 7.0
(TID 310, slave2, PROCESS_LOCAL, 1910 bytes)
>>> 15/06/30 22:47:04 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 3)
>>> 15/06/30 22:47:04 INFO cluster.YarnClusterSchedulerBackend: Requesting to kill
executor(s) 26
>>> 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor
26 from BlockManagerMaster.
>>> 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Removing block manager
BlockManagerId(26, slave2, 54845)
>>> 15/06/30 22:47:04 INFO storage.BlockManagerMaster: Removed 26 successfully in
removeExecutor
>>> 15/06/30 22:47:04 INFO yarn.YarnAllocator: Driver requested a total number of
26 executor(s).
>>> 15/06/30 22:47:04 INFO scheduler.ShuffleMapStage: ShuffleMapStage 6 is now unavailable
on executor 26 (193/200, false)
>>> 15/06/30 22:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver requested to
kill executor(s) 26.
>>> 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or
disconnected! Shutting down. slave2:51849
>>> 15/06/30 22:47:06 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2:
remote Rpc client disassociated
>>> 15/06/30 22:47:06 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from
TaskSet 7.0
>>> 15/06/30 22:47:06 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 5)
>>> 15/06/30 22:47:06 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor
26 from BlockManagerMaster.
>>> 15/06/30 22:47:06 INFO storage.BlockManagerMaster: Removed 26 successfully in
removeExecutor
>>> 15/06/30 22:47:06 WARN remote.ReliableDeliverySupervisor: Association with remote
system [akka.tcp://sparkExecutor@slave2:51849] has failed, address is now gated for [5000]
ms. Reason is: [Disassociated].
>>> 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or
disconnected! Shutting down. slave2:51849
>>> 15/06/30 22:47:21 WARN scheduler.TaskSetManager: Lost task 6.1 in stage 7.0 (TID
310, slave2): org.apache.spark.SparkException: Task failed while writing rows.
>>> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161)
>>> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>>> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> 	at java.lang.Thread.run(Thread.java:745)
>>> Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to connect to
slave2/...:54845
>>> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>>> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
>>> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> 	at org.apache.spark.sql.execution.joins.HashOuterJoin.org$apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:170)
>>> 	at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:211)
>>> 	at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:188)
>>> 	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>>> 	at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>>> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:152)
>>> 	... 8 more
>>> Caused by: java.io.IOException: Failed to connect to slave2/...:54845
>>> 	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
>>> 	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>>> 	at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
>>> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>>> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
>>> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
>>> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>> 	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>> 	... 3 more
>>> Caused by: java.net.ConnectException: Connection refused: slave2/...:54845
>>> 	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>> 	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>>> 	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
>>> 	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
>>> 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>>> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>> 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>> 	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>>> 	... 1 more
>>>
>>>
>>
>

Mime
View raw message