spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hyukjin Kwon <>
Subject Re: data source api v2 refactoring
Date Fri, 07 Sep 2018 07:02:07 GMT
BTW, do we hold Datasource V2 related PRs for now until we finish this
refactoring just for clarification?

2018년 9월 7일 (금) 오전 12:52, Ryan Blue <>님이 작성:

> Wenchen,
> I'm not really sure what you're proposing here. What is a `LogicalWrite`?
> Is it something that mirrors the read side in your PR?
> I think that I agree that if we have a Write independent of the Table that
> carries the commit and abort methods, then we can create it directly
> without a WriteConfig. So I tentatively agree with what you propose,
> assuming that I understand it correctly.
> rb
> On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <> wrote:
>> I'm switching to my another Gmail account, let's see if it still gets
>> dropped this time.
>> Hi Ryan,
>> I'm thinking about the write path and feel the abstraction should be the
>> same.
>> We still have logical and physical writing. And the table can create
>> different logical writing based on how to write. e.g., append, delete,
>> replaceWhere, etc.
>> One thing I'm not sure about is the WriteConfig. With the WriteConfig,
>> the API would look like
>> trait Table {
>>   WriteConfig newAppendWriteConfig();
>>   WriteConfig newDeleteWriteConfig(deleteExprs);
>>   LogicalWrite newLogicalWrite(writeConfig);
>> }
>> Without WriteConfig, the API looks like
>> trait Table {
>>   LogicalWrite newAppendWrite();
>>   LogicalWrite newDeleteWrite(deleteExprs);
>> }
>> It looks to me that the API is simpler without WriteConfig, what do you
>> think?
>> Thanks,
>> Wenchen
>> On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <>
>> wrote:
>>> Latest from Wenchen in case it was dropped.
>>> ---------- Forwarded message ---------
>>> From: Wenchen Fan <>
>>> Date: Mon, Sep 3, 2018 at 6:16 AM
>>> Subject: Re: data source api v2 refactoring
>>> To: <>
>>> Cc: Ryan Blue <>, Reynold Xin <>,
>>> Hi Mridul,
>>> I'm not sure what's going on, my email was CC'ed to the dev list.
>>> Hi Ryan,
>>> The logical and physical scan idea sounds good. To add more color
>>> to Jungtaek's question, both micro-batch and continuous mode have
>>> the logical and physical scan, but there is a difference: for micro-batch
>>> mode, a physical scan outputs data for one epoch, but it's not true for
>>> continuous mode.
>>> I'm not sure if it's necessary to include streaming epoch in the API
>>> abstraction, for features like metrics reporting.
>>> On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <>
>>> wrote:
>>>> Is it only me or are all others getting Wenchen’s mails ? (Obviously
>>>> Ryan did :-) )
>>>> I did not see it in the mail thread I received or in archives ... [1]
>>>> Wondering which othersenderswere getting dropped (if yes).
>>>> Regards
>>>> Mridul
>>>> [1]
>>>> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <>
>>>> wrote:
>>>>> Thanks for clarifying, Wenchen. I think that's what I expected.
>>>>> As for the abstraction, here's the way that I think about it: there
>>>>> are two important parts of a scan: the definition of what will be read,
>>>>> task sets that actually perform the read. In batch, there's one definition
>>>>> of the scan and one task set so it makes sense that there's one scan
>>>>> that encapsulates both of these concepts. For streaming, we need to
>>>>> separate the two into the definition of what will be read (the stream
>>>>> streaming read) and the task sets that are run (scans). That way, the
>>>>> streaming read behaves like a factory for scans, producing scans that
>>>>> handle the data either in micro-batches or using continuous tasks.
>>>>> To address Jungtaek's question, I think that this does work with
>>>>> continuous. In continuous mode, the query operators keep running and
>>>>> data to one another directly. The API still needs a streaming read layer
>>>>> because it may still produce more than one continuous scan. That would
>>>>> happen when the underlying source changes and Spark needs to reconfigure.
>>>>> think the example here is when partitioning in a Kafka topic changes
>>>>> Spark needs to re-map Kafka partitions to continuous tasks.
>>>>> rb
>>>>> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <>
>>>>> wrote:
>>>>>> Hi Ryan,
>>>>>> Sorry I may use a wrong wording. The pushdown is done with
>>>>>> ScanConfig, which is not table/stream/scan, but something between
them. The
>>>>>> table creates ScanConfigBuilder, and table creates stream/scan with
>>>>>> ScanConfig. For streaming source, stream is the one to take care
of the
>>>>>> pushdown result. For batch source, it's the scan.
>>>>>> It's a little tricky because stream is an abstraction for streaming
>>>>>> source only. Better ideas are welcome!
>>>>>> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <>
>>>>>>> Thanks, Reynold!
>>>>>>> I think your API sketch looks great. I appreciate having the
>>>>>>> level in the abstraction to plug into as well. I think this makes
it clear
>>>>>>> what everything does, particularly having the Stream level that
>>>>>>> a configured (by ScanConfig) streaming read and can act as a
factory for
>>>>>>> individual batch scans or for continuous scans.
>>>>>>> Wenchen, I'm not sure what you mean by doing pushdown at the
>>>>>>> level. It seems to mean that pushdown is specific to a batch
scan or
>>>>>>> streaming read, which seems to be what you're saying as well.
Wouldn't the
>>>>>>> pushdown happen to create a ScanConfig, which is then used as
>>>>>>> suggests? Looking forward to seeing this PR when you get it posted.
>>>>>>> for all of your work on this!
>>>>>>> rb
>>>>>>> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <>
>>>>>>> wrote:
>>>>>>>> Thank Reynold for writing this and starting the discussion!
>>>>>>>> Data source v2 was started with batch only, so we didn't
pay much
>>>>>>>> attention to the abstraction and just follow the v1 API.
Now we are
>>>>>>>> designing the streaming API and catalog integration, the
>>>>>>>> becomes super important.
>>>>>>>> I like this proposed abstraction and have successfully prototyped
>>>>>>>> it to make sure it works.
>>>>>>>> During prototyping, I have to work around the issue that
>>>>>>>> current streaming engine does query optimization/planning
for each micro
>>>>>>>> batch. With this abstraction, the operator pushdown is only
applied once
>>>>>>>> per-query. In my prototype, I do the physical planning up
front to get the
>>>>>>>> pushdown result, and
>>>>>>>> add a logical linking node that wraps the resulting physical
>>>>>>>> node for the data source, and then swap that logical linking
node into the
>>>>>>>> logical plan for each batch. In the future we should just
let the streaming
>>>>>>>> engine do query optimization/planning only once.
>>>>>>>> About pushdown, I think we should do it at the table level.
>>>>>>>> table should create a new pushdow handler to apply operator
pushdowm for
>>>>>>>> each scan/stream, and create the scan/stream with the pushdown
result. The
>>>>>>>> rationale is, a table should have the same pushdown behavior
regardless the
>>>>>>>> scan node.
>>>>>>>> Thanks,
>>>>>>>> Wenchen
>>>>>>>> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <>
>>>>>>>> wrote:
>>>>>>>>> I spent some time last week looking at the current data
source v2
>>>>>>>>> apis, and I thought we should be a bit more buttoned
up in terms of the
>>>>>>>>> abstractions and the guarantees Spark provides. In particular,
I feel we
>>>>>>>>> need the following levels of "abstractions", to fit the
use cases in Spark,
>>>>>>>>> from batch, to streaming.
>>>>>>>>> Please don't focus on the naming at this stage. When
possible, I
>>>>>>>>> draw parallels to what similar levels are named in the
currently committed
>>>>>>>>> api:
>>>>>>>>> 0. Format: This represents a specific format, e.g. Parquet,
>>>>>>>>> There is currently no explicit class at this level.
>>>>>>>>> 1. Table: This should represent a logical dataset (with
>>>>>>>>> This could be just a directory on the file system, or
a table in the
>>>>>>>>> catalog. Operations on tables can include batch reads
(Scan), streams,
>>>>>>>>> writes, and potentially other operations such as deletes.
The closest to
>>>>>>>>> the table level abstraction in the current code base
is the "Provider"
>>>>>>>>> class, although Provider isn't quite a Table. This is
similar to Ryan's
>>>>>>>>> proposed design.
>>>>>>>>> 2. Stream: Specific to streaming. A stream is created
out of a
>>>>>>>>> Table. This logically represents a an instance of a StreamingQuery.
>>>>>>>>> Pushdowns and options are handled at this layer. I.e.
Spark guarnatees to
>>>>>>>>> data source implementation pushdowns and options don't
change within a
>>>>>>>>> Stream. Each Stream consists of a sequence of scans.
There is no
>>>>>>>>> equivalent concept in the current committed code.
>>>>>>>>> 3. Scan: A physical scan -- either as part of a streaming
>>>>>>>>> or a batch query. This should contain sufficient information
and methods so
>>>>>>>>> we can run a Spark job over a defined subset of the table.
>>>>>>>>> functionally equivalent to an RDD, except there's no
dependency on RDD so
>>>>>>>>> it is a smaller surface. In the current code, the equivalent
class would be
>>>>>>>>> the ScanConfig, which represents the information needed,
but in order to
>>>>>>>>> execute a job, ReadSupport is needed (various methods
in ReadSupport takes
>>>>>>>>> a ScanConfig).
>>>>>>>>> To illustrate with pseudocode what the different levels
mean, a
>>>>>>>>> batch query would look like the following:
>>>>>>>>> val provider = reflection[Format]("parquet")
>>>>>>>>> val table = provider.createTable(options)
>>>>>>>>> val scan = table.createScan(scanConfig) // scanConfig
>>>>>>>>> pushdown and options
>>>>>>>>> // run tasks on executors
>>>>>>>>> A streaming micro-batch scan would look like the following:
>>>>>>>>> val provider = reflection[Format]("parquet")
>>>>>>>>> val table = provider.createTable(options)
>>>>>>>>> val stream = table.createStream(scanConfig)
>>>>>>>>> while(true) {
>>>>>>>>>   val scan = streamingScan.createScan(startOffset)
>>>>>>>>>   // run tasks on executors
>>>>>>>>> }
>>>>>>>>> Vs the current API, the above:
>>>>>>>>> 1. Creates an explicit Table abstraction, and an explicit
>>>>>>>>> abstraction.
>>>>>>>>> 2. Have an explicit Stream level and makes it clear pushdowns
>>>>>>>>> options are handled there, rather than at the individual
scan (ReadSupport)
>>>>>>>>> level. Data source implementations don't need to worry
about pushdowns or
>>>>>>>>> options changing mid-stream. For batch, those happen
when the scan object
>>>>>>>>> is created.
>>>>>>>>> This email is just a high level sketch. I've asked Wenchen
>>>>>>>>> prototype this, to see if it is actually feasible and
the degree of hacks
>>>>>>>>> it removes, or creates.
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
> --
> Ryan Blue
> Software Engineer
> Netflix

View raw message