beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] incubator-beam git commit: Connect generated DoFnInvoker.invokerOnTimer to OnTimerInvoker
Date Wed, 16 Nov 2016 05:18:52 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master dbbd5e448 -> dc94dbdd7


Connect generated DoFnInvoker.invokerOnTimer to OnTimerInvoker


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a945a025
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a945a025
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a945a025

Branch: refs/heads/master
Commit: a945a025301ca09a4cfc160302ef3914429dc15e
Parents: dbbd5e4
Author: Kenneth Knowles <klk@google.com>
Authored: Tue Nov 1 21:23:48 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Tue Nov 15 20:08:41 2016 -0800

----------------------------------------------------------------------
 .../reflect/ByteBuddyDoFnInvokerFactory.java    | 152 ++++++++++++++-----
 .../reflect/ByteBuddyOnTimerInvokerFactory.java |  24 ++-
 .../sdk/transforms/reflect/DoFnInvoker.java     |   4 +
 .../sdk/transforms/reflect/DoFnInvokers.java    |   6 +
 .../sdk/transforms/reflect/OnTimerInvoker.java  |   2 +-
 .../transforms/reflect/DoFnInvokersTest.java    | 137 ++++++++++++++++-
 .../testhelper/DoFnInvokersTestHelper.java      | 137 +++++++++++++++++
 7 files changed, 415 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index c137255..bc6d8c9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -23,6 +23,7 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -31,7 +32,6 @@ import net.bytebuddy.ByteBuddy;
 import net.bytebuddy.NamingStrategy;
 import net.bytebuddy.description.field.FieldDescription;
 import net.bytebuddy.description.method.MethodDescription;
-import net.bytebuddy.description.modifier.FieldManifestation;
 import net.bytebuddy.description.modifier.Visibility;
 import net.bytebuddy.description.type.TypeDescription;
 import net.bytebuddy.description.type.TypeList;
@@ -65,6 +65,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ContextParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.InputProviderParameter;
@@ -128,6 +129,54 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
     return newByteBuddyInvoker(DoFnSignatures.getSignature((Class) fn.getClass()), fn);
   }
 
+  /**
+   * Internal base class for generated {@link DoFnInvoker} instances.
+   *
+   * <p>This class should <i>not</i> be extended directly, or by Beam users.
It must be public for
+   * generated instances to have adequate access, as they are generated "inside" the invoked
{@link
+   * DoFn} class.
+   */
+  public abstract static class DoFnInvokerBase<InputT, OutputT, DoFnT extends DoFn<InputT,
OutputT>>
+      implements DoFnInvoker<InputT, OutputT> {
+    protected DoFnT delegate;
+
+    private Map<String, OnTimerInvoker> onTimerInvokers = new HashMap<>();
+
+    public DoFnInvokerBase(DoFnT delegate) {
+      this.delegate = delegate;
+    }
+
+    /**
+     * Associates the given timer ID with the given {@link OnTimerInvoker}.
+     *
+     * <p>ByteBuddy does not like to generate conditional code, so we use a map + lookup
+     * of the timer ID rather than a generated conditional branch to choose which
+     * OnTimerInvoker to invoke.
+     *
+     * <p>This method has package level access as it is intended only for assembly
of the
+     * {@link DoFnInvokerBase} not by any subclass.
+     */
+    void addOnTimerInvoker(String timerId, OnTimerInvoker onTimerInvoker) {
+      this.onTimerInvokers.put(timerId, onTimerInvoker);
+    }
+
+    @Override
+    public void invokeOnTimer(String timerId, DoFn.ArgumentProvider<InputT, OutputT>
arguments) {
+      @Nullable OnTimerInvoker onTimerInvoker = onTimerInvokers.get(timerId);
+
+      if (onTimerInvoker != null) {
+        onTimerInvoker.invokeOnTimer(arguments);
+      } else {
+        throw new IllegalArgumentException(
+            String.format(
+                "Attempted to invoke timer %s on %s, but that timer is not registered."
+                    + " This is the responsibility of the runner, which must only deliver"
+                    + " registered timers.",
+                timerId, delegate.getClass().getName()));
+      }
+    }
+  }
+
   /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
   public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
       DoFnSignature signature, DoFn<InputT, OutputT> fn) {
@@ -136,10 +185,18 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
         "Signature is for class %s, but fn is of class %s",
         signature.fnClass(),
         fn.getClass());
