spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matt Cheah (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-26874) With PARQUET-1414, Spark can erroneously write empty pages
Date Thu, 14 Feb 2019 02:06:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-26874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Matt Cheah updated SPARK-26874:
-------------------------------
    Description: 
This issue will only come up when Spark upgrades its Parquet dependency to the latest off
of parquet-mr/master. This issue is being filed to proactively fix the bug before we upgrade
- it's not something that would easily be found in the current unit tests and can be missed
until the community scale tests in an e.g. RC phase.

Parquet introduced a new feature to limit the number of rows written to a page in a column
chunk - see PARQUET-1414. Previously, Parquet would only flush pages to the column store
after the page writer had filled its buffer with a certain amount of bytes. The idea of the
Parquet patch was to make page writers flush to the column store upon the writer being given
a certain number of rows - the default value is 20000.

The patch makes the Spark Parquet Data Source erroneously write empty pages to column chunks,
making the Parquet file ultimately unreadable with exceptions like these:

 
{code:java}
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block
-1 in file file:/private/var/folders/7f/rrg37sj15r33cwlxbhg7fyg5080xxg/T/spark-2c1b1e5d-c132-42fe-98e1-b523b9baa176/parquet-data/part-00002-b8b4bb3c-b49f-440b-b96c-53fcd19786ad-c000.snappy.parquet
 at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
 at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
 at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
 at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
 at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
 ... 18 more
Caused by: java.lang.IllegalArgumentException: Reading past RLE/BitPacking stream.
 at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:53)
 at org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readNext(RunLengthBitPackingHybridDecoder.java:80)
 at org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readInt(RunLengthBitPackingHybridDecoder.java:62)
 at org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReader.readInteger(RunLengthBitPackingHybridValuesReader.java:53)
 at org.apache.parquet.column.impl.ColumnReaderBase$ValuesReaderIntIterator.nextInt(ColumnReaderBase.java:733)
 at org.apache.parquet.column.impl.ColumnReaderBase.checkRead(ColumnReaderBase.java:567)
 at org.apache.parquet.column.impl.ColumnReaderBase.consume(ColumnReaderBase.java:705)
 at org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:30)
 at org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:47)
 at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:84)
 at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
 at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
 at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
 at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
 at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
 at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
 at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
 ... 22 more
{code}
What's happening here is that the reader is being given a page with no values, which Parquet
can never handle.

