beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] incubator-beam git commit: Add ViewFn and port SDK to use it
Date Mon, 08 Aug 2016 20:51:34 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master d60a0a0e4 -> 5049011a2


Add ViewFn and port SDK to use it

This is a preliminary step towards the architecture at
https://s.apache.org/beam-side-input-1-pager

This separates the ViewFn part of each PCollectionView
class/transform, toward eliminating extraneous public
subclasses of PCollectionView and PTransform.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c376b45c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c376b45c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c376b45c

Branch: refs/heads/master
Commit: c376b45cac8568d7242d29725f4a9a701673df75
Parents: 2b5c6bc
Author: Kenneth Knowles <klk@google.com>
Authored: Wed Jun 22 08:39:33 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Mon Aug 8 12:31:11 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/SideInputContainer.java |   2 +-
 .../runners/direct/ViewEvaluatorFactory.java    |   5 +-
 .../functions/FlinkProcessContext.java          |   2 +-
 .../functions/SideInputInitializer.java         |   2 +-
 .../spark/translation/SparkProcessContext.java  |   2 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |   2 +-
 .../org/apache/beam/sdk/transforms/ViewFn.java  |  45 ++++
 .../beam/sdk/util/DirectSideInputReader.java    |   4 +-
 .../apache/beam/sdk/util/PCollectionViews.java  | 228 +++++++++++++------
 .../apache/beam/sdk/values/PCollectionView.java |  29 ++-
 .../sdk/testing/PCollectionViewTesting.java     |  35 +--
 11 files changed, 262 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
index 7a19ed9..6458215 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
@@ -247,7 +247,7 @@ class SideInputContainer {
       @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> values =
           (Iterable<WindowedValue<?>>) viewContents.getUnchecked(PCollectionViewWindow.of(view,
               window)).get();
-      return view.fromIterableInternal(values);
+      return view.getViewFn().apply(values);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 7a0b0f7..362e903 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -133,8 +133,9 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
   /**
    * An in-process implementation of the {@link CreatePCollectionView} primitive.
    *
-   * This implementation requires the input {@link PCollection} to be an iterable, which
is provided
-   * to {@link PCollectionView#fromIterableInternal(Iterable)}.
+   * This implementation requires the input {@link PCollection} to be an iterable
+   * of {@code WindowedValue<ElemT>}, which is provided
+   * to {@link PCollectionView#getViewFn()} for conversion to {@link ViewT}.
    */
   public static final class WriteView<ElemT, ViewT>
       extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>>
{

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
index 3954d1f..64b93c8 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
@@ -249,7 +249,7 @@ class FlinkProcessContext<InputT, OutputT>
             view.getTagInternal().getId(), new SideInputInitializer<>(view));
     ViewT result = sideInputs.get(sideInputWindow);
     if (result == null) {
-      result = view.fromIterableInternal(Collections.<WindowedValue<?>>emptyList());
+      result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList());
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
index 451b31b..a577b68 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
@@ -67,7 +67,7 @@ public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow>
       Iterable<WindowedValue<?>> elementsIterable =
           (List<WindowedValue<?>>) (List<?>) elements.getValue();
 
-      resultMap.put(elements.getKey(), view.fromIterableInternal(elementsIterable));
+      resultMap.put(elements.getKey(), view.getViewFn().apply(elementsIterable));
     }
 
     return resultMap;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 58ac03c..2f06a1c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -84,7 +84,7 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
     BroadcastHelper<Iterable<WindowedValue<?>>> broadcastHelper =
         (BroadcastHelper<Iterable<WindowedValue<?>>>) mSideInputs.get(view.getTagInternal());
     Iterable<WindowedValue<?>> contents = broadcastHelper.getValue();
-    return view.fromIterableInternal(contents);
+    return view.getViewFn().apply(contents);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 8de1066..e2764eb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -543,7 +543,7 @@ public class DoFnTester<InputT, OutputT> {
           return windowValue;
         }
       }
