spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joseph Torres <joseph.tor...@databricks.com>
Subject Re: Datasource API V2 and checkpointing
Date Mon, 30 Apr 2018 18:05:09 GMT
Offset is just a type alias for arbitrary JSON-serializable state. Most
implementations should (and do) just toss the blob at Spark and let Spark
handle recovery on its own.

In the case of file streams, the obstacle is that the conceptual offset is
very large: a list of every file which the stream has ever read. In order
to parse this efficiently, the stream connector needs detailed control over
how it's stored; the current implementation even has complex
compactification and retention logic.


On Mon, Apr 30, 2018 at 10:48 AM, Ryan Blue <rblue@netflix.com> wrote:

> Why don't we just have the source return a Serializable of state when it
> reports offsets? Then Spark could handle storing the source's state and the
> source wouldn't need to worry about file system paths. I think that would
> be easier for implementations and better for recovery because it wouldn't
> leave unknown state on a single machine's file system.
>
> rb
>
> On Fri, Apr 27, 2018 at 9:23 AM, Joseph Torres <
> joseph.torres@databricks.com> wrote:
>
>> The precise interactions with the DataSourceV2 API haven't yet been
>> hammered out in design. But much of this comes down to the core of
>> Structured Streaming rather than the API details.
>>
>> The execution engine handles checkpointing and recovery. It asks the
>> streaming data source for offsets, and then determines that batch N
>> contains the data between offset A and offset B. On recovery, if batch N
>> needs to be re-run, the execution engine just asks the source for the same
>> offset range again. Sources also get a handle to their own subfolder of the
>> checkpoint, which they can use as scratch space if they need. For example,
>> Spark's FileStreamReader keeps a log of all the files it's seen, so its
>> offsets can be simply indices into the log rather than huge strings
>> containing all the paths.
>>
>> SPARK-23323 is orthogonal. That commit coordinator is responsible for
>> ensuring that, within a single Spark job, two different tasks can't commit
>> the same partition.
>>
>> On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh <
>> jthakrar@conversantmedia.com> wrote:
>>
>>> Wondering if this issue is related to SPARK-23323?
>>>
>>>
>>>
>>> Any pointers will be greatly appreciated….
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Jayesh
>>>
>>>
>>>
>>> *From: *"Thakrar, Jayesh" <jthakrar@conversantmedia.com>
>>> *Date: *Monday, April 23, 2018 at 9:49 PM
>>> *To: *"dev@spark.apache.org" <dev@spark.apache.org>
>>> *Subject: *Datasource API V2 and checkpointing
>>>
>>>
>>>
>>> I was wondering when checkpointing is enabled, who does the actual work?
>>>
>>> The streaming datasource or the execution engine/driver?
>>>
>>>
>>>
>>> I have written a small/trivial datasource that just generates strings.
>>>
>>> After enabling checkpointing, I do see a folder being created under the
>>> checkpoint folder, but there's nothing else in there.
>>>
>>>
>>>
>>> Same question for write-ahead and recovery?
>>>
>>> And on a restart from a failed streaming session - who should set the
>>> offsets?
>>>
>>> The driver/Spark or the datasource?
>>>
>>>
>>>
>>> Any pointers to design docs would also be greatly appreciated.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Jayesh
>>>
>>>
>>>
>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Mime
View raw message