spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Anton Puzanov <ANT...@il.ibm.com>
Subject Predicate Pushdown Doesn't Work With Data Source API
Date Mon, 28 Aug 2017 12:48:46 GMT
Hi everyone,

I am trying to improve the performance of data loading from disk. For that 
I have implemented my own RDD and now I am trying to increase the 
performance with predicate pushdown.
I have used many sources including the documentations and 
https://www.slideshare.net/databricks/yin-huai-20150325meetupwithdemos.

What I hope to achieve is accepting the filters on: `public RDD<Row> 
buildScan(String[] requiredColumns, Filter[] filters) ` and use them for 
filtering the data loaded to the dataframe. I have not implemented any 
Filters and from my understanding all the basic filters should be built in 
(eq, gt etc...).

The interesting part of my code:

public class MyProvider implements RelationProvider
{
    @Override
    public BaseRelation createRelation(SQLContext sqlContext, 
scala.collection.immutable.Map<String, String> parameters) {
        System.out.println("createRelation");
        return new MylRelation(sqlContext, JavaConversions.mapAsJavaMap
(parameters));

    }
}
public class MyRelation extends BaseRelation implements TableScan, 
PrunedScan, PrunedFilteredScan {
@Override
public StructType schema() {
    return ExpressionEncoder.javaBean(EventBean.class).schema();
}

public RDD<Row> buildScan(String[] requiredColumns, Filter[] filters) {
    System.out.println(Arrays.toString(requiredColumns));
    System.out.println(Arrays.toString(filters));
}
}

of course I implemented the other 2 versions of buildScan. 
I use it in the following manner:

SQLContext sqc = sparkSession.sqlContext();
Dataset<Row> eventDataFrame = sqc.load("com.MyProvider", loadingOptions);
eventDataFrame.createOrReplaceTempView( "events" );
Dataset<Row> sqlResult = sparkSession.sql( query );
System.out.println(sqlResult.queryExecution().executedPlan().toString());

Where the query looks like this: "SELECT field1,field2,field3 from events 
WHERE field1>3"

The original Bean has 26 fields and the requiredColumns array has the 
relevant fields only which is good. The Filters array is empty regardless 
what I do.

I also tried:
SQLContext sqc = sparkSession.sqlContext();
Dataset<Row> eventDataFrame = sqc.load("com.myProvider", loadingOptions);
eventDataFrame.createOrReplaceTempView( "events" );
Dataset<Row> sqlResult = eventDataFrame.select("field1", "field2").filter(
"field1>4");

Another thing that bothers me is in the planning, I don't see the catalyst 
optimization printing, maybe it's not being optimized?
The planning looks like this:
*Filter (cast(field1#27 as int) > 4)
+- *Scan MyRelation @3a709cc7 [field1#27,field2#26] ReadSchema: 
struct<field1:smallint,field2:smallint>

Thank you for your time and support.
Anton P.
Mime
View raw message