parquet-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (PARQUET-1531) Page row count limit causes empty pages to be written from MessageColumnIO
Date Wed, 13 Mar 2019 07:26:00 GMT

    [ https://issues.apache.org/jira/browse/PARQUET-1531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16791398#comment-16791398
] 

ASF GitHub Bot commented on PARQUET-1531:
-----------------------------------------

gszadovszky commented on pull request #620: PARQUET-1531: Page row count limit causes empty
pages to be written from MessageColumnIO
URL: https://github.com/apache/parquet-mr/pull/620
 
 
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Page row count limit causes empty pages to be written from MessageColumnIO
> --------------------------------------------------------------------------
>
>                 Key: PARQUET-1531
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1531
>             Project: Parquet
>          Issue Type: Bug
>    Affects Versions: 1.11.0
>            Reporter: Matt Cheah
>            Assignee: Gabor Szadovszky
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.11.0
>
>
> This originally manifested as https://issues.apache.org/jira/browse/SPARK-26874 but
we realized that this is fundamentally an issue in the way PARQUET-1414's solution interacts
with {{MessageColumnIO}}, where Spark is one such user of that API.
> In {{MessageColumnIO#endMessage()}}, we first examine if any fields are missing and fill
in the values with null in {{MessageColumnIO#writeNullForMissingFieldsAtCurrentLevel}}. However,
this method might not actually write any nulls to the underlying page. {{MessageColumnIO}}
can buffer nulls in memory and flush them to the page store lazily.
> Regardless of whether or not nulls are flushed to the page store, in {{MessageColumnIO#endMessage}}
we always call {{columns#endRecord()}} which will signal to the {{ColumnWriteStore}} that
a record was written. At that point, the write store increments the row count for the current
page by 1, and then check if the page needs to be flushed due to hitting the page row count
limit.
> The problem is that with the above writing scheme, {{MessageColumnIO}} can cause empty
pages to be written to Parquet files, and empty pages are not readable by Parquet readers.
Suppose the page row count limit is N, and the {{MessageColumnIO}} receives N nulls for a
column. The {{MessageColumnIO}} will buffer the nulls in memory, and doesn't necessarily flush
the nulls to the writer yet. On the Nth call to {{endMessage()}}, however, the column store
will think there are N values in memory and that the page has hit the row count limit, despite
the fact that no rows have actually been written at all. But the underlying page writer will
write an empty page regardless.
> To illustrate the problem, one can try running this simple example inserted into Spark's
\{{ParquetIOSuite}} when Spark has been upgraded to use the master branch of Parquet. Attach
a debugger to {{MessageColumnIO#endMessage()}} and trace the logic accordingly - the column
writer will push a page with 0 values:
> {code:java}
> test("PARQUET-1414 Problems") {
>   // Manually adjust the maximum row count to reproduce the issue on small data
>   sparkContext.hadoopConfiguration.set("parquet.page.row.count.limit", "1")
>   withTempPath { location =>
>     val path = new Path(location.getCanonicalPath + "/parquet-data")
>     val schema = StructType(
>       Array(StructField("timestamps1", ArrayType(TimestampType))))
>     val rows = ListBuffer[Row]()
>     for (j <- 0 until 10) {
>       rows += Row(
>         null.asInstanceOf[Array[java.sql.Timestamp]])
>     }
>     val srcDf = spark.createDataFrame(
>       sparkContext.parallelize(rows, 3),
>       schema,
>       true)
>     srcDf.write.parquet(path.toString)
>     assert(spark.read.parquet(path.toString).collect.size > 0)
>   }
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message