beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From reuven...@apache.org
Subject [beam] branch master updated: Merge pull request #8425: [BEAM-7174] Add schema modification transforms
Date Wed, 15 May 2019 01:17:23 GMT
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c630217  Merge pull request #8425: [BEAM-7174] Add schema modification transforms
c630217 is described below

commit c6302174ea5a60dbc09ae318ea2c5470d4060ef7
Author: reuvenlax <relax@google.com>
AuthorDate: Tue May 14 18:17:04 2019 -0700

    Merge pull request #8425: [BEAM-7174] Add schema modification transforms
---
 .../beam/sdk/schemas/FieldAccessDescriptor.java    |  56 ++-
 .../java/org/apache/beam/sdk/schemas/Schema.java   |   4 +
 .../beam/sdk/schemas/transforms/AddFields.java     | 434 +++++++++++++++++++++
 .../beam/sdk/schemas/transforms/DropFields.java    | 149 +++++++
 .../beam/sdk/schemas/transforms/RenameFields.java  | 187 +++++++++
 .../beam/sdk/schemas/transforms/AddFieldsTest.java | 356 +++++++++++++++++
 .../sdk/schemas/transforms/DropFieldsTest.java     | 161 ++++++++
 .../sdk/schemas/transforms/RenameFieldsTest.java   | 261 +++++++++++++
 8 files changed, 1607 insertions(+), 1 deletion(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
index 060222c..55b9939 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
@@ -48,6 +48,7 @@ import org.apache.beam.sdk.schemas.parser.FieldAccessDescriptorParser;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ArrayListMultimap;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap;
@@ -229,7 +230,7 @@ public abstract class FieldAccessDescriptor implements Serializable {
     return builder().setFieldsAccessed(Lists.newArrayList(fields)).build();
   }
 
-  // Union a set of FieldAccessDescriptors. This function currenty only supports descriptors with
+  // Union a set of FieldAccessDescriptors. This function currently only supports descriptors with
   // containing named fields, not those containing ids.
   private static FieldAccessDescriptor union(
       Iterable<FieldAccessDescriptor> fieldAccessDescriptors) {
@@ -328,6 +329,15 @@ public abstract class FieldAccessDescriptor implements Serializable {
   }
 
   /**
+   * Return the field names accessed. Should not be called until after {@link #resolve} is called.
+   */
+  public Set<String> fieldNamesAccessed() {
+    return getFieldsAccessed().stream()
+        .map(FieldDescriptor::getFieldName)
+        .collect(Collectors.toSet());
+  }
+
+  /**
    * Return the nested fields keyed by field ids. Should not be called until after {@link #resolve}
    * is called.
    */
@@ -337,6 +347,32 @@ public abstract class FieldAccessDescriptor implements Serializable {
   }
 
   /**
+   * Return the nested fields keyed by field name. Should not be called until after {@link #resolve}
+   * is called.
+   */
+  public Map<String, FieldAccessDescriptor> nestedFieldsByName() {
+    return getNestedFieldsAccessed().entrySet().stream()
+        .collect(Collectors.toMap(f -> f.getKey().getFieldName(), f -> f.getValue()));
+  }
+
+  /** Returns true if this descriptor references only a single, non-wildcard field. */
+  public boolean referencesSingleField() {
+    if (getAllFields()) {
+      return false;
+    }
+
+    if (getFieldsAccessed().size() == 1 && getNestedFieldsAccessed().isEmpty()) {
+      return true;
+    }
+
+    if (getFieldsAccessed().isEmpty() && getNestedFieldsAccessed().size() == 1) {
+      return getNestedFieldsAccessed().values().iterator().next().referencesSingleField();
+    }
+
+    return false;
+  }
+
+  /**
    * Resolve the {@link FieldAccessDescriptor} against a schema.
    *
    * <p>Resolve will resolve all of the field names into field ids, validating that all field names
@@ -515,4 +551,22 @@ public abstract class FieldAccessDescriptor implements Serializable {
       }
     }
   }
+
+  @Override
+  public String toString() {
+    if (getAllFields()) {
+      return "*";
+    }
+
+    List<String> singleSelectors =
+        getFieldsAccessed().stream()
+            .map(FieldDescriptor::getFieldName)
+            .collect(Collectors.toList());
+    List<String> nestedSelectors =
+        getNestedFieldsAccessed().entrySet().stream()
+            .map(e -> e.getKey().getFieldName() + "." + e.getValue().toString())
+            .collect(Collectors.toList());
+    ;
+    return String.join(", ", Iterables.concat(singleSelectors, nestedSelectors));
+  }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index 21fd782..d659b25 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -196,6 +196,10 @@ public class Schema implements Serializable {
       return this;
     }
 
+    public int getLastFieldId() {
+      return fields.size() - 1;
+    }
+
     public Schema build() {
       return new Schema(fields);
     }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/AddFields.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/AddFields.java
new file mode 100644
index 0000000..1019d0e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/AddFields.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor.FieldDescriptor.Qualifier;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor.FieldDescriptor.Qualifier.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimaps;
+
+/**
+ * A transform to add new nullable fields to a PCollection's schema. Elements are extended to have
+ * the new schema. By default new fields are nullable, and input rows will be extended to the new
+ * schema by inserting null values. However explicit default values for new fields can be set using
+ * {@link Inner#field(String, Schema.FieldType, Object)}. Nested fields can be added as well.
+ *
+ * <p>Example use:
+ *
+ * <pre>{@code PCollection<Event> events = readEvents();
+ * PCollection<Row> augmentedEvents =
+ *   events.apply(AddFields.<Event>create()
+ *       .field("userId", FieldType.STRING)
+ *       .field("location.zipcode", FieldType.INT32)
+ *       .field("userDetails.isSpecialUser", "FieldType.BOOLEAN", false));
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SCHEMAS)
+public class AddFields {
+  public static <T> Inner<T> create() {
+    return new Inner<>();
+  }
+
+  /** Inner PTransform for AddFields. */
+  public static class Inner<T> extends PTransform<PCollection<T>, PCollection<Row>> {
+    /** Internal object representing a new field added. */
+    @AutoValue
+    abstract static class NewField implements Serializable {
+      abstract String getName();
+
+      abstract FieldAccessDescriptor getDescriptor();
+
+      abstract Schema.FieldType getFieldType();
+
+      @Nullable
+      abstract Object getDefaultValue();
+
+      @AutoValue.Builder
+      abstract static class Builder {
+        abstract Builder setName(String name);
+
+        abstract Builder setDescriptor(FieldAccessDescriptor descriptor);
+
+        abstract Builder setFieldType(Schema.FieldType fieldType);
+
+        abstract Builder setDefaultValue(@Nullable Object defaultValue);
+
+        abstract NewField build();
+      }
+
+      abstract Builder toBuilder();
+
+      static NewField of(
+          FieldAccessDescriptor fieldAccessDescriptor,
+          Schema.FieldType fieldType,
+          Object defaultValue) {
+        return new AutoValue_AddFields_Inner_NewField.Builder()
+            .setName(getName(fieldAccessDescriptor))
+            .setDescriptor(fieldAccessDescriptor)
+            .setFieldType(fieldType)
+            .setDefaultValue(defaultValue)
+            .build();
+      }
+
+      // If this field represents a nested value, pop the FieldAccessDescriptor one level down.
+      NewField descend() {
+        FieldAccessDescriptor descriptor =
+            Iterables.getOnlyElement(getDescriptor().getNestedFieldsAccessed().values());
+        return toBuilder().setDescriptor(descriptor).setName(getName(descriptor)).build();
+      }
+
+      static String getName(FieldAccessDescriptor descriptor) {
+        if (!descriptor.getFieldsAccessed().isEmpty()) {
+          return Iterables.getOnlyElement(descriptor.fieldNamesAccessed());
+        } else {
+          return Iterables.getOnlyElement(descriptor.nestedFieldsByName().keySet());
+        }
+      }
+
+      FieldAccessDescriptor.FieldDescriptor getFieldDescriptor() {
+        if (!getDescriptor().getFieldsAccessed().isEmpty()) {
+          return Iterables.getOnlyElement(getDescriptor().getFieldsAccessed());
+        } else {
+          return Iterables.getOnlyElement(getDescriptor().getNestedFieldsAccessed().keySet());
+        }
+      }
+    }
+
+    /** This class encapsulates all data needed to add a a new field to the schema. */
+    @AutoValue
+    abstract static class AddFieldsInformation implements Serializable {
+      // The new output fieldtype after adding the new field.
+      @Nullable
+      abstract Schema.FieldType getOutputFieldType();
+
+      // A list of default values corresponding to this level of the schema.
+      abstract List<Object> getDefaultValues();
+
+      // A list of nested values. This list corresponds to the output schema fields, and is
+      // populated for fields that
+      // have new nested values. For other fields, the list contains a null value.
+      abstract List<AddFieldsInformation> getNestedNewValues();
+
+      @AutoValue.Builder
+      abstract static class Builder {
+        abstract AddFieldsInformation.Builder setOutputFieldType(Schema.FieldType outputFieldType);
+
+        abstract AddFieldsInformation.Builder setDefaultValues(List<Object> defaultValues);
+
+        abstract AddFieldsInformation.Builder setNestedNewValues(
+            List<AddFieldsInformation> nestedNewValues);
+
+        abstract AddFieldsInformation build();
+      }
+
+      abstract AddFieldsInformation.Builder toBuilder();
+
+      static AddFieldsInformation of(
+          Schema.FieldType outputFieldType,
+          List<Object> defaultValues,
+          List<AddFieldsInformation> nestedNewValues) {
+        return new AutoValue_AddFields_Inner_AddFieldsInformation.Builder()
+            .setOutputFieldType(outputFieldType)
+            .setDefaultValues(defaultValues)
+            .setNestedNewValues(nestedNewValues)
+            .build();
+      }
+    }
+
+    private final List<NewField> newFields;
+
+    private Inner() {
+      this.newFields = Collections.emptyList();
+    }
+
+    private Inner(List<NewField> newFields) {
+      this.newFields = newFields;
+    }
+
+    /**
+     * Add a new field of the specified type. The new field will be nullable and will be filled in
+     * with null values.
+     */
+    public Inner<T> field(String fieldName, Schema.FieldType fieldType) {
+      return field(fieldName, fieldType.withNullable(true), null);
+    }
+
+    /**
+     * Add a new field of the specified type. The new field will be filled in with the specified
+     * value.
+     */
+    public Inner<T> field(String fieldName, Schema.FieldType fieldType, Object defaultValue) {
+      if (defaultValue == null) {
+        checkArgument(fieldType.getNullable());
+      }
+
+      FieldAccessDescriptor descriptor = FieldAccessDescriptor.withFieldNames(fieldName);
+      checkArgument(descriptor.referencesSingleField());
+      List<NewField> fields =
+          ImmutableList.<NewField>builder()
+              .addAll(newFields)
+              .add(NewField.of(descriptor, fieldType, defaultValue))
+              .build();
+      return new Inner<>(fields);
+    }
+
+    private static AddFieldsInformation getAddFieldsInformation(
+        Schema inputSchema, Collection<NewField> fieldsToAdd) {
+      List<NewField> newTopLevelFields =
+          fieldsToAdd.stream()
+              .filter(n -> !n.getDescriptor().getFieldsAccessed().isEmpty())
+              .collect(Collectors.toList());
+      List<NewField> newNestedFields =
+          fieldsToAdd.stream()
+              .filter(n -> !n.getDescriptor().getNestedFieldsAccessed().isEmpty())
+              .collect(Collectors.toList());
+      // Group all nested fields together by the field at the current level. For example, if adding
+      // a.b, a.c, a.d
+      // this map will contain a -> {a.b, a.c, a.d}.
+      Multimap<String, NewField> newNestedFieldsMap =
+          Multimaps.index(newNestedFields, NewField::getName);
+
+      Map<Integer, AddFieldsInformation> resolvedNestedNewValues = Maps.newHashMap();
+      Schema.Builder builder = Schema.builder();
+      for (int i = 0; i < inputSchema.getFieldCount(); ++i) {
+        Schema.Field field = inputSchema.getField(i);
+        Collection<NewField> nestedFields = newNestedFieldsMap.get(field.getName());
+
+        // If this field is a nested field and new subfields are added further down the tree, add
+        // those subfields before
+        // adding to the current schema. Otherwise we just add this field as is to the new schema.
+        if (!nestedFields.isEmpty()) {
+          nestedFields = nestedFields.stream().map(NewField::descend).collect(Collectors.toList());
+
+          AddFieldsInformation nestedInformation =
+              getAddFieldsInformation(field.getType(), nestedFields);
+          field = field.withType(nestedInformation.getOutputFieldType());
+          resolvedNestedNewValues.put(i, nestedInformation);
+        }
+        builder.addField(field);
+      }
+
+      // Add any new fields at this level.
+      List<Object> newValuesThisLevel = new ArrayList<>(newTopLevelFields.size());
+      for (NewField newField : newTopLevelFields) {
+        builder.addField(newField.getName(), newField.getFieldType());
+        newValuesThisLevel.add(newField.getDefaultValue());
+      }
+
+      // If there are any nested field additions left that are not already processed, that means
+      // that the root of the
+      // nested field doesn't exist in the schema. In this case we'll walk down the new nested
+      // fields and recursively create each nested level as necessary.
+      for (Map.Entry<String, Collection<NewField>> newNested :
+          newNestedFieldsMap.asMap().entrySet()) {
+        String fieldName = newNested.getKey();
+
+        // If the user specifies the same nested field twice in different ways (e.g. a[].x, a{}.x)
+        FieldAccessDescriptor.FieldDescriptor fieldDescriptor =
+            Iterables.getOnlyElement(
+                newNested.getValue().stream()
+                    .map(NewField::getFieldDescriptor)
+                    .distinct()
+                    .collect(Collectors.toList()));
+        FieldType fieldType = Schema.FieldType.row(Schema.of()).withNullable(true);
+        for (Qualifier qualifier : fieldDescriptor.getQualifiers()) {
+          // The problem with adding recursive map fields is that we don't know what the map key
+          // type should be.
+          // In a field descriptor of the form mapField{}.subField, the subField is assumed to be in
+          // the map value.
+          // Since in this code path the mapField field does not already exist this means we need to
+          // create the new
+          // map field, and we have no way of knowing what type the key should be.
+          // Alternatives would be to always create a default key type (e.g. FieldType.STRING) or
+          // extend our selector
+          // syntax to allow specifying key types.
+          checkArgument(!qualifier.getKind().equals(Kind.MAP), "Map qualifiers not supported here");
+          fieldType = FieldType.array(fieldType).withNullable(true);
+        }
+        if (!inputSchema.hasField(fieldName)) {
+          // This is a brand-new nested field with no matching field in the input schema. We will
+          // recursively create a nested schema to match it.
+          Collection<NewField> nestedNewFields =
+              newNested.getValue().stream().map(NewField::descend).collect(Collectors.toList());
+          AddFieldsInformation addFieldsInformation =
+              getAddFieldsInformation(fieldType, nestedNewFields);
+          builder.addField(fieldName, addFieldsInformation.getOutputFieldType());
+          resolvedNestedNewValues.put(builder.getLastFieldId(), addFieldsInformation);
+        }
+      }
+      Schema schema = builder.build();
+
+      List<AddFieldsInformation> nestedNewValueList =
+          new ArrayList<>(Collections.nCopies(schema.getFieldCount(), null));
+      for (Map.Entry<Integer, AddFieldsInformation> entry : resolvedNestedNewValues.entrySet()) {
+        nestedNewValueList.set(entry.getKey(), entry.getValue());
+      }
+      return AddFieldsInformation.of(
+          Schema.FieldType.row(schema), newValuesThisLevel, nestedNewValueList);
+    }
+
+    private static AddFieldsInformation getAddFieldsInformation(
+        Schema.FieldType inputFieldType, Collection<NewField> nestedFields) {
+      AddFieldsInformation addFieldsInformation;
+      Schema.FieldType fieldType;
+      switch (inputFieldType.getTypeName()) {
+        case ROW:
+          addFieldsInformation =
+              getAddFieldsInformation(inputFieldType.getRowSchema(), nestedFields);
+          fieldType = addFieldsInformation.getOutputFieldType();
+          break;
+
+        case ARRAY:
+          addFieldsInformation =
+              getAddFieldsInformation(inputFieldType.getCollectionElementType(), nestedFields);
+          fieldType = Schema.FieldType.array(addFieldsInformation.getOutputFieldType());
+          break;
+
+        case MAP:
+          addFieldsInformation =
+              getAddFieldsInformation(inputFieldType.getMapValueType(), nestedFields);
+          fieldType =
+              Schema.FieldType.map(
+                  inputFieldType.getMapKeyType(), addFieldsInformation.getOutputFieldType());
+          break;
+
+        default:
+          throw new RuntimeException("Cannot select a subfield of a non-composite type.");
+      }
+      fieldType = fieldType.withNullable(inputFieldType.getNullable());
+      return addFieldsInformation.toBuilder().setOutputFieldType(fieldType).build();
+    }
+
+    private static Row fillNewFields(Row row, AddFieldsInformation addFieldsInformation) {
+      Schema outputSchema = checkNotNull(addFieldsInformation.getOutputFieldType().getRowSchema());
+
+      List<Object> newValues = Lists.newArrayListWithCapacity(outputSchema.getFieldCount());
+      for (int i = 0; i < row.getFieldCount(); ++i) {
+        AddFieldsInformation nested = addFieldsInformation.getNestedNewValues().get(i);
+        if (nested != null) {
+          // New fields were added to nested subfields of this value. Recursively fill them out
+          // before adding to the new row.
+          Object newValue = fillNewFields(row.getValue(i), nested.getOutputFieldType(), nested);
+          newValues.add(newValue);
+        } else {
+          // Nothing changed. Just copy the old value into the new row.
+          newValues.add(row.getValue(i));
+        }
+      }
+      // If there are brand new simple (i.e. have no nested values) fields at this level, then add
+      // the default values for all of them.
+      newValues.addAll(addFieldsInformation.getDefaultValues());
+      // If we are creating new recursive fields, populate new values for them here.
+      for (int i = newValues.size(); i < addFieldsInformation.getNestedNewValues().size(); ++i) {
+        AddFieldsInformation newNestedField = addFieldsInformation.getNestedNewValues().get(i);
+        if (newNestedField != null) {
+          newValues.add(fillNewFields(null, newNestedField.getOutputFieldType(), newNestedField));
+        }
+      }
+
+      return Row.withSchema(outputSchema).attachValues(newValues).build();
+    }
+
+    private static Object fillNewFields(
+        Object original, Schema.FieldType fieldType, AddFieldsInformation addFieldsInformation) {
+      switch (fieldType.getTypeName()) {
+        case ROW:
+          if (original == null) {
+            original = Row.withSchema(fieldType.getRowSchema()).build();
+          }
+          return fillNewFields((Row) original, addFieldsInformation);
+
+        case ARRAY:
+          if (original == null) {
+            return Collections.emptyList();
+          }
+          List<Object> list = (List<Object>) original;
+          List<Object> filledList = new ArrayList<>(list.size());
+          Schema.FieldType elementType = fieldType.getCollectionElementType();
+          AddFieldsInformation elementAddFieldInformation =
+              addFieldsInformation.toBuilder().setOutputFieldType(elementType).build();
+          for (Object element : list) {
+            filledList.add(fillNewFields(element, elementType, elementAddFieldInformation));
+          }
+          return filledList;
+
+        case MAP:
+          if (original == null) {
+            return Collections.emptyMap();
+          }
+          Map<Object, Object> originalMap = (Map<Object, Object>) original;
+          Map<Object, Object> filledMap = Maps.newHashMapWithExpectedSize(originalMap.size());
+          Schema.FieldType mapValueType = fieldType.getMapValueType();
+          AddFieldsInformation mapValueAddFieldInformation =
+              addFieldsInformation.toBuilder().setOutputFieldType(mapValueType).build();
+          for (Map.Entry<Object, Object> entry : originalMap.entrySet()) {
+            filledMap.put(
+                entry.getKey(),
+                fillNewFields(entry.getValue(), mapValueType, mapValueAddFieldInformation));
+          }
+          return filledMap;
+
+        default:
+          throw new RuntimeException("Unexpected field type");
+      }
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollection<T> input) {
+      final AddFieldsInformation addFieldsInformation =
+          getAddFieldsInformation(input.getSchema(), newFields);
+      Schema outputSchema = checkNotNull(addFieldsInformation.getOutputFieldType().getRowSchema());
+
+      return input
+          .apply(
+              ParDo.of(
+                  new DoFn<T, Row>() {
+                    @ProcessElement
+                    public void processElement(@Element Row row, OutputReceiver<Row> o) {
+                      o.output(fillNewFields(row, addFieldsInformation));
+                    }
+                  }))
+          .setRowSchema(outputSchema);
+    }
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/DropFields.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/DropFields.java
new file mode 100644
index 0000000..d1c4d12
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/DropFields.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
+
+/**
+ * A transform to drop fields from a schema.
+ *
+ * <p>This transform acts as the inverse of the {@link Select} transform. A list of fields to drop
+ * is specified, and all fields in the schema that are not specified are selected. For example:
+ *
+ * <pre>{@code @DefaultSchema(JavaFieldSchema.class)
+ * public class UserEvent {
+ *   public String userId;
+ *   public String eventId;
+ *   public int eventType;
+ *   public Location location;
+ * }}</pre>
+ *
+ * <pre>{@code @DefaultSchema(JavaFieldSchema.class)
+ * public class Location {
+ *   public double latitude;
+ *   public double longtitude;
+ * }
+ *
+ * PCollection<UserEvent> events = readUserEvents();
+ * // Drop the location field.
+ * PCollection<Row> noLocation = events.apply(DropFields.fields("location"));
+ * // Drop the latitude field.
+ * PCollection<Row> noLatitude = events.apply(DropFields.fields("location.latitude"));
+ * }</pre>
+ */
+@Experimental(Kind.SCHEMAS)
+public class DropFields {
+  public static <T> Inner<T> fields(String... fields) {
+    return fields(FieldAccessDescriptor.withFieldNames(fields));
+  }
+
+  public static <T> Inner<T> fields(Integer... fieldIds) {
+    return fields(FieldAccessDescriptor.withFieldIds(fieldIds));
+  }
+
+  public static <T> Inner<T> fields(FieldAccessDescriptor fieldsToDrop) {
+    return new Inner<>(fieldsToDrop);
+  }
+
+  /** Implementation class for DropFields. */
+  public static class Inner<T> extends PTransform<PCollection<T>, PCollection<Row>> {
+    private final FieldAccessDescriptor fieldsToDrop;
+
+    private Inner(FieldAccessDescriptor fieldsToDrop) {
+      this.fieldsToDrop = fieldsToDrop;
+    }
+
+    FieldAccessDescriptor complement(Schema inputSchema, FieldAccessDescriptor input) {
+      // Create a FieldAccessDescriptor that select all fields _not_ selected in the input
+      // descriptor. Maintain
+      // the original order of the schema.
+      Set<String> fieldNamesToSelect = Sets.newHashSet();
+      Map<FieldAccessDescriptor.FieldDescriptor, FieldAccessDescriptor> nestedFieldsToSelect =
+          Maps.newHashMap();
+      for (int i = 0; i < inputSchema.getFieldCount(); ++i) {
+        if (input.fieldIdsAccessed().contains(i)) {
+          // This field is selected, so exclude it from the complement.
+          continue;
+        }
+        Field field = inputSchema.getField(i);
+        Map<Integer, FieldAccessDescriptor.FieldDescriptor> nestedFields =
+            input.getNestedFieldsAccessed().keySet().stream()
+                .collect(Collectors.toMap(k -> k.getFieldId(), k -> k));
+
+        FieldAccessDescriptor.FieldDescriptor fieldDescriptor = nestedFields.get(i);
+        if (fieldDescriptor != null) {
+          // Some subfields are selected, so recursively calculate the complementary subfields to
+          // select.
+          FieldType fieldType = inputSchema.getField(i).getType();
+          for (FieldAccessDescriptor.FieldDescriptor.Qualifier qualifier :
+              fieldDescriptor.getQualifiers()) {
+            switch (qualifier.getKind()) {
+              case LIST:
+                fieldType = fieldType.getCollectionElementType();
+                break;
+              case MAP:
+                fieldType = fieldType.getMapValueType();
+                break;
+              default:
+                throw new RuntimeException("Unexpected field descriptor type.");
+            }
+          }
+          checkArgument(fieldType.getTypeName().isCompositeType());
+          FieldAccessDescriptor nestedDescriptor =
+              input.getNestedFieldsAccessed().get(fieldDescriptor);
+          nestedFieldsToSelect.put(
+              fieldDescriptor, complement(fieldType.getRowSchema(), nestedDescriptor));
+        } else {
+          // Neither the field nor the subfield is selected. This means we should select it.
+          fieldNamesToSelect.add(field.getName());
+        }
+      }
+
+      FieldAccessDescriptor fieldAccess = FieldAccessDescriptor.withFieldNames(fieldNamesToSelect);
+      for (Map.Entry<FieldAccessDescriptor.FieldDescriptor, FieldAccessDescriptor> entry :
+          nestedFieldsToSelect.entrySet()) {
+        fieldAccess = fieldAccess.withNestedField(entry.getKey(), entry.getValue());
+      }
+      return fieldAccess.resolve(inputSchema);
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollection<T> input) {
+      Schema inputSchema = input.getSchema();
+      FieldAccessDescriptor selectDescriptor =
+          complement(inputSchema, fieldsToDrop.resolve(inputSchema));
+
+      return input.apply(Select.fieldAccess(selectDescriptor));
+    }
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java
new file mode 100644
index 0000000..07d8499
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap;
+import org.apache.commons.compress.utils.Lists;
+
+/**
+ * A transform for renaming fields inside an existing schema. Top level or nested fields can be
+ * renamed. When renaming a nested field, the nested prefix does not need to be specified again when
+ * specifying the new name.
+ *
+ * <p>Example use:
+ *
+ * <pre>{@code PCollection<Event> events = readEvents();
+ * PCollection<Row> renamedEvents =
+ *   events.apply(RenameFields.<Event>create()
+ *       .rename("userName", "userId")
+ *       .rename("location.country", "countryCode"));
+ * }</pre>
+ */
+@Experimental(Kind.SCHEMAS)
+public class RenameFields {
+  /** Create an instance of this transform. */
+  public static <T> Inner<T> create() {
+    return new Inner<>();
+  }
+
+  // Describes a single renameSchema rule.
+  private static class RenamePair implements Serializable {
+    // The FieldAccessDescriptor describing the field to renameSchema. Must reference a singleton
+    // field.
+    private final FieldAccessDescriptor fieldAccessDescriptor;
+    // The new name for the field.
+    private final String newName;
+
+    RenamePair(FieldAccessDescriptor fieldAccessDescriptor, String newName) {
+      this.fieldAccessDescriptor = fieldAccessDescriptor;
+      this.newName = newName;
+    }
+
+    RenamePair resolve(Schema schema) {
+      FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(schema);
+      if (!resolved.referencesSingleField()) {
+        throw new IllegalArgumentException(resolved + " references multiple fields.");
+      }
+      return new RenamePair(resolved, newName);
+    }
+  }
+
+  private static FieldType renameFieldType(FieldType inputType, Collection<RenamePair> renames) {
+    switch (inputType.getTypeName()) {
+      case ROW:
+        return FieldType.row(renameSchema(inputType.getRowSchema(), renames));
+      case ARRAY:
+        return FieldType.array(renameFieldType(inputType.getCollectionElementType(), renames));
+      case MAP:
+        return FieldType.map(
+            renameFieldType(inputType.getMapKeyType(), renames),
+            renameFieldType(inputType.getMapValueType(), renames));
+      default:
+        return inputType;
+    }
+  }
+
+  // Apply the user-specified renames to the input schema.
+  private static Schema renameSchema(Schema inputSchema, Collection<RenamePair> renames) {
+    // The mapping of renames to apply at this level of the schema.
+    Map<Integer, String> topLevelRenames = Maps.newHashMap();
+    // For nested schemas, collect all applicable renames here.
+    Multimap<Integer, RenamePair> nestedRenames = ArrayListMultimap.create();
+
+    for (RenamePair rename : renames) {
+      FieldAccessDescriptor access = rename.fieldAccessDescriptor;
+      if (!access.fieldIdsAccessed().isEmpty()) {
+        // This references a field at this level of the schema.
+        Integer fieldId = Iterables.getOnlyElement(access.fieldIdsAccessed());
+        topLevelRenames.put(fieldId, rename.newName);
+      } else {
+        // This references a nested field.
+        Map.Entry<Integer, FieldAccessDescriptor> nestedAccess =
+            Iterables.getOnlyElement(access.nestedFieldsById().entrySet());
+        nestedRenames.put(
+            nestedAccess.getKey(), new RenamePair(nestedAccess.getValue(), rename.newName));
+      }
+    }
+
+    Schema.Builder builder = Schema.builder();
+    for (int i = 0; i < inputSchema.getFieldCount(); ++i) {
+      Field field = inputSchema.getField(i);
+      FieldType fieldType = field.getType();
+      String newName = topLevelRenames.getOrDefault(i, field.getName());
+      Collection<RenamePair> nestedFieldRenames = nestedRenames.asMap().get(i);
+      if (nestedFieldRenames != null) {
+        // There are nested field renames. Recursively renameSchema the rest of the schema.
+        builder.addField(newName, renameFieldType(fieldType, nestedFieldRenames));
+      } else {
+        // No renameSchema for this field. Just add it back as is, potentially with a new name.
+        builder.addField(newName, fieldType);
+      }
+    }
+    return builder.build();
+  }
+
+  /** The class implementing the actual PTransform. */
+  public static class Inner<T> extends PTransform<PCollection<T>, PCollection<Row>> {
+    private List<RenamePair> renames;
+
+    private Inner() {
+      renames = Lists.newArrayList();
+    }
+
+    private Inner(List<RenamePair> renames) {
+      this.renames = renames;
+    }
+
+    /** Rename a specific field. */
+    public Inner<T> rename(String field, String newName) {
+      return rename(FieldAccessDescriptor.withFieldNames(field), newName);
+    }
+
+    /** Rename a specific field. */
+    public Inner<T> rename(FieldAccessDescriptor field, String newName) {
+      List<RenamePair> newList =
+          ImmutableList.<RenamePair>builder()
+              .addAll(renames)
+              .add(new RenamePair(field, newName))
+              .build();
+
+      return new Inner<>(newList);
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollection<T> input) {
+      Schema inputSchema = input.getSchema();
+
+      List<RenamePair> pairs =
+          renames.stream().map(r -> r.resolve(inputSchema)).collect(Collectors.toList());
+      final Schema outputSchema = renameSchema(inputSchema, pairs);
+      return input
+          .apply(
+              ParDo.of(
+                  new DoFn<T, Row>() {
+                    @ProcessElement
+                    public void processElement(@Element Row row, OutputReceiver<Row> o) {
+                      o.output(Row.withSchema(outputSchema).attachValues(row.getValues()).build());
+                    }
+                  }))
+          .setRowSchema(outputSchema);
+    }
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/AddFieldsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/AddFieldsTest.java
new file mode 100644
index 0000000..af166e2
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/AddFieldsTest.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import static junit.framework.TestCase.assertEquals;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+/** Tests for {@link AddFields}. */
+public class AddFieldsTest {
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void addSimpleFields() {
+    Schema schema = Schema.builder().addStringField("field1").build();
+    PCollection<Row> added =
+        pipeline
+            .apply(
+                Create.of(Row.withSchema(schema).addValue("value").build()).withRowSchema(schema))
+            .apply(
+                AddFields.<Row>create()
+                    .field("field2", Schema.FieldType.INT32)
+                    .field("field3", Schema.FieldType.array(Schema.FieldType.STRING)));
+
+    Schema expectedSchema =
+        Schema.builder()
+            .addStringField("field1")
+            .addNullableField("field2", Schema.FieldType.INT32)
+            .addNullableField("field3", Schema.FieldType.array(Schema.FieldType.STRING))
+            .build();
+    assertEquals(expectedSchema, added.getSchema());
+    Row expected = Row.withSchema(expectedSchema).addValues("value", null, null).build();
+    PAssert.that(added).containsInAnyOrder(expected);
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void addSimpleFieldsDefaultValue() {
+    Schema schema = Schema.builder().addStringField("field1").build();
+    PCollection<Row> added =
+        pipeline
+            .apply(
+                Create.of(Row.withSchema(schema).addValue("value").build()).withRowSchema(schema))
+            .apply(AddFields.<Row>create().field("field2", Schema.FieldType.INT32, 42));
+    Schema expectedSchema =
+        Schema.builder()
+            .addStringField("field1")
+            .addField("field2", Schema.FieldType.INT32)
+            .build();
+    assertEquals(expectedSchema, added.getSchema());
+    Row expected = Row.withSchema(expectedSchema).addValues("value", 42).build();
+    PAssert.that(added).containsInAnyOrder(expected);
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void addNestedField() {
+    Schema nested = Schema.builder().addStringField("field1").build();
+    Schema schema = Schema.builder().addRowField("nested", nested).build();
+
+    Row subRow = Row.withSchema(nested).addValue("value").build();
+    Row row = Row.withSchema(schema).addValue(subRow).build();
+    PCollection<Row> added =
+        pipeline
+            .apply(Create.of(row).withRowSchema(schema))
+            .apply(
+                AddFields.<Row>create()
+                    .field("nested.field2", Schema.FieldType.INT32)
+                    .field("nested.field3", Schema.FieldType.array(Schema.FieldType.STRING)));
+
+    Schema expectedNestedSchema =
+        Schema.builder()
+            .addStringField("field1")
+            .addNullableField("field2", Schema.FieldType.INT32)
+            .addNullableField("field3", Schema.FieldType.array(Schema.FieldType.STRING))
+            .build();
+    Schema expectedSchema = Schema.builder().addRowField("nested", expectedNestedSchema).build();
+    assertEquals(expectedSchema, added.getSchema());
+
+    Row expectedNested =
+        Row.withSchema(expectedNestedSchema).addValues("value", null, null).build();
+    Row expected = Row.withSchema(expectedSchema).addValue(expectedNested).build();
+
+    PAssert.that(added).containsInAnyOrder(expected);
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void addNestedFieldDefaultValue() {
+    Schema nested = Schema.builder().addStringField("field1").build();
+    Schema schema = Schema.builder().addRowField("nested", nested).build();
+
+    Row subRow = Row.withSchema(nested).addValue("value").build();
+    Row row = Row.withSchema(schema).addValue(subRow).build();
+    List<String> list = ImmutableList.of("one", "two", "three");
+    PCollection<Row> added =
+        pipeline
+            .apply(Create.of(row).withRowSchema(schema))
+            .apply(
+                AddFields.<Row>create()
+                    .field("nested.field2", Schema.FieldType.INT32, 42)
+                    .field("nested.field3", Schema.FieldType.array(Schema.FieldType.STRING), list));
+
+    Schema expectedNestedSchema =
+        Schema.builder()
+            .addStringField("field1")
+            .addField("field2", Schema.FieldType.INT32)
+            .addField("field3", Schema.FieldType.array(Schema.FieldType.STRING))
+            .build();
+    Schema expectedSchema = Schema.builder().addRowField("nested", expectedNestedSchema).build();
+    assertEquals(expectedSchema, added.getSchema());
+    Row expectedNested = Row.withSchema(expectedNestedSchema).addValues("value", 42, list).build();
+    Row expected = Row.withSchema(expectedSchema).addValue(expectedNested).build();
+
+    PAssert.that(added).containsInAnyOrder(expected);
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void addSimpleAndNestedField() {
+    Schema nested = Schema.builder().addStringField("field1").build();
+    Schema schema = Schema.builder().addRowField("nested", nested).build();
+
+    Row subRow = Row.withSchema(nested).addValue("value").build();
+    Row row = Row.withSchema(schema).addValue(subRow).build();
+    PCollection<Row> added =
+        pipeline
+            .apply(Create.of(row).withRowSchema(schema))
+            .apply(
+                AddFields.<Row>create()
+                    .field("field2", Schema.FieldType.INT32)
+                    .field("nested.field2", Schema.FieldType.INT32)
+                    .field("nested.field3", Schema.FieldType.array(Schema.FieldType.STRING)));
+
+    Schema expectedNestedSchema =
+        Schema.builder()
+            .addStringField("field1")
+            .addNullableField("field2", Schema.FieldType.INT32)
+            .addNullableField("field3", Schema.FieldType.array(Schema.FieldType.STRING))
+            .build();
+    Schema expectedSchema =
+        Schema.builder()
+            .addRowField("nested", expectedNestedSchema)
+            .addNullableField("field2", Schema.FieldType.INT32)
+            .build();
+    assertEquals(expectedSchema, added.getSchema());
+
+    Row expectedNested =
+        Row.withSchema(expectedNestedSchema).addValues("value", null, null).build();
+    Row expected = Row.withSchema(expectedSchema).addValues(expectedNested, null).build();
+
+    PAssert.that(added).containsInAnyOrder(expected);
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void recursivelyAddNestedFields() {
+    Schema schema = Schema.of();
+
+    Row row = Row.withSchema(schema).build();
+    PCollection<Row> added =
+        pipeline
+            .apply(Create.of(row).withRowSchema(schema))
+            .apply(
+                AddFields.<Row>create()
+                    .field("nested.field1", Schema.FieldType.STRING, "value")
+                    .field("nested.field2", Schema.FieldType.INT32)
+                    .field("nested.field3", Schema.FieldType.array(Schema.FieldType.STRING)));
+
+    Schema expectedNestedSchema =
+        Schema.builder()
+            .addStringField("field1")
+            .addNullableField("field2", Schema.FieldType.INT32)
+            .addNullableField("field3", Schema.FieldType.array(Schema.FieldType.STRING))
+            .build();
+    Schema expectedSchema =
+        Schema.builder()
+            .addNullableField("nested", Schema.FieldType.row(expectedNestedSchema))
+            .build();
+    assertEquals(expectedSchema, added.getSchema());
+
+    Row expectedNested =
+        Row.withSchema(expectedNestedSchema).addValues("value", null, null).build();
+    Row expected = Row.withSchema(expectedSchema).addValue(expectedNested).build();
+
+    PAssert.that(added).containsInAnyOrder(expected);
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void addNestedArrayField() {
+    Schema nested = Schema.builder().addStringField("field1").build();
+    Schema schema = Schema.builder().addArrayField("array", Schema.FieldType.row(nested)).build();
+
+    Row subRow = Row.withSchema(nested).addValue("value").build();
+    Row row = Row.withSchema(schema).addArray(subRow, subRow).build();
+    PCollection<Row> added =
+        pipeline
+            .apply(Create.of(row).withRowSchema(schema))
+            .apply(
+                AddFields.<Row>create()
+                    .field("array.field2", Schema.FieldType.INT32)
+                    .field("array.field3", Schema.FieldType.array(Schema.FieldType.STRING)));
+
+    Schema expectedNestedSchema =
+        Schema.builder()
+            .addStringField("field1")
+            .addNullableField("field2", Schema.FieldType.INT32)
+            .addNullableField("field3", Schema.FieldType.array(Schema.FieldType.STRING))
+            .build();
+    Schema expectedSchema =
+        Schema.builder().addArrayField("array", Schema.FieldType.row(expectedNestedSchema)).build();
+    assertEquals(expectedSchema, added.getSchema());
+
+    Row expectedNested =
+        Row.withSchema(expectedNestedSchema).addValues("value", null, null).build();
+    Row expected = Row.withSchema(expectedSchema).addArray(expectedNested, expectedNested).build();
+
+    PAssert.that(added).containsInAnyOrder(expected);
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void recursivelyAddNestedArrayField() {
+    Schema schema = Schema.builder().build();
+    Row row = Row.withSchema(schema).build();
+    PCollection<Row> added =
+        pipeline
+            .apply(Create.of(row).withRowSchema(schema))
+            .apply(
+                AddFields.<Row>create()
+                    .field("array[].field1", FieldType.STRING)
+                    .field("array[].field2", Schema.FieldType.INT32)
+                    .field("array[].field3", Schema.FieldType.array(Schema.FieldType.STRING)));
+
+    Schema expectedNestedSchema =
+        Schema.builder()
+            .addNullableField("field1", FieldType.STRING)
+            .addNullableField("field2", Schema.FieldType.INT32)
+            .addNullableField("field3", Schema.FieldType.array(Schema.FieldType.STRING))
+            .build();
+    Schema expectedSchema =
+        Schema.builder()
+            .addNullableField(
+                "array",
+                Schema.FieldType.array(
+                    Schema.FieldType.row(expectedNestedSchema).withNullable(true)))
+            .build();
+    assertEquals(expectedSchema, added.getSchema());
+
+    Row expected = Row.withSchema(expectedSchema).addValue(Collections.emptyList()).build();
+    PAssert.that(added).containsInAnyOrder(expected);
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void addNestedMapField() {
+    Schema nested = Schema.builder().addStringField("field1").build();
+    Schema schema =
+        Schema.builder()
+            .addMapField("map", Schema.FieldType.STRING, Schema.FieldType.row(nested))
+            .build();
+
+    Row subRow = Row.withSchema(nested).addValue("value").build();
+    Row row = Row.withSchema(schema).addValue(ImmutableMap.of("key", subRow)).build();
+    PCollection<Row> added =
+        pipeline
+            .apply(Create.of(row).withRowSchema(schema))
+            .apply(
+                AddFields.<Row>create()
+                    .field("map.field2", Schema.FieldType.INT32)
+                    .field("map.field3", Schema.FieldType.array(Schema.FieldType.STRING)));
+
+    Schema expectedNestedSchema =
+        Schema.builder()
+            .addStringField("field1")
+            .addNullableField("field2", Schema.FieldType.INT32)
+            .addNullableField("field3", Schema.FieldType.array(Schema.FieldType.STRING))
+            .build();
+    Schema expectedSchema =
+        Schema.builder()
+            .addMapField("map", Schema.FieldType.STRING, Schema.FieldType.row(expectedNestedSchema))
+            .build();
+    assertEquals(expectedSchema, added.getSchema());
+
+    Row expectedNested =
+        Row.withSchema(expectedNestedSchema).addValues("value", null, null).build();
+    Row expected =
+        Row.withSchema(expectedSchema).addValue(ImmutableMap.of("key", expectedNested)).build();
+
+    PAssert.that(added).containsInAnyOrder(expected);
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void addDuplicateField() {
+    Schema schema = Schema.builder().addStringField("field1").build();
+    thrown.expect(IllegalArgumentException.class);
+    pipeline
+        .apply(Create.of(Row.withSchema(schema).addValue("value").build()).withRowSchema(schema))
+        .apply(AddFields.<Row>create().field("field1", Schema.FieldType.INT32));
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void addNonNullableField() {
+    Schema schema = Schema.builder().addStringField("field1").build();
+    thrown.expect(IllegalArgumentException.class);
+    pipeline
+        .apply(Create.of(Row.withSchema(schema).addValue("value").build()).withRowSchema(schema))
+        .apply(AddFields.<Row>create().field("field2", Schema.FieldType.INT32, null));
+    pipeline.run();
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/DropFieldsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/DropFieldsTest.java
new file mode 100644
index 0000000..52c67a4
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/DropFieldsTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/** Tests for {@link DropFields}. */
+public class DropFieldsTest {
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  private static final Schema SIMPLE_SCHEMA =
+      Schema.builder().addInt32Field("field1").addStringField("field2").build();
+
+  private static Row simpleRow(int field1, String field2) {
+    return Row.withSchema(SIMPLE_SCHEMA).addValues(field1, field2).build();
+  }
+
+  private static final Schema NESTED_SCHEMA =
+      Schema.builder().addRowField("nested", SIMPLE_SCHEMA).addStringField("string").build();
+
+  private static Row nestedRow(Row nested) {
+    return Row.withSchema(NESTED_SCHEMA).addValues(nested, "foo").build();
+  }
+
+  private static final Schema NESTED_ARRAY_SCHEMA =
+      Schema.builder().addArrayField("array", FieldType.row(SIMPLE_SCHEMA)).build();
+
+  private static Row nestedArray(Row... elements) {
+    return Row.withSchema(NESTED_ARRAY_SCHEMA).addArray((Object[]) elements).build();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDropTopLevelField() {
+    Schema expectedSchema = Schema.builder().addStringField("field2").build();
+
+    PCollection<Row> result =
+        pipeline
+            .apply(
+                Create.of(simpleRow(1, "one"), simpleRow(2, "two"), simpleRow(3, "three"))
+                    .withRowSchema(SIMPLE_SCHEMA))
+            .apply(DropFields.fields("field1"));
+    assertEquals(expectedSchema, result.getSchema());
+
+    List<Row> expectedRows =
+        Lists.newArrayList(
+            Row.withSchema(expectedSchema).addValue("one").build(),
+            Row.withSchema(expectedSchema).addValue("two").build(),
+            Row.withSchema(expectedSchema).addValue("three").build());
+    PAssert.that(result).containsInAnyOrder(expectedRows);
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDropNestedField() {
+    Schema expectedSchema =
+        Schema.builder().addStringField("string").addStringField("field2").build();
+
+    PCollection<Row> result =
+        pipeline
+            .apply(
+                Create.of(
+                        nestedRow(simpleRow(1, "one")),
+                        nestedRow(simpleRow(2, "two")),
+                        nestedRow(simpleRow(3, "three")))
+                    .withRowSchema(NESTED_SCHEMA))
+            .apply(DropFields.fields("nested.field1"));
+    assertEquals(expectedSchema, result.getSchema());
+
+    List<Row> expectedRows =
+        Lists.newArrayList(
+            Row.withSchema(expectedSchema).addValues("foo", "one").build(),
+            Row.withSchema(expectedSchema).addValues("foo", "two").build(),
+            Row.withSchema(expectedSchema).addValues("foo", "three").build());
+
+    PAssert.that(result).containsInAnyOrder(expectedRows);
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDropNestedFieldKeepingOnlyNested() {
+    Schema expectedSchema = Schema.builder().addStringField("field2").build();
+
+    PCollection<Row> result =
+        pipeline
+            .apply(
+                Create.of(
+                        nestedRow(simpleRow(1, "one")),
+                        nestedRow(simpleRow(2, "two")),
+                        nestedRow(simpleRow(3, "three")))
+                    .withRowSchema(NESTED_SCHEMA))
+            .apply(DropFields.fields("string", "nested.field1"));
+    assertEquals(expectedSchema, result.getSchema());
+
+    List<Row> expectedRows =
+        Lists.newArrayList(
+            Row.withSchema(expectedSchema).addValue("one").build(),
+            Row.withSchema(expectedSchema).addValue("two").build(),
+            Row.withSchema(expectedSchema).addValue("three").build());
+    PAssert.that(result).containsInAnyOrder(expectedRows);
+    pipeline.run();
+  }
+
+  // drop making sure a nested field remains.
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDropNestedArrayField() {
+    Schema expectedSchema = Schema.builder().addArrayField("field2", FieldType.STRING).build();
+
+    PCollection<Row> result =
+        pipeline
+            .apply(
+                Create.of(
+                        nestedArray(simpleRow(1, "one1"), simpleRow(1, "one2")),
+                        nestedArray(simpleRow(2, "two1"), simpleRow(2, "two2")),
+                        nestedArray(simpleRow(3, "three1"), simpleRow(3, "three2")))
+                    .withRowSchema(NESTED_ARRAY_SCHEMA))
+            .apply(DropFields.fields("array[].field1"));
+    assertEquals(expectedSchema, result.getSchema());
+
+    List<Row> expectedRows =
+        Lists.newArrayList(
+            Row.withSchema(expectedSchema).addArray("one1", "one2").build(),
+            Row.withSchema(expectedSchema).addArray("two1", "two2").build(),
+            Row.withSchema(expectedSchema).addArray("three1", "three2").build());
+    PAssert.that(result).containsInAnyOrder(expectedRows);
+    pipeline.run();
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
new file mode 100644
index 0000000..b940e78
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import static junit.framework.TestCase.assertEquals;
+
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/** Tests for {@link RenameFields}. */
+public class RenameFieldsTest {
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void renameTopLevelFields() {
+    Schema schema = Schema.builder().addStringField("field1").addInt32Field("field2").build();
+    PCollection<Row> renamed =
+        pipeline
+            .apply(
+                Create.of(
+                        Row.withSchema(schema).addValues("one", 1).build(),
+                        Row.withSchema(schema).addValues("two", 2).build())
+                    .withRowSchema(schema))
+            .apply(RenameFields.<Row>create().rename("field1", "new1").rename("field2", "new2"));
+    Schema expectedSchema = Schema.builder().addStringField("new1").addInt32Field("new2").build();
+    assertEquals(expectedSchema, renamed.getSchema());
+    List<Row> expectedRows =
+        ImmutableList.of(
+            Row.withSchema(expectedSchema).addValues("one", 1).build(),
+            Row.withSchema(expectedSchema).addValues("two", 2).build());
+    PAssert.that(renamed).containsInAnyOrder(expectedRows);
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void renameNestedFields() {
+    Schema nestedSchema = Schema.builder().addStringField("field1").addInt32Field("field2").build();
+    Schema schema =
+        Schema.builder().addStringField("field1").addRowField("nested", nestedSchema).build();
+
+    PCollection<Row> renamed =
+        pipeline
+            .apply(
+                Create.of(
+                        Row.withSchema(schema)
+                            .addValues(
+                                "one", Row.withSchema(nestedSchema).addValues("one", 1).build())
+                            .build(),
+                        Row.withSchema(schema)
+                            .addValues(
+                                "two", Row.withSchema(nestedSchema).addValues("two", 1).build())
+                            .build())
+                    .withRowSchema(schema))
+            .apply(
+                RenameFields.<Row>create()
+                    .rename("nested.field1", "new1")
+                    .rename("nested.field2", "new2"));
+
+    Schema expectedNestedSchema =
+        Schema.builder().addStringField("new1").addInt32Field("new2").build();
+    Schema expectedSchema =
+        Schema.builder()
+            .addStringField("field1")
+            .addRowField("nested", expectedNestedSchema)
+            .build();
+    assertEquals(expectedSchema, renamed.getSchema());
+
+    List<Row> expectedRows =
+        ImmutableList.of(
+            Row.withSchema(expectedSchema)
+                .addValues("one", Row.withSchema(expectedNestedSchema).addValues("one", 1).build())
+                .build(),
+            Row.withSchema(expectedSchema)
+                .addValues("two", Row.withSchema(expectedNestedSchema).addValues("two", 1).build())
+                .build());
+
+    PAssert.that(renamed).containsInAnyOrder(expectedRows);
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void renameTopLevelAndNestedFields() {
+    Schema nestedSchema = Schema.builder().addStringField("field1").addInt32Field("field2").build();
+    Schema schema =
+        Schema.builder().addStringField("field1").addRowField("nested", nestedSchema).build();
+
+    PCollection<Row> renamed =
+        pipeline
+            .apply(
+                Create.of(
+                        Row.withSchema(schema)
+                            .addValues(
+                                "one", Row.withSchema(nestedSchema).addValues("one", 1).build())
+                            .build(),
+                        Row.withSchema(schema)
+                            .addValues(
+                                "two", Row.withSchema(nestedSchema).addValues("two", 1).build())
+                            .build())
+                    .withRowSchema(schema))
+            .apply(
+                RenameFields.<Row>create()
+                    .rename("field1", "top1")
+                    .rename("nested", "newnested")
+                    .rename("nested.field1", "new1")
+                    .rename("nested.field2", "new2"));
+
+    Schema expectedNestedSchema =
+        Schema.builder().addStringField("new1").addInt32Field("new2").build();
+    Schema expectedSchema =
+        Schema.builder()
+            .addStringField("top1")
+            .addRowField("newnested", expectedNestedSchema)
+            .build();
+    assertEquals(expectedSchema, renamed.getSchema());
+
+    List<Row> expectedRows =
+        ImmutableList.of(
+            Row.withSchema(expectedSchema)
+                .addValues("one", Row.withSchema(expectedNestedSchema).addValues("one", 1).build())
+                .build(),
+            Row.withSchema(expectedSchema)
+                .addValues("two", Row.withSchema(expectedNestedSchema).addValues("two", 1).build())
+                .build());
+
+    PAssert.that(renamed).containsInAnyOrder(expectedRows);
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void renameNestedInArrayFields() {
+    Schema nestedSchema = Schema.builder().addStringField("field1").addInt32Field("field2").build();
+    Schema schema =
+        Schema.builder().addArrayField("array", Schema.FieldType.row(nestedSchema)).build();
+
+    PCollection<Row> renamed =
+        pipeline
+            .apply(
+                Create.of(
+                        Row.withSchema(schema)
+                            .addValue(
+                                ImmutableList.of(
+                                    Row.withSchema(nestedSchema).addValues("one", 1).build()))
+                            .build(),
+                        Row.withSchema(schema)
+                            .addValue(
+                                ImmutableList.of(
+                                    Row.withSchema(nestedSchema).addValues("two", 1).build()))
+                            .build())
+                    .withRowSchema(schema))
+            .apply(
+                RenameFields.<Row>create()
+                    .rename("array.field1", "new1")
+                    .rename("array.field2", "new2"));
+
+    Schema expectedNestedSchema =
+        Schema.builder().addStringField("new1").addInt32Field("new2").build();
+    Schema expectedSchema =
+        Schema.builder().addArrayField("array", Schema.FieldType.row(expectedNestedSchema)).build();
+    assertEquals(expectedSchema, renamed.getSchema());
+
+    List<Row> expectedRows =
+        ImmutableList.of(
+            Row.withSchema(expectedSchema)
+                .addValue(
+                    ImmutableList.of(
+                        Row.withSchema(expectedNestedSchema).addValues("one", 1).build()))
+                .build(),
+            Row.withSchema(expectedSchema)
+                .addValue(
+                    ImmutableList.of(
+                        Row.withSchema(expectedNestedSchema).addValues("two", 1).build()))
+                .build());
+
+    PAssert.that(renamed).containsInAnyOrder(expectedRows);
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void renameNestedInMapFields() {
+    Schema nestedSchema = Schema.builder().addStringField("field1").addInt32Field("field2").build();
+    Schema schema =
+        Schema.builder()
+            .addMapField("map", Schema.FieldType.STRING, Schema.FieldType.row(nestedSchema))
+            .build();
+
+    PCollection<Row> renamed =
+        pipeline
+            .apply(
+                Create.of(
+                        Row.withSchema(schema)
+                            .addValue(
+                                ImmutableMap.of(
+                                    "k1", Row.withSchema(nestedSchema).addValues("one", 1).build()))
+                            .build(),
+                        Row.withSchema(schema)
+                            .addValue(
+                                ImmutableMap.of(
+                                    "k2", Row.withSchema(nestedSchema).addValues("two", 1).build()))
+                            .build())
+                    .withRowSchema(schema))
+            .apply(
+                RenameFields.<Row>create()
+                    .rename("map.field1", "new1")
+                    .rename("map.field2", "new2"));
+
+    Schema expectedNestedSchema =
+        Schema.builder().addStringField("new1").addInt32Field("new2").build();
+    Schema expectedSchema =
+        Schema.builder()
+            .addMapField("map", Schema.FieldType.STRING, Schema.FieldType.row(expectedNestedSchema))
+            .build();
+    assertEquals(expectedSchema, renamed.getSchema());
+
+    List<Row> expectedRows =
+        ImmutableList.of(
+            Row.withSchema(expectedSchema)
+                .addValue(
+                    ImmutableMap.of(
+                        "k1", Row.withSchema(expectedNestedSchema).addValues("one", 1).build()))
+                .build(),
+            Row.withSchema(expectedSchema)
+                .addValue(
+                    ImmutableMap.of(
+                        "k2", Row.withSchema(expectedNestedSchema).addValues("two", 1).build()))
+                .build());
+
+    PAssert.that(renamed).containsInAnyOrder(expectedRows);
+    pipeline.run();
+  }
+}


Mime
View raw message