spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Anton Puzanov <>
Subject Predicate Pushdown Doesn't Work With Data Source API
Date Mon, 28 Aug 2017 12:48:46 GMT
Hi everyone,

I am trying to improve the performance of data loading from disk. For that 
I have implemented my own RDD and now I am trying to increase the 
performance with predicate pushdown.
I have used many sources including the documentations and

What I hope to achieve is accepting the filters on: `public RDD<Row> 
buildScan(String[] requiredColumns, Filter[] filters) ` and use them for 
filtering the data loaded to the dataframe. I have not implemented any 
Filters and from my understanding all the basic filters should be built in 
(eq, gt etc...).

The interesting part of my code:

public class MyProvider implements RelationProvider
    public BaseRelation createRelation(SQLContext sqlContext, 
scala.collection.immutable.Map<String, String> parameters) {
        return new MylRelation(sqlContext, JavaConversions.mapAsJavaMap

public class MyRelation extends BaseRelation implements TableScan, 
PrunedScan, PrunedFilteredScan {
public StructType schema() {
    return ExpressionEncoder.javaBean(EventBean.class).schema();

public RDD<Row> buildScan(String[] requiredColumns, Filter[] filters) {

of course I implemented the other 2 versions of buildScan. 
I use it in the following manner:

SQLContext sqc = sparkSession.sqlContext();
Dataset<Row> eventDataFrame = sqc.load("com.MyProvider", loadingOptions);
eventDataFrame.createOrReplaceTempView( "events" );
Dataset<Row> sqlResult = sparkSession.sql( query );

Where the query looks like this: "SELECT field1,field2,field3 from events 
WHERE field1>3"

The original Bean has 26 fields and the requiredColumns array has the 
relevant fields only which is good. The Filters array is empty regardless 
what I do.

I also tried:
SQLContext sqc = sparkSession.sqlContext();
Dataset<Row> eventDataFrame = sqc.load("com.myProvider", loadingOptions);
eventDataFrame.createOrReplaceTempView( "events" );
Dataset<Row> sqlResult ="field1", "field2").filter(

Another thing that bothers me is in the planning, I don't see the catalyst 
optimization printing, maybe it's not being optimized?
The planning looks like this:
*Filter (cast(field1#27 as int) > 4)
+- *Scan MyRelation @3a709cc7 [field1#27,field2#26] ReadSchema: 

Thank you for your time and support.
Anton P.
View raw message