The root cause is due to the way Spark treats empty (null) records in optional fields. Concretely,
in {{ParquetWriteSupport#write}}, we always indicate to the recordConsumer that we are starting
a message ({{recordConsumer#startMessage}}). If Spark is given a null field in the row, it
still indicates to the record consumer after having ignored the row that the message is finished
({{recordConsumer#endMessage}}). The ending of the message causes all column writers to increment
their row count in the current page by 1, despite the fact that Spark is not necessarily
sending records to the underlying page writer. Now suppose the page maximum row count is N;
if Spark does the above N times in a page, and particularly if Spark cuts a page boundary
and is subsequently given N empty values for an optional column - the column writer will then
think it needs to flush the page to the column chunk store and will write out an empty page.

This will most likely be manifested in very sparse columns.

A simple demonstration of the issue is given below. Assume this code is manually inserted
into {{ParquetIOSuite}}:
{code:java}
test("PARQUET-1414 Problems") {
  // Manually adjust the maximum row count to reproduce the issue on small data
  sparkContext.hadoopConfiguration.set("parquet.page.row.count.limit", "1")
  withTempPath { location =>
    val path = new Path(location.getCanonicalPath + "/parquet-data")
    val schema = StructType(
      Array(StructField("timestamps1", ArrayType(TimestampType))))
    val rows = ListBuffer[Row]()
    for (j <- 0 until 10) {
      rows += Row(
        null.asInstanceOf[Array[java.sql.Timestamp]])
    }
    val srcDf = spark.createDataFrame(
      sparkContext.parallelize(rows, 3),
      schema,
      true)
    srcDf.write.parquet(path.toString)
    assert(spark.read.parquet(path.toString).collect.size > 0)
  }
}{code}
Reverting the Parquet patch makes the above test pass.

 

  was:
This issue will only come up when Spark upgrades its Parquet dependency to the latest. This
issue is being filed to proactively fix the bug before we upgrade - it's not something that
would easily be found in the current unit tests and can be missed until the community scale
tests in an e.g. RC phase.

Parquet introduced a new feature to limit the number of rows written to a page in a column
chunk - see PARQUET-1414. Previously, Parquet would only flush pages to the column store
after the page writer had filled its buffer with a certain amount of bytes. The idea of the
Parquet patch was to make page writers flush to the column store upon the writer being given
a certain number of rows - the default value is 20000.

The patch makes the Spark Parquet Data Source erroneously write empty pages to column chunks,
making the Parquet file ultimately unreadable with exceptions like these:

 
{code:java}
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block
-1 in file file:/private/var/folders/7f/rrg37sj15r33cwlxbhg7fyg5080xxg/T/spark-2c1b1e5d-c132-42fe-98e1-b523b9baa176/parquet-data/part-00002-b8b4bb3c-b49f-440b-b96c-53fcd19786ad-c000.snappy.parquet
 at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
 at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
 at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
 at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
 at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
 ... 18 more
Caused by: java.lang.IllegalArgumentException: Reading past RLE/BitPacking stream.
 at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:53)
 at org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readNext(RunLengthBitPackingHybridDecoder.java:80)
 at org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readInt(RunLengthBitPackingHybridDecoder.java:62)
 at org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReader.readInteger(RunLengthBitPackingHybridValuesReader.java:53)
 at org.apache.parquet.column.impl.ColumnReaderBase$ValuesReaderIntIterator.nextInt(ColumnReaderBase.java:733)
 at org.apache.parquet.column.impl.ColumnReaderBase.checkRead(ColumnReaderBase.java:567)
 at org.apache.parquet.column.impl.ColumnReaderBase.consume(ColumnReaderBase.java:705)
 at org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:30)
 at org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:47)
 at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:84)
 at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
 at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
 at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
 at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
 at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
 at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
 at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
 ... 22 more
{code}
What's happening here is that the reader is being given a page with no values, which Parquet
can never handle.

The root cause is due to the way Spark treats empty (null) records in optional fields. Concretely,
in {{ParquetWriteSupport#write}}, we always indicate to the recordConsumer that we are starting
a message ({{recordConsumer#startMessage}}). If Spark is given a null field in the row, it
still indicates to the record consumer after having ignored the row that the message is finished
({{recordConsumer#endMessage}}). The ending of the message causes all column writers to increment
their row count in the current page by 1, despite the fact that Spark is not necessarily
sending records to the underlying page writer. Now suppose the page maximum row count is N;
if Spark does the above N times in a page, and particularly if Spark cuts a page boundary
and is subsequently given N empty values for an optional column - the column writer will then
think it needs to flush the page to the column chunk store and will write out an empty page.

This will most likely be manifested in very sparse columns.

A simple demonstration of the issue is given below. Assume this code is manually inserted
into {{ParquetIOSuite}}:
{code:java}
test("PARQUET-1414 Problems") {
  // Manually adjust the maximum row count to reproduce the issue on small data
  sparkContext.hadoopConfiguration.set("parquet.page.row.count.limit", "1")
  withTempPath { location =>
    val path = new Path(location.getCanonicalPath + "/parquet-data")
    val schema = StructType(
      Array(StructField("timestamps1", ArrayType(TimestampType))))
    val rows = ListBuffer[Row]()
    for (j <- 0 until 10) {
      rows += Row(
        null.asInstanceOf[Array[java.sql.Timestamp]])
    }
    val srcDf = spark.createDataFrame(
      sparkContext.parallelize(rows, 3),
      schema,
      true)
    srcDf.write.parquet(path.toString)
    assert(spark.read.parquet(path.toString).collect.size > 0)
  }
}{code}
Reverting the Parquet patch makes the above test pass.

 


> With PARQUET-1414, Spark can erroneously write empty pages
> ----------------------------------------------------------
>
>                 Key: SPARK-26874
>                 URL: https://issues.apache.org/jira/browse/SPARK-26874
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output, SQL
>    Affects Versions: 2.4.0
>            Reporter: Matt Cheah
>            Priority: Major
>
> This issue will only come up when Spark upgrades its Parquet dependency to the latest
off of parquet-mr/master. This issue is being filed to proactively fix the bug before we upgrade
- it's not something that would easily be found in the current unit tests and can be missed
until the community scale tests in an e.g. RC phase.
> Parquet introduced a new feature to limit the number of rows written to a page in a column
chunk - see PARQUET-1414. Previously, Parquet would only flush pages to the column store
after the page writer had filled its buffer with a certain amount of bytes. The idea of the
Parquet patch was to make page writers flush to the column store upon the writer being given
a certain number of rows - the default value is 20000.
> The patch makes the Spark Parquet Data Source erroneously write empty pages to column
chunks, making the Parquet file ultimately unreadable with exceptions like these:
>  
> {code:java}
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in
block -1 in file file:/private/var/folders/7f/rrg37sj15r33cwlxbhg7fyg5080xxg/T/spark-2c1b1e5d-c132-42fe-98e1-b523b9baa176/parquet-data/part-00002-b8b4bb3c-b49f-440b-b96c-53fcd19786ad-c000.snappy.parquet
>  at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
>  at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
>  at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
>  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
>  ... 18 more
> Caused by: java.lang.IllegalArgumentException: Reading past RLE/BitPacking stream.
>  at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:53)
>  at org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readNext(RunLengthBitPackingHybridDecoder.java:80)
>  at org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readInt(RunLengthBitPackingHybridDecoder.java:62)
>  at org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReader.readInteger(RunLengthBitPackingHybridValuesReader.java:53)
>  at org.apache.parquet.column.impl.ColumnReaderBase$ValuesReaderIntIterator.nextInt(ColumnReaderBase.java:733)
>  at org.apache.parquet.column.impl.ColumnReaderBase.checkRead(ColumnReaderBase.java:567)
>  at org.apache.parquet.column.impl.ColumnReaderBase.consume(ColumnReaderBase.java:705)
>  at org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:30)
>  at org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:47)
>  at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:84)
>  at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
>  at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>  at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>  at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>  at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>  at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>  at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
>  ... 22 more
> {code}
> What's happening here is that the reader is being given a page with no values, which
Parquet can never handle.
> The root cause is due to the way Spark treats empty (null) records in optional fields.
Concretely, in {{ParquetWriteSupport#write}}, we always indicate to the recordConsumer that
we are starting a message ({{recordConsumer#startMessage}}). If Spark is given a null field
in the row, it still indicates to the record consumer after having ignored the row that the
message is finished ({{recordConsumer#endMessage}}). The ending of the message causes all
column writers to increment their row count in the current page by 1, despite the fact that
Spark is not necessarily sending records to the underlying page writer. Now suppose the page
maximum row count is N; if Spark does the above N times in a page, and particularly if Spark
cuts a page boundary and is subsequently given N empty values for an optional column - the
column writer will then think it needs to flush the page to the column chunk store and will
write out an empty page.
> This will most likely be manifested in very sparse columns.
> A simple demonstration of the issue is given below. Assume this code is manually inserted
into {{ParquetIOSuite}}:
> {code:java}
> test("PARQUET-1414 Problems") {
>   // Manually adjust the maximum row count to reproduce the issue on small data
>   sparkContext.hadoopConfiguration.set("parquet.page.row.count.limit", "1")
>   withTempPath { location =>
>     val path = new Path(location.getCanonicalPath + "/parquet-data")
>     val schema = StructType(
>       Array(StructField("timestamps1", ArrayType(TimestampType))))
>     val rows = ListBuffer[Row]()
>     for (j <- 0 until 10) {
>       rows += Row(
>         null.asInstanceOf[Array[java.sql.Timestamp]])
>     }
>     val srcDf = spark.createDataFrame(
>       sparkContext.parallelize(rows, 3),
>       schema,
>       true)
>     srcDf.write.parquet(path.toString)
>     assert(spark.read.parquet(path.toString).collect.size > 0)
>   }
> }{code}
> Reverting the Parquet patch makes the above test pass.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message