spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Reynold Xin <>
Subject Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2
Date Wed, 30 Aug 2017 11:28:23 GMT
That might be good to do, but seems like orthogonal to this effort itself.
It would be a completely different interface.

On Wed, Aug 30, 2017 at 1:10 PM Wenchen Fan <> wrote:

> OK I agree with it, how about we add a new interface to push down the
> query plan, based on the current framework? We can mark the
> query-plan-push-down interface as unstable, to save the effort of designing
> a stable representation of query plan and maintaining forward compatibility.
> On Wed, Aug 30, 2017 at 10:53 AM, James Baker <> wrote:
>> I'll just focus on the one-by-one thing for now - it's the thing that
>> blocks me the most.
>> I think the place where we're most confused here is on the cost of
>> determining whether I can push down a filter. For me, in order to work out
>> whether I can push down a filter or satisfy a sort, I might have to read
>> plenty of data. That said, it's worth me doing this because I can use this
>> information to avoid reading >>that much data.
>> If you give me all the orderings, I will have to read that data many
>> times (we stream it to avoid keeping it in memory).
>> There's also a thing where our typical use cases have many filters (20+
>> is common). So, it's likely not going to work to pass us all the
>> combinations. That said, if I can tell you a cost, I know what optimal
>> looks like, why can't I just pick that myself?
>> The current design is friendly to simple datasources, but does not have
>> the potential to support this.
>> So the main problem we have with datasources v1 is that it's essentially
>> impossible to leverage a bunch of Spark features - I don't get to use
>> bucketing or row batches or all the nice things that I really want to use
>> to get decent performance. Provided I can leverage these in a moderately
>> supported way which won't break in any given commit, I'll be pretty happy
>> with anything that lets me opt out of the restrictions.
>> My suggestion here is that if you make a mode which works well for
>> complicated use cases, you end up being able to write simple mode in terms
>> of it very easily. So we could actually provide two APIs, one that lets
>> people who have more interesting datasources leverage the cool Spark
>> features, and one that lets people who just want to implement basic
>> features do that - I'd try to include some kind of layering here. I could
>> probably sketch out something here if that'd be useful?
>> James
>> On Tue, 29 Aug 2017 at 18:59 Wenchen Fan <> wrote:
>>> Hi James,
>>> Thanks for your feedback! I think your concerns are all valid, but we
>>> need to make a tradeoff here.
>>> > Explicitly here, what I'm looking for is a convenient mechanism to
>>> accept a fully specified set of arguments
>>> The problem with this approach is: 1) if we wanna add more arguments in
>>> the future, it's really hard to do without changing the existing interface.
>>> 2) if a user wants to implement a very simple data source, he has to look
>>> at all the arguments and understand them, which may be a burden for him.
>>> I don't have a solution to solve these 2 problems, comments are welcome.
>>> > There are loads of cases like this - you can imagine someone being
>>> able to push down a sort before a filter is applied, but not afterwards.
>>> However, maybe the filter is so selective that it's better to push down the
>>> filter and not handle the sort. I don't get to make this decision, Spark
>>> does (but doesn't have good enough information to do it properly, whilst I
>>> do). I want to be able to choose the parts I push down given knowledge of
>>> my datasource - as defined the APIs don't let me do that, they're strictly
>>> more restrictive than the V1 APIs in this way.
>>> This is true, the current framework applies push downs one by one,
>>> incrementally. If a data source wanna go back to accept a sort push down
>>> after it accepts a filter push down, it's impossible with the current data
>>> source V2.
>>> Fortunately, we have a solution for this problem. At Spark side,
>>> actually we do have a fully specified set of arguments waiting to be
>>> pushed down, but Spark doesn't know which is the best order to push them
>>> into data source. Spark can try every combination and ask the data source
>>> to report a cost, then Spark can pick the best combination with the lowest
>>> cost. This can also be implemented as a cost report interface, so that
>>> advanced data source can implement it for optimal performance, and simple
>>> data source doesn't need to care about it and keep simple.
>>> The current design is very friendly to simple data source, and has the
>>> potential to support complex data source, I prefer the current design over
>>> the plan push down one. What do you think?
>>> On Wed, Aug 30, 2017 at 5:53 AM, James Baker <>
>>> wrote:
>>>> Yeah, for sure.
>>>> With the stable representation - agree that in the general case this is
>>>> pretty intractable, it restricts the modifications that you can do in the
>>>> future too much. That said, it shouldn't be as hard if you restrict
>>>> yourself to the parts of the plan which are supported by the datasources
>>>> API (which after all, need to be translateable properly into the future to
>>>> support the mixins proposed). This should have a pretty small scope in
>>>> comparison. As long as the user can bail out of nodes they don't
>>>> understand, they should be ok, right?
>>>> That said, what would also be fine for us is a place to plug into an
>>>> unstable query plan.
>>>> Explicitly here, what I'm looking for is a convenient mechanism to
>>>> accept a fully specified set of arguments (of which I can choose to ignore
>>>> some), and return the information as to which of them I'm ignoring. Taking
>>>> a query plan of sorts is a way of doing this which IMO is intuitive to the
>>>> user. It also provides a convenient location to plug in things like stats.
>>>> Not at all married to the idea of using a query plan here; it just seemed
>>>> convenient.
>>>> Regarding the users who just want to be able to pump data into Spark,
>>>> my understanding is that replacing isolated nodes in a query plan is easy.
>>>> That said, our goal here is to be able to push down as much as possible
>>>> into the underlying datastore.
>>>> To your second question:
>>>> The issue is that if you build up pushdowns incrementally and not all
>>>> at once, you end up having to reject pushdowns and filters that you
>>>> actually can do, which unnecessarily increases overheads.
>>>> For example, the dataset
>>>> a b c
>>>> 1 2 3
>>>> 1 3 3
>>>> 1 3 4
>>>> 2 1 1
>>>> 2 0 1
>>>> can efficiently push down sort(b, c) if I have already applied the
>>>> filter a = 1, but otherwise will force a sort in Spark. On the PR I detail
>>>> a case I see where I can push down two equality filters iff I am given them
>>>> at the same time, whilst not being able to one at a time.
>>>> There are loads of cases like this - you can imagine someone being able
>>>> to push down a sort before a filter is applied, but not afterwards.
>>>> However, maybe the filter is so selective that it's better to push down the
>>>> filter and not handle the sort. I don't get to make this decision, Spark
>>>> does (but doesn't have good enough information to do it properly, whilst
>>>> do). I want to be able to choose the parts I push down given knowledge of
>>>> my datasource - as defined the APIs don't let me do that, they're strictly
>>>> more restrictive than the V1 APIs in this way.
>>>> The pattern of not considering things that can be done in bulk bites us
>>>> in other ways. The retrieval methods end up being trickier to implement
>>>> than is necessary because frequently a single operation provides the result
>>>> of many of the getters, but the state is mutable, so you end up with odd
>>>> caches.
>>>> For example, the work I need to do to answer unhandledFilters in V1 is
>>>> roughly the same as the work I need to do to buildScan, so I want to cache
>>>> it. This means that I end up with code that looks like:
>>>> public final class CachingFoo implements Foo {
>>>>     private final Foo delegate;
>>>>     private List<Filter> currentFilters = emptyList();
>>>>     private Supplier<Bar> barSupplier = newSupplier(currentFilters);
>>>>     public CachingFoo(Foo delegate) {
>>>>         this.delegate = delegate;
>>>>     }
>>>>     private Supplier<Bar> newSupplier(List<Filter> filters) {
>>>>         return Suppliers.memoize(() -> delegate.computeBar(filters));
>>>>     }
>>>>     @Override
>>>>     public Bar computeBar(List<Filter> filters) {
>>>>         if (!filters.equals(currentFilters)) {
>>>>             currentFilters = filters;
>>>>             barSupplier = newSupplier(filters);
>>>>         }
>>>>         return barSupplier.get();
>>>>     }
>>>> }
>>>> which caches the result required in unhandledFilters on the expectation
>>>> that Spark will call buildScan afterwards and get to use the result..
>>>> This kind of cache becomes more prominent, but harder to deal with in
>>>> the new APIs. As one example here, the state I will need in order to
>>>> compute accurate column stats internally will likely be a subset of the
>>>> work required in order to get the read tasks, tell you if I can handle
>>>> filters, etc, so I'll want to cache them for reuse. However, the cached
>>>> information needs to be appropriately invalidated when I add a new filter
>>>> or sort order or limit, and this makes implementing the APIs harder and
>>>> more error-prone.
>>>> One thing that'd be great is a defined contract of the order in which
>>>> Spark calls the methods on your datasource (ideally this contract could be
>>>> implied by the way the Java class structure works, but otherwise I can just
>>>> throw).
>>>> James
>>>> On Tue, 29 Aug 2017 at 02:56 Reynold Xin <> wrote:
>>>>> James,
>>>>> Thanks for the comment. I think you just pointed out a trade-off
>>>>> between expressiveness and API simplicity, compatibility and evolvability.
>>>>> For the max expressiveness, we'd want the ability to expose full query
>>>>> plans, and let the data source decide which part of the query plan can
>>>>> pushed down.
>>>>> The downside to that (full query plan push down) are:
>>>>> 1. It is extremely difficult to design a stable representation for
>>>>> logical / physical plan. It is doable, but we'd be the first to do it.
>>>>> not sure of any mainstream databases being able to do that in the past.
>>>>> design of that API itself, to make sure we have a good story for backward
>>>>> and forward compatibility, would probably take months if not years. It
>>>>> might still be good to do, or offer an experimental trait without
>>>>> compatibility guarantee that uses the current Catalyst internal logical
>>>>> plan.
>>>>> 2. Most data source developers simply want a way to offer some data,
>>>>> without any pushdown. Having to understand query plans is a burden rather
>>>>> than a gift.
>>>>> Re: your point about the proposed v2 being worse than v1 for your use
>>>>> case.
>>>>> Can you say more? You used the argument that in v2 there are more
>>>>> support for broader pushdown and as a result it is harder to implement.
>>>>> That's how it is supposed to be. If a data source simply implements one
>>>>> the trait, it'd be logically identical to v1. I don't see why it would
>>>>> worse or better, other than v2 provides much stronger forward compatibility
>>>>> guarantees than v1.
>>>>> On Tue, Aug 29, 2017 at 4:54 AM, James Baker <>
>>>>> wrote:
>>>>>> Copying from the code review comments I just submitted on the draft
>>>>>> API (
>>>>>> Context here is that I've spent some time implementing a Spark
>>>>>> datasource and have had some issues with the current API which are
>>>>>> worse in V2.
>>>>>> The general conclusion I’ve come to here is that this is very hard
>>>>>> actually implement (in a similar but more aggressive way than DataSource
>>>>>> V1, because of the extra methods and dimensions we get in V2).
>>>>>> In DataSources V1 PrunedFilteredScan, the issue is that you are
>>>>>> passed in the filters with the buildScan method, and then passed
in again
>>>>>> with the unhandledFilters method.
>>>>>> However, the filters that you can’t handle might be data dependent,
>>>>>> which the current API does not handle well. Suppose I can handle
filter A
>>>>>> some of the time, and filter B some of the time. If I’m passed
in both,
>>>>>> then either A and B are unhandled, or A, or B, or neither. The work
I have
>>>>>> to do to work this out is essentially the same as I have to do while
>>>>>> actually generating my RDD (essentially I have to generate my partitions),
>>>>>> so I end up doing some weird caching work.
>>>>>> This V2 API proposal has the same issues, but perhaps moreso. In
>>>>>> PrunedFilteredScan, there is essentially one degree of freedom for
>>>>>> (filters), so you just have to implement caching between unhandledFilters
>>>>>> and buildScan. However, here we have many degrees of freedom; sorts,
>>>>>> individual filters, clustering, sampling, maybe aggregations eventually
>>>>>> and these operations are not all commutative, and computing my support
>>>>>> one-by-one can easily end up being more expensive than computing
all in one
>>>>>> go.
>>>>>> For some trivial examples:
>>>>>> - After filtering, I might be sorted, whilst before filtering I might
>>>>>> not be.
>>>>>> - Filtering with certain filters might affect my ability to push
>>>>>> others.
>>>>>> - Filtering with aggregations (as mooted) might not be possible to
>>>>>> push down.
>>>>>> And with the API as currently mooted, I need to be able to go back
>>>>>> and change my results because they might change later.
>>>>>> Really what would be good here is to pass all of the filters and
>>>>>> sorts etc all at once, and then I return the parts I can’t handle.
>>>>>> I’d prefer in general that this be implemented by passing some
>>>>>> of query plan to the datasource which enables this kind of replacement.
>>>>>> Explicitly don’t want to give the whole query plan - that sounds
painful -
>>>>>> would prefer we push down only the parts of the query plan we deem
to be
>>>>>> stable. With the mix-in approach, I don’t think we can guarantee
>>>>>> properties we want without a two-phase thing - I’d really love
to be able
>>>>>> to just define a straightforward union type which is our supported
>>>>>> stuff, and then the user can transform and return it.
>>>>>> I think this ends up being a more elegant API for consumers, and
>>>>>> far more intuitive.
>>>>>> James
>>>>>> On Mon, 28 Aug 2017 at 18:00 蒋星博 <>
>>>>>>> +1 (Non-binding)
>>>>>>> Xiao Li <>于2017年8月28日 周一下午5:38写道:
>>>>>>>> +1
>>>>>>>> 2017-08-28 12:45 GMT-07:00 Cody Koeninger <>:
>>>>>>>>> Just wanted to point out that because the jira isn't
labeled SPIP,
>>>>>>>>> it
>>>>>>>>> won't have shown up linked from
>>>>>>>>> On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <>
>>>>>>>>> wrote:
>>>>>>>>> > Hi all,
>>>>>>>>> >
>>>>>>>>> > It has been almost 2 weeks since I proposed the
data source V2
>>>>>>>>> for
>>>>>>>>> > discussion, and we already got some feedbacks on
the JIRA ticket
>>>>>>>>> and the
>>>>>>>>> > prototype PR, so I'd like to call for a vote.
>>>>>>>>> >
>>>>>>>>> > The full document of the Data Source API V2 is:
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > Note that, this vote should focus on high-level
>>>>>>>>> design/framework, not
>>>>>>>>> > specified APIs, as we can always change/improve
specified APIs
>>>>>>>>> during
>>>>>>>>> > development.
>>>>>>>>> >
>>>>>>>>> > The vote will be up for the next 72 hours. Please
reply with
>>>>>>>>> your vote:
>>>>>>>>> >
>>>>>>>>> > +1: Yeah, let's go forward and implement the SPIP.
>>>>>>>>> > +0: Don't really care.
>>>>>>>>> > -1: I don't think this is a good idea because of
the following
>>>>>>>>> technical
>>>>>>>>> > reasons.
>>>>>>>>> >
>>>>>>>>> > Thanks!
>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>> To unsubscribe e-mail:

View raw message