spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hugh Hyndman <h...@redleaf.ca>
Subject PushDown Filter Not Reset
Date Sun, 09 Sep 2018 10:56:14 GMT
Hi,

I am developing my own datasource reader and have implemented pushdown filters. I am struggling
with a case where I do not get a call to pushFilters() when I have no filter in order to reset
a previously defined filter. Here is a a Spark shell session to demonstrate the issue, with
debug log statements to show call invocation.

1) Initial unfiltered load/show. Note, data source provides schema.

scala> val df = spark.read.format("MyDataSource").
    option("function", "testSpartan").
    option("loglevel", "debug").load
18/08/26 07:42:56.580 DEBUG dr: MyDataSourceReader()
18/08/26 07:42:56.580 DEBUG dr:   function: testSpartan
18/08/26 07:42:56.580 DEBUG dr:   loglevel: debug
df: org.apache.spark.sql.DataFrame = [jcolumn: bigint, ccolumn: string]

scala> df.show
18/08/26 07:43:33.659 DEBUG dr: pruneColums()
18/08/26 07:43:33.659 DEBUG dr:   StructField(jcolumn,LongType,false)
18/08/26 07:43:33.659 DEBUG dr:   StructField(ccolumn,StringType,false)
18/08/26 07:43:33.659 DEBUG dr: pushedFilters()
18/08/26 07:43:33.659 DEBUG dr: pushedFilters()
18/08/26 07:43:33.659 DEBUG dr: pushedFilters()
18/08/26 07:43:33.659 DEBUG dr: pushedFilters()
18/08/26 07:43:33.678 DEBUG dr: createBatchDataReaderFactories()
18/08/26 07:43:33.699 DEBUG dr: next()
18/08/26 07:43:33.701 DEBUG dr: get()
18/08/26 07:43:33.701 DEBUG dr: next()
+-------+-------+
|jcolumn|ccolumn|
+-------+-------+
|      0|      a|
|      1|      b|
|      2|      c|
|      3|      a|
|      4|      b|
|      5|      c|
|      6|      a|
|      7|      b|
|      8|      c|
|      9|      a|
+-------+-------+
2) Call with a simple filter. Note the call to pushFilters()

scala> df.filter("jcolumn<2").show
18/08/26 07:45:42.500 DEBUG dr: pushedFilters()
18/08/26 07:45:42.500 DEBUG dr: pushedFilters()
18/08/26 07:45:42.500 DEBUG dr: pushedFilters()
18/08/26 07:45:42.500 DEBUG dr: pushedFilters()
18/08/26 07:45:42.501 DEBUG dr: pushFilters()
18/08/26 07:45:42.501 DEBUG dr:   LessThan(jcolumn,2)
18/08/26 07:45:42.501 DEBUG dr: pruneColums()
18/08/26 07:45:42.501 DEBUG dr:   StructField(jcolumn,LongType,false)
18/08/26 07:45:42.501 DEBUG dr:   StructField(ccolumn,StringType,false)
18/08/26 07:45:42.501 DEBUG dr: pushedFilters()
18/08/26 07:45:42.501 DEBUG dr: pushedFilters()
18/08/26 07:45:42.501 DEBUG dr: pushedFilters()
18/08/26 07:45:42.501 DEBUG dr: pushedFilters()
18/08/26 07:45:42.512 DEBUG dr: createBatchDataReaderFactories()
18/08/26 07:45:42.529 DEBUG dr: next()
18/08/26 07:45:42.532 DEBUG dr: get()
18/08/26 07:45:42.532 DEBUG dr: next()
+-------+-------+
|jcolumn|ccolumn|
+-------+-------+
|      0|      a|
|      1|      b|
+-------+-------+
3) Subsequent call with no filter. You'll see that I don't get a call to pushFilters() with
an empty Filter array. I am unsure what "signal" I should get in order to reset supported
filters

scala> df.show
18/08/26 07:46:21.442 DEBUG dr: pruneColums()
18/08/26 07:46:21.442 DEBUG dr:   StructField(jcolumn,LongType,false)
18/08/26 07:46:21.442 DEBUG dr:   StructField(ccolumn,StringType,false)
18/08/26 07:46:21.443 DEBUG dr: pushedFilters()
18/08/26 07:46:21.443 DEBUG dr: pushedFilters()
18/08/26 07:46:21.443 DEBUG dr: pushedFilters()
18/08/26 07:46:21.443 DEBUG dr: pushedFilters()
18/08/26 07:46:21.452 DEBUG dr: createBatchDataReaderFactories()
18/08/26 07:46:21.468 DEBUG dr: next()
18/08/26 07:46:21.470 DEBUG dr: get()
18/08/26 07:46:21.471 DEBUG dr: next()
+-------+-------+
|jcolumn|ccolumn|
+-------+-------+
|      0|      a|
|      1|      b|
+-------+-------+
Thanks

/Hugh


Mime
View raw message