beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jean-Baptiste Onofré (JIRA) <j...@apache.org>
Subject [jira] [Commented] (BEAM-638) Add sink transform to write bounded data per window, pane, [and key] even when PCollection is unbounded
Date Sun, 07 May 2017 15:33:04 GMT

    [ https://issues.apache.org/jira/browse/BEAM-638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999903#comment-15999903
] 

Jean-Baptiste Onofré commented on BEAM-638:
-------------------------------------------

Agree, it's resolved. This is what I tested:

{code}
                .apply(TextIO.write()
                .to("hdfs://localhost/uc2")
                .withFilenamePolicy(new PerWindowFiles("uc2"))
                .withWindowedWrites()
                .withNumShards(1));
{code}

with: 

{code}
    public static class PerWindowFiles extends FileBasedSink.FilenamePolicy {

        private final String prefix;

        public PerWindowFiles(String prefix) {
            this.prefix = prefix;
        }

        public String filenamePrefixForWindow(IntervalWindow window) {
            return String.format("%s-%s-%s",
                    prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
        }

        @Override
        public ResourceId windowedFilename(
                ResourceId outputDirectory, FileBasedSink.FilenamePolicy.WindowedContext context,
String extension) {
            IntervalWindow window = (IntervalWindow) context.getWindow();
            String filename = String.format(
                    "%s-%s-of-%s%s",
                    filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
                    extension);
            return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
        }

        @Nullable
        @Override
        public ResourceId unwindowedFilename(ResourceId resourceId, Context context, String
s) {
            throw new UnsupportedOperationException("Unsupported.");
        }
    }
{code}

> Add sink transform to write bounded data per window, pane, [and key] even when PCollection
is unbounded
> -------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-638
>                 URL: https://issues.apache.org/jira/browse/BEAM-638
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-core
>            Reporter: Jean-Baptiste Onofré
>            Assignee: Davor Bonaci
>             Fix For: 2.0.0
>
>
> Today, if the pipeline source is unbounded, and the sink expects a bounded collection,
there's no way to use a single pipeline. Even a window creates a chunk on the unbounded PCollection,
but the "sub" PCollection is still unbounded.
> It would be helpful for users to have a Window function that create a bounded PCollection
(on the window) from an unbounded PCollection coming from the source.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message