spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryan Blue <>
Subject Re: data source api v2 refactoring
Date Fri, 07 Sep 2018 19:19:57 GMT
There are a few v2-related changes that we can work in parallel, at least
for reviews:

* SPARK-25006, #21978 <>: Add
catalog to TableIdentifier - this proposes how to incrementally add
multi-catalog support without breaking existing code paths
* SPARK-24253, #21308 <>: Add
DeleteSupport API - this is a small API addition, which doesn't affect the
* SPARK-24252, #21306 <>: Add v2
Catalog API - this is a different way to create v2 tables, also doesn't
affect the refactor

I agree that the PR for adding SQL support should probably wait on the
refactor. I have also been meaning to share our implementation, which isn't
based on the refactor. It handles CTAS, RTAS, InsertInto, DeleteFrom, and
AlterTable from both SQL and the other methods in the DF API, saveAsTable
and insertInto. It follows the structure that I proposed on the SQL support
PR to convert SQL plans to v2 plans and uses the new TableCatalog to
implement CTAS and RTAS.


On Fri, Sep 7, 2018 at 12:27 AM Wenchen Fan <> wrote:

> Hi Ryan,
> You are right that the `LogicalWrite` mirrors the read side API. I just
> don't have a good naming yet, and write side changes will be a different PR.
> Hi Hyukjin,
> That's my expectation, otherwise we keep rebasing the refactor PR and
> never get it done.
> On Fri, Sep 7, 2018 at 3:02 PM Hyukjin Kwon <> wrote:
>> BTW, do we hold Datasource V2 related PRs for now until we finish this
>> refactoring just for clarification?
>> 2018년 9월 7일 (금) 오전 12:52, Ryan Blue <>님이
>>> Wenchen,
>>> I'm not really sure what you're proposing here. What is a
>>> `LogicalWrite`? Is it something that mirrors the read side in your PR?
>>> I think that I agree that if we have a Write independent of the Table
>>> that carries the commit and abort methods, then we can create it directly
>>> without a WriteConfig. So I tentatively agree with what you propose,
>>> assuming that I understand it correctly.
>>> rb
>>> On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <> wrote:
>>>> I'm switching to my another Gmail account, let's see if it still gets
>>>> dropped this time.
>>>> Hi Ryan,
>>>> I'm thinking about the write path and feel the abstraction should be
>>>> the same.
>>>> We still have logical and physical writing. And the table can create
>>>> different logical writing based on how to write. e.g., append, delete,
>>>> replaceWhere, etc.
>>>> One thing I'm not sure about is the WriteConfig. With the WriteConfig,
>>>> the API would look like
>>>> trait Table {
>>>>   WriteConfig newAppendWriteConfig();
>>>>   WriteConfig newDeleteWriteConfig(deleteExprs);
>>>>   LogicalWrite newLogicalWrite(writeConfig);
>>>> }
>>>> Without WriteConfig, the API looks like
>>>> trait Table {
>>>>   LogicalWrite newAppendWrite();
>>>>   LogicalWrite newDeleteWrite(deleteExprs);
>>>> }
>>>> It looks to me that the API is simpler without WriteConfig, what do you
>>>> think?
>>>> Thanks,
>>>> Wenchen
>>>> On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <>
>>>> wrote:
>>>>> Latest from Wenchen in case it was dropped.
>>>>> ---------- Forwarded message ---------
>>>>> From: Wenchen Fan <>
>>>>> Date: Mon, Sep 3, 2018 at 6:16 AM
>>>>> Subject: Re: data source api v2 refactoring
>>>>> To: <>
>>>>> Cc: Ryan Blue <>, Reynold Xin <>,
>>>>> <>
>>>>> Hi Mridul,
>>>>> I'm not sure what's going on, my email was CC'ed to the dev list.
>>>>> Hi Ryan,
>>>>> The logical and physical scan idea sounds good. To add more color
>>>>> to Jungtaek's question, both micro-batch and continuous mode have
>>>>> the logical and physical scan, but there is a difference: for micro-batch
>>>>> mode, a physical scan outputs data for one epoch, but it's not true for
>>>>> continuous mode.
>>>>> I'm not sure if it's necessary to include streaming epoch in the API
>>>>> abstraction, for features like metrics reporting.
>>>>> On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <>
>>>>> wrote:
>>>>>> Is it only me or are all others getting Wenchen’s mails ? (Obviously
>>>>>> Ryan did :-) )
>>>>>> I did not see it in the mail thread I received or in archives ...
>>>>>> Wondering which othersenderswere getting dropped (if yes).
>>>>>> Regards
>>>>>> Mridul
>>>>>> [1]
>>>>>> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <>
>>>>>> wrote:
>>>>>>> Thanks for clarifying, Wenchen. I think that's what I expected.
>>>>>>> As for the abstraction, here's the way that I think about it:
>>>>>>> are two important parts of a scan: the definition of what will
be read, and
>>>>>>> task sets that actually perform the read. In batch, there's one
>>>>>>> of the scan and one task set so it makes sense that there's one
scan object
>>>>>>> that encapsulates both of these concepts. For streaming, we need
>>>>>>> separate the two into the definition of what will be read (the
stream or
>>>>>>> streaming read) and the task sets that are run (scans). That
way, the
>>>>>>> streaming read behaves like a factory for scans, producing scans
>>>>>>> handle the data either in micro-batches or using continuous tasks.
>>>>>>> To address Jungtaek's question, I think that this does work with
>>>>>>> continuous. In continuous mode, the query operators keep running
and send
>>>>>>> data to one another directly. The API still needs a streaming
read layer
>>>>>>> because it may still produce more than one continuous scan. That
>>>>>>> happen when the underlying source changes and Spark needs to
reconfigure. I
>>>>>>> think the example here is when partitioning in a Kafka topic
changes and
>>>>>>> Spark needs to re-map Kafka partitions to continuous tasks.
>>>>>>> rb
>>>>>>> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <>
>>>>>>> wrote:
>>>>>>>> Hi Ryan,
>>>>>>>> Sorry I may use a wrong wording. The pushdown is done with
>>>>>>>> ScanConfig, which is not table/stream/scan, but something
between them. The
>>>>>>>> table creates ScanConfigBuilder, and table creates stream/scan
>>>>>>>> ScanConfig. For streaming source, stream is the one to take
care of the
>>>>>>>> pushdown result. For batch source, it's the scan.
>>>>>>>> It's a little tricky because stream is an abstraction for
>>>>>>>> source only. Better ideas are welcome!
>>>>>>>> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <>
>>>>>>>>> Thanks, Reynold!
>>>>>>>>> I think your API sketch looks great. I appreciate having
the Table
>>>>>>>>> level in the abstraction to plug into as well. I think
this makes it clear
>>>>>>>>> what everything does, particularly having the Stream
level that represents
>>>>>>>>> a configured (by ScanConfig) streaming read and can act
as a factory for
>>>>>>>>> individual batch scans or for continuous scans.
>>>>>>>>> Wenchen, I'm not sure what you mean by doing pushdown
at the table
>>>>>>>>> level. It seems to mean that pushdown is specific to
a batch scan or
>>>>>>>>> streaming read, which seems to be what you're saying
as well. Wouldn't the
>>>>>>>>> pushdown happen to create a ScanConfig, which is then
used as Reynold
>>>>>>>>> suggests? Looking forward to seeing this PR when you
get it posted. Thanks
>>>>>>>>> for all of your work on this!
>>>>>>>>> rb
>>>>>>>>> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <
>>>>>>>>>> wrote:
>>>>>>>>>> Thank Reynold for writing this and starting the discussion!
>>>>>>>>>> Data source v2 was started with batch only, so we
didn't pay much
>>>>>>>>>> attention to the abstraction and just follow the
v1 API. Now we are
>>>>>>>>>> designing the streaming API and catalog integration,
the abstraction
>>>>>>>>>> becomes super important.
>>>>>>>>>> I like this proposed abstraction and have successfully
>>>>>>>>>> it to make sure it works.
>>>>>>>>>> During prototyping, I have to work around the issue
that the
>>>>>>>>>> current streaming engine does query optimization/planning
for each micro
>>>>>>>>>> batch. With this abstraction, the operator pushdown
is only applied once
>>>>>>>>>> per-query. In my prototype, I do the physical planning
up front to get the
>>>>>>>>>> pushdown result, and
>>>>>>>>>> add a logical linking node that wraps the resulting
physical plan
>>>>>>>>>> node for the data source, and then swap that logical
linking node into the
>>>>>>>>>> logical plan for each batch. In the future we should
just let the streaming
>>>>>>>>>> engine do query optimization/planning only once.
>>>>>>>>>> About pushdown, I think we should do it at the table
level. The
>>>>>>>>>> table should create a new pushdow handler to apply
operator pushdowm for
>>>>>>>>>> each scan/stream, and create the scan/stream with
the pushdown result. The
>>>>>>>>>> rationale is, a table should have the same pushdown
behavior regardless the
>>>>>>>>>> scan node.
>>>>>>>>>> Thanks,
>>>>>>>>>> Wenchen
>>>>>>>>>> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <>
>>>>>>>>>> wrote:
>>>>>>>>>>> I spent some time last week looking at the current
data source
>>>>>>>>>>> v2 apis, and I thought we should be a bit more
buttoned up in terms of the
>>>>>>>>>>> abstractions and the guarantees Spark provides.
In particular, I feel we
>>>>>>>>>>> need the following levels of "abstractions",
to fit the use cases in Spark,
>>>>>>>>>>> from batch, to streaming.
>>>>>>>>>>> Please don't focus on the naming at this stage.
When possible, I
>>>>>>>>>>> draw parallels to what similar levels are named
in the currently committed
>>>>>>>>>>> api:
>>>>>>>>>>> 0. Format: This represents a specific format,
e.g. Parquet, ORC.
>>>>>>>>>>> There is currently no explicit class at this
>>>>>>>>>>> 1. Table: This should represent a logical dataset
(with schema).
>>>>>>>>>>> This could be just a directory on the file system,
or a table in the
>>>>>>>>>>> catalog. Operations on tables can include batch
reads (Scan), streams,
>>>>>>>>>>> writes, and potentially other operations such
as deletes. The closest to
>>>>>>>>>>> the table level abstraction in the current code
base is the "Provider"
>>>>>>>>>>> class, although Provider isn't quite a Table.
This is similar to Ryan's
>>>>>>>>>>> proposed design.
>>>>>>>>>>> 2. Stream: Specific to streaming. A stream is
created out of a
>>>>>>>>>>> Table. This logically represents a an instance
of a StreamingQuery.
>>>>>>>>>>> Pushdowns and options are handled at this layer.
I.e. Spark guarnatees to
>>>>>>>>>>> data source implementation pushdowns and options
don't change within a
>>>>>>>>>>> Stream. Each Stream consists of a sequence of
scans. There is
>>>>>>>>>>> no equivalent concept in the current committed
>>>>>>>>>>> 3. Scan: A physical scan -- either as part of
a streaming query,
>>>>>>>>>>> or a batch query. This should contain sufficient
information and methods so
>>>>>>>>>>> we can run a Spark job over a defined subset
of the table. It's
>>>>>>>>>>> functionally equivalent to an RDD, except there's
no dependency on RDD so
>>>>>>>>>>> it is a smaller surface. In the current code,
the equivalent class would be
>>>>>>>>>>> the ScanConfig, which represents the information
needed, but in order to
>>>>>>>>>>> execute a job, ReadSupport is needed (various
methods in ReadSupport takes
>>>>>>>>>>> a ScanConfig).
>>>>>>>>>>> To illustrate with pseudocode what the different
levels mean, a
>>>>>>>>>>> batch query would look like the following:
>>>>>>>>>>> val provider = reflection[Format]("parquet")
>>>>>>>>>>> val table = provider.createTable(options)
>>>>>>>>>>> val scan = table.createScan(scanConfig) // scanConfig
>>>>>>>>>>> pushdown and options
>>>>>>>>>>> // run tasks on executors
>>>>>>>>>>> A streaming micro-batch scan would look like
the following:
>>>>>>>>>>> val provider = reflection[Format]("parquet")
>>>>>>>>>>> val table = provider.createTable(options)
>>>>>>>>>>> val stream = table.createStream(scanConfig)
>>>>>>>>>>> while(true) {
>>>>>>>>>>>   val scan = streamingScan.createScan(startOffset)
>>>>>>>>>>>   // run tasks on executors
>>>>>>>>>>> }
>>>>>>>>>>> Vs the current API, the above:
>>>>>>>>>>> 1. Creates an explicit Table abstraction, and
an explicit Scan
>>>>>>>>>>> abstraction.
>>>>>>>>>>> 2. Have an explicit Stream level and makes it
clear pushdowns
>>>>>>>>>>> and options are handled there, rather than at
the individual scan
>>>>>>>>>>> (ReadSupport) level. Data source implementations
don't need to worry about
>>>>>>>>>>> pushdowns or options changing mid-stream. For
batch, those happen when the
>>>>>>>>>>> scan object is created.
>>>>>>>>>>> This email is just a high level sketch. I've
asked Wenchen to
>>>>>>>>>>> prototype this, to see if it is actually feasible
and the degree of hacks
>>>>>>>>>>> it removes, or creates.
>>>>>>>>> --
>>>>>>>>> Ryan Blue
>>>>>>>>> Software Engineer
>>>>>>>>> Netflix
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix

Ryan Blue
Software Engineer

View raw message