beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <>
Subject [jira] [Commented] (BEAM-2750) Read whole files as one PCollection element each
Date Fri, 11 Aug 2017 03:13:00 GMT


ASF GitHub Bot commented on BEAM-2750:

GitHub user cphbrt opened a pull request:

    [BEAM-2750][BEAM-2751] Implement WholeFileIO

    Follow this checklist to help us incorporate your contribution quickly and easily:
     - [x] Make sure there is a [JIRA issue](
filed for the change (usually before you start working on it).  Trivial changes like typos
do not require a JIRA issue.  Your pull request should address just this issue, without pulling
in other changes.
     - [x] Each commit in the pull request should have a meaningful subject line and body.
     - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`,
where you replace `BEAM-XXX` with the appropriate JIRA issue.
     - [x] Write a pull request description that is detailed enough to understand what the
pull request does, how, and why.
     - [ ] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will
be performed on your pull request automatically.
     - [ ] If this contribution is large, please file an Apache [Individual Contributor License
    ## Narrative
    WholeFileIO fulfills the requests of [BEAM-2750] and [BEAM-2751] for a way to read and
write individual files as individual elements of a PCollection to and from specific filenames.
    ## Description
    `WholeFileIO.Read` receives a file pattern (glob) of input files. The file pattern is
expanded into a `PCollection` of `ResourceId`s, each pointing to a single file. The bytes
at the file location specified by the `ResourceId`s are read in and attached to their originating
filename in a `KV`.
    `WholeFileIO.Write` receives a `PCollection` of `KV`s containing byte arrays and their
corresponding filenames. The byte arrays are written to the output directory with their corresponding
    ## Example Usage
    This example pipeline will read in files according to a given file glob and write them
to the specified output directory unmodified other than "-copy" appended to their filenames.
If the input file glob specifies files spread through a directory hierarchy, they will still
be written out all into the same flat output directory.
    Example pipeline:
    public class WholeFileIOPipeline {
        public interface FileIOOptions extends PipelineOptions {
            @Description("File glob of the files to read from")
            String getInputFiles();
            void setInputFiles(String value);
            @Description("Path of the directory to write files to")
            String getOutputDir();
            void setOutputDir(String value);
        public static void main(String[] args) {
            final FileIOOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
            Pipeline p = Pipeline.create(options);
            PCollection<KV<String, byte[]>> files = p.apply(
                "Read Bytes and filenames of input files",
            PCollection<KV<String, byte[]>> renamedFiles = files.apply(
                            new DoFn<KV<String, byte[]>, KV<String, byte[]>>()
                                public void processElement(ProcessContext c) {
                                    KV<String, byte[]> file = c.element();
                                    c.output(KV.of(file.getKey() + "-copy", file.getValue()));
                    "Write Bytes to filenames in Output Directory",
    Example command to run example pipeline:
    mvn clean compile exec:java -Dexec.mainClass=com.example.WholeFileIOPipeline \
      -Dexec.args=" \
        --inputFiles=/path/to/input/files/** \
        --outputDir=/path/to/output/directory/ \
        " \
    ## ToDo
    - [ ] Add comments
    - [ ] Add unit tests
    - [ ] Scale test for performance
    - [ ] Find out if `FileSystems.resolve()` will resolve multiple intermediary directories
if a user provides a path that doesn't fully exist yet. (`WholeFileIO -> Write -> expand()
-> ParDo -> ResourceId logic`)
    - [ ] Make sure that the `OutputStream` automatically closes when an `Exception` occurs
in `WholeFileIO -> Write -> expand() -> ParDo -> try/catch`. If not, close it
in a `finally` statement.

You can merge this pull request into a Git repository by running:

    $ git pull master+WholeFileIO

Alternatively you can review and apply these changes as the patch at:

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3717
commit 42f8b991512aa01019013d64e167b5e5782f87bf
Author: Chris Hebert <>
Date:   2017-08-10T19:56:32Z

    [BEAM-2750][BEAM-2751] Implement WholeFileIO


> Read whole files as one PCollection element each
> ------------------------------------------------
>                 Key: BEAM-2750
>                 URL:
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-core
>            Reporter: Christopher Hebert
>            Assignee: Davor Bonaci
> I'd like to read whole files as one element each.
> If my input files are hi.txt, what.txt, and yes.txt, then the whole contents of hi.txt
are an element of the returned PCollection, the whole contents of what.txt are the next element,
etc., giving me a PCollection with three elements.
> This contrasts with TextIO which reads a new element for every line of text in the input
> This read (I'll call it WholeFileIO for now) would work like so:
> {code:java}
> PCollection<KV<String, Byte[]>> fileNamesAndBytes = p.apply("Read","/path/to/input/dir/*"));
> {code}
> The above example passes the raw file contents and the filename.
> Alternatively, we could pass a PCollection of some sort of FileWrapper around an InputStream
to support lazy loading.
> This ticket complements [BEAM-2751].

This message was sent by Atlassian JIRA

View raw message