spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hyukjin Kwon <gurwls...@gmail.com>
Subject Re: data source api v2 refactoring
Date Fri, 07 Sep 2018 07:02:07 GMT
BTW, do we hold Datasource V2 related PRs for now until we finish this
refactoring just for clarification?

2018년 9월 7일 (금) 오전 12:52, Ryan Blue <rblue@netflix.com.invalid>님이 작성:

> Wenchen,
>
> I'm not really sure what you're proposing here. What is a `LogicalWrite`?
> Is it something that mirrors the read side in your PR?
>
> I think that I agree that if we have a Write independent of the Table that
> carries the commit and abort methods, then we can create it directly
> without a WriteConfig. So I tentatively agree with what you propose,
> assuming that I understand it correctly.
>
> rb
>
> On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <cloud0fan@gmail.com> wrote:
>
>> I'm switching to my another Gmail account, let's see if it still gets
>> dropped this time.
>>
>> Hi Ryan,
>>
>> I'm thinking about the write path and feel the abstraction should be the
>> same.
>>
>> We still have logical and physical writing. And the table can create
>> different logical writing based on how to write. e.g., append, delete,
>> replaceWhere, etc.
>>
>> One thing I'm not sure about is the WriteConfig. With the WriteConfig,
>> the API would look like
>> trait Table {
>>   WriteConfig newAppendWriteConfig();
>>
>>   WriteConfig newDeleteWriteConfig(deleteExprs);
>>
>>   LogicalWrite newLogicalWrite(writeConfig);
>> }
>>
>> Without WriteConfig, the API looks like
>> trait Table {
>>   LogicalWrite newAppendWrite();
>>
>>   LogicalWrite newDeleteWrite(deleteExprs);
>> }
>>
>>
>> It looks to me that the API is simpler without WriteConfig, what do you
>> think?
>>
>> Thanks,
>> Wenchen
>>
>> On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <rblue@netflix.com.invalid>
>> wrote:
>>
>>> Latest from Wenchen in case it was dropped.
>>>
>>> ---------- Forwarded message ---------
>>> From: Wenchen Fan <wenchen@databricks.com>
>>> Date: Mon, Sep 3, 2018 at 6:16 AM
>>> Subject: Re: data source api v2 refactoring
>>> To: <mridul@gmail.com>
>>> Cc: Ryan Blue <rblue@netflix.com>, Reynold Xin <rxin@databricks.com>,
<
>>> dev@spark.apache.org>
>>>
>>>
>>> Hi Mridul,
>>>
>>> I'm not sure what's going on, my email was CC'ed to the dev list.
>>>
>>>
>>> Hi Ryan,
>>>
>>> The logical and physical scan idea sounds good. To add more color
>>> to Jungtaek's question, both micro-batch and continuous mode have
>>> the logical and physical scan, but there is a difference: for micro-batch
>>> mode, a physical scan outputs data for one epoch, but it's not true for
>>> continuous mode.
>>>
>>> I'm not sure if it's necessary to include streaming epoch in the API
>>> abstraction, for features like metrics reporting.
>>>
>>> On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <mridul@gmail.com>
>>> wrote:
>>>
>>>>
>>>> Is it only me or are all others getting Wenchen’s mails ? (Obviously
>>>> Ryan did :-) )
>>>> I did not see it in the mail thread I received or in archives ... [1]
>>>> Wondering which othersenderswere getting dropped (if yes).
>>>>
>>>> Regards
>>>> Mridul
>>>>
>>>> [1]
>>>> http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html
>>>>
>>>>
>>>> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <rblue@netflix.com.invalid>
>>>> wrote:
>>>>
>>>>> Thanks for clarifying, Wenchen. I think that's what I expected.
>>>>>
>>>>> As for the abstraction, here's the way that I think about it: there
>>>>> are two important parts of a scan: the definition of what will be read,
and
>>>>> task sets that actually perform the read. In batch, there's one definition
>>>>> of the scan and one task set so it makes sense that there's one scan
object
>>>>> that encapsulates both of these concepts. For streaming, we need to
>>>>> separate the two into the definition of what will be read (the stream
or
>>>>> streaming read) and the task sets that are run (scans). That way, the
>>>>> streaming read behaves like a factory for scans, producing scans that
>>>>> handle the data either in micro-batches or using continuous tasks.
>>>>>
>>>>> To address Jungtaek's question, I think that this does work with
>>>>> continuous. In continuous mode, the query operators keep running and
send
>>>>> data to one another directly. The API still needs a streaming read layer
>>>>> because it may still produce more than one continuous scan. That would
>>>>> happen when the underlying source changes and Spark needs to reconfigure.
I
>>>>> think the example here is when partitioning in a Kafka topic changes
and
>>>>> Spark needs to re-map Kafka partitions to continuous tasks.
>>>>>
>>>>> rb
>>>>>
>>>>> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <wenchen@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Ryan,
>>>>>>
>>>>>> Sorry I may use a wrong wording. The pushdown is done with
>>>>>> ScanConfig, which is not table/stream/scan, but something between
them. The
>>>>>> table creates ScanConfigBuilder, and table creates stream/scan with
>>>>>> ScanConfig. For streaming source, stream is the one to take care
of the
>>>>>> pushdown result. For batch source, it's the scan.
>>>>>>
>>>>>> It's a little tricky because stream is an abstraction for streaming
>>>>>> source only. Better ideas are welcome!
>>>>>>
>>>>>
>>>>>> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <rblue@netflix.com>
wrote:
>>>>>>
>>>>>>> Thanks, Reynold!
>>>>>>>
>>>>>>> I think your API sketch looks great. I appreciate having the
Table
>>>>>>> level in the abstraction to plug into as well. I think this makes
it clear
>>>>>>> what everything does, particularly having the Stream level that
represents
>>>>>>> a configured (by ScanConfig) streaming read and can act as a
factory for
>>>>>>> individual batch scans or for continuous scans.
>>>>>>>
>>>>>>> Wenchen, I'm not sure what you mean by doing pushdown at the
table
>>>>>>> level. It seems to mean that pushdown is specific to a batch
scan or
>>>>>>> streaming read, which seems to be what you're saying as well.
Wouldn't the
>>>>>>> pushdown happen to create a ScanConfig, which is then used as
Reynold
>>>>>>> suggests? Looking forward to seeing this PR when you get it posted.
Thanks
>>>>>>> for all of your work on this!
>>>>>>>
>>>>>>> rb
>>>>>>>
>>>>>>> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <wenchen@databricks.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thank Reynold for writing this and starting the discussion!
>>>>>>>>
>>>>>>>> Data source v2 was started with batch only, so we didn't
pay much
>>>>>>>> attention to the abstraction and just follow the v1 API.
Now we are
>>>>>>>> designing the streaming API and catalog integration, the
abstraction
>>>>>>>> becomes super important.
>>>>>>>>
>>>>>>>> I like this proposed abstraction and have successfully prototyped
>>>>>>>> it to make sure it works.
>>>>>>>>
>>>>>>>> During prototyping, I have to work around the issue that
the
>>>>>>>> current streaming engine does query optimization/planning
for each micro
>>>>>>>> batch. With this abstraction, the operator pushdown is only
applied once
>>>>>>>> per-query. In my prototype, I do the physical planning up
front to get the
>>>>>>>> pushdown result, and
>>>>>>>> add a logical linking node that wraps the resulting physical
plan
>>>>>>>> node for the data source, and then swap that logical linking
node into the
>>>>>>>> logical plan for each batch. In the future we should just
let the streaming
>>>>>>>> engine do query optimization/planning only once.
>>>>>>>>
>>>>>>>> About pushdown, I think we should do it at the table level.
The
>>>>>>>> table should create a new pushdow handler to apply operator
pushdowm for
>>>>>>>> each scan/stream, and create the scan/stream with the pushdown
result. The
>>>>>>>> rationale is, a table should have the same pushdown behavior
regardless the
>>>>>>>> scan node.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Wenchen
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <rxin@databricks.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I spent some time last week looking at the current data
source v2
>>>>>>>>> apis, and I thought we should be a bit more buttoned
up in terms of the
>>>>>>>>> abstractions and the guarantees Spark provides. In particular,
I feel we
>>>>>>>>> need the following levels of "abstractions", to fit the
use cases in Spark,
>>>>>>>>> from batch, to streaming.
>>>>>>>>>
>>>>>>>>> Please don't focus on the naming at this stage. When
possible, I
>>>>>>>>> draw parallels to what similar levels are named in the
currently committed
>>>>>>>>> api:
>>>>>>>>>
>>>>>>>>> 0. Format: This represents a specific format, e.g. Parquet,
ORC.
>>>>>>>>> There is currently no explicit class at this level.
>>>>>>>>>
>>>>>>>>> 1. Table: This should represent a logical dataset (with
schema).
>>>>>>>>> This could be just a directory on the file system, or
a table in the
>>>>>>>>> catalog. Operations on tables can include batch reads
(Scan), streams,
>>>>>>>>> writes, and potentially other operations such as deletes.
The closest to
>>>>>>>>> the table level abstraction in the current code base
is the "Provider"
>>>>>>>>> class, although Provider isn't quite a Table. This is
similar to Ryan's
>>>>>>>>> proposed design.
>>>>>>>>>
>>>>>>>>> 2. Stream: Specific to streaming. A stream is created
out of a
>>>>>>>>> Table. This logically represents a an instance of a StreamingQuery.
>>>>>>>>> Pushdowns and options are handled at this layer. I.e.
Spark guarnatees to
>>>>>>>>> data source implementation pushdowns and options don't
change within a
>>>>>>>>> Stream. Each Stream consists of a sequence of scans.
There is no
>>>>>>>>> equivalent concept in the current committed code.
>>>>>>>>>
>>>>>>>>> 3. Scan: A physical scan -- either as part of a streaming
query,
>>>>>>>>> or a batch query. This should contain sufficient information
and methods so
>>>>>>>>> we can run a Spark job over a defined subset of the table.
It's
>>>>>>>>> functionally equivalent to an RDD, except there's no
dependency on RDD so
>>>>>>>>> it is a smaller surface. In the current code, the equivalent
class would be
>>>>>>>>> the ScanConfig, which represents the information needed,
but in order to
>>>>>>>>> execute a job, ReadSupport is needed (various methods
in ReadSupport takes
>>>>>>>>> a ScanConfig).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> To illustrate with pseudocode what the different levels
mean, a
>>>>>>>>> batch query would look like the following:
>>>>>>>>>
>>>>>>>>> val provider = reflection[Format]("parquet")
>>>>>>>>> val table = provider.createTable(options)
>>>>>>>>> val scan = table.createScan(scanConfig) // scanConfig
includes
>>>>>>>>> pushdown and options
>>>>>>>>> // run tasks on executors
>>>>>>>>>
>>>>>>>>> A streaming micro-batch scan would look like the following:
>>>>>>>>>
>>>>>>>>> val provider = reflection[Format]("parquet")
>>>>>>>>> val table = provider.createTable(options)
>>>>>>>>> val stream = table.createStream(scanConfig)
>>>>>>>>>
>>>>>>>>> while(true) {
>>>>>>>>>   val scan = streamingScan.createScan(startOffset)
>>>>>>>>>   // run tasks on executors
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Vs the current API, the above:
>>>>>>>>>
>>>>>>>>> 1. Creates an explicit Table abstraction, and an explicit
Scan
>>>>>>>>> abstraction.
>>>>>>>>>
>>>>>>>>> 2. Have an explicit Stream level and makes it clear pushdowns
and
>>>>>>>>> options are handled there, rather than at the individual
scan (ReadSupport)
>>>>>>>>> level. Data source implementations don't need to worry
about pushdowns or
>>>>>>>>> options changing mid-stream. For batch, those happen
when the scan object
>>>>>>>>> is created.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This email is just a high level sketch. I've asked Wenchen
to
>>>>>>>>> prototype this, to see if it is actually feasible and
the degree of hacks
>>>>>>>>> it removes, or creates.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Mime
View raw message