spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Wenchen Fan <>
Subject Re: data source api v2 refactoring
Date Fri, 07 Sep 2018 07:26:52 GMT
Hi Ryan,

You are right that the `LogicalWrite` mirrors the read side API. I just
don't have a good naming yet, and write side changes will be a different PR.

Hi Hyukjin,

That's my expectation, otherwise we keep rebasing the refactor PR and never
get it done.

On Fri, Sep 7, 2018 at 3:02 PM Hyukjin Kwon <> wrote:

> BTW, do we hold Datasource V2 related PRs for now until we finish this
> refactoring just for clarification?
> 2018년 9월 7일 (금) 오전 12:52, Ryan Blue <>님이
>> Wenchen,
>> I'm not really sure what you're proposing here. What is a `LogicalWrite`?
>> Is it something that mirrors the read side in your PR?
>> I think that I agree that if we have a Write independent of the Table
>> that carries the commit and abort methods, then we can create it directly
>> without a WriteConfig. So I tentatively agree with what you propose,
>> assuming that I understand it correctly.
>> rb
>> On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <> wrote:
>>> I'm switching to my another Gmail account, let's see if it still gets
>>> dropped this time.
>>> Hi Ryan,
>>> I'm thinking about the write path and feel the abstraction should be the
>>> same.
>>> We still have logical and physical writing. And the table can create
>>> different logical writing based on how to write. e.g., append, delete,
>>> replaceWhere, etc.
>>> One thing I'm not sure about is the WriteConfig. With the WriteConfig,
>>> the API would look like
>>> trait Table {
>>>   WriteConfig newAppendWriteConfig();
>>>   WriteConfig newDeleteWriteConfig(deleteExprs);
>>>   LogicalWrite newLogicalWrite(writeConfig);
>>> }
>>> Without WriteConfig, the API looks like
>>> trait Table {
>>>   LogicalWrite newAppendWrite();
>>>   LogicalWrite newDeleteWrite(deleteExprs);
>>> }
>>> It looks to me that the API is simpler without WriteConfig, what do you
>>> think?
>>> Thanks,
>>> Wenchen
>>> On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <>
>>> wrote:
>>>> Latest from Wenchen in case it was dropped.
>>>> ---------- Forwarded message ---------
>>>> From: Wenchen Fan <>
>>>> Date: Mon, Sep 3, 2018 at 6:16 AM
>>>> Subject: Re: data source api v2 refactoring
>>>> To: <>
>>>> Cc: Ryan Blue <>, Reynold Xin <>,
>>>> Hi Mridul,
>>>> I'm not sure what's going on, my email was CC'ed to the dev list.
>>>> Hi Ryan,
>>>> The logical and physical scan idea sounds good. To add more color
>>>> to Jungtaek's question, both micro-batch and continuous mode have
>>>> the logical and physical scan, but there is a difference: for micro-batch
>>>> mode, a physical scan outputs data for one epoch, but it's not true for
>>>> continuous mode.
>>>> I'm not sure if it's necessary to include streaming epoch in the API
>>>> abstraction, for features like metrics reporting.
>>>> On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <>
>>>> wrote:
>>>>> Is it only me or are all others getting Wenchen’s mails ? (Obviously
>>>>> Ryan did :-) )
>>>>> I did not see it in the mail thread I received or in archives ... [1]
>>>>> Wondering which othersenderswere getting dropped (if yes).
>>>>> Regards
>>>>> Mridul
>>>>> [1]
>>>>> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <>
>>>>> wrote:
>>>>>> Thanks for clarifying, Wenchen. I think that's what I expected.
>>>>>> As for the abstraction, here's the way that I think about it: there
>>>>>> are two important parts of a scan: the definition of what will be
read, and
>>>>>> task sets that actually perform the read. In batch, there's one definition
>>>>>> of the scan and one task set so it makes sense that there's one scan
>>>>>> that encapsulates both of these concepts. For streaming, we need
>>>>>> separate the two into the definition of what will be read (the stream
>>>>>> streaming read) and the task sets that are run (scans). That way,
>>>>>> streaming read behaves like a factory for scans, producing scans
>>>>>> handle the data either in micro-batches or using continuous tasks.
>>>>>> To address Jungtaek's question, I think that this does work with
>>>>>> continuous. In continuous mode, the query operators keep running
and send
>>>>>> data to one another directly. The API still needs a streaming read
>>>>>> because it may still produce more than one continuous scan. That
>>>>>> happen when the underlying source changes and Spark needs to reconfigure.
>>>>>> think the example here is when partitioning in a Kafka topic changes
>>>>>> Spark needs to re-map Kafka partitions to continuous tasks.
>>>>>> rb
>>>>>> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <>
>>>>>> wrote:
>>>>>>> Hi Ryan,
>>>>>>> Sorry I may use a wrong wording. The pushdown is done with
>>>>>>> ScanConfig, which is not table/stream/scan, but something between
them. The
>>>>>>> table creates ScanConfigBuilder, and table creates stream/scan
>>>>>>> ScanConfig. For streaming source, stream is the one to take care
of the
>>>>>>> pushdown result. For batch source, it's the scan.
>>>>>>> It's a little tricky because stream is an abstraction for streaming
>>>>>>> source only. Better ideas are welcome!
>>>>>>> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <>
>>>>>>>> Thanks, Reynold!
>>>>>>>> I think your API sketch looks great. I appreciate having
the Table
>>>>>>>> level in the abstraction to plug into as well. I think this
makes it clear
>>>>>>>> what everything does, particularly having the Stream level
that represents
>>>>>>>> a configured (by ScanConfig) streaming read and can act as
a factory for
>>>>>>>> individual batch scans or for continuous scans.
>>>>>>>> Wenchen, I'm not sure what you mean by doing pushdown at
the table
>>>>>>>> level. It seems to mean that pushdown is specific to a batch
scan or
>>>>>>>> streaming read, which seems to be what you're saying as well.
Wouldn't the
>>>>>>>> pushdown happen to create a ScanConfig, which is then used
as Reynold
>>>>>>>> suggests? Looking forward to seeing this PR when you get
it posted. Thanks
>>>>>>>> for all of your work on this!
>>>>>>>> rb
>>>>>>>> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <>
>>>>>>>> wrote:
>>>>>>>>> Thank Reynold for writing this and starting the discussion!
>>>>>>>>> Data source v2 was started with batch only, so we didn't
pay much
>>>>>>>>> attention to the abstraction and just follow the v1 API.
Now we are
>>>>>>>>> designing the streaming API and catalog integration,
the abstraction
>>>>>>>>> becomes super important.
>>>>>>>>> I like this proposed abstraction and have successfully
>>>>>>>>> it to make sure it works.
>>>>>>>>> During prototyping, I have to work around the issue that
>>>>>>>>> current streaming engine does query optimization/planning
for each micro
>>>>>>>>> batch. With this abstraction, the operator pushdown is
only applied once
>>>>>>>>> per-query. In my prototype, I do the physical planning
up front to get the
>>>>>>>>> pushdown result, and
>>>>>>>>> add a logical linking node that wraps the resulting physical
>>>>>>>>> node for the data source, and then swap that logical
linking node into the
>>>>>>>>> logical plan for each batch. In the future we should
just let the streaming
>>>>>>>>> engine do query optimization/planning only once.
>>>>>>>>> About pushdown, I think we should do it at the table
level. The
>>>>>>>>> table should create a new pushdow handler to apply operator
pushdowm for
>>>>>>>>> each scan/stream, and create the scan/stream with the
pushdown result. The
>>>>>>>>> rationale is, a table should have the same pushdown behavior
regardless the
>>>>>>>>> scan node.
>>>>>>>>> Thanks,
>>>>>>>>> Wenchen
>>>>>>>>> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <>
>>>>>>>>> wrote:
>>>>>>>>>> I spent some time last week looking at the current
data source v2
>>>>>>>>>> apis, and I thought we should be a bit more buttoned
up in terms of the
>>>>>>>>>> abstractions and the guarantees Spark provides. In
particular, I feel we
>>>>>>>>>> need the following levels of "abstractions", to fit
the use cases in Spark,
>>>>>>>>>> from batch, to streaming.
>>>>>>>>>> Please don't focus on the naming at this stage. When
possible, I
>>>>>>>>>> draw parallels to what similar levels are named in
the currently committed
>>>>>>>>>> api:
>>>>>>>>>> 0. Format: This represents a specific format, e.g.
Parquet, ORC.
>>>>>>>>>> There is currently no explicit class at this level.
>>>>>>>>>> 1. Table: This should represent a logical dataset
(with schema).
>>>>>>>>>> This could be just a directory on the file system,
or a table in the
>>>>>>>>>> catalog. Operations on tables can include batch reads
(Scan), streams,
>>>>>>>>>> writes, and potentially other operations such as
deletes. The closest to
>>>>>>>>>> the table level abstraction in the current code base
is the "Provider"
>>>>>>>>>> class, although Provider isn't quite a Table. This
is similar to Ryan's
>>>>>>>>>> proposed design.
>>>>>>>>>> 2. Stream: Specific to streaming. A stream is created
out of a
>>>>>>>>>> Table. This logically represents a an instance of
a StreamingQuery.
>>>>>>>>>> Pushdowns and options are handled at this layer.
I.e. Spark guarnatees to
>>>>>>>>>> data source implementation pushdowns and options
don't change within a
>>>>>>>>>> Stream. Each Stream consists of a sequence of scans.
There is no
>>>>>>>>>> equivalent concept in the current committed code.
>>>>>>>>>> 3. Scan: A physical scan -- either as part of a streaming
>>>>>>>>>> or a batch query. This should contain sufficient
information and methods so
>>>>>>>>>> we can run a Spark job over a defined subset of the
table. It's
>>>>>>>>>> functionally equivalent to an RDD, except there's
no dependency on RDD so
>>>>>>>>>> it is a smaller surface. In the current code, the
equivalent class would be
>>>>>>>>>> the ScanConfig, which represents the information
needed, but in order to
>>>>>>>>>> execute a job, ReadSupport is needed (various methods
in ReadSupport takes
>>>>>>>>>> a ScanConfig).
>>>>>>>>>> To illustrate with pseudocode what the different
levels mean, a
>>>>>>>>>> batch query would look like the following:
>>>>>>>>>> val provider = reflection[Format]("parquet")
>>>>>>>>>> val table = provider.createTable(options)
>>>>>>>>>> val scan = table.createScan(scanConfig) // scanConfig
>>>>>>>>>> pushdown and options
>>>>>>>>>> // run tasks on executors
>>>>>>>>>> A streaming micro-batch scan would look like the
>>>>>>>>>> val provider = reflection[Format]("parquet")
>>>>>>>>>> val table = provider.createTable(options)
>>>>>>>>>> val stream = table.createStream(scanConfig)
>>>>>>>>>> while(true) {
>>>>>>>>>>   val scan = streamingScan.createScan(startOffset)
>>>>>>>>>>   // run tasks on executors
>>>>>>>>>> }
>>>>>>>>>> Vs the current API, the above:
>>>>>>>>>> 1. Creates an explicit Table abstraction, and an
explicit Scan
>>>>>>>>>> abstraction.
>>>>>>>>>> 2. Have an explicit Stream level and makes it clear
pushdowns and
>>>>>>>>>> options are handled there, rather than at the individual
scan (ReadSupport)
>>>>>>>>>> level. Data source implementations don't need to
worry about pushdowns or
>>>>>>>>>> options changing mid-stream. For batch, those happen
when the scan object
>>>>>>>>>> is created.
>>>>>>>>>> This email is just a high level sketch. I've asked
Wenchen to
>>>>>>>>>> prototype this, to see if it is actually feasible
and the degree of hacks
>>>>>>>>>> it removes, or creates.
>>>>>>>> --
>>>>>>>> Ryan Blue
>>>>>>>> Software Engineer
>>>>>>>> Netflix
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Software Engineer
>>>>>> Netflix
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix

View raw message