beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jean-Baptiste Onofré (JIRA) <>
Subject [jira] [Commented] (BEAM-638) Add sink transform to write bounded data per window, pane, [and key] even when PCollection is unbounded
Date Sun, 07 May 2017 15:33:04 GMT


Jean-Baptiste Onofré commented on BEAM-638:

Agree, it's resolved. This is what I tested:

                .withFilenamePolicy(new PerWindowFiles("uc2"))


    public static class PerWindowFiles extends FileBasedSink.FilenamePolicy {

        private final String prefix;

        public PerWindowFiles(String prefix) {
            this.prefix = prefix;

        public String filenamePrefixForWindow(IntervalWindow window) {
            return String.format("%s-%s-%s",
                    prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));

        public ResourceId windowedFilename(
                ResourceId outputDirectory, FileBasedSink.FilenamePolicy.WindowedContext context,
String extension) {
            IntervalWindow window = (IntervalWindow) context.getWindow();
            String filename = String.format(
                    filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
            return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);

        public ResourceId unwindowedFilename(ResourceId resourceId, Context context, String
s) {
            throw new UnsupportedOperationException("Unsupported.");

> Add sink transform to write bounded data per window, pane, [and key] even when PCollection
is unbounded
> -------------------------------------------------------------------------------------------------------
>                 Key: BEAM-638
>                 URL:
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-core
>            Reporter: Jean-Baptiste Onofré
>            Assignee: Davor Bonaci
>             Fix For: 2.0.0
> Today, if the pipeline source is unbounded, and the sink expects a bounded collection,
there's no way to use a single pipeline. Even a window creates a chunk on the unbounded PCollection,
but the "sub" PCollection is still unbounded.
> It would be helpful for users to have a Window function that create a bounded PCollection
(on the window) from an unbounded PCollection coming from the source.

This message was sent by Atlassian JIRA

View raw message