+
     try {
       @SuppressWarnings("unchecked")
-      DoFnInvoker<InputT, OutputT> invoker =
-          (DoFnInvoker<InputT, OutputT>) getByteBuddyInvokerConstructor(signature).newInstance(fn);
+      DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>> invoker =
+          (DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>>)
+              getByteBuddyInvokerConstructor(signature).newInstance(fn);
+
+      for (OnTimerMethod onTimerMethod : signature.onTimerMethods().values()) {
+        invoker.addOnTimerInvoker(onTimerMethod.id(),
+            OnTimerInvokers.forTimer(fn, onTimerMethod.id()));
+      }
+
       return invoker;
     } catch (InstantiationException
         | IllegalAccessException
@@ -214,31 +271,39 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
                     return super.name(clazzDescription);
                   }
                 })
-            // Create a subclass of DoFnInvoker
-            .subclass(DoFnInvoker.class, ConstructorStrategy.Default.NO_CONSTRUCTORS)
-            .defineField(
-                FN_DELEGATE_FIELD_NAME, fnClass, Visibility.PRIVATE, FieldManifestation.FINAL)
+            // class <invoker class> extends DoFnInvokerBase {
+            .subclass(DoFnInvokerBase.class, ConstructorStrategy.Default.NO_CONSTRUCTORS)
+
+            //   public <invoker class>(<fn class> delegate) { this.delegate
= delegate; }
             .defineConstructor(Visibility.PUBLIC)
             .withParameter(fnClass)
             .intercept(new InvokerConstructor())
+
+            //   public invokeProcessElement(ProcessContext, ExtraContextFactory) {
+            //     delegate.<@ProcessElement>(... pass just the right args ...);
+            //   }
             .method(ElementMatchers.named("invokeProcessElement"))
-            .intercept(new ProcessElementDelegation(signature.processElement()))
+            .intercept(new ProcessElementDelegation(clazzDescription, signature.processElement()))
+
+            //   public invokeStartBundle(Context c) { delegate.<@StartBundle>(c);
}
+            //   ... etc ...
             .method(ElementMatchers.named("invokeStartBundle"))
-            .intercept(delegateOrNoop(signature.startBundle()))
+            .intercept(delegateOrNoop(clazzDescription, signature.startBundle()))
             .method(ElementMatchers.named("invokeFinishBundle"))
-            .intercept(delegateOrNoop(signature.finishBundle()))
+            .intercept(delegateOrNoop(clazzDescription, signature.finishBundle()))
             .method(ElementMatchers.named("invokeSetup"))
-            .intercept(delegateOrNoop(signature.setup()))
+            .intercept(delegateOrNoop(clazzDescription, signature.setup()))
             .method(ElementMatchers.named("invokeTeardown"))
-            .intercept(delegateOrNoop(signature.teardown()))
+            .intercept(delegateOrNoop(clazzDescription, signature.teardown()))
             .method(ElementMatchers.named("invokeGetInitialRestriction"))
-            .intercept(delegateWithDowncastOrThrow(signature.getInitialRestriction()))
+            .intercept(
+                delegateWithDowncastOrThrow(clazzDescription, signature.getInitialRestriction()))
             .method(ElementMatchers.named("invokeSplitRestriction"))
-            .intercept(splitRestrictionDelegation(signature))
+            .intercept(splitRestrictionDelegation(clazzDescription, signature))
             .method(ElementMatchers.named("invokeGetRestrictionCoder"))
-            .intercept(getRestrictionCoderDelegation(signature))
+            .intercept(getRestrictionCoderDelegation(clazzDescription, signature))
             .method(ElementMatchers.named("invokeNewTracker"))
-            .intercept(delegateWithDowncastOrThrow(signature.newTracker()));
+            .intercept(delegateWithDowncastOrThrow(clazzDescription, signature.newTracker()));
 
     DynamicType.Unloaded<?> unloaded = builder.make();
 
@@ -253,13 +318,15 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
     return res;
   }
 