-      return view.fromIterableInternal(Collections.<WindowedValue<?>>emptyList());
+      return view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java
new file mode 100644
index 0000000..aa3cb0d
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import java.io.Serializable;
+
+/**
+ * A function to adapt a primitive "view" of a {@link PCollection} - some materialization
+ * specified in the Beam model and implemented by the runner - to a user-facing view type
+ * for side input.
+ *
+ * <p>Both the underlying primitive view and the user-facing view are immutable.
+ *
+ * <p>The most common case is using the {@link View} transforms to prepare a {@link
PCollection}
+ * for use as a side input to {@link ParDo}. See {@link View#asSingleton()},
+ * {@link View#asIterable()}, and {@link View#asMap()} for more detail on specific views
+ * available in the SDK.
+ *
+ * @param <PrimitiveViewT> the type of the underlying primitive view, provided by the
runner
+ *        <ViewT> the type of the value(s) accessible via this {@link PCollectionView}
+ */
+public abstract class ViewFn<PrimitiveViewT, ViewT> implements Serializable {
+  /**
+   * A function to adapt a primitive view type to a desired view type.
+   */
+  public abstract ViewT apply(PrimitiveViewT contents);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectSideInputReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectSideInputReader.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectSideInputReader.java
index f44c06e..c8d360c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectSideInputReader.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectSideInputReader.java
@@ -60,9 +60,9 @@ public class DirectSideInputReader implements SideInputReader {
     }
 
     if (view.getWindowingStrategyInternal().getWindowFn() instanceof GlobalWindows) {
-      return view.fromIterableInternal(sideInputValues.get(tag));
+      return view.getViewFn().apply(sideInputValues.get(tag));
     } else {
-      return view.fromIterableInternal(
+      return view.getViewFn().apply(
           Iterables.filter(sideInputValues.get(tag),
               new Predicate<WindowedValue<?>>() {
                   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
index d63fb96..581a98a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.util;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
 import org.apache.beam.sdk.values.KV;
@@ -63,8 +64,14 @@ public class PCollectionViews {
       Pipeline pipeline,
       WindowingStrategy<?, W> windowingStrategy,
       boolean hasDefault,
-      T defaultValue,
+      @Nullable T defaultValue,
       Coder<T> valueCoder) {
+    // TODO: as soon as runners are ported off the indicator classes,
+    // return new SimplePCollectionView<>(
+    //    pipeline,
+    //    new SingletonViewFn<K, V>(hasDefault, defaultValue, valueCoder),
+    //    windowingStrategy,
+    //    valueCoder);
     return new SingletonPCollectionView<>(
         pipeline, windowingStrategy, hasDefault, defaultValue, valueCoder);
   }
@@ -77,6 +84,9 @@ public class PCollectionViews {
       Pipeline pipeline,
       WindowingStrategy<?, W> windowingStrategy,
       Coder<T> valueCoder) {
+    // TODO: as soon as runners are ported off the indicator classes,
+    // return new SimplePCollectionView<>(
+    //    pipeline, new IterableViewFn<T>(), windowingStrategy, valueCoder);
     return new IterablePCollectionView<>(pipeline, windowingStrategy, valueCoder);
   }
 
@@ -88,6 +98,9 @@ public class PCollectionViews {
       Pipeline pipeline,
       WindowingStrategy<?, W> windowingStrategy,
       Coder<T> valueCoder) {
+    // TODO: as soon as runners are ported off the indicator classes,
+    // return new SimplePCollectionView<>(
+    //    pipeline, new ListViewFn<T>(), windowingStrategy, valueCoder);
     return new ListPCollectionView<>(pipeline, windowingStrategy, valueCoder);
   }
 
@@ -99,8 +112,10 @@ public class PCollectionViews {
       Pipeline pipeline,
       WindowingStrategy<?, W> windowingStrategy,
       Coder<KV<K, V>> valueCoder) {
-
-    return new MapPCollectionView<K, V, W>(pipeline, windowingStrategy, valueCoder);
+    // TODO: as soon as runners are ported off the indicator classes,
+    // return new SimplePCollectionView<>(
+    //    pipeline, new MapViewFn<K, V>(), windowingStrategy, valueCoder);
+    return new MapPCollectionView<>(pipeline, windowingStrategy, valueCoder);
   }
 
   /**
@@ -111,27 +126,119 @@ public class PCollectionViews {
       Pipeline pipeline,
       WindowingStrategy<?, W> windowingStrategy,
       Coder<KV<K, V>> valueCoder) {
-    return new MultimapPCollectionView<K, V, W>(pipeline, windowingStrategy, valueCoder);
+    // TODO: as soon as runners are ported off the indicator classes,
+    // return new SimplePCollectionView<>(
+    //    pipeline, new MultimapViewFn<K, V>(), windowingStrategy, valueCoder);
+    return new MultimapPCollectionView<>(pipeline, windowingStrategy, valueCoder);
+  }
+
+  /**
+   * A public indicator class that this view is a singleton view.
+   *
+   * @deprecated Runners should not inspect the {@link PCollectionView} subclass, as it is
an
+   * implementation detail. To specialize a side input, a runner should inspect the
+   * language-independent metadata of the {@link ViewFn}.
+   */
+  @Deprecated
+  public static class SingletonPCollectionView<T, W extends BoundedWindow>
+      extends SimplePCollectionView<T, T, W> {
+    public SingletonPCollectionView(
+        Pipeline pipeline,
+        WindowingStrategy<?, W> windowingStrategy,
+        boolean hasDefault,
+        T defaultValue,
+        Coder<T> valueCoder) {
+      super(
+          pipeline,
+          new SingletonViewFn<>(hasDefault, defaultValue, valueCoder),
+          windowingStrategy,
+          valueCoder);
+    }
+
+    public T getDefaultValue() {
+      return ((SingletonViewFn<T>) viewFn).getDefaultValue();
+    }
+  }
+
+  /**
+   * A public indicator class that this view is an iterable view.
+   *
+   * @deprecated Runners should not inspect the {@link PCollectionView} subclass, as it is
an
+   * implementation detail. To specialize a side input, a runner should inspect the
+   * language-independent metadata of the {@link ViewFn}.
+   */
+  @Deprecated
+  public static class IterablePCollectionView<ElemT, W extends BoundedWindow>
+      extends SimplePCollectionView<ElemT, Iterable<ElemT>, W> {
+    public IterablePCollectionView(
+        Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<ElemT>
valueCoder) {
+      super(pipeline, new IterableViewFn<ElemT>(), windowingStrategy, valueCoder);
+    }
+  }
+
+  /**
+   * A public indicator class that this view is a list view.
+   *
+   * @deprecated Runners should not inspect the {@link PCollectionView} subclass, as it is
an
+   * implementation detail. To specialize a side input, a runner should inspect the
+   * language-independent metadata of the {@link ViewFn}.
+   */
+  @Deprecated
+  public static class ListPCollectionView<ElemT, W extends BoundedWindow>
+      extends SimplePCollectionView<ElemT, List<ElemT>, W> {
+    public ListPCollectionView(
+        Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<ElemT>
valueCoder) {
+      super(pipeline, new ListViewFn<ElemT>(), windowingStrategy, valueCoder);
+    }
+  }
+
+  /**
+   * A public indicator class that this view is a map view.
+   *
+   * @deprecated Runners should not inspect the {@link PCollectionView} subclass, as it is
an
+   * implementation detail. To specialize a side input, a runner should inspect the
+   * language-independent metadata of the {@link ViewFn}.
+   */
+  @Deprecated
+  public static class MapPCollectionView<K, V, W extends BoundedWindow>
+      extends SimplePCollectionView<KV<K, V>, Map<K, V>, W> {
+    public MapPCollectionView(
+        Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<KV<K,
V>> valueCoder) {
+      super(pipeline, new MapViewFn<K, V>(), windowingStrategy, valueCoder);
+    }
   }
 
   /**
+   * A public indicator class that this view is a multimap view.
+   *
+   * @deprecated Runners should not inspect the {@link PCollectionView} subclass, as it is
an
+   * implementation detail. To specialize a side input, a runner should inspect the
+   * language-independent metadata of the {@link ViewFn}.
+   */
+  @Deprecated
+  public static class MultimapPCollectionView<K, V, W extends BoundedWindow>
+      extends SimplePCollectionView<KV<K, V>, Map<K, Iterable<V>>, W>
{
+    public MultimapPCollectionView(
+        Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<KV<K,
V>> valueCoder) {
+      super(pipeline, new MultimapViewFn<K, V>(), windowingStrategy, valueCoder);
+    }
+  }
+
+
+  /**
    * Implementation of conversion of singleton {@code Iterable<WindowedValue<T>>}
to {@code T}.
    *
    * <p>For internal use only.
    *
    * <p>Instantiate via {@link PCollectionViews#singletonView}.
    */
-  public static class SingletonPCollectionView<T, W extends BoundedWindow>
-     extends PCollectionViewBase<T, T, W> {
+  private static class SingletonViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>,
T> {
     @Nullable private byte[] encodedDefaultValue;
     @Nullable private transient T defaultValue;
     @Nullable private Coder<T> valueCoder;
     private boolean hasDefault;
 
-    private SingletonPCollectionView(
-        Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy,
-        boolean hasDefault, T defaultValue, Coder<T> valueCoder) {
-      super(pipeline, windowingStrategy, valueCoder);
+    private SingletonViewFn(boolean hasDefault, T defaultValue, Coder<T> valueCoder)
{
       this.hasDefault = hasDefault;
       this.defaultValue = defaultValue;
       this.valueCoder = valueCoder;
@@ -170,7 +277,7 @@ public class PCollectionViews {
     }
 
     @Override
-    protected T fromElements(Iterable<WindowedValue<T>> contents) {
+    public T apply(Iterable<WindowedValue<T>> contents) {
       try {
         return Iterables.getOnlyElement(contents).getValue();
       } catch (NoSuchElementException exc) {
@@ -178,7 +285,7 @@ public class PCollectionViews {
       } catch (IllegalArgumentException exc) {
         throw new IllegalArgumentException(
             "PCollection with more than one element "
-            + "accessed as a singleton view.");
+                + "accessed as a singleton view.");
       }
     }
   }
@@ -190,15 +297,11 @@ public class PCollectionViews {
    *
    * <p>Instantiate via {@link PCollectionViews#iterableView}.
    */
-  public static class IterablePCollectionView<T, W extends BoundedWindow>
-      extends PCollectionViewBase<T, Iterable<T>, W> {
-    private IterablePCollectionView(
-        Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<T>
valueCoder) {
-      super(pipeline, windowingStrategy, valueCoder);
-    }
+  private static class IterableViewFn<T>
+      extends ViewFn<Iterable<WindowedValue<T>>, Iterable<T>> {
 
     @Override
-    protected Iterable<T> fromElements(Iterable<WindowedValue<T>> contents)
{
+    public Iterable<T> apply(Iterable<WindowedValue<T>> contents) {
       return Iterables.unmodifiableIterable(
           Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
         @SuppressWarnings("unchecked")
@@ -217,15 +320,9 @@ public class PCollectionViews {
    *
    * <p>Instantiate via {@link PCollectionViews#listView}.
    */
-  public static class ListPCollectionView<T, W extends BoundedWindow>
-      extends PCollectionViewBase<T, List<T>, W> {
-    private ListPCollectionView(
-        Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<T>
valueCoder) {
-      super(pipeline, windowingStrategy, valueCoder);
-    }
-
+  private static class ListViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>,
List<T>> {
     @Override
-    protected List<T> fromElements(Iterable<WindowedValue<T>> contents)
{
+    public List<T> apply(Iterable<WindowedValue<T>> contents) {
       return ImmutableList.copyOf(
           Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
             @SuppressWarnings("unchecked")
@@ -240,20 +337,12 @@ public class PCollectionViews {
   /**
    * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>>}
    * to {@code Map<K, Iterable<V>>}.
-   *
-   * <p>For internal use only.
    */
-  public static class MultimapPCollectionView<K, V, W extends BoundedWindow>
-      extends PCollectionViewBase<KV<K, V>, Map<K, Iterable<V>>, W>
{
-    private MultimapPCollectionView(
-        Pipeline pipeline,
-        WindowingStrategy<?, W> windowingStrategy,
-        Coder<KV<K, V>> valueCoder) {
-      super(pipeline, windowingStrategy, valueCoder);
-    }
+  private static class MultimapViewFn<K, V>
+      extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, Iterable<V>>>
{
 
     @Override
-    protected Map<K, Iterable<V>> fromElements(Iterable<WindowedValue<KV<K,
V>>> elements) {
+    public Map<K, Iterable<V>> apply(Iterable<WindowedValue<KV<K, V>>>
elements) {
       Multimap<K, V> multimap = HashMultimap.create();
       for (WindowedValue<KV<K, V>> elem : elements) {
         KV<K, V> kv = elem.getValue();
@@ -267,25 +356,16 @@ public class PCollectionViews {
   }
 
   /**
-   * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>}
with
-   * one value per key to {@code Map<K, V>}.
-   *
-   * <p>For internal use only.
+   * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>}
with one value per key
+   * to {@code Map<K, V>}.
    */
-  public static class MapPCollectionView<K, V, W extends BoundedWindow>
-      extends PCollectionViewBase<KV<K, V>, Map<K, V>, W> {
-    private MapPCollectionView(
-        Pipeline pipeline,
-        WindowingStrategy<?, W> windowingStrategy,
-        Coder<KV<K, V>> valueCoder) {
-      super(pipeline, windowingStrategy, valueCoder);
-    }
-
+  private static class MapViewFn<K, V>
+      extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, V>>
{
     /**
      * Input iterable must actually be {@code Iterable<WindowedValue<KV<K, V>>>}.
      */
     @Override
-    protected Map<K, V> fromElements(Iterable<WindowedValue<KV<K, V>>>
elements) {
+    public Map<K, V> apply(Iterable<WindowedValue<KV<K, V>>> elements)
{
       Map<K, V> map = new HashMap<>();
       for (WindowedValue<KV<K, V>> elem : elements) {
         KV<K, V> kv = elem.getValue();
@@ -302,7 +382,7 @@ public class PCollectionViews {
    * A base class for {@link PCollectionView} implementations, with additional type parameters
    * that are not visible at pipeline assembly time when the view is used as a side input.
    */
-  private abstract static class PCollectionViewBase<ElemT, ViewT, W extends BoundedWindow>
+  private static class SimplePCollectionView<ElemT, ViewT, W extends BoundedWindow>
       extends PValueBase
       implements PCollectionView<ViewT> {
     /** A unique tag for the view, typed according to the elements underlying the view. */
@@ -315,18 +395,23 @@ public class PCollectionViews {
     private Coder<Iterable<WindowedValue<ElemT>>> coder;
 
     /**
-     * Implement this to complete the implementation. It is a conversion function from
-     * all of the elements of the underlying {@link PCollection} to the value of the view.
+     * The typed {@link ViewFn} for this view.
+     *
+     * @deprecated Access to this variable from subclasses is temporary, for migrating away
+     * from language-specific inspections.
      */
-    protected abstract ViewT fromElements(Iterable<WindowedValue<ElemT>> elements);
+    @Deprecated
+    protected ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn;
 
     /**
      * Call this constructor to initialize the fields for which this base class provides
      * boilerplate accessors.
      */
-    protected PCollectionViewBase(
+    // TODO: make private as soon as runners are ported off indicator subclasses
+    protected SimplePCollectionView(
         Pipeline pipeline,
         TupleTag<Iterable<WindowedValue<ElemT>>> tag,
+        ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
         WindowingStrategy<?, W> windowingStrategy,
         Coder<ElemT> valueCoder) {
       super(pipeline);
@@ -335,6 +420,7 @@ public class PCollectionViews {
       }
       this.tag = tag;
       this.windowingStrategy = windowingStrategy;
+      this.viewFn = viewFn;
       this.coder =
           IterableCoder.of(WindowedValue.getFullCoder(
               valueCoder, windowingStrategy.getWindowFn().windowCoder()));
@@ -344,30 +430,42 @@ public class PCollectionViews {
      * Call this constructor to initialize the fields for which this base class provides
      * boilerplate accessors, with an auto-generated tag.
      */
-    protected PCollectionViewBase(
+    // TODO: make private as soon as runners are ported off indicator subclasses
+    protected SimplePCollectionView(
         Pipeline pipeline,
+        ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
         WindowingStrategy<?, W> windowingStrategy,
         Coder<ElemT> valueCoder) {
-      this(pipeline, new TupleTag<Iterable<WindowedValue<ElemT>>>(), windowingStrategy,
valueCoder);
+      this(
+          pipeline,
+          new TupleTag<Iterable<WindowedValue<ElemT>>>(),
+          viewFn,
+          windowingStrategy,
+          valueCoder);
     }
 
     /**
-     * For serialization only. Do not use directly. Subclasses should call from their own
-     * protected no-argument constructor.
+     * For serialization only. Do not use directly.
      */
     @SuppressWarnings("unused")  // used for serialization
-    protected PCollectionViewBase() {
+    protected SimplePCollectionView() {
       super();
     }
 
     @Override
-    public ViewT fromIterableInternal(Iterable<WindowedValue<?>> elements) {
+    public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() {
       // Safe cast: it is required that the rest of the SDK maintain the invariant
       // that a PCollectionView is only provided an iterable for the elements of an
       // appropriately typed PCollection.
       @SuppressWarnings({"rawtypes", "unchecked"})
-      Iterable<WindowedValue<ElemT>> typedElements = (Iterable) elements;
-      return fromElements(typedElements);
+      ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn)
viewFn;
+      return untypedViewFn;
+    }
+
+    @Override
+    @Deprecated
+    public ViewT fromIterableInternal(Iterable<WindowedValue<?>> elements) {
+      return getViewFn().apply(elements);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
index fb3bfab..20f1071 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.values;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 
@@ -44,22 +45,42 @@ import java.io.Serializable;
  */
 public interface PCollectionView<T> extends PValue, Serializable {
   /**
-   * A unique identifier, for internal use.
+   * @deprecated this method will be removed entirely. The {@link PCollection} underlying
a side
+   *     input, is part of the side input's specification with a {@link ParDo} transform,
which will
+   *     obtain that information via a package-private channel.
    */
+  @Deprecated
   public TupleTag<Iterable<WindowedValue<?>>> getTagInternal();
 
   /**
-   * For internal use only.
+   * @deprecated use {@link #getViewFn()} for now, but eventually get the needed information
via the
+   *     side input specification on the {@link ParDo} transform.
    */
+  @Deprecated
   public T fromIterableInternal(Iterable<WindowedValue<?>> contents);
 
   /**
-   * For internal use only.
+   * @deprecated this method will be removed entirely. The {@link ViewFn} for a side input
is an
+   *     attribute of the side input's specification with a {@link ParDo} transform, which
will
+   *     obtain this specification via a package-private channel.
    */
+  @Deprecated
+  public ViewFn<Iterable<WindowedValue<?>>, T> getViewFn();
+
+  /**
+   * @deprecated this method will be removed entirely. The {@link PCollection} underlying
a side
+   *     input, including its {@link WindowingStrategy}, is part of the side input's specification
+   *     with a {@link ParDo} transform, which will obtain that information via a package-private
+   *     channel.
+   */
+  @Deprecated
   public WindowingStrategy<?, ?> getWindowingStrategyInternal();
 
   /**
-   * For internal use only.
+   * @deprecated this method will be removed entirely. The {@link PCollection} underlying
a side
+   *     input, including its {@link Coder}, is part of the side input's specification with
a {@link
+   *     ParDo} transform, which will obtain that information via a package-private channel.
    */
+  @Deprecated
   public Coder<Iterable<WindowedValue<?>>> getCoderInternal();
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
index 427f2da..517ed68 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.testing;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -83,16 +83,9 @@ public final class PCollectionViewTesting {
       DEFAULT_NONEMPTY_WINDOW.maxTimestamp().plus(DEFAULT_WINDOW_MSECS));
 
   /**
-   * A specialization of {@link SerializableFunction} just for putting together
-   * {@link PCollectionView} instances.
-   */
-  public static interface ViewFn<ElemT, ViewT>
-      extends SerializableFunction<Iterable<WindowedValue<ElemT>>, ViewT>
{ }
-
-  /**
    * A {@link ViewFn} that returns the provided contents as a fully lazy iterable.
    */
-  public static class IdentityViewFn<T> implements ViewFn<T, Iterable<T>>
{
+  public static class IdentityViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>,
Iterable<T>> {
     @Override
     public Iterable<T> apply(Iterable<WindowedValue<T>> contents) {
       return Iterables.transform(contents, new Function<WindowedValue<T>, T>()
{
@@ -110,7 +103,7 @@ public final class PCollectionViewTesting {
    * <p>Only for use in testing scenarios with small collections. If there are more
elements
    * provided than {@code Integer.MAX_VALUE} then behavior is unpredictable.
    */
-  public static class LengthViewFn<T> implements ViewFn<T, Long> {
+  public static class LengthViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>,
Long> {
     @Override
     public Long apply(Iterable<WindowedValue<T>> contents) {
       return (long) Iterables.size(contents);
@@ -120,7 +113,8 @@ public final class PCollectionViewTesting {
   /**
    * A {@link ViewFn} that always returns the value with which it is instantiated.
    */
-  public static class ConstantViewFn<ElemT, ViewT> implements ViewFn<ElemT, ViewT>
{
+  public static class ConstantViewFn<ElemT, ViewT>
+      extends ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> {
     private ViewT value;
 
     public ConstantViewFn(ViewT value) {
@@ -148,7 +142,7 @@ public final class PCollectionViewTesting {
    */
   public static <ElemT, ViewT> PCollectionView<ViewT> testingView(
       TupleTag<Iterable<WindowedValue<ElemT>>> tag,
-      ViewFn<ElemT, ViewT> viewFn,
+      ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
       Coder<ElemT> elemCoder) {
     return testingView(
         tag,
@@ -178,7 +172,7 @@ public final class PCollectionViewTesting {
    */
   public static <ElemT, ViewT> PCollectionView<ViewT> testingView(
       TupleTag<Iterable<WindowedValue<ElemT>>> tag,
-      ViewFn<ElemT, ViewT> viewFn,
+      ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
       Coder<ElemT> elemCoder,
       WindowingStrategy<?, ?> windowingStrategy) {
     return new PCollectionViewFromParts<>(
@@ -233,13 +227,13 @@ public final class PCollectionViewTesting {
       extends PValueBase
       implements PCollectionView<ViewT> {
     private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
-    private ViewFn<ElemT, ViewT> viewFn;
+    private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn;
     private WindowingStrategy<?, ?> windowingStrategy;
     private Coder<Iterable<WindowedValue<ElemT>>> coder;
 
     public PCollectionViewFromParts(
         TupleTag<Iterable<WindowedValue<ElemT>>> tag,
-        ViewFn<ElemT, ViewT> viewFn,
+        ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
         WindowingStrategy<?, ?> windowingStrategy,
         Coder<Iterable<WindowedValue<ElemT>>> coder) {
       this.tag = tag;
@@ -256,8 +250,17 @@ public final class PCollectionViewTesting {
 
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
+    @Deprecated
     public ViewT fromIterableInternal(Iterable<WindowedValue<?>> contents) {
-      return (ViewT) viewFn.apply((Iterable) contents);
+      return getViewFn().apply(contents);
+    }
+
+    @Override
+    public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() {
+      // Safe cast; runners must maintain type safety
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn)
viewFn;
+      return untypedViewFn;
     }
 
     @Override


Mime
View raw message