spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Uri Laserson <laser...@cloudera.com>
Subject Re: java.net.SocketException on reduceByKey() in pyspark
Date Thu, 20 Mar 2014 03:25:57 GMT
I have the exact same error running on a bare metal cluster with CentOS6
and Python 2.6.6.  Any other thoughts on the problem here?  I only get the
error on operations that require communication, like reduceByKey or groupBy.


On Sun, Mar 2, 2014 at 1:29 PM, Nicholas Chammas <nicholas.chammas@gmail.com
> wrote:

> Alright, so this issue is related to the upgrade to Python 2.7, which
> relates it to the other Python 2.7 issue I reported in this thread<http://apache-spark-user-list.1001560.n3.nabble.com/Python-2-7-numpy-break-sortByKey-td2214.html>
> .
>
> I modified my code not to rely on Python 2.7, spun up a new cluster and
> did *not* upgrade its version of Python from 2.6.8. The code ran fine.
>
> I'd open a JIRA issue about this, but I cannot provide a simple repro that
> anyone can walk through.
>
> Nick
>
>
> On Fri, Feb 28, 2014 at 11:44 PM, Nicholas Chammas <
> nicholas.chammas@gmail.com> wrote:
>
>> Even a count() on the result of the flatMap() fails with the same error.
>> Somehow the formatting on the error output got messed in my previous email,
>> so here's a relevant snippet of the output again.
>>
>> 14/03/01 04:39:01 INFO scheduler.DAGScheduler: Failed to run count at
>> <stdin>:1
>> Traceback (most recent call last):
>>   File "<stdin>", line 1, in <module>
>>   File "/root/spark/python/pyspark/rdd.py", line 542, in count
>>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>>   File "/root/spark/python/pyspark/rdd.py", line 533, in sum
>>     return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
>>   File "/root/spark/python/pyspark/rdd.py", line 499, in reduce
>>     vals = self.mapPartitions(func).collect()
>>   File "/root/spark/python/pyspark/rdd.py", line 463, in collect
>>     bytesInJava = self._jrdd.collect().iterator()
>>   File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
>> line 537, in __call__
>>   File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line
>> 300, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling o396.collect.
>> : org.apache.spark.SparkException: Job aborted: Task 29.0:0 failed 4
>> times (most recent failure: Exception failure: java.net.SocketException:
>> Connection reset)
>>  at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>  at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>  at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>>  at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>>  at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
>>  at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>  at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>  at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> Any pointers to where I should look, or things to try?
>>
>> Nick
>>
>>
>>
>> On Fri, Feb 28, 2014 at 6:33 PM, nicholas.chammas <
>> nicholas.chammas@gmail.com> wrote:
>>
>>> I've done a whole bunch of things to this RDD, and now when I try to
>>> sortByKey(), this is what I get:
>>>
>>> >>> flattened_po.flatMap(lambda x: map_to_database_types(x)).sortByKey()14/02/28
>>> 23:18:41 INFO spark.SparkContext: Starting job: sortByKey at <stdin>:114/02/28
>>> 23:18:41 INFO scheduler.DAGScheduler: Got job 22 (sortByKey at <stdin>:1)
>>> with 1 output partitions (allowLocal=false)14/02/28 23:18:41 INFO
>>> scheduler.DAGScheduler: Final stage: Stage 23 (sortByKey at <stdin>:1)14/02/28
>>> 23:18:41 INFO scheduler.DAGScheduler: Parents of final stage: List()14/02/28
>>> 23:18:41 INFO scheduler.DAGScheduler: Missing parents: List()14/02/28
>>> 23:18:41 INFO scheduler.DAGScheduler: Submitting Stage 23 (PythonRDD[41] at
>>> sortByKey at <stdin>:1), which has no missing parents14/02/28 23:18:41
>>> INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 23
>>> (PythonRDD[41] at sortByKey at <stdin>:1)14/02/28 23:18:41 INFO
>>> scheduler.TaskSchedulerImpl: Adding task set 23.0 with 1 tasks14/02/28
>>> 23:18:41 INFO scheduler.TaskSetManager: Starting task 23.0:0 as TID 32 on
>>> executor 0: ip-<blah>.ec2.internal (PROCESS_LOCAL)14/02/28 23:18:41
>>> INFO scheduler.TaskSetManager: Serialized task 23.0:0 as 4985 bytes in 1 ms14/02/28
>>> 23:18:41 WARN scheduler.TaskSetManager: Lost TID 32 (task 23.0:0)14/02/28
>>> 23:18:41 WARN scheduler.TaskSetManager: Loss was due to
>>> java.net.SocketExceptionjava.net.SocketException: Connection reset at
>>> java.net.SocketInputStream.read(SocketInputStream.java:196) at
>>> java.net.SocketInputStream.read(SocketInputStream.java:122) at
>>> java.io.BufferedInputStream.fill(BufferedInputStream.java:235) at
>>> java.io.BufferedInputStream.read(BufferedInputStream.java:254) at
>>> java.io.DataInputStream.readInt(DataInputStream.java:387) at
>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:110) at
>>> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:153)
at
>>> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at
>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at
>>> org.apache.spark.scheduler.Task.run(Task.scala:53) at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at
>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at
>>> java.lang.Thread.run(Thread.java:744)14/02/28 23:18:41 INFO
>>> scheduler.TaskSetManager: Starting task 23.0:0 as TID 33 on executor 0:
>>> ip-<blah>.ec2.internal (PROCESS_LOCAL)14/02/28 23:18:41 INFO
>>> scheduler.TaskSetManager: Serialized task 23.0:0 as 4985 bytes in 1 ms14/02/28
>>> 23:18:41 WARN scheduler.TaskSetManager: Lost TID 33 (task 23.0:0)14/02/28
>>> 23:18:41 INFO scheduler.TaskSetManager: Loss was due to
>>> java.net.SocketException: Connection reset [duplicate 1]14/02/28
>>> 23:18:41 INFO scheduler.TaskSetManager: Starting task 23.0:0 as TID 34 on
>>> executor 0: ip-<blah>.ec2.internal (PROCESS_LOCAL)14/02/28 23:18:41
>>> INFO scheduler.TaskSetManager: Serialized task 23.0:0 as 4985 bytes in 1 ms14/02/28
>>> 23:18:41 WARN scheduler.TaskSetManager: Lost TID 34 (task 23.0:0)14/02/28
>>> 23:18:41 INFO scheduler.TaskSetManager: Loss was due to
>>> java.net.SocketException: Connection reset [duplicate 2]14/02/28
>>> 23:18:41 INFO scheduler.TaskSetManager: Starting task 23.0:0 as TID 35 on
>>> executor 0: ip-<blah>.ec2.internal (PROCESS_LOCAL)14/02/28 23:18:41
>>> INFO scheduler.TaskSetManager: Serialized task 23.0:0 as 4985 bytes in 1 ms14/02/28
>>> 23:18:41 WARN scheduler.TaskSetManager: Lost TID 35 (task 23.0:0)14/02/28
>>> 23:18:41 INFO scheduler.TaskSetManager: Loss was due to
>>> java.net.SocketException: Connection reset [duplicate 3]14/02/28
>>> 23:18:41 ERROR scheduler.TaskSetManager: Task 23.0:0 failed 4 times;
>>> aborting job14/02/28 23:18:41 INFO scheduler.TaskSchedulerImpl: Remove
>>> TaskSet 23.0 from pool 14/02/28 23:18:41 INFO scheduler.DAGScheduler:
>>> Failed to run sortByKey at <stdin>:1Traceback (most recent call last):
>>> File "<stdin>", line 1, in <module>  File
>>> "/root/spark/python/pyspark/rdd.py", line 361, in sortByKey    rddSize
>>> = self.count()  File "/root/spark/python/pyspark/rdd.py", line 542, in
>>> count    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>>> File "/root/spark/python/pyspark/rdd.py", line 533, in sum    return
>>> self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)  File
>>> "/root/spark/python/pyspark/rdd.py", line 499, in reduce    vals =
>>> self.mapPartitions(func).collect()  File
>>> "/root/spark/python/pyspark/rdd.py", line 463, in collect
>>> bytesInJava = self._jrdd.collect().iterator()  File
>>> "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", line 537,
>>> in __call__  File
>>> "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line 300, in
>>> get_return_valuepy4j.protocol.Py4JJavaError: An error occurred while
>>> calling o332.collect.: org.apache.spark.SparkException: Job aborted:
>>> Task 23.0:0 failed 4 times (most recent failure: Exception failure:
>>> java.net.SocketException: Connection reset) at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at
>>> org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at
>>> scala.Option.foreach(Option.scala:236) at
>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at
>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at
>>> akka.actor.ActorCell.invoke(ActorCell.scala:456) at
>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at
>>> akka.dispatch.Mailbox.run(Mailbox.scala:219) at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> >>>
>>>
>>>
>>> The lambda passed to flatMap() returns a list of tuples; take() works
>>> fine just on the flatMap().
>>>
>>> Where would I start to troubleshoot this error?
>>>
>>> The error output includes mention of reset connections, so I naively
>>> confirmed that the master node can reach its 1 slave. Dunno if those are
>>> related things.
>>>
>>> If it matters any, I upgraded the cluster to Python 2.7 using the
>>> instructions here <https://spark-project.atlassian.net/browse/SPARK-922>.
>>> Also, I am running Spark 0.9.0, though I notice that in the error output is
>>> mention of 0.8.1 files.
>>>
>>> Nick
>>>
>>> ------------------------------
>>> View this message in context: java.net.SocketException on reduceByKey()
>>> in pyspark<http://apache-spark-user-list.1001560.n3.nabble.com/java-net-SocketException-on-reduceByKey-in-pyspark-tp2184.html>
>>> Sent from the Apache Spark User List mailing list archive<http://apache-spark-user-list.1001560.n3.nabble.com/>at
Nabble.com.
>>>
>>
>>
>


-- 
Uri Laserson, PhD
Data Scientist, Cloudera
Twitter/GitHub: @laserson
+1 617 910 0447
laserson@cloudera.com

Mime
View raw message