-  private static Implementation getRestrictionCoderDelegation(DoFnSignature signature) {
+  private static Implementation getRestrictionCoderDelegation(
+      TypeDescription doFnType, DoFnSignature signature) {
     if (signature.processElement().isSplittable()) {
       if (signature.getRestrictionCoder() == null) {
         return MethodDelegation.to(
             new DefaultRestrictionCoder(signature.getInitialRestriction().restrictionT()));
       } else {
         return new DowncastingParametersMethodDelegation(
+            doFnType,
             signature.getRestrictionCoder().targetMethod());
       }
     } else {
@@ -267,26 +334,30 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
     }
   }
 
-  private static Implementation splitRestrictionDelegation(DoFnSignature signature) {
+  private static Implementation splitRestrictionDelegation(
+      TypeDescription doFnType, DoFnSignature signature) {
     if (signature.splitRestriction() == null) {
       return MethodDelegation.to(DefaultSplitRestriction.class);
     } else {
-      return new DowncastingParametersMethodDelegation(signature.splitRestriction().targetMethod());
+      return new DowncastingParametersMethodDelegation(
+          doFnType, signature.splitRestriction().targetMethod());
     }
   }
 
   /** Delegates to the given method if available, or does nothing. */
-  private static Implementation delegateOrNoop(DoFnSignature.DoFnMethod method) {
+  private static Implementation delegateOrNoop(TypeDescription doFnType, DoFnSignature.DoFnMethod
+      method) {
     return (method == null)
         ? FixedValue.originType()
-        : new DoFnMethodDelegation(method.targetMethod());
+        : new DoFnMethodDelegation(doFnType, method.targetMethod());
   }
 
   /** Delegates to the given method if available, or throws UnsupportedOperationException.
*/
-  private static Implementation delegateWithDowncastOrThrow(DoFnSignature.DoFnMethod method)
{
+  private static Implementation delegateWithDowncastOrThrow(
+      TypeDescription doFnType, DoFnSignature.DoFnMethod method) {
     return (method == null)
         ? ExceptionMethod.throwing(UnsupportedOperationException.class)
-        : new DowncastingParametersMethodDelegation(method.targetMethod());
+        : new DowncastingParametersMethodDelegation(doFnType, method.targetMethod());
   }
 
   /**
@@ -301,7 +372,10 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
 
     protected FieldDescription delegateField;
 
-    public DoFnMethodDelegation(Method targetMethod) {
+    private final TypeDescription doFnType;
+
+    public DoFnMethodDelegation(TypeDescription doFnType, Method targetMethod) {
+      this.doFnType = doFnType;
       this.targetMethod = new MethodDescription.ForLoadedMethod(targetMethod);
       targetHasReturn = !TypeDescription.VOID.equals(this.targetMethod.getReturnType().asErasure());
     }
@@ -311,6 +385,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
       // Remember the field description of the instrumented type.
       delegateField =
           instrumentedType
+              .getSuperClass() // always DoFnInvokerBase
               .getDeclaredFields()
               .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
               .getOnly();
@@ -349,6 +424,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
                   MethodVariableAccess.REFERENCE.loadOffset(0),
                   // Access this.delegate (DoFn on top of the stack)
                   FieldAccess.forField(delegateField).getter(),
+                  // Cast it to the more precise type
+                  TypeCasting.to(doFnType),
                   // Run the beforeDelegation manipulations.
                   // The arguments necessary to invoke the target are on top of the stack.
                   beforeDelegation(instrumentedMethod),
@@ -400,8 +477,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
    * to its expected type.
    */
   private static class DowncastingParametersMethodDelegation extends DoFnMethodDelegation
{
-    DowncastingParametersMethodDelegation(Method method) {
-      super(method);
+    DowncastingParametersMethodDelegation(TypeDescription doFnType, Method method) {
+      super(doFnType, method);
     }
 
     @Override
@@ -536,8 +613,9 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
     private final DoFnSignature.ProcessElementMethod signature;
 
     /** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method.
*/
-    private ProcessElementDelegation(DoFnSignature.ProcessElementMethod signature) {
-      super(signature.targetMethod());
+    private ProcessElementDelegation(TypeDescription doFnType, DoFnSignature.ProcessElementMethod
+        signature) {
+      super(doFnType, signature.targetMethod());
       this.signature = signature;
     }
 
@@ -739,26 +817,16 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
               new StackManipulation.Compound(
                       // Load the this reference
                       MethodVariableAccess.REFERENCE.loadOffset(0),
+                      // Load the delegate argument
+                      MethodVariableAccess.REFERENCE.loadOffset(1),
                       // Invoke the super constructor (default constructor of Object)
                       MethodInvocation.invoke(
-                          new TypeDescription.ForLoadedType(Object.class)
+                          new TypeDescription.ForLoadedType(DoFnInvokerBase.class)
                               .getDeclaredMethods()
                               .filter(
                                   ElementMatchers.isConstructor()
-                                      .and(ElementMatchers.takesArguments(0)))
+                                      .and(ElementMatchers.takesArguments(DoFn.class)))
                               .getOnly()),
-                      // Load the this reference
-                      MethodVariableAccess.REFERENCE.loadOffset(0),
-                      // Load the delegate argument
-                      MethodVariableAccess.REFERENCE.loadOffset(1),
-                      // Assign the delegate argument to the delegate field
-                      FieldAccess.forField(
-                              implementationTarget
-                                  .getInstrumentedType()
-                                  .getDeclaredFields()
-                                  .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
-                                  .getOnly())
-                          .putter(),
                       // Return void.
                       MethodReturn.VOID)
                   .apply(methodVisitor, implementationContext);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
index 4e53757..7a39ed1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
@@ -180,7 +180,9 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory
{
             //     this.delegate.<@OnTimer method>(... pass the right args ...)
             //   }
             .method(ElementMatchers.named("invokeOnTimer"))
-            .intercept(new InvokeOnTimerDelegation(signature.onTimerMethods().get(timerId)));
+            .intercept(
+                new InvokeOnTimerDelegation(
+                    clazzDescription, signature.onTimerMethods().get(timerId)));
 
     DynamicType.Unloaded<?> unloaded = builder.make();
 
@@ -203,12 +205,28 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory
{
 
     private final DoFnSignature.OnTimerMethod signature;
 
-    public InvokeOnTimerDelegation(DoFnSignature.OnTimerMethod signature) {
-      super(signature.targetMethod());
+    public InvokeOnTimerDelegation(
+        TypeDescription clazzDescription, DoFnSignature.OnTimerMethod signature) {
+      super(clazzDescription, signature.targetMethod());
       this.signature = signature;
     }
 
     @Override
+    public InstrumentedType prepare(InstrumentedType instrumentedType) {
+      // Remember the field description of the instrumented type.
+      // Kind of a hack to set the protected value, because the instrumentedType
+      // is only available to prepare, while we need this information in
+      // beforeDelegation
+      delegateField =
+          instrumentedType
+              .getDeclaredFields() // the delegate is declared on the OnTimerInvoker
+              .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
+              .getOnly();
+      // Delegating the method call doesn't require any changes to the instrumented type.
+      return instrumentedType;
+    }
+
+    @Override
     protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
       // Parameters of the wrapper invoker method:
       //   DoFn.ArgumentProvider

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index ce68d0b..2ae7920 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -50,6 +50,10 @@ public interface DoFnInvoker<InputT, OutputT> {
    */
   DoFn.ProcessContinuation invokeProcessElement(DoFn.ArgumentProvider<InputT, OutputT>
extra);
 
+  /** Invoke the appropriate {@link DoFn.OnTimer} method on the bound {@link DoFn}. */
+  void invokeOnTimer(
+      String timerId, DoFn.ArgumentProvider<InputT, OutputT> arguments);
+
   /** Invoke the {@link DoFn.GetInitialRestriction} method on the bound {@link DoFn}. */
   <RestrictionT> RestrictionT invokeGetInitialRestriction(InputT element);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index 9a96985..7eccaab 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -115,6 +115,12 @@ public class DoFnInvokers {
     }
 
     @Override
+    public void invokeOnTimer(String timerId, DoFn.ArgumentProvider<InputT, OutputT>
arguments) {
+      throw new UnsupportedOperationException(
+          String.format("Timers are not supported for %s", OldDoFn.class.getSimpleName()));
+    }
+
+    @Override
     public void invokeStartBundle(DoFn.Context c) {
       OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, c);
       try {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
index f87fa74..bfcafd0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.transforms.reflect;
 import org.apache.beam.sdk.transforms.DoFn;
 
 /** Interface for invoking the {@link DoFn.OnTimer} method for a particular timer. */
-interface OnTimerInvoker<InputT, OutputT> {
+public interface OnTimerInvoker<InputT, OutputT> {
 
   /** Invoke the {@link DoFn.OnTimer} method in the provided context. */
   void invokeOnTimer(DoFn.ArgumentProvider<InputT, OutputT> extra);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index c7b71ff..3d9e3ec 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms.reflect;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertSame;
@@ -45,6 +46,7 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;
@@ -55,6 +57,7 @@ import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.util.state.StateSpecs;
 import org.apache.beam.sdk.util.state.ValueState;
+import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -95,6 +98,10 @@ public class DoFnInvokersTest {
     return DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider);
   }
 
+  private void invokeOnTimer(String timerId, DoFn<String, String> fn) {
+    DoFnInvokers.invokerFor(fn).invokeOnTimer(timerId, mockArgumentProvider);
+  }
+
   @Test
   public void testDoFnInvokersReused() throws Exception {
     // Ensures that we don't create a new Invoker class for every instance of the DoFn.
@@ -460,7 +467,79 @@ public class DoFnInvokersTest {
   }
 
   // ---------------------------------------------------------------------------------------
-  // Tests for ability to invoke private, inner and anonymous classes.
+  // Tests for ability to invoke @OnTimer for private, inner and anonymous classes.
+  // ---------------------------------------------------------------------------------------
+
+  private static final String TIMER_ID = "test-timer-id";
+
+  private static class PrivateDoFnWithTimers extends DoFn<String, String> {
+    @ProcessElement
+    public void processThis(ProcessContext c) {}
+
+    @TimerId(TIMER_ID)
+    private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    @OnTimer(TIMER_ID)
+    public void onTimer(BoundedWindow w) {}
+  }
+
+  @Test
+  public void testLocalPrivateDoFnWithTimers() throws Exception {
+    PrivateDoFnWithTimers fn = mock(PrivateDoFnWithTimers.class);
+    invokeOnTimer(TIMER_ID, fn);
+    verify(fn).onTimer(mockWindow);
+  }
+
+  @Test
+  public void testStaticPackagePrivateDoFnWithTimers() throws Exception {
+    DoFn<String, String> fn =
+        mock(DoFnInvokersTestHelper.newStaticPackagePrivateDoFnWithTimers().getClass());
+    invokeOnTimer(TIMER_ID, fn);
+    DoFnInvokersTestHelper.verifyStaticPackagePrivateDoFnWithTimers(fn, mockWindow);
+  }
+
+  @Test
+  public void testInnerPackagePrivateDoFnWithTimers() throws Exception {
+    DoFn<String, String> fn =
+        mock(new DoFnInvokersTestHelper().newInnerPackagePrivateDoFnWithTimers().getClass());
+    invokeOnTimer(TIMER_ID, fn);
+    DoFnInvokersTestHelper.verifyInnerPackagePrivateDoFnWithTimers(fn, mockWindow);
+  }
+
+  @Test
+  public void testStaticPrivateDoFnWithTimers() throws Exception {
+    DoFn<String, String> fn =
+        mock(DoFnInvokersTestHelper.newStaticPrivateDoFnWithTimers().getClass());
+    invokeOnTimer(TIMER_ID, fn);
+    DoFnInvokersTestHelper.verifyStaticPrivateDoFnWithTimers(fn, mockWindow);
+  }
+
+  @Test
+  public void testInnerPrivateDoFnWithTimers() throws Exception {
+    DoFn<String, String> fn =
+        mock(new DoFnInvokersTestHelper().newInnerPrivateDoFnWithTimers().getClass());
+    invokeOnTimer(TIMER_ID, fn);
+    DoFnInvokersTestHelper.verifyInnerPrivateDoFnWithTimers(fn, mockWindow);
+  }
+
+  @Test
+  public void testAnonymousInnerDoFnWithTimers() throws Exception {
+    DoFn<String, String> fn =
+        mock(new DoFnInvokersTestHelper().newInnerAnonymousDoFnWithTimers().getClass());
+    invokeOnTimer(TIMER_ID, fn);
+    DoFnInvokersTestHelper.verifyInnerAnonymousDoFnWithTimers(fn, mockWindow);
+  }
+
+  @Test
+  public void testStaticAnonymousDoFnWithTimersInOtherPackage() throws Exception {
+    // Can't use mockito for this one - the anonymous class is final and can't be mocked.
+    DoFn<String, String> fn = DoFnInvokersTestHelper.newStaticAnonymousDoFnWithTimers();
+    invokeOnTimer(TIMER_ID, fn);
+    DoFnInvokersTestHelper.verifyStaticAnonymousDoFnWithTimersInvoked(fn, mockWindow);
+  }
+
+  // ---------------------------------------------------------------------------------------
+  // Tests for ability to invoke @ProcessElement for private, inner and anonymous classes.
   // ---------------------------------------------------------------------------------------
 
   private static class PrivateDoFnClass extends DoFn<String, String> {
@@ -605,6 +684,62 @@ public class DoFnInvokersTest {
     invoker.invokeFinishBundle(null);
   }
 
+  @Test
+  public void testOnTimerHelloWord() throws Exception {
+    final String timerId = "my-timer-id";
+
+    class SimpleTimerDoFn extends DoFn<String, String> {
+
+      public String status = "not yet";
+
+      @TimerId(timerId)
+      private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+      @ProcessElement
+      public void process(ProcessContext c) {}
+
+      @OnTimer(timerId)
+      public void onMyTimer() {
+        status = "OK now";
+      }
+    }
+
+    SimpleTimerDoFn fn = new SimpleTimerDoFn();
+
+    DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
+    invoker.invokeOnTimer(timerId, mockArgumentProvider);
+    assertThat(fn.status, equalTo("OK now"));
+  }
+
+  @Test
+  public void testOnTimerWithWindow() throws Exception {
+    final String timerId = "my-timer-id";
+    final IntervalWindow testWindow = new IntervalWindow(new Instant(0), new Instant(15));
+    when(mockArgumentProvider.window()).thenReturn(testWindow);
+
+    class SimpleTimerDoFn extends DoFn<String, String> {
+
+      public IntervalWindow window = null;
+
+      @TimerId(timerId)
+      private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+      @ProcessElement
+      public void process(ProcessContext c) {}
+
+      @OnTimer(timerId)
+      public void onMyTimer(IntervalWindow w) {
+        window = w;
+      }
+    }
+
+    SimpleTimerDoFn fn = new SimpleTimerDoFn();
+
+    DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
+    invoker.invokeOnTimer(timerId, mockArgumentProvider);
+    assertThat(fn.window, equalTo(testWindow));
+  }
+
   private class OldDoFnIdentity extends OldDoFn<String, String> {
     public void processElement(ProcessContext c) {}
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a945a025/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/testhelper/DoFnInvokersTestHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/testhelper/DoFnInvokersTestHelper.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/testhelper/DoFnInvokersTestHelper.java
index c20a788..95e7c49 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/testhelper/DoFnInvokersTestHelper.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/testhelper/DoFnInvokersTestHelper.java
@@ -23,6 +23,10 @@ import static org.mockito.Mockito.verify;
 
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokersTest;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
 
 /**
  * Test helper for {@link DoFnInvokersTest}, which needs to test package-private access to
DoFns in
@@ -121,4 +125,137 @@ public class DoFnInvokersTestHelper {
 
     fn.getClass().getMethod("verify", DoFn.ProcessContext.class).invoke(fn, context);
   }
+
+  //
+  // Classes for testing OnTimer methods when the DoFn does not live in the same package
+  //
+
+  private static final String TIMER_ID = "test-timer-id";
+
+  private static class StaticPrivateDoFnWithTimers extends DoFn<String, String> {
+    @TimerId(TIMER_ID)
+    private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    @OnTimer(TIMER_ID)
+    public void onTimer(BoundedWindow w) {}
+
+    @ProcessElement
+    public void process(ProcessContext c) {}
+  }
+
+  private class InnerPrivateDoFnWithTimers extends DoFn<String, String> {
+    @TimerId(TIMER_ID)
+    private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    @OnTimer(TIMER_ID)
+    public void onTimer(BoundedWindow w) {}
+
+    @ProcessElement
+    public void process(ProcessContext c) {}
+  }
+
+  static class StaticPackagePrivateDoFnWithTimers extends DoFn<String, String> {
+    @TimerId(TIMER_ID)
+    private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    @OnTimer(TIMER_ID)
+    public void onTimer(BoundedWindow w) {}
+
+    @ProcessElement
+    public void process(ProcessContext c) {}
+  }
+
+  class InnerPackagePrivateDoFnWithTimers extends DoFn<String, String> {
+    @TimerId(TIMER_ID)
+    private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    @OnTimer(TIMER_ID)
+    public void onTimer(BoundedWindow w) {}
+
+    @ProcessElement
+    public void process(ProcessContext c) {}
+  }
+
+  public static DoFn<String, String> newStaticPackagePrivateDoFnWithTimers() {
+    return new StaticPackagePrivateDoFnWithTimers();
+  }
+
+  public static void verifyStaticPackagePrivateDoFnWithTimers(
+      DoFn<String, String> fn, BoundedWindow window) {
+    verify((StaticPackagePrivateDoFnWithTimers) fn).onTimer(window);
+  }
+
+  public DoFn<String, String> newInnerPackagePrivateDoFnWithTimers() {
+    return new InnerPackagePrivateDoFnWithTimers();
+  }
+
+  public static void verifyInnerPackagePrivateDoFnWithTimers(
+      DoFn<String, String> fn, BoundedWindow window) {
+    verify((InnerPackagePrivateDoFnWithTimers) fn).onTimer(window);
+  }
+
+  public static DoFn<String, String> newStaticPrivateDoFnWithTimers() {
+    return new StaticPrivateDoFnWithTimers();
+  }
+
+  public static void verifyStaticPrivateDoFnWithTimers(
+      DoFn<String, String> fn, BoundedWindow window) {
+    verify((StaticPrivateDoFnWithTimers) fn).onTimer(window);
+  }
+
+  public DoFn<String, String> newInnerPrivateDoFnWithTimers() {
+    return new InnerPrivateDoFnWithTimers();
+  }
+
+  public static void verifyInnerPrivateDoFnWithTimers(
+      DoFn<String, String> fn, BoundedWindow window) {
+    verify((InnerPrivateDoFnWithTimers) fn).onTimer(window);
+  }
+
+  public DoFn<String, String> newInnerAnonymousDoFnWithTimers() {
+    return new DoFn<String, String>() {
+      @TimerId(TIMER_ID)
+      private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+      @OnTimer(TIMER_ID)
+      public void onTimer(BoundedWindow w) {}
+
+      @ProcessElement
+      public void process(ProcessContext c) {}
+    };
+  }
+
+  public static void verifyInnerAnonymousDoFnWithTimers(
+      DoFn<String, String> fn, BoundedWindow window) throws Exception {
+    DoFn<String, String> verifier = verify(fn);
+    verifier.getClass().getMethod("onTimer", BoundedWindow.class).invoke(verifier, window);
+  }
+
+  public static DoFn<String, String> newStaticAnonymousDoFnWithTimers() {
+    return new DoFn<String, String>() {
+      private BoundedWindow invokedWindow;
+
+      @ProcessElement
+      public void process(ProcessContext c) {}
+
+      @TimerId(TIMER_ID)
+      private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+      @OnTimer(TIMER_ID)
+      public void onTimer(BoundedWindow window) {
+        assertNull("Should have been invoked just once", invokedWindow);
+        invokedWindow = window;
+      }
+
+      @SuppressWarnings("unused")
+      public void verify(BoundedWindow window) {
+        assertEquals(window, invokedWindow);
+      }
+    };
+  }
+
+  public static void verifyStaticAnonymousDoFnWithTimersInvoked(
+      DoFn<String, String> fn, BoundedWindow window) throws Exception {
+    fn.getClass().getMethod("verify", BoundedWindow.class).invoke(fn, window);
+  }
 }


Mime
View raw message