beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aromane...@apache.org
Subject [beam] branch master updated: [BEAM-6480] Adds AvroIO sink for generic records. (#9005)
Date Tue, 09 Jul 2019 13:00:17 GMT
This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 142b581  [BEAM-6480] Adds AvroIO sink for generic records. (#9005)
142b581 is described below

commit 142b581c2cb91213538bfe6dd51501d918f5ffdd
Author: RyanSkraba <rskraba@talend.com>
AuthorDate: Tue Jul 9 15:00:01 2019 +0200

    [BEAM-6480] Adds AvroIO sink for generic records. (#9005)
---
 .../main/java/org/apache/beam/sdk/io/AvroIO.java   |  23 +++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java    | 159 +++++++++++++++++++--
 2 files changed, 173 insertions(+), 9 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 4706a7d..841ec81 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -33,6 +33,7 @@ import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -1705,6 +1706,28 @@ public class AvroIO {
 
   /**
    * A {@link Sink} for use with {@link FileIO#write} and {@link FileIO#writeDynamic}, writing
+   * elements with a given (common) schema, like {@link #writeGenericRecords(Schema)}.
+   */
+  @Experimental
+  public static <ElementT extends IndexedRecord> Sink<ElementT> sink(Schema schema)
{
+    return sink(schema.toString());
+  }
+
+  /**
+   * A {@link Sink} for use with {@link FileIO#write} and {@link FileIO#writeDynamic}, writing
+   * elements with a given (common) schema, like {@link #writeGenericRecords(String)}.
+   */
+  @Experimental
+  public static <ElementT extends IndexedRecord> Sink<ElementT> sink(String jsonSchema)
{
+    return new AutoValue_AvroIO_Sink.Builder<ElementT>()
+        .setJsonSchema(jsonSchema)
+        .setMetadata(ImmutableMap.of())
+        .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC)
+        .build();
+  }
+
+  /**
+   * A {@link Sink} for use with {@link FileIO#write} and {@link FileIO#writeDynamic}, writing
    * elements by converting each one to a {@link GenericRecord} with a given (common) schema,
like
    * {@link #writeCustomTypeToGenericRecords()}.
    *
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 012d0ae..7e3ea0d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -34,12 +34,15 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -56,6 +59,8 @@ import org.apache.avro.reflect.Nullable;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
@@ -261,7 +266,11 @@ public class AvroIOTest implements Serializable {
 
     private enum WriteMethod {
       AVROIO_WRITE,
-      AVROIO_SINK
+      AVROIO_SINK_WITH_CLASS,
+      AVROIO_SINK_WITH_SCHEMA,
+      /** @deprecated Test code for the deprecated {AvroIO.RecordFormatter}. */
+      @Deprecated
+      AVROIO_SINK_WITH_FORMATTER
     }
 
     private static final String SCHEMA_STRING =
@@ -870,7 +879,7 @@ public class AvroIOTest implements Serializable {
     @Test
     @Category({NeedsRunner.class, UsesTestStream.class})
     public void testWindowedAvroIOWriteViaSink() throws Throwable {
-      testWindowedAvroIOWriteUsingMethod(WriteMethod.AVROIO_SINK);
+      testWindowedAvroIOWriteUsingMethod(WriteMethod.AVROIO_SINK_WITH_CLASS);
     }
 
     void testWindowedAvroIOWriteUsingMethod(WriteMethod method) throws IOException {
@@ -943,7 +952,7 @@ public class AvroIOTest implements Serializable {
             break;
           }
 
-        case AVROIO_SINK:
+        case AVROIO_SINK_WITH_CLASS:
           {
             write =
                 FileIO.<GenericClass>write()
@@ -976,9 +985,11 @@ public class AvroIOTest implements Serializable {
             case AVROIO_WRITE:
               expectedFiles.add(new File(baseAndWindow + "-" + shard + "-of-2-pane-0-last.avro"));
               break;
-            case AVROIO_SINK:
+            case AVROIO_SINK_WITH_CLASS:
               expectedFiles.add(new File(baseAndWindow + "-0000" + shard + "-of-00002.avro"));
               break;
+            default:
+              throw new UnsupportedOperationException("Unknown write method " + method);
           }
         }
       }
@@ -1000,7 +1011,7 @@ public class AvroIOTest implements Serializable {
     private static final String SCHEMA_TEMPLATE_STRING =
         "{\"namespace\": \"example.avro\",\n"
             + " \"type\": \"record\",\n"
-            + " \"name\": \"TestTemplateSchema$$\",\n"
+            + " \"name\": \"$$TestTemplateSchema\",\n"
             + " \"fields\": [\n"
             + "     {\"name\": \"$$full\", \"type\": \"string\"},\n"
             + "     {\"name\": \"$$suffix\", \"type\": [\"string\", \"null\"]}\n"
@@ -1067,6 +1078,50 @@ public class AvroIOTest implements Serializable {
       }
     }
 
+    /**
+     * Example of a {@link Coder} for a collection of Avro records with different schemas.
+     *
+     * <p>All the schemas are known at pipeline construction, and are keyed internally
on the prefix
+     * character (lower byte only for UTF-8 data).
+     */
+    private static class AvroMultiplexCoder extends Coder<GenericRecord> {
+
+      /** Lookup table for the possible schemas, keyed on the prefix character. */
+      private final Map<Character, AvroCoder<GenericRecord>> coderMap = Maps.newHashMap();
+
+      protected AvroMultiplexCoder(Map<String, String> schemaMap) {
+        for (Map.Entry<String, String> entry : schemaMap.entrySet()) {
+          coderMap.put(
+              entry.getKey().charAt(0), AvroCoder.of(new Schema.Parser().parse(entry.getValue())));
+        }
+      }
+
+      @Override
+      public void encode(GenericRecord value, OutputStream outStream) throws IOException
{
+        char prefix = value.getSchema().getName().charAt(0);
+        outStream.write(prefix); // Only reads and writes the low byte.
+        coderMap.get(prefix).encode(value, outStream);
+      }
+
+      @Override
+      public GenericRecord decode(InputStream inStream) throws CoderException, IOException
{
+        char prefix = (char) inStream.read();
+        return coderMap.get(prefix).decode(inStream);
+      }
+
+      @Override
+      public List<? extends Coder<?>> getCoderArguments() {
+        return Collections.emptyList();
+      }
+
+      @Override
+      public void verifyDeterministic() throws NonDeterministicException {
+        for (AvroCoder<GenericRecord> internalCoder : coderMap.values()) {
+          internalCoder.verifyDeterministic();
+        }
+      }
+    }
+
     private void testDynamicDestinationsUnwindowedWithSharding(
         WriteMethod writeMethod, Sharding sharding) throws Exception {
       final ResourceId baseDir =
@@ -1116,7 +1171,69 @@ public class AvroIOTest implements Serializable {
             break;
           }
 
-        case AVROIO_SINK:
+        case AVROIO_SINK_WITH_SCHEMA:
+          {
+            FileIO.Write<String, GenericRecord> write =
+                FileIO.<String, GenericRecord>writeDynamic()
+                    .by(
+                        fn(
+                            (element, c) -> {
+                              c.sideInput(schemaView); // Ignore result
+                              return element.getSchema().getName().substring(0, 1);
+                            },
+                            requiresSideInputs(schemaView)))
+                    .via(
+                        fn(
+                            (dest, c) -> {
+                              Schema schema =
+                                  new Schema.Parser().parse(c.sideInput(schemaView).get(dest));
+                              return AvroIO.sink(schema);
+                            },
+                            requiresSideInputs(schemaView)))
+                    .to(baseDir.toString())
+                    .withNaming(
+                        fn(
+                            (dest, c) -> {
+                              c.sideInput(schemaView); // Ignore result
+                              return FileIO.Write.defaultNaming("file_" + dest, ".avro");
+                            },
+                            requiresSideInputs(schemaView)))
+                    .withTempDirectory(baseDir.toString())
+                    .withDestinationCoder(StringUtf8Coder.of())
+                    .withIgnoreWindowing();
+            switch (sharding) {
+              case RUNNER_DETERMINED:
+                break;
+              case WITHOUT_SHARDING:
+                write = write.withNumShards(1);
+                break;
+              case FIXED_3_SHARDS:
+                write = write.withNumShards(3);
+                break;
+              default:
+                throw new IllegalArgumentException("Unknown sharding " + sharding);
+            }
+
+            MapElements<String, GenericRecord> toRecord =
+                MapElements.via(
+                    new SimpleFunction<String, GenericRecord>() {
+                      @Override
+                      public GenericRecord apply(String element) {
+                        String prefix = element.substring(0, 1);
+                        GenericRecord record =
+                            new GenericData.Record(
+                                new Schema.Parser().parse(schemaFromPrefix(prefix)));
+                        record.put(prefix + "full", element);
+                        record.put(prefix + "suffix", element.substring(1));
+                        return record;
+                      }
+                    });
+
+            input.apply(toRecord).setCoder(new AvroMultiplexCoder(schemaMap)).apply(write);
+            break;
+          }
+
+        case AVROIO_SINK_WITH_FORMATTER:
           {
             final AvroIO.RecordFormatter<String> formatter =
                 (element, schema) -> {
@@ -1170,6 +1287,8 @@ public class AvroIOTest implements Serializable {
             input.apply(write);
             break;
           }
+        default:
+          throw new UnsupportedOperationException("Unknown write method " + writeMethod);
       }
 
       writePipeline.run();
@@ -1230,21 +1349,43 @@ public class AvroIOTest implements Serializable {
     @Category(NeedsRunner.class)
     public void testDynamicDestinationsViaSinkRunnerDeterminedSharding() throws Exception
{
       testDynamicDestinationsUnwindowedWithSharding(
-          WriteMethod.AVROIO_SINK, Sharding.RUNNER_DETERMINED);
+          WriteMethod.AVROIO_SINK_WITH_SCHEMA, Sharding.RUNNER_DETERMINED);
     }
 
     @Test
     @Category(NeedsRunner.class)
     public void testDynamicDestinationsViaSinkWithoutSharding() throws Exception {
       testDynamicDestinationsUnwindowedWithSharding(
-          WriteMethod.AVROIO_SINK, Sharding.WITHOUT_SHARDING);
+          WriteMethod.AVROIO_SINK_WITH_SCHEMA, Sharding.WITHOUT_SHARDING);
     }
 
     @Test
     @Category(NeedsRunner.class)
     public void testDynamicDestinationsViaSinkWithNumShards() throws Exception {
       testDynamicDestinationsUnwindowedWithSharding(
-          WriteMethod.AVROIO_SINK, Sharding.FIXED_3_SHARDS);
+          WriteMethod.AVROIO_SINK_WITH_SCHEMA, Sharding.FIXED_3_SHARDS);
+    }
+
+    @Test
+    @Category(NeedsRunner.class)
+    public void testDynamicDestinationsViaSinkWithFormatterRunnerDeterminedSharding()
+        throws Exception {
+      testDynamicDestinationsUnwindowedWithSharding(
+          WriteMethod.AVROIO_SINK_WITH_FORMATTER, Sharding.RUNNER_DETERMINED);
+    }
+
+    @Test
+    @Category(NeedsRunner.class)
+    public void testDynamicDestinationsViaSinkWithFormatterWithoutSharding() throws Exception
{
+      testDynamicDestinationsUnwindowedWithSharding(
+          WriteMethod.AVROIO_SINK_WITH_FORMATTER, Sharding.WITHOUT_SHARDING);
+    }
+
+    @Test
+    @Category(NeedsRunner.class)
+    public void testDynamicDestinationsViaSinkWithFormatterWithNumShards() throws Exception
{
+      testDynamicDestinationsUnwindowedWithSharding(
+          WriteMethod.AVROIO_SINK_WITH_FORMATTER, Sharding.FIXED_3_SHARDS);
     }
 
     @Test


Mime
View raw message