[ https://issues.apache.org/jira/browse/SPARK-23961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Michel Lemay updated SPARK-23961:
---------------------------------
Description:
Given a dataframe and use toLocalIterator. If we do not consume all records, it will throw:
{quote}ERROR PythonRDD: Error while sending iterator
java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:497)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:705)
at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705)
at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:706)
{quote}
To reproduce, here is a simple pyspark shell script that show the error:
{quote}import itertools
df = spark.read.parquet("large parquet folder").cache()
print(df.count())
b = df.toLocalIterator()
print(len(list(itertools.islice(b, 20))))
b = None # Make the iterator goes out of scope. Throws here.
{quote}
Observations:
* Consuming all records do not throw. Taking only a subset of the partitions create the
error.
* In another experiment, doing the same on a regular RDD works if we cache/materialize it.
If we do not cache the RDD, it throws similarly.
* It works in scala shell
was:
Given a dataframe, take it's rdd and use toLocalIterator. If we do not consume all records,
it will throw:
{quote}ERROR PythonRDD: Error while sending iterator
java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:497)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:705)
at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705)
at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:706)
{quote}
To reproduce, here is a simple pyspark shell script that show the error:
{quote}import itertools
df = spark.read.parquet("large parquet folder")
cachedRDD = df.rdd.cache()
print(cachedRDD.count()) # materialize
b = cachedRDD.toLocalIterator()
print(len(list(itertools.islice(b, 20))))
b = None # Make the iterator goes out of scope. Throws here.
{quote}
Observations:
* Consuming all records do not throw. Taking only a subset of the partitions create the
error.
* In another experiment, doing the same on a regular RDD works if we cache/materialize it.
If we do not cache the RDD, it throws similarly.
* It works in scala shell
> pyspark toLocalIterator throws an exception
> -------------------------------------------
>
> Key: SPARK-23961
> URL: https://issues.apache.org/jira/browse/SPARK-23961
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.2.1
> Reporter: Michel Lemay
> Priority: Minor
> Labels: DataFrame, pyspark
>
> Given a dataframe and use toLocalIterator. If we do not consume all records, it will
throw:
> {quote}ERROR PythonRDD: Error while sending iterator
> java.net.SocketException: Connection reset by peer: socket write error
> at java.net.SocketOutputStream.socketWrite0(Native Method)
> at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
> at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
> at java.io.DataOutputStream.write(DataOutputStream.java:107)
> at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
> at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:497)
> at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509)
> at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
> at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:705)
> at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705)
> at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
> at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:706)
> {quote}
>
> To reproduce, here is a simple pyspark shell script that show the error:
> {quote}import itertools
> df = spark.read.parquet("large parquet folder").cache()
> print(df.count())
> b = df.toLocalIterator()
> print(len(list(itertools.islice(b, 20))))
> b = None # Make the iterator goes out of scope. Throws here.
> {quote}
>
> Observations:
> * Consuming all records do not throw. Taking only a subset of the partitions create
the error.
> * In another experiment, doing the same on a regular RDD works if we cache/materialize
it. If we do not cache the RDD, it throws similarly.
> * It works in scala shell
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org
|