flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: sink with BucketingSink to S3 files override
Date Tue, 20 Feb 2018 10:36:43 GMT
Hi,

I'm afraid the BucketingSink does not work well with S3 because of the eventually-consistent
nature of S3. As you noticed in the code snipped you sent, the sink relies on the fact that
directory listings are accurate, which is not the case with S3.

The Flink community is aware of this problem and it's one of the top priorities for the next
release after Flink 1.5.

Best,
Aljoscha

> On 19. Feb 2018, at 17:39, galantaa <alongalant@gmail.com> wrote:
> 
> Hey,
> I have some kind of a concurrency problem with Bucketing sink when I write
> to S3.
> I use the AvroKeyValueSinkWriter.
> The problem is that when I send events the suppose to be written to the same
> directory, but to a different part file (because of different event types),
> the files override each other.
> The problem occurs only when I sink the files to S3. 
> When I write the files to the local storage it does not happen, but I think
> that only because there's this loop in openNewPartFile:
> 
> // The following loop tries different partCounter values in ascending order
> until it reaches the minimum
> // that is not yet used. This works since there is only one parallel subtask
> that tries names with this
> // subtask id. Otherwise we would run into concurrency issues here. This is
> aligned with the way we now
> // clean the base directory in case of rescaling.
> 
> /int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
> bucketState.partCounter);
> while (fs.exists(partPath) ||
>  fs.exists(getPendingPathFor(partPath)) ||
>  fs.exists(getInProgressPathFor(partPath))) {
> 	bucketState.partCounter++;
> 	partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
> bucketState.partCounter);
> 		}
> /
> that makes sense. But on S3 the files does not exist until checkpointing, so
> the loop won't find the files.
> 
> After debugging, I've noticed that in the invoke method, in
> state.getBucketState() the first time I try to write event to the bucket, it
> creates a new bucketState in the HashMap, but the second time I try to write
> to the same bucket (with the different event), it does find this new
> bucketState.
> 
> Thanks for the help! 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Mime
View raw message