spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "jiangyu (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-28482) Data incomplete when using pandas udf in pyspark
Date Tue, 23 Jul 2019 08:44:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-28482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

jiangyu updated SPARK-28482:
----------------------------
    Description: 
Hi,
  
 Since Spark 2.3.x, pandas udf has been introduced as default ser/des method when using udf.
However, an issue raises with python >= 3.5.x version.
 We use pandas udf to process batches of data, but we find the data is incomplete in python
3.x. At first , i think the process logical maybe wrong, so i change the code to very simple
one and it has the same problem.After investigate for a week, i find it is related to pyarrow.
  
  
 *Reproduce procedure:*

1. prepare data
 The data have seven column, a、b、c、d、e、f and g, data type is Integer
 a,b,c,d,e,f,g
 1,2,3,4,5,6,7
 1,2,3,4,5,6,7
 1,2,3,4,5,6,7
 1,2,3,4,5,6,7
  produce 100,000 rows and name the file test.csv ,upload to hdfs, then load it , and repartition
it to 1 partition.
  
{code:java}
df=spark.read.format('csv').option("header","true").load('/test.csv')
df=df.select(*(col(c).cast("int").alias(c) for c in df.columns))
df=df.repartition(1)
spark_context = SparkContext.getOrCreate() {code}
 
 2.register pandas udf
  
{code:java}
def add_func(a,b,c,d,e,f,g):
    print('iterator one time')
    return a
add = pandas_udf(add_func, returnType=IntegerType())
df_result=df.select(add(col("a"),col("b"),col("c"),col("d"),col("e"),col("f"),col("g"))){code}
 
 3.apply pandas udf
  
{code:java}
def trigger_func(iterator):
      yield iterator
df_result.rdd.foreachPartition(trigger_func){code}
 
 4.execute it in pyspark (local or yarn)
 run it with conf spark.sql.execution.arrow.maxRecordsPerBatch=100000. As mentioned before
the total row number is 1000000, it should print "iterator one time " 10 times.
 (1)Python 2.7 envs:
  
{code:java}
PYSPARK_PYTHON=/usr/lib/conda/envs/py2.7/bin/python pyspark --conf spark.sql.execution.arrow.maxRecordsPerBatch=100000
--conf spark.executor.pyspark.memory=2g --conf spark.sql.execution.arrow.enabled=true --executor-cores
1{code}
 
!py2.7.png!   
 The result is right, 10 times of print.

 

 

(2)Python 3.5 or 3.6 envs:
{code:java}
PYSPARK_PYTHON=/usr/lib/conda/envs/python3.6/bin/python pyspark --conf spark.sql.execution.arrow.maxRecordsPerBatch=100000
--conf spark.executor.pyspark.memory=2g --conf spark.sql.execution.arrow.enabled=true --executor-cores{code}
 

!py3.6.png!

The data is incomplete. Exception is print by jvm spark which have been added by us , I will
explain it later.
  
  
h3. *Investigation*

The “process done” is added in the worker.py.
!worker.png!
 In order to get the exception,  change the spark code, the code is under core/src/main/scala/org/apache/spark/util/Utils.scala
