spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Wenchen Fan <>
Subject Re: data source api v2 refactoring
Date Wed, 05 Sep 2018 03:42:22 GMT
I'm switching to my another Gmail account, let's see if it still gets
dropped this time.

Hi Ryan,

I'm thinking about the write path and feel the abstraction should be the

We still have logical and physical writing. And the table can create
different logical writing based on how to write. e.g., append, delete,
replaceWhere, etc.

One thing I'm not sure about is the WriteConfig. With the WriteConfig, the
API would look like
trait Table {
  WriteConfig newAppendWriteConfig();

  WriteConfig newDeleteWriteConfig(deleteExprs);

  LogicalWrite newLogicalWrite(writeConfig);

Without WriteConfig, the API looks like
trait Table {
  LogicalWrite newAppendWrite();

  LogicalWrite newDeleteWrite(deleteExprs);

It looks to me that the API is simpler without WriteConfig, what do you


On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <> wrote:

> Latest from Wenchen in case it was dropped.
> ---------- Forwarded message ---------
> From: Wenchen Fan <>
> Date: Mon, Sep 3, 2018 at 6:16 AM
> Subject: Re: data source api v2 refactoring
> To: <>
> Cc: Ryan Blue <>, Reynold Xin <>, <
> Hi Mridul,
> I'm not sure what's going on, my email was CC'ed to the dev list.
> Hi Ryan,
> The logical and physical scan idea sounds good. To add more color
> to Jungtaek's question, both micro-batch and continuous mode have
> the logical and physical scan, but there is a difference: for micro-batch
> mode, a physical scan outputs data for one epoch, but it's not true for
> continuous mode.
> I'm not sure if it's necessary to include streaming epoch in the API
> abstraction, for features like metrics reporting.
> On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <>
> wrote:
>> Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan
>> did :-) )
>> I did not see it in the mail thread I received or in archives ... [1]
>> Wondering which othersenderswere getting dropped (if yes).
>> Regards
>> Mridul
>> [1]
>> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <>
>> wrote:
>>> Thanks for clarifying, Wenchen. I think that's what I expected.
>>> As for the abstraction, here's the way that I think about it: there are
>>> two important parts of a scan: the definition of what will be read, and
>>> task sets that actually perform the read. In batch, there's one definition
>>> of the scan and one task set so it makes sense that there's one scan object
>>> that encapsulates both of these concepts. For streaming, we need to
>>> separate the two into the definition of what will be read (the stream or
>>> streaming read) and the task sets that are run (scans). That way, the
>>> streaming read behaves like a factory for scans, producing scans that
>>> handle the data either in micro-batches or using continuous tasks.
>>> To address Jungtaek's question, I think that this does work with
>>> continuous. In continuous mode, the query operators keep running and send
>>> data to one another directly. The API still needs a streaming read layer
>>> because it may still produce more than one continuous scan. That would
>>> happen when the underlying source changes and Spark needs to reconfigure. I
>>> think the example here is when partitioning in a Kafka topic changes and
>>> Spark needs to re-map Kafka partitions to continuous tasks.
>>> rb
>>> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <>
>>> wrote:
>>>> Hi Ryan,
>>>> Sorry I may use a wrong wording. The pushdown is done with ScanConfig,
>>>> which is not table/stream/scan, but something between them. The table
>>>> creates ScanConfigBuilder, and table creates stream/scan with ScanConfig.
>>>> For streaming source, stream is the one to take care of the pushdown
>>>> result. For batch source, it's the scan.
>>>> It's a little tricky because stream is an abstraction for streaming
>>>> source only. Better ideas are welcome!
>>>> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <> wrote:
>>>>> Thanks, Reynold!
>>>>> I think your API sketch looks great. I appreciate having the Table
>>>>> level in the abstraction to plug into as well. I think this makes it
>>>>> what everything does, particularly having the Stream level that represents
>>>>> a configured (by ScanConfig) streaming read and can act as a factory
>>>>> individual batch scans or for continuous scans.
>>>>> Wenchen, I'm not sure what you mean by doing pushdown at the table
>>>>> level. It seems to mean that pushdown is specific to a batch scan or
>>>>> streaming read, which seems to be what you're saying as well. Wouldn't
>>>>> pushdown happen to create a ScanConfig, which is then used as Reynold
>>>>> suggests? Looking forward to seeing this PR when you get it posted. Thanks
>>>>> for all of your work on this!
>>>>> rb
>>>>> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <>
>>>>> wrote:
>>>>>> Thank Reynold for writing this and starting the discussion!
>>>>>> Data source v2 was started with batch only, so we didn't pay much
>>>>>> attention to the abstraction and just follow the v1 API. Now we are
>>>>>> designing the streaming API and catalog integration, the abstraction
>>>>>> becomes super important.
>>>>>> I like this proposed abstraction and have successfully prototyped
>>>>>> to make sure it works.
>>>>>> During prototyping, I have to work around the issue that the current
>>>>>> streaming engine does query optimization/planning for each micro
>>>>>> With this abstraction, the operator pushdown is only applied once
>>>>>> per-query. In my prototype, I do the physical planning up front to
get the
>>>>>> pushdown result, and
>>>>>> add a logical linking node that wraps the resulting physical plan
>>>>>> node for the data source, and then swap that logical linking node
into the
>>>>>> logical plan for each batch. In the future we should just let the
>>>>>> engine do query optimization/planning only once.
>>>>>> About pushdown, I think we should do it at the table level. The table
>>>>>> should create a new pushdow handler to apply operator pushdowm for
>>>>>> scan/stream, and create the scan/stream with the pushdown result.
>>>>>> rationale is, a table should have the same pushdown behavior regardless
>>>>>> scan node.
>>>>>> Thanks,
>>>>>> Wenchen
>>>>>> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <>
>>>>>> wrote:
>>>>>>> I spent some time last week looking at the current data source
>>>>>>> apis, and I thought we should be a bit more buttoned up in terms
of the
>>>>>>> abstractions and the guarantees Spark provides. In particular,
I feel we
>>>>>>> need the following levels of "abstractions", to fit the use cases
in Spark,
>>>>>>> from batch, to streaming.
>>>>>>> Please don't focus on the naming at this stage. When possible,
>>>>>>> draw parallels to what similar levels are named in the currently
>>>>>>> api:
>>>>>>> 0. Format: This represents a specific format, e.g. Parquet, ORC.
>>>>>>> There is currently no explicit class at this level.
>>>>>>> 1. Table: This should represent a logical dataset (with schema).
>>>>>>> This could be just a directory on the file system, or a table
in the
>>>>>>> catalog. Operations on tables can include batch reads (Scan),
>>>>>>> writes, and potentially other operations such as deletes. The
closest to
>>>>>>> the table level abstraction in the current code base is the "Provider"
>>>>>>> class, although Provider isn't quite a Table. This is similar
to Ryan's
>>>>>>> proposed design.
>>>>>>> 2. Stream: Specific to streaming. A stream is created out of
>>>>>>> Table. This logically represents a an instance of a StreamingQuery.
>>>>>>> Pushdowns and options are handled at this layer. I.e. Spark guarnatees
>>>>>>> data source implementation pushdowns and options don't change
within a
>>>>>>> Stream. Each Stream consists of a sequence of scans. There is
>>>>>>> equivalent concept in the current committed code.
>>>>>>> 3. Scan: A physical scan -- either as part of a streaming query,
>>>>>>> a batch query. This should contain sufficient information and
methods so we
>>>>>>> can run a Spark job over a defined subset of the table. It's
>>>>>>> equivalent to an RDD, except there's no dependency on RDD so
it is a
>>>>>>> smaller surface. In the current code, the equivalent class would
be the
>>>>>>> ScanConfig, which represents the information needed, but in order
>>>>>>> execute a job, ReadSupport is needed (various methods in ReadSupport
>>>>>>> a ScanConfig).
>>>>>>> To illustrate with pseudocode what the different levels mean,
>>>>>>> batch query would look like the following:
>>>>>>> val provider = reflection[Format]("parquet")
>>>>>>> val table = provider.createTable(options)
>>>>>>> val scan = table.createScan(scanConfig) // scanConfig includes
>>>>>>> pushdown and options
>>>>>>> // run tasks on executors
>>>>>>> A streaming micro-batch scan would look like the following:
>>>>>>> val provider = reflection[Format]("parquet")
>>>>>>> val table = provider.createTable(options)
>>>>>>> val stream = table.createStream(scanConfig)
>>>>>>> while(true) {
>>>>>>>   val scan = streamingScan.createScan(startOffset)
>>>>>>>   // run tasks on executors
>>>>>>> }
>>>>>>> Vs the current API, the above:
>>>>>>> 1. Creates an explicit Table abstraction, and an explicit Scan
>>>>>>> abstraction.
>>>>>>> 2. Have an explicit Stream level and makes it clear pushdowns
>>>>>>> options are handled there, rather than at the individual scan
>>>>>>> level. Data source implementations don't need to worry about
pushdowns or
>>>>>>> options changing mid-stream. For batch, those happen when the
scan object
>>>>>>> is created.
>>>>>>> This email is just a high level sketch. I've asked Wenchen to
>>>>>>> prototype this, to see if it is actually feasible and the degree
of hacks
>>>>>>> it removes, or creates.
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
> --
> Ryan Blue
> Software Engineer
> Netflix

View raw message