beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ieme...@apache.org
Subject [beam] 01/01: --wip-- [skip ci]
Date Tue, 02 Jul 2019 07:46:13 GMT
This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch BEAM-1861-orcio
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ff724e8ff9f9e621f993d051db339747fdada5ac
Author: Ismaël Mejía <iemejia@gmail.com>
AuthorDate: Mon May 27 00:33:38 2019 +0200

    --wip-- [skip ci]
---
 sdks/java/io/orc/build.gradle                      |  37 +++
 .../java/org/apache/beam/sdk/io/orc/OrcIO.java     | 344 +++++++++++++++++++++
 .../org/apache/beam/sdk/io/orc/package-info.java   |  20 ++
 settings.gradle                                    |   1 +
 4 files changed, 402 insertions(+)

diff --git a/sdks/java/io/orc/build.gradle b/sdks/java/io/orc/build.gradle
new file mode 100644
index 0000000..c4b25fc
--- /dev/null
+++ b/sdks/java/io/orc/build.gradle
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature()
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Orc"
+ext.summary = "IO to read and write on Orc storage format."
+
+def orc_version = "1.5.5"
+
+dependencies {
+  shadow project(path: ":sdks:java:core", configuration: "shadow")
+  shadow library.java.vendored_guava_20_0
+  shadow library.java.slf4j_api
+  shadow "org.apache.orc:orc-core:$orc_version"
+  shadow "org.apache.orc:orc-mapreduce:$orc_version"
+  testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
+  testCompile library.java.junit
+  testRuntimeOnly project(path: ":runners:direct-java")
+  testRuntimeOnly library.java.slf4j_jdk14
+}
diff --git a/sdks/java/io/orc/src/main/java/org/apache/beam/sdk/io/orc/OrcIO.java b/sdks/java/io/orc/src/main/java/org/apache/beam/sdk/io/orc/OrcIO.java
new file mode 100644
index 0000000..8239d08
--- /dev/null
+++ b/sdks/java/io/orc/src/main/java/org/apache/beam/sdk/io/orc/OrcIO.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.parquet;
+
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+import org.apache.parquet.io.SeekableInputStream;
+
+/**
+ * Ofc.
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class OrcIO {
+
+  /**
+   * Reads {@link GenericRecord} from a Parquet file (or multiple Parquet files matching
the
+   * pattern).
+   */
+  public static Read read(Schema schema) {
+    return new AutoValue_OrcIO_Read.Builder().setSchema(schema).build();
+  }
+
+  /**
+   * Like {@link #read(Schema)}, but reads each file in a {@link PCollection} of {@link
+   * org.apache.beam.sdk.io.FileIO.ReadableFile}, which allows more flexible usage.
+   */
+  public static ReadFiles readFiles(Schema schema) {
+    return new AutoValue_OrcIO_ReadFiles.Builder().setSchema(schema).build();
+  }
+
+  /** Implementation of {@link #read(Schema)}. */
+  @AutoValue
+  public abstract static class Read extends PTransform<PBegin, PCollection<GenericRecord>>
{
+
+    @Nullable
+    abstract ValueProvider<String> getFilepattern();
+
+    @Nullable
+    abstract Schema getSchema();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setFilepattern(ValueProvider<String> filepattern);
+
+      abstract Builder setSchema(Schema schema);
+
+      abstract Read build();
+    }
+
+    /** Reads from the given filename or filepattern. */
+    public Read from(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    /** Like {@link #from(ValueProvider)}. */
+    public Read from(String filepattern) {
+      return from(ValueProvider.StaticValueProvider.of(filepattern));
+    }
+
+    @Override
+    public PCollection<GenericRecord> expand(PBegin input) {
+      checkNotNull(getFilepattern(), "Filepattern cannot be null.");
+
+      return input
+          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+          .apply(FileIO.matchAll())
+          .apply(FileIO.readMatches())
+          .apply(readFiles(getSchema()));
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(
+          DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"));
+    }
+  }
+
+  /** Implementation of {@link #readFiles(Schema)}. */
+  @AutoValue
+  public abstract static class ReadFiles
+      extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<GenericRecord>>
{
+
+    @Nullable
+    abstract Schema getSchema();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSchema(Schema schema);
+
+      abstract ReadFiles build();
+    }
+
+    @Override
+    public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile>
input) {
+      checkNotNull(getSchema(), "Schema can not be null");
+      return input.apply(ParDo.of(new OrcReadFn())).setCoder(AvroCoder.of(getSchema()));
+    }
+
+    static class OrcReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
+
+      @ProcessElement
+      public void processElement(ProcessContext c) throws Exception {
+        FileIO.ReadableFile file = c.element();
+
+        if (!file.getMetadata().isReadSeekEfficient()) {
+          ResourceId filename = file.getMetadata().resourceId();
+          throw new RuntimeException(String.format("File has to be seekable: %s", filename));
+        }
+
+        SeekableByteChannel seekableByteChannel = file.openSeekable();
+
+        Reader reader1 = OrcFile.createReader(null, null);
+
+        OrcReader x;
+        try (ParquetReader<GenericRecord> reader =
+            AvroParquetReader.<GenericRecord>builder(new BeamParquetInputFile(seekableByteChannel))
+                .build()) {
+          GenericRecord read;
+          while ((read = reader.read()) != null) {
+            c.output(read);
+          }
+        }
+      }
+    }
+
+    private static class BeamParquetInputFile implements InputFile {
+
+      private SeekableByteChannel seekableByteChannel;
+
+      BeamParquetInputFile(SeekableByteChannel seekableByteChannel) {
+        this.seekableByteChannel = seekableByteChannel;
+      }
+
+      @Override
+      public long getLength() throws IOException {
+        return seekableByteChannel.size();
+      }
+
+      @Override
+      public SeekableInputStream newStream() {
+        return new DelegatingSeekableInputStream(Channels.newInputStream(seekableByteChannel))
{
+
+          @Override
+          public long getPos() throws IOException {
+            return seekableByteChannel.position();
+          }
+
+          @Override
+          public void seek(long newPos) throws IOException {
+            seekableByteChannel.position(newPos);
+          }
+        };
+      }
+    }
+  }
+
+  /** Creates a {@link Sink} that, for use with {@link FileIO#write}. */
+  public static Sink sink(Schema schema) {
+    return new AutoValue_ParquetIO_Sink.Builder()
+        .setJsonSchema(schema.toString())
+        .setCompressionCodec(CompressionCodecName.SNAPPY)
+        .build();
+  }
+
+  /** Implementation of {@link #sink}. */
+  @AutoValue
+  public abstract static class Sink implements FileIO.Sink<GenericRecord> {
+
+    @Nullable
+    abstract String getJsonSchema();
+
+    abstract CompressionCodecName getCompressionCodec();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setJsonSchema(String jsonSchema);
+
+      abstract Builder setCompressionCodec(CompressionCodecName compressionCodec);
+
+      abstract Sink build();
+    }
+
+    /** Specifies compression codec. By default, CompressionCodecName.SNAPPY. */
+    public Sink withCompressionCodec(CompressionCodecName compressionCodecName) {
+      return toBuilder().setCompressionCodec(compressionCodecName).build();
+    }
+
+    @Nullable private transient ParquetWriter<GenericRecord> writer;
+
+    @Override
+    public void open(WritableByteChannel channel) throws IOException {
+      checkNotNull(getJsonSchema(), "Schema cannot be null");
+
+      Schema schema = new Schema.Parser().parse(getJsonSchema());
+
+      BeamParquetOutputFile beamParquetOutputFile =
+          new BeamParquetOutputFile(Channels.newOutputStream(channel));
+
+      this.writer =
+          AvroParquetWriter.<GenericRecord>builder(beamParquetOutputFile)
+              .withSchema(schema)
+              .withCompressionCodec(getCompressionCodec())
+              .withWriteMode(OVERWRITE)
+              .build();
+    }
+
+    @Override
+    public void write(GenericRecord element) throws IOException {
+      checkNotNull(writer, "Writer cannot be null");
+      writer.write(element);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      writer.close();
+    }
+
+    private static class BeamParquetOutputFile implements OutputFile {
+
+      private OutputStream outputStream;
+
+      BeamParquetOutputFile(OutputStream outputStream) {
+        this.outputStream = outputStream;
+      }
+
+      @Override
+      public PositionOutputStream create(long blockSizeHint) {
+        return new BeamOutputStream(outputStream);
+      }
+
+      @Override
+      public PositionOutputStream createOrOverwrite(long blockSizeHint) {
+        return new BeamOutputStream(outputStream);
+      }
+
+      @Override
+      public boolean supportsBlockSize() {
+        return false;
+      }
+
+      @Override
+      public long defaultBlockSize() {
+        return 0;
+      }
+    }
+
+    private static class BeamOutputStream extends PositionOutputStream {
+      private long position = 0;
+      private OutputStream outputStream;
+
+      private BeamOutputStream(OutputStream outputStream) {
+        this.outputStream = outputStream;
+      }
+
+      @Override
+      public long getPos() throws IOException {
+        return position;
+      }
+
+      @Override
+      public void write(int b) throws IOException {
+        position++;
+        outputStream.write(b);
+      }
+
+      @Override
+      public void write(byte[] b) throws IOException {
+        write(b, 0, b.length);
+      }
+
+      @Override
+      public void write(byte[] b, int off, int len) throws IOException {
+        outputStream.write(b, off, len);
+        position += len;
+      }
+
+      @Override
+      public void flush() throws IOException {
+        outputStream.flush();
+      }
+
+      @Override
+      public void close() throws IOException {
+        outputStream.close();
+      }
+    }
+  }
+
+  /** Disallow construction of utility class. */
+  private ParquetIO() {}
+}
diff --git a/sdks/java/io/orc/src/main/java/org/apache/beam/sdk/io/orc/package-info.java b/sdks/java/io/orc/src/main/java/org/apache/beam/sdk/io/orc/package-info.java
new file mode 100644
index 0000000..5e86b2a
--- /dev/null
+++ b/sdks/java/io/orc/src/main/java/org/apache/beam/sdk/io/orc/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Transforms for reading and writing from/to Orc files. */
+package org.apache.beam.sdk.io.orc;
diff --git a/settings.gradle b/settings.gradle
index 4220945..9870cc9 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -108,6 +108,7 @@ include ":sdks:java:io:kinesis"
 include ":sdks:java:io:kudu"
 include ":sdks:java:io:mongodb"
 include ":sdks:java:io:mqtt"
+include ":sdks:java:io:orc"
 include ":sdks:java:io:parquet"
 include ":sdks:java:io:rabbitmq"
 include ":sdks:java:io:redis"


Mime
View raw message