flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Manish Bellani <manish.bell...@gmail.com>
Subject BucketingSink and S3RecoverableWriters
Date Thu, 16 May 2019 13:36:41 GMT
Hello,

I'm  using BucketingSink (S3AFilesystem) to write data to s3, from
following some of the issues, my initial understanding was that
*S3RecoverableWriter* code path is wired into BucketingSink path.

Taking a closer look at the code (specifically the initialization part)  it
seems like it would never pick FlinkS3FileSystem, hence, never use the
*S3RecoverableWriter* code path, does that sound correct?

I also don't see any files being written under `/tmp` directory with the
pattern like ".tmp_UUID", which is what RefCountedTmpFileCreator creates
for staging writes to s3 (which is wired in by
org.apache.flink.fs.s3.common.FlinkS3FileSystem):


    public RefCountedFile apply(File file) throws IOException {
       File directory = this.tempDirectories[this.nextIndex()];

        while(true) {
            try {
                if (file == null) {
                    File newFile = new File(directory, ".tmp_" +
UUID.randomUUID());
                    OutputStream out =
Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
                    return RefCountedFile.newFile(newFile, out);
                }

                OutputStream out = Files.newOutputStream(file.toPath(),
StandardOpenOption.APPEND);
                return RefCountedFile.restoredFile(file, out,
file.length());
            } catch (FileAlreadyExistsException var5) {
            }
        }
    }

FWIW I do supply extra configuration via following core-site.xml contents,
but it doesn't appear that FlinkS3FileSystem would be supported there.

<configuration>

    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <property>
        <name>fs.s3a.fast.upload</name>
        <value>true</value>
        <description>
            Use the incremental block upload mechanism with
            the buffering mechanism set in fs.s3a.fast.upload.buffer.
            The number of threads performing uploads in the filesystem is
defined
            by fs.s3a.threads.max; the queue of waiting uploads limited by
            fs.s3a.max.total.tasks.
            The size of each buffer is set by fs.s3a.multipart.size.
        </description>
    </property>

    <property>
        <name>fs.s3a.fast.upload.buffer</name>
        <value>array</value>
        <description>
            The buffering mechanism to use when using S3A fast upload
            (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
            This configuration option has no effect if fs.s3a.fast.upload
is false.

            "disk" will use the directories listed in fs.s3a.buffer.dir as
            the location(s) to save data prior to being uploaded.

            "array" uses arrays in the JVM heap

            "bytebuffer" uses off-heap memory within the JVM.

            Both "array" and "bytebuffer" will consume memory in a single
stream up to the number
            of blocks set by:

            fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.

            If using either of these mechanisms, keep this value low

            The total number of threads performing work across all threads
is set by
            fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting
the number of queued
            work items.
        </description>
    </property>

    <property>
        <name>fs.s3a.multipart.size</name>
        <value>10M</value>
        <description>How big (in bytes) to split upload or copy operations
up into.
            A suffix from the set {K,M,G,T,P} may be used to scale the
numeric value.
        </description>
    </property>

    <property>
        <name>fs.s3a.fast.upload.active.blocks</name>
        <value>8</value>
        <description>
            Maximum Number of blocks a single output stream can have
            active (uploading, or queued to the central FileSystem
            instance's pool of queued operations.

            This stops a single stream overloading the shared thread pool.
        </description>
    </property>

    <property>
      <name>fs.s3a.aws.credentials.provider</name>

<value>com.amazonaws.auth.EnvironmentVariableCredentialsProvider</value>
    </property>

</configuration>

Is S3RecoverableWriter path supported for BucketingSink?

Manish

Mime
View raw message