spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryan Blue <rb...@netflix.com.INVALID>
Subject Re: Datasource API V2 and checkpointing
Date Mon, 30 Apr 2018 18:59:21 GMT
Should we really plan the API for a source with state that grows
indefinitely? It sounds like we're letting a bad case influence the design,
when we probably shouldn't.

On Mon, Apr 30, 2018 at 11:05 AM, Joseph Torres <
joseph.torres@databricks.com> wrote:

> Offset is just a type alias for arbitrary JSON-serializable state. Most
> implementations should (and do) just toss the blob at Spark and let Spark
> handle recovery on its own.
>
> In the case of file streams, the obstacle is that the conceptual offset is
> very large: a list of every file which the stream has ever read. In order
> to parse this efficiently, the stream connector needs detailed control over
> how it's stored; the current implementation even has complex
> compactification and retention logic.
>
>
> On Mon, Apr 30, 2018 at 10:48 AM, Ryan Blue <rblue@netflix.com> wrote:
>
>> Why don't we just have the source return a Serializable of state when it
>> reports offsets? Then Spark could handle storing the source's state and the
>> source wouldn't need to worry about file system paths. I think that would
>> be easier for implementations and better for recovery because it wouldn't
>> leave unknown state on a single machine's file system.
>>
>> rb
>>
>> On Fri, Apr 27, 2018 at 9:23 AM, Joseph Torres <
>> joseph.torres@databricks.com> wrote:
>>
>>> The precise interactions with the DataSourceV2 API haven't yet been
>>> hammered out in design. But much of this comes down to the core of
>>> Structured Streaming rather than the API details.
>>>
>>> The execution engine handles checkpointing and recovery. It asks the
>>> streaming data source for offsets, and then determines that batch N
>>> contains the data between offset A and offset B. On recovery, if batch N
>>> needs to be re-run, the execution engine just asks the source for the same
>>> offset range again. Sources also get a handle to their own subfolder of the
>>> checkpoint, which they can use as scratch space if they need. For example,
>>> Spark's FileStreamReader keeps a log of all the files it's seen, so its
>>> offsets can be simply indices into the log rather than huge strings
>>> containing all the paths.
>>>
>>> SPARK-23323 is orthogonal. That commit coordinator is responsible for
>>> ensuring that, within a single Spark job, two different tasks can't commit
>>> the same partition.
>>>
>>> On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh <
>>> jthakrar@conversantmedia.com> wrote:
>>>
>>>> Wondering if this issue is related to SPARK-23323?
>>>>
>>>>
>>>>
>>>> Any pointers will be greatly appreciated….
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Jayesh
>>>>
>>>>
>>>>
>>>> *From: *"Thakrar, Jayesh" <jthakrar@conversantmedia.com>
>>>> *Date: *Monday, April 23, 2018 at 9:49 PM
>>>> *To: *"dev@spark.apache.org" <dev@spark.apache.org>
>>>> *Subject: *Datasource API V2 and checkpointing
>>>>
>>>>
>>>>
>>>> I was wondering when checkpointing is enabled, who does the actual work?
>>>>
>>>> The streaming datasource or the execution engine/driver?
>>>>
>>>>
>>>>
>>>> I have written a small/trivial datasource that just generates strings.
>>>>
>>>> After enabling checkpointing, I do see a folder being created under the
>>>> checkpoint folder, but there's nothing else in there.
>>>>
>>>>
>>>>
>>>> Same question for write-ahead and recovery?
>>>>
>>>> And on a restart from a failed streaming session - who should set the
>>>> offsets?
>>>>
>>>> The driver/Spark or the datasource?
>>>>
>>>>
>>>>
>>>> Any pointers to design docs would also be greatly appreciated.
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Jayesh
>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>


-- 
Ryan Blue
Software Engineer
Netflix

Mime
View raw message