spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Baker <>
Subject Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2
Date Tue, 29 Aug 2017 21:48:20 GMT
Yeah, for sure.

With the stable representation - agree that in the general case this is pretty intractable,
it restricts the modifications that you can do in the future too much. That said, it shouldn't
be as hard if you restrict yourself to the parts of the plan which are supported by the datasources
V2 API (which after all, need to be translateable properly into the future to support the
mixins proposed). This should have a pretty small scope in comparison. As long as the user
can bail out of nodes they don't understand, they should be ok, right?

That said, what would also be fine for us is a place to plug into an unstable query plan.

Explicitly here, what I'm looking for is a convenient mechanism to accept a fully specified
set of arguments (of which I can choose to ignore some), and return the information as to
which of them I'm ignoring. Taking a query plan of sorts is a way of doing this which IMO
is intuitive to the user. It also provides a convenient location to plug in things like stats.
Not at all married to the idea of using a query plan here; it just seemed convenient.

Regarding the users who just want to be able to pump data into Spark, my understanding is
that replacing isolated nodes in a query plan is easy. That said, our goal here is to be able
to push down as much as possible into the underlying datastore.

To your second question:

The issue is that if you build up pushdowns incrementally and not all at once, you end up
having to reject pushdowns and filters that you actually can do, which unnecessarily increases

For example, the dataset

a b c
1 2 3
1 3 3
1 3 4
2 1 1
2 0 1

can efficiently push down sort(b, c) if I have already applied the filter a = 1, but otherwise
will force a sort in Spark. On the PR I detail a case I see where I can push down two equality
filters iff I am given them at the same time, whilst not being able to one at a time.

There are loads of cases like this - you can imagine someone being able to push down a sort
before a filter is applied, but not afterwards. However, maybe the filter is so selective
that it's better to push down the filter and not handle the sort. I don't get to make this
decision, Spark does (but doesn't have good enough information to do it properly, whilst I
do). I want to be able to choose the parts I push down given knowledge of my datasource -
as defined the APIs don't let me do that, they're strictly more restrictive than the V1 APIs
in this way.

The pattern of not considering things that can be done in bulk bites us in other ways. The
retrieval methods end up being trickier to implement than is necessary because frequently
a single operation provides the result of many of the getters, but the state is mutable, so
you end up with odd caches.

For example, the work I need to do to answer unhandledFilters in V1 is roughly the same as
the work I need to do to buildScan, so I want to cache it. This means that I end up with code
that looks like:

public final class CachingFoo implements Foo {
    private final Foo delegate;

    private List<Filter> currentFilters = emptyList();
    private Supplier<Bar> barSupplier = newSupplier(currentFilters);

    public CachingFoo(Foo delegate) {
        this.delegate = delegate;

    private Supplier<Bar> newSupplier(List<Filter> filters) {
        return Suppliers.memoize(() -> delegate.computeBar(filters));

    public Bar computeBar(List<Filter> filters) {
        if (!filters.equals(currentFilters)) {
            currentFilters = filters;
            barSupplier = newSupplier(filters);

        return barSupplier.get();

which caches the result required in unhandledFilters on the expectation that Spark will call
buildScan afterwards and get to use the result..

This kind of cache becomes more prominent, but harder to deal with in the new APIs. As one
example here, the state I will need in order to compute accurate column stats internally will
likely be a subset of the work required in order to get the read tasks, tell you if I can
handle filters, etc, so I'll want to cache them for reuse. However, the cached information
needs to be appropriately invalidated when I add a new filter or sort order or limit, and
this makes implementing the APIs harder and more error-prone.

One thing that'd be great is a defined contract of the order in which Spark calls the methods
on your datasource (ideally this contract could be implied by the way the Java class structure
works, but otherwise I can just throw).


On Tue, 29 Aug 2017 at 02:56 Reynold Xin <<>>

Thanks for the comment. I think you just pointed out a trade-off between expressiveness and
API simplicity, compatibility and evolvability. For the max expressiveness, we'd want the
ability to expose full query plans, and let the data source decide which part of the query
plan can be pushed down.

The downside to that (full query plan push down) are:

1. It is extremely difficult to design a stable representation for logical / physical plan.
It is doable, but we'd be the first to do it. I'm not sure of any mainstream databases being
able to do that in the past. The design of that API itself, to make sure we have a good story
for backward and forward compatibility, would probably take months if not years. It might
still be good to do, or offer an experimental trait without compatibility guarantee that uses
the current Catalyst internal logical plan.

2. Most data source developers simply want a way to offer some data, without any pushdown.
Having to understand query plans is a burden rather than a gift.

Re: your point about the proposed v2 being worse than v1 for your use case.

Can you say more? You used the argument that in v2 there are more support for broader pushdown
and as a result it is harder to implement. That's how it is supposed to be. If a data source
simply implements one of the trait, it'd be logically identical to v1. I don't see why it
would be worse or better, other than v2 provides much stronger forward compatibility guarantees
than v1.

On Tue, Aug 29, 2017 at 4:54 AM, James Baker <<>>
Copying from the code review comments I just submitted on the draft API (

Context here is that I've spent some time implementing a Spark datasource and have had some
issues with the current API which are made worse in V2.

The general conclusion I’ve come to here is that this is very hard to actually implement
(in a similar but more aggressive way than DataSource V1, because of the extra methods and
dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the filters with
the buildScan method, and then passed in again with the unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the current API
does not handle well. Suppose I can handle filter A some of the time, and filter B some of
the time. If I’m passed in both, then either A and B are unhandled, or A, or B, or neither.
The work I have to do to work this out is essentially the same as I have to do while actually
generating my RDD (essentially I have to generate my partitions), so I end up doing some weird
caching work.

This V2 API proposal has the same issues, but perhaps moreso. In PrunedFilteredScan, there
is essentially one degree of freedom for pruning (filters), so you just have to implement
caching between unhandledFilters and buildScan. However, here we have many degrees of freedom;
sorts, individual filters, clustering, sampling, maybe aggregations eventually - and these
operations are not all commutative, and computing my support one-by-one can easily end up
being more expensive than computing all in one go.

For some trivial examples:

- After filtering, I might be sorted, whilst before filtering I might not be.

- Filtering with certain filters might affect my ability to push down others.

- Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change my results because
they might change later.

Really what would be good here is to pass all of the filters and sorts etc all at once, and
then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query plan to the
datasource which enables this kind of replacement. Explicitly don’t want to give the whole
query plan - that sounds painful - would prefer we push down only the parts of the query plan
we deem to be stable. With the mix-in approach, I don’t think we can guarantee the properties
we want without a two-phase thing - I’d really love to be able to just define a straightforward
union type which is our supported pushdown stuff, and then the user can transform and return

I think this ends up being a more elegant API for consumers, and also far more intuitive.


On Mon, 28 Aug 2017 at 18:00 蒋星博 <<>>
+1 (Non-binding)

Xiao Li <<>>于2017年8月28日

2017-08-28 12:45 GMT-07:00 Cody Koeninger <<>>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <<>>
> Hi all,
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
> The full document of the Data Source API V2 is:
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
> The vote will be up for the next 72 hours. Please reply with your vote:
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
> Thanks!

To unsubscribe e-mail:<>

View raw message