, and add this code to print the exception.
  

 
{code:java}
@@ -1362,6 +1362,8 @@ private[spark] object Utils extends Logging {
 case t: Throwable =>
 // Purposefully not using NonFatal, because even fatal exceptions
 // we don't want to have our finallyBlock suppress
+ logInfo(t.getLocalizedMessage)
+ t.printStackTrace()
 originalThrowable = t
 throw originalThrowable
 } finally {{code}
 

 
 It seems the pyspark get the data from jvm , but pyarrow get the data incomplete. Pyarrow
side think the data is finished, then shutdown the socket. At the same time, the jvm side
still writes to the same socket , but get socket close exception.
 The pyarrow part is in ipc.pxi:
  
{code:java}
cdef class _RecordBatchReader:
 cdef:
 shared_ptr[CRecordBatchReader] reader
 shared_ptr[InputStream] in_stream
cdef readonly:
 Schema schema
def _cinit_(self):
 pass
def _open(self, source):
 get_input_stream(source, &self.in_stream)
 with nogil:
 check_status(CRecordBatchStreamReader.Open(
 self.in_stream.get(), &self.reader))
self.schema = pyarrow_wrap_schema(self.reader.get().schema())
def _iter_(self):
 while True:
 yield self.read_next_batch()
def get_next_batch(self):
 import warnings
 warnings.warn('Please use read_next_batch instead of '
 'get_next_batch', FutureWarning)
 return self.read_next_batch()
def read_next_batch(self):
 """
 Read next RecordBatch from the stream. Raises StopIteration at end of
 stream
 """
 cdef shared_ptr[CRecordBatch] batch
with nogil:
 check_status(self.reader.get().ReadNext(&batch))
if batch.get() == NULL:
 raise StopIteration
 return pyarrow_wrap_batch(batch){code}
 

read_next_batch function get NULL, think the iterator is over.
  
h3. *RESULT*

Our environment is spark 2.4.3, we have tried pyarrow version 0.10.0 and 0.14.0 , python version
is python 2.7, python 3.5, python 3.6.
 When using python 2.7, everything is fine. But when change to python 3.5,3,6, the data is
wrong.
 The column number is critical to trigger this bug, if column number is less than 5 , this
bug probably will not happen. But If the column number is big , for example 7 or above, it
will happened every time.
 So we wonder if there is some conflict between python 3.x and pyarrow version? 
 I have put the code and data as attachment.

  was:
Hi,
 
Since Spark 2.3.x, pandas udf has been introduced as default ser/des method when using udf.
However, an issue raises with python >= 3.5.x version.
We use pandas udf to process batches of data, but we find the data is incomplete in python
3.x. At first , i think the process logical maybe wrong, so i change the code to very simple
one and it has the same problem.After investigate for a week, i find it is related to pyarrow.
  
 
*Reproduce procedure:*

1. prepare data
The data have seven column, a、b、c、d、e、f and g, data type is Integer
a,b,c,d,e,f,g
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
 produce 100,000 rows and name the file test.csv ,upload to hdfs, then load it , and repartition
it to 1 partition.
 
{code:java}
df=spark.read.format('csv').option("header","true").load('/test.csv')
df=df.select(*(col(c).cast("int").alias(c) for c in df.columns))
df=df.repartition(1)
spark_context = SparkContext.getOrCreate() {code}
 
2.register pandas udf
 
{code:java}
def add_func(a,b,c,d,e,f,g):
    print('iterator one time')
    return a
add = pandas_udf(add_func, returnType=IntegerType())
df_result=df.select(add(col("a"),col("b"),col("c"),col("d"),col("e"),col("f"),col("g"))){code}
 
3.apply pandas udf
 
{code:java}
def trigger_func(iterator):
      yield iterator
df_result.rdd.foreachPartition(trigger_func){code}
 
4.execute it in pyspark (local or yarn)
run it with conf spark.sql.execution.arrow.maxRecordsPerBatch=100000. As mentioned before
the total row number is 1000000, it should print "iterator one time " 10 times.
(1)Python 2.7 envs:
 
{code:java}
PYSPARK_PYTHON=/usr/lib/conda/envs/py2.7/bin/python pyspark --conf spark.sql.execution.arrow.maxRecordsPerBatch=100000
--conf spark.executor.pyspark.memory=2g --conf spark.sql.execution.arrow.enabled=true --executor-cores
1{code}
 
!image-2019-07-23-16-06-49-889.png!  
The result is right, 10 times of print.

 

 

(2)Python 3.5 or 3.6 envs:
{code:java}
PYSPARK_PYTHON=/usr/lib/conda/envs/python3.6/bin/python pyspark --conf spark.sql.execution.arrow.maxRecordsPerBatch=100000
--conf spark.executor.pyspark.memory=2g --conf spark.sql.execution.arrow.enabled=true --executor-cores{code}
 

!py3.6.png!

The data is incomplete. Exception is print by jvm spark which have been added by us , I will
explain it later.
 
 
h3. *Investigation*

The “process done” is added in the worker.py.
!worker.png!   
In order to get the exception,  change the spark code, the code is under core/src/main/scala/org/apache/spark/util/Utils.scala
, and add this code to print the exception.
 

 
{code:java}
@@ -1362,6 +1362,8 @@ private[spark] object Utils extends Logging {
 case t: Throwable =>
 // Purposefully not using NonFatal, because even fatal exceptions
 // we don't want to have our finallyBlock suppress
+ logInfo(t.getLocalizedMessage)
+ t.printStackTrace()
 originalThrowable = t
 throw originalThrowable
 } finally {{code}
 

 
It seems the pyspark get the data from jvm , but pyarrow get the data incomplete. Pyarrow
side think the data is finished, then shutdown the socket. At the same time, the jvm side
still writes to the same socket , but get socket close exception.
The pyarrow part is in ipc.pxi:
 
{code:java}
cdef class _RecordBatchReader:
 cdef:
 shared_ptr[CRecordBatchReader] reader
 shared_ptr[InputStream] in_stream
cdef readonly:
 Schema schema
def _cinit_(self):
 pass
def _open(self, source):
 get_input_stream(source, &self.in_stream)
 with nogil:
 check_status(CRecordBatchStreamReader.Open(
 self.in_stream.get(), &self.reader))
self.schema = pyarrow_wrap_schema(self.reader.get().schema())
def _iter_(self):
 while True:
 yield self.read_next_batch()
def get_next_batch(self):
 import warnings
 warnings.warn('Please use read_next_batch instead of '
 'get_next_batch', FutureWarning)
 return self.read_next_batch()
def read_next_batch(self):
 """
 Read next RecordBatch from the stream. Raises StopIteration at end of
 stream
 """
 cdef shared_ptr[CRecordBatch] batch
with nogil:
 check_status(self.reader.get().ReadNext(&batch))
if batch.get() == NULL:
 raise StopIteration
 return pyarrow_wrap_batch(batch){code}
 

read_next_batch function get NULL, think the iterator is over.
 
h3. *RESULT*

Our environment is spark 2.4.3, we have tried pyarrow version 0.10.0 and 0.14.0 , python version
is python 2.7, python 3.5, python 3.6.
When using python 2.7, everything is fine. But when change to python 3.5,3,6, the data is
wrong.
The column number is critical to trigger this bug, if column number is less than 5 , this
bug probably will not happen. But If the column number is big , for example 7 or above, it
will happened every time.
So we wonder if there is some conflict between python 3.x and pyarrow version? 
I have put the code and data as attachment.


> Data incomplete when using pandas udf in pyspark
> ------------------------------------------------
>
>                 Key: SPARK-28482
>                 URL: https://issues.apache.org/jira/browse/SPARK-28482
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.3
>         Environment: centos 7.4   
> pyarrow 0.10.0 0.14.0
> python 2.7 3.5 3.6
>            Reporter: jiangyu
>            Priority: Major
>         Attachments: py2.7.png, py3.6.png, test.csv, test.py, worker.png
>
>
> Hi,
>   
>  Since Spark 2.3.x, pandas udf has been introduced as default ser/des method when using
udf. However, an issue raises with python >= 3.5.x version.
>  We use pandas udf to process batches of data, but we find the data is incomplete in
python 3.x. At first , i think the process logical maybe wrong, so i change the code to very
simple one and it has the same problem.After investigate for a week, i find it is related
to pyarrow.   
>   
>  *Reproduce procedure:*
> 1. prepare data
>  The data have seven column, a、b、c、d、e、f and g, data type is Integer
>  a,b,c,d,e,f,g
>  1,2,3,4,5,6,7
>  1,2,3,4,5,6,7
>  1,2,3,4,5,6,7
>  1,2,3,4,5,6,7
>   produce 100,000 rows and name the file test.csv ,upload to hdfs, then load it , and
repartition it to 1 partition.
>   
> {code:java}
> df=spark.read.format('csv').option("header","true").load('/test.csv')
> df=df.select(*(col(c).cast("int").alias(c) for c in df.columns))
> df=df.repartition(1)
> spark_context = SparkContext.getOrCreate() {code}
>  
>  2.register pandas udf
>   
> {code:java}
> def add_func(a,b,c,d,e,f,g):
>     print('iterator one time')
>     return a
> add = pandas_udf(add_func, returnType=IntegerType())
> df_result=df.select(add(col("a"),col("b"),col("c"),col("d"),col("e"),col("f"),col("g"))){code}
>  
>  3.apply pandas udf
>   
> {code:java}
> def trigger_func(iterator):
>       yield iterator
> df_result.rdd.foreachPartition(trigger_func){code}
>  
>  4.execute it in pyspark (local or yarn)
>  run it with conf spark.sql.execution.arrow.maxRecordsPerBatch=100000. As mentioned before
the total row number is 1000000, it should print "iterator one time " 10 times.
>  (1)Python 2.7 envs:
>   
> {code:java}
> PYSPARK_PYTHON=/usr/lib/conda/envs/py2.7/bin/python pyspark --conf spark.sql.execution.arrow.maxRecordsPerBatch=100000
--conf spark.executor.pyspark.memory=2g --conf spark.sql.execution.arrow.enabled=true --executor-cores
1{code}
>  
> !py2.7.png!   
>  The result is right, 10 times of print.
>  
>  
> (2)Python 3.5 or 3.6 envs:
> {code:java}
> PYSPARK_PYTHON=/usr/lib/conda/envs/python3.6/bin/python pyspark --conf spark.sql.execution.arrow.maxRecordsPerBatch=100000
--conf spark.executor.pyspark.memory=2g --conf spark.sql.execution.arrow.enabled=true --executor-cores{code}
>  
> !py3.6.png!
> The data is incomplete. Exception is print by jvm spark which have been added by us , I
will explain it later.
>   
>   
> h3. *Investigation*
> The “process done” is added in the worker.py.
> !worker.png!
>  In order to get the exception,  change the spark code, the code is under core/src/main/scala/org/apache/spark/util/Utils.scala
, and add this code to print the exception.
>   
>  
> {code:java}
> @@ -1362,6 +1362,8 @@ private[spark] object Utils extends Logging {
>  case t: Throwable =>
>  // Purposefully not using NonFatal, because even fatal exceptions
>  // we don't want to have our finallyBlock suppress
> + logInfo(t.getLocalizedMessage)
> + t.printStackTrace()
>  originalThrowable = t
>  throw originalThrowable
>  } finally {{code}
>  
>  
>  It seems the pyspark get the data from jvm , but pyarrow get the data incomplete. Pyarrow
side think the data is finished, then shutdown the socket. At the same time, the jvm side
still writes to the same socket , but get socket close exception.
>  The pyarrow part is in ipc.pxi:
>   
> {code:java}
> cdef class _RecordBatchReader:
>  cdef:
>  shared_ptr[CRecordBatchReader] reader
>  shared_ptr[InputStream] in_stream
> cdef readonly:
>  Schema schema
> def _cinit_(self):
>  pass
> def _open(self, source):
>  get_input_stream(source, &self.in_stream)
>  with nogil:
>  check_status(CRecordBatchStreamReader.Open(
>  self.in_stream.get(), &self.reader))
> self.schema = pyarrow_wrap_schema(self.reader.get().schema())
> def _iter_(self):
>  while True:
>  yield self.read_next_batch()
> def get_next_batch(self):
>  import warnings
>  warnings.warn('Please use read_next_batch instead of '
>  'get_next_batch', FutureWarning)
>  return self.read_next_batch()
> def read_next_batch(self):
>  """
>  Read next RecordBatch from the stream. Raises StopIteration at end of
>  stream
>  """
>  cdef shared_ptr[CRecordBatch] batch
> with nogil:
>  check_status(self.reader.get().ReadNext(&batch))
> if batch.get() == NULL:
>  raise StopIteration
>  return pyarrow_wrap_batch(batch){code}
>  
> read_next_batch function get NULL, think the iterator is over.
>   
> h3. *RESULT*
> Our environment is spark 2.4.3, we have tried pyarrow version 0.10.0 and 0.14.0 , python
version is python 2.7, python 3.5, python 3.6.
>  When using python 2.7, everything is fine. But when change to python 3.5,3,6, the data
is wrong.
>  The column number is critical to trigger this bug, if column number is less than 5 ,
this bug probably will not happen. But If the column number is big , for example 7 or above,
it will happened every time.
>  So we wonder if there is some conflict between python 3.x and pyarrow version? 
>  I have put the code and data as attachment.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message