spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryan Blue <>
Subject Re: data source api v2 refactoring
Date Thu, 06 Sep 2018 16:51:41 GMT

I'm not really sure what you're proposing here. What is a `LogicalWrite`?
Is it something that mirrors the read side in your PR?

I think that I agree that if we have a Write independent of the Table that
carries the commit and abort methods, then we can create it directly
without a WriteConfig. So I tentatively agree with what you propose,
assuming that I understand it correctly.


On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <> wrote:

> I'm switching to my another Gmail account, let's see if it still gets
> dropped this time.
> Hi Ryan,
> I'm thinking about the write path and feel the abstraction should be the
> same.
> We still have logical and physical writing. And the table can create
> different logical writing based on how to write. e.g., append, delete,
> replaceWhere, etc.
> One thing I'm not sure about is the WriteConfig. With the WriteConfig, the
> API would look like
> trait Table {
>   WriteConfig newAppendWriteConfig();
>   WriteConfig newDeleteWriteConfig(deleteExprs);
>   LogicalWrite newLogicalWrite(writeConfig);
> }
> Without WriteConfig, the API looks like
> trait Table {
>   LogicalWrite newAppendWrite();
>   LogicalWrite newDeleteWrite(deleteExprs);
> }
> It looks to me that the API is simpler without WriteConfig, what do you
> think?
> Thanks,
> Wenchen
> On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <>
> wrote:
>> Latest from Wenchen in case it was dropped.
>> ---------- Forwarded message ---------
>> From: Wenchen Fan <>
>> Date: Mon, Sep 3, 2018 at 6:16 AM
>> Subject: Re: data source api v2 refactoring
>> To: <>
>> Cc: Ryan Blue <>, Reynold Xin <>,
>> Hi Mridul,
>> I'm not sure what's going on, my email was CC'ed to the dev list.
>> Hi Ryan,
>> The logical and physical scan idea sounds good. To add more color
>> to Jungtaek's question, both micro-batch and continuous mode have
>> the logical and physical scan, but there is a difference: for micro-batch
>> mode, a physical scan outputs data for one epoch, but it's not true for
>> continuous mode.
>> I'm not sure if it's necessary to include streaming epoch in the API
>> abstraction, for features like metrics reporting.
>> On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <>
>> wrote:
>>> Is it only me or are all others getting Wenchen’s mails ? (Obviously
>>> Ryan did :-) )
>>> I did not see it in the mail thread I received or in archives ... [1]
>>> Wondering which othersenderswere getting dropped (if yes).
>>> Regards
>>> Mridul
>>> [1]
>>> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <>
>>> wrote:
>>>> Thanks for clarifying, Wenchen. I think that's what I expected.
>>>> As for the abstraction, here's the way that I think about it: there are
>>>> two important parts of a scan: the definition of what will be read, and
>>>> task sets that actually perform the read. In batch, there's one definition
>>>> of the scan and one task set so it makes sense that there's one scan object
>>>> that encapsulates both of these concepts. For streaming, we need to
>>>> separate the two into the definition of what will be read (the stream or
>>>> streaming read) and the task sets that are run (scans). That way, the
>>>> streaming read behaves like a factory for scans, producing scans that
>>>> handle the data either in micro-batches or using continuous tasks.
>>>> To address Jungtaek's question, I think that this does work with
>>>> continuous. In continuous mode, the query operators keep running and send
>>>> data to one another directly. The API still needs a streaming read layer
>>>> because it may still produce more than one continuous scan. That would
>>>> happen when the underlying source changes and Spark needs to reconfigure.
>>>> think the example here is when partitioning in a Kafka topic changes and
>>>> Spark needs to re-map Kafka partitions to continuous tasks.
>>>> rb
>>>> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <>
>>>> wrote:
>>>>> Hi Ryan,
>>>>> Sorry I may use a wrong wording. The pushdown is done with ScanConfig,
>>>>> which is not table/stream/scan, but something between them. The table
>>>>> creates ScanConfigBuilder, and table creates stream/scan with ScanConfig.
>>>>> For streaming source, stream is the one to take care of the pushdown
>>>>> result. For batch source, it's the scan.
>>>>> It's a little tricky because stream is an abstraction for streaming
>>>>> source only. Better ideas are welcome!
>>>>> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <> wrote:
>>>>>> Thanks, Reynold!
>>>>>> I think your API sketch looks great. I appreciate having the Table
>>>>>> level in the abstraction to plug into as well. I think this makes
it clear
>>>>>> what everything does, particularly having the Stream level that represents
>>>>>> a configured (by ScanConfig) streaming read and can act as a factory
>>>>>> individual batch scans or for continuous scans.
>>>>>> Wenchen, I'm not sure what you mean by doing pushdown at the table
>>>>>> level. It seems to mean that pushdown is specific to a batch scan
>>>>>> streaming read, which seems to be what you're saying as well. Wouldn't
>>>>>> pushdown happen to create a ScanConfig, which is then used as Reynold
>>>>>> suggests? Looking forward to seeing this PR when you get it posted.
>>>>>> for all of your work on this!
>>>>>> rb
>>>>>> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <>
>>>>>> wrote:
>>>>>>> Thank Reynold for writing this and starting the discussion!
>>>>>>> Data source v2 was started with batch only, so we didn't pay
>>>>>>> attention to the abstraction and just follow the v1 API. Now
we are
>>>>>>> designing the streaming API and catalog integration, the abstraction
>>>>>>> becomes super important.
>>>>>>> I like this proposed abstraction and have successfully prototyped
>>>>>>> to make sure it works.
>>>>>>> During prototyping, I have to work around the issue that the
>>>>>>> streaming engine does query optimization/planning for each micro
>>>>>>> With this abstraction, the operator pushdown is only applied
>>>>>>> per-query. In my prototype, I do the physical planning up front
to get the
>>>>>>> pushdown result, and
>>>>>>> add a logical linking node that wraps the resulting physical
>>>>>>> node for the data source, and then swap that logical linking
node into the
>>>>>>> logical plan for each batch. In the future we should just let
the streaming
>>>>>>> engine do query optimization/planning only once.
>>>>>>> About pushdown, I think we should do it at the table level. The
>>>>>>> table should create a new pushdow handler to apply operator pushdowm
>>>>>>> each scan/stream, and create the scan/stream with the pushdown
result. The
>>>>>>> rationale is, a table should have the same pushdown behavior
regardless the
>>>>>>> scan node.
>>>>>>> Thanks,
>>>>>>> Wenchen
>>>>>>> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <>
>>>>>>> wrote:
>>>>>>>> I spent some time last week looking at the current data source
>>>>>>>> apis, and I thought we should be a bit more buttoned up in
terms of the
>>>>>>>> abstractions and the guarantees Spark provides. In particular,
I feel we
>>>>>>>> need the following levels of "abstractions", to fit the use
cases in Spark,
>>>>>>>> from batch, to streaming.
>>>>>>>> Please don't focus on the naming at this stage. When possible,
>>>>>>>> draw parallels to what similar levels are named in the currently
>>>>>>>> api:
>>>>>>>> 0. Format: This represents a specific format, e.g. Parquet,
>>>>>>>> There is currently no explicit class at this level.
>>>>>>>> 1. Table: This should represent a logical dataset (with schema).
>>>>>>>> This could be just a directory on the file system, or a table
in the
>>>>>>>> catalog. Operations on tables can include batch reads (Scan),
>>>>>>>> writes, and potentially other operations such as deletes.
The closest to
>>>>>>>> the table level abstraction in the current code base is the
>>>>>>>> class, although Provider isn't quite a Table. This is similar
to Ryan's
>>>>>>>> proposed design.
>>>>>>>> 2. Stream: Specific to streaming. A stream is created out
of a
>>>>>>>> Table. This logically represents a an instance of a StreamingQuery.
>>>>>>>> Pushdowns and options are handled at this layer. I.e. Spark
guarnatees to
>>>>>>>> data source implementation pushdowns and options don't change
within a
>>>>>>>> Stream. Each Stream consists of a sequence of scans. There
is no
>>>>>>>> equivalent concept in the current committed code.
>>>>>>>> 3. Scan: A physical scan -- either as part of a streaming
query, or
>>>>>>>> a batch query. This should contain sufficient information
and methods so we
>>>>>>>> can run a Spark job over a defined subset of the table. It's
>>>>>>>> equivalent to an RDD, except there's no dependency on RDD
so it is a
>>>>>>>> smaller surface. In the current code, the equivalent class
would be the
>>>>>>>> ScanConfig, which represents the information needed, but
in order to
>>>>>>>> execute a job, ReadSupport is needed (various methods in
ReadSupport takes
>>>>>>>> a ScanConfig).
>>>>>>>> To illustrate with pseudocode what the different levels mean,
>>>>>>>> batch query would look like the following:
>>>>>>>> val provider = reflection[Format]("parquet")
>>>>>>>> val table = provider.createTable(options)
>>>>>>>> val scan = table.createScan(scanConfig) // scanConfig includes
>>>>>>>> pushdown and options
>>>>>>>> // run tasks on executors
>>>>>>>> A streaming micro-batch scan would look like the following:
>>>>>>>> val provider = reflection[Format]("parquet")
>>>>>>>> val table = provider.createTable(options)
>>>>>>>> val stream = table.createStream(scanConfig)
>>>>>>>> while(true) {
>>>>>>>>   val scan = streamingScan.createScan(startOffset)
>>>>>>>>   // run tasks on executors
>>>>>>>> }
>>>>>>>> Vs the current API, the above:
>>>>>>>> 1. Creates an explicit Table abstraction, and an explicit
>>>>>>>> abstraction.
>>>>>>>> 2. Have an explicit Stream level and makes it clear pushdowns
>>>>>>>> options are handled there, rather than at the individual
scan (ReadSupport)
>>>>>>>> level. Data source implementations don't need to worry about
pushdowns or
>>>>>>>> options changing mid-stream. For batch, those happen when
the scan object
>>>>>>>> is created.
>>>>>>>> This email is just a high level sketch. I've asked Wenchen
>>>>>>>> prototype this, to see if it is actually feasible and the
degree of hacks
>>>>>>>> it removes, or creates.
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Software Engineer
>>>>>> Netflix
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix

Ryan Blue
Software Engineer

View raw message