spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mridul Muralidharan <>
Subject Re: data source api v2 refactoring
Date Sun, 02 Sep 2018 04:30:57 GMT
Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan
did :-) )
I did not see it in the mail thread I received or in archives ... [1]
Wondering which othersenderswere getting dropped (if yes).



On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <> wrote:

> Thanks for clarifying, Wenchen. I think that's what I expected.
> As for the abstraction, here's the way that I think about it: there are
> two important parts of a scan: the definition of what will be read, and
> task sets that actually perform the read. In batch, there's one definition
> of the scan and one task set so it makes sense that there's one scan object
> that encapsulates both of these concepts. For streaming, we need to
> separate the two into the definition of what will be read (the stream or
> streaming read) and the task sets that are run (scans). That way, the
> streaming read behaves like a factory for scans, producing scans that
> handle the data either in micro-batches or using continuous tasks.
> To address Jungtaek's question, I think that this does work with
> continuous. In continuous mode, the query operators keep running and send
> data to one another directly. The API still needs a streaming read layer
> because it may still produce more than one continuous scan. That would
> happen when the underlying source changes and Spark needs to reconfigure. I
> think the example here is when partitioning in a Kafka topic changes and
> Spark needs to re-map Kafka partitions to continuous tasks.
> rb
> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <>
> wrote:
>> Hi Ryan,
>> Sorry I may use a wrong wording. The pushdown is done with ScanConfig,
>> which is not table/stream/scan, but something between them. The table
>> creates ScanConfigBuilder, and table creates stream/scan with ScanConfig.
>> For streaming source, stream is the one to take care of the pushdown
>> result. For batch source, it's the scan.
>> It's a little tricky because stream is an abstraction for streaming
>> source only. Better ideas are welcome!
>> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <> wrote:
>>> Thanks, Reynold!
>>> I think your API sketch looks great. I appreciate having the Table level
>>> in the abstraction to plug into as well. I think this makes it clear what
>>> everything does, particularly having the Stream level that represents a
>>> configured (by ScanConfig) streaming read and can act as a factory for
>>> individual batch scans or for continuous scans.
>>> Wenchen, I'm not sure what you mean by doing pushdown at the table
>>> level. It seems to mean that pushdown is specific to a batch scan or
>>> streaming read, which seems to be what you're saying as well. Wouldn't the
>>> pushdown happen to create a ScanConfig, which is then used as Reynold
>>> suggests? Looking forward to seeing this PR when you get it posted. Thanks
>>> for all of your work on this!
>>> rb
>>> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <>
>>> wrote:
>>>> Thank Reynold for writing this and starting the discussion!
>>>> Data source v2 was started with batch only, so we didn't pay much
>>>> attention to the abstraction and just follow the v1 API. Now we are
>>>> designing the streaming API and catalog integration, the abstraction
>>>> becomes super important.
>>>> I like this proposed abstraction and have successfully prototyped it to
>>>> make sure it works.
>>>> During prototyping, I have to work around the issue that the current
>>>> streaming engine does query optimization/planning for each micro batch.
>>>> With this abstraction, the operator pushdown is only applied once
>>>> per-query. In my prototype, I do the physical planning up front to get the
>>>> pushdown result, and
>>>> add a logical linking node that wraps the resulting physical plan node
>>>> for the data source, and then swap that logical linking node into the
>>>> logical plan for each batch. In the future we should just let the streaming
>>>> engine do query optimization/planning only once.
>>>> About pushdown, I think we should do it at the table level. The table
>>>> should create a new pushdow handler to apply operator pushdowm for each
>>>> scan/stream, and create the scan/stream with the pushdown result. The
>>>> rationale is, a table should have the same pushdown behavior regardless the
>>>> scan node.
>>>> Thanks,
>>>> Wenchen
>>>> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <>
>>>> wrote:
>>>>> I spent some time last week looking at the current data source v2
>>>>> apis, and I thought we should be a bit more buttoned up in terms of the
>>>>> abstractions and the guarantees Spark provides. In particular, I feel
>>>>> need the following levels of "abstractions", to fit the use cases in
>>>>> from batch, to streaming.
>>>>> Please don't focus on the naming at this stage. When possible, I draw
>>>>> parallels to what similar levels are named in the currently committed
>>>>> 0. Format: This represents a specific format, e.g. Parquet, ORC. There
>>>>> is currently no explicit class at this level.
>>>>> 1. Table: This should represent a logical dataset (with schema). This
>>>>> could be just a directory on the file system, or a table in the catalog.
>>>>> Operations on tables can include batch reads (Scan), streams, writes,
>>>>> potentially other operations such as deletes. The closest to the table
>>>>> level abstraction in the current code base is the "Provider" class,
>>>>> although Provider isn't quite a Table. This is similar to Ryan's proposed
>>>>> design.
>>>>> 2. Stream: Specific to streaming. A stream is created out of a Table.
>>>>> This logically represents a an instance of a StreamingQuery. Pushdowns
>>>>> options are handled at this layer. I.e. Spark guarnatees to data source
>>>>> implementation pushdowns and options don't change within a Stream. Each
>>>>> Stream consists of a sequence of scans. There is no equivalent
>>>>> concept in the current committed code.
>>>>> 3. Scan: A physical scan -- either as part of a streaming query, or a
>>>>> batch query. This should contain sufficient information and methods so
>>>>> can run a Spark job over a defined subset of the table. It's functionally
>>>>> equivalent to an RDD, except there's no dependency on RDD so it is a
>>>>> smaller surface. In the current code, the equivalent class would be the
>>>>> ScanConfig, which represents the information needed, but in order to
>>>>> execute a job, ReadSupport is needed (various methods in ReadSupport
>>>>> a ScanConfig).
>>>>> To illustrate with pseudocode what the different levels mean, a batch
>>>>> query would look like the following:
>>>>> val provider = reflection[Format]("parquet")
>>>>> val table = provider.createTable(options)
>>>>> val scan = table.createScan(scanConfig) // scanConfig includes
>>>>> pushdown and options
>>>>> // run tasks on executors
>>>>> A streaming micro-batch scan would look like the following:
>>>>> val provider = reflection[Format]("parquet")
>>>>> val table = provider.createTable(options)
>>>>> val stream = table.createStream(scanConfig)
>>>>> while(true) {
>>>>>   val scan = streamingScan.createScan(startOffset)
>>>>>   // run tasks on executors
>>>>> }
>>>>> Vs the current API, the above:
>>>>> 1. Creates an explicit Table abstraction, and an explicit Scan
>>>>> abstraction.
>>>>> 2. Have an explicit Stream level and makes it clear pushdowns and
>>>>> options are handled there, rather than at the individual scan (ReadSupport)
>>>>> level. Data source implementations don't need to worry about pushdowns
>>>>> options changing mid-stream. For batch, those happen when the scan object
>>>>> is created.
>>>>> This email is just a high level sketch. I've asked Wenchen to
>>>>> prototype this, to see if it is actually feasible and the degree of hacks
>>>>> it removes, or creates.
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
> --
> Ryan Blue
> Software Engineer
> Netflix

View raw message