spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jungtaek Lim <kabhwan.opensou...@gmail.com>
Subject Re: Structured Streaming Checkpoint Error
Date Wed, 02 Dec 2020 23:23:08 GMT
In theory it would work, but works very inefficiently on checkpointing. If
I understand correctly, it will write the content to the temp file on s3,
and rename the file which actually gets the temp file from s3 and write the
content of temp file to the final path on s3. Compared to checkpoint with
HDFS, 1 unnecessary write, 1 unnecessary read. It probably warrants custom
implementation of checkpoint manager on S3.

Also atomic rename is still not working for S3, as well as S3 doesn't
support write with overwrite=false. That said, there's no barrier if
concurrent streaming queries access to the same checkpoint and mess up.
With checkpoint in HDFS, the rename is atomic and only one succeeds even in
parallel and the other query lost writing to the checkpoint file simply
fails. That's a caveat you may want to keep in mind.

On Wed, Dec 2, 2020 at 11:35 PM German Schiavon <gschiavonspark@gmail.com>
wrote:

> Hello!
>
> @Gabor Somogyi <gabor.g.somogyi@gmail.com>  I wonder that now that s3 is *strongly
> consistent* , would work fine.
>
>
> Regards!
>
> https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/
>
> On Thu, 17 Sep 2020 at 11:55, German Schiavon <gschiavonspark@gmail.com>
> wrote:
>
>> Hi Gabor,
>>
>> Makes sense, thanks a lot!
>>
>> On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi <gabor.g.somogyi@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Structured Streaming is simply not working when checkpoint location is
>>> on S3 due to it's read-after-write consistency.
>>> Please choose an HDFS compliant filesystem and it will work like a charm.
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Wed, Sep 16, 2020 at 4:12 PM German Schiavon <
>>> gschiavonspark@gmail.com> wrote:
>>>
>>>> Hi!
>>>>
>>>> I have an Structured Streaming Application that reads from kafka,
>>>> performs some aggregations and writes in S3 in parquet format.
>>>>
>>>> Everything seems to work great except that from time to time I get a
>>>> checkpoint error, at the beginning I thought it was a random error but it
>>>> happened more than 3 times already in a few days
>>>>
>>>> Caused by: java.io.FileNotFoundException: No such file or directory:
>>>> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>>>>
>>>>
>>>> Does this happen to anyone else?
>>>>
>>>> Thanks in advance.
>>>>
>>>> *This is the full error :*
>>>>
>>>> ERROR streaming.MicroBatchExecution: Query segmentValidation [id =
>>>> 14edaddf-25bb-4259-b7a2-6107907f962f, runId =
>>>> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error
>>>>
>>>> java.io.FileNotFoundException: No such file or directory:
>>>> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>>>>
>>>> at
>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)
>>>>
>>>> at
>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)
>>>>
>>>> at
>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)
>>>>
>>>> at
>>>> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)
>>>>
>>>> at
>>>> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
>>>>
>>>> at
>>>> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
>>>>
>>>> at
>>>> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
>>>>
>>>> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)
>>>>
>>>> at
>>>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)
>>>>
>>>> at
>>>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
>>>>
>>>> at
>>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)
>>>>
>>>> at
>>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)
>>>>
>>>> at
>>>> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>>>> at scala.Option.getOrElse(Option.scala:189)
>>>>
>>>

Mime
View raw message