beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] incubator-beam git commit: Improvements to ReduceFnRunner prefetching
Date Wed, 30 Nov 2016 22:23:02 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master b75a76459 -> 4a7da91f0


Improvements to ReduceFnRunner prefetching

- add prefetch* methods for prefetching state matching existing methods
- replace onTimer with batched onTimers method to allow prefetching
  across timers
- prefetch triggers in processElements


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4282c67c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4282c67c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4282c67c

Branch: refs/heads/master
Commit: 4282c67c5fa4dea2fe6c8695e0ea23f383c6457b
Parents: b75a764
Author: Sam Whittle <samuelw@google.com>
Authored: Thu Nov 10 12:59:49 2016 -0800
Committer: Kenneth Knowles <klk@google.com>
Committed: Wed Nov 30 13:32:05 2016 -0800

----------------------------------------------------------------------
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |  11 +-
 .../beam/runners/core/PaneInfoTracker.java      |   4 +
 .../runners/core/ReduceFnContextFactory.java    |   9 +-
 .../beam/runners/core/ReduceFnRunner.java       | 488 ++++++++++++-------
 .../apache/beam/runners/core/WatermarkHold.java |   5 +
 .../triggers/TriggerStateMachineRunner.java     |  14 +-
 .../beam/runners/core/ReduceFnTester.java       |   4 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   5 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |   6 +-
 .../sdk/util/state/InMemoryTimerInternals.java  |  22 +-
 .../beam/sdk/util/state/TimerCallback.java      |   9 +-
 .../util/state/InMemoryTimerInternalsTest.java  |  54 +-
 12 files changed, 407 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 8b10813..294f21d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
@@ -73,9 +72,9 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
 
   @Override
   public void processElement(ProcessContext c) throws Exception {
-    KeyedWorkItem<K, InputT> element = c.element();
+    KeyedWorkItem<K, InputT> keyedWorkItem = c.element();
 
-    K key = c.element().key();
+    K key = keyedWorkItem.key();
     TimerInternals timerInternals = c.windowingInternals().timerInternals();
     StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
 
@@ -93,10 +92,8 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
             reduceFn,
             c.getPipelineOptions());
 
-    reduceFnRunner.processElements(element.elementsIterable());
-    for (TimerData timer : element.timersIterable()) {
-      reduceFnRunner.onTimer(timer);
-    }
+    reduceFnRunner.processElements(keyedWorkItem.elementsIterable());
+    reduceFnRunner.onTimers(keyedWorkItem.timersIterable());
     reduceFnRunner.persist();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
index 8140243..69a4cfd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
@@ -54,6 +54,10 @@ public class PaneInfoTracker {
     state.access(PANE_INFO_TAG).clear();
   }
 
+  public void prefetchPaneInfo(ReduceFn<?, ?, ?, ?>.Context context) {
+    context.state().access(PaneInfoTracker.PANE_INFO_TAG).readLater();
+  }
+
   /**
    * Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane
    * info includes the timing for the pane, who's calculation is quite subtle.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index 539126a..c5bda9b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.Timers;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateAccessor;
 import org.apache.beam.sdk.util.state.StateContext;
@@ -117,7 +116,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
   }
 
   public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(W window,
-      ReadableState<PaneInfo> pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
+      PaneInfo pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
     return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks);
   }
 
@@ -389,11 +388,11 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
 
   private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnTriggerContext {
     private final StateAccessorImpl<K, W> state;
-    private final ReadableState<PaneInfo> pane;
+    private final PaneInfo pane;
     private final OnTriggerCallbacks<OutputT> callbacks;
     private final TimersImpl timers;
 
-    private OnTriggerContextImpl(StateAccessorImpl<K, W> state, ReadableState<PaneInfo> pane,
+    private OnTriggerContextImpl(StateAccessorImpl<K, W> state, PaneInfo pane,
         OnTriggerCallbacks<OutputT> callbacks) {
       reduceFn.super();
       this.state = state;
@@ -424,7 +423,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
 
     @Override
     public PaneInfo paneInfo() {
-      return pane.read();
+      return pane;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index a686f46..3a82be9 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -21,12 +21,15 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -58,7 +61,6 @@ import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
 import org.apache.beam.sdk.util.state.TimerCallback;
@@ -268,6 +270,32 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
     return activeWindows.getActiveAndNewWindows().isEmpty();
   }
 
+  private Set<W> openWindows(Collection<W> windows) {
+    Set<W> result = new HashSet<>();
+    for (W window : windows) {
+      ReduceFn<K, InputT, OutputT, W>.Context directContext = contextFactory.base(
+          window, StateStyle.DIRECT);
+      if (!triggerRunner.isClosed(directContext.state())) {
+        result.add(window);
+      }
+    }
+    return result;
+  }
+
+  private Collection<W> windowsThatShouldFire(Set<W> windows) throws Exception {
+    Collection<W> result = new ArrayList<>();
+    // Filter out timers that didn't trigger.
+    for (W window : windows) {
+      ReduceFn<K, InputT, OutputT, W>.Context directContext =
+          contextFactory.base(window, StateStyle.DIRECT);
+      if (triggerRunner.shouldFire(
+          directContext.window(), directContext.timers(), directContext.state())) {
+        result.add(window);
+      }
+    }
+    return result;
+  }
+
   /**
    * Incorporate {@code values} into the underlying reduce function, and manage holds, timers,
    * triggers, and window merging.
@@ -293,25 +321,54 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
    * </ol>
    */
   public void processElements(Iterable<WindowedValue<InputT>> values) throws Exception {
+    if (!values.iterator().hasNext()) {
+      return;
+    }
+
+    // Determine all the windows for elements.
+    Set<W> windows = collectWindows(values);
     // If an incoming element introduces a new window, attempt to merge it into an existing
     // window eagerly.
-    Map<W, W> windowToMergeResult = collectAndMergeWindows(values);
+    Map<W, W> windowToMergeResult = mergeWindows(windows);
+    if (!windowToMergeResult.isEmpty()) {
+      // Update windows by removing all windows that were merged away and adding
+      // the windows they were merged to. We add after completing all the
+      // removals to avoid removing a window that was also added.
+      List<W> addedWindows = new ArrayList<>(windowToMergeResult.size());
+      for (Map.Entry<W, W> entry : windowToMergeResult.entrySet()) {
+        windows.remove(entry.getKey());
+        addedWindows.add(entry.getValue());
+      }
+      windows.addAll(addedWindows);
+    }
 
-    Set<W> windowsToConsider = new HashSet<>();
+    prefetchWindowsForValues(windows);
+
+    // All windows that are open before element processing may need to fire.
+    Set<W> windowsToConsider = openWindows(windows);
 
     // Process each element, using the updated activeWindows determined by collectAndMergeWindows.
     for (WindowedValue<InputT> value : values) {
-      windowsToConsider.addAll(processElement(windowToMergeResult, value));
+      processElement(windowToMergeResult, value);
     }
 
-    // Trigger output from any window for which the trigger is ready
+    // Now that we've processed the elements, see if any of the windows need to fire.
+    // Prefetch state necessary to determine if the triggers should fire.
     for (W mergedWindow : windowsToConsider) {
-      ReduceFn<K, InputT, OutputT, W>.Context directContext =
-          contextFactory.base(mergedWindow, StateStyle.DIRECT);
-      ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
-          contextFactory.base(mergedWindow, StateStyle.RENAMED);
-      triggerRunner.prefetchShouldFire(mergedWindow, directContext.state());
-      emitIfAppropriate(directContext, renamedContext);
+      triggerRunner.prefetchShouldFire(
+          mergedWindow, contextFactory.base(mergedWindow, StateStyle.DIRECT).state());
+    }
+    // Filter to windows that are firing.
+    Collection<W> windowsToFire = windowsThatShouldFire(windowsToConsider);
+    // Prefetch windows that are firing.
+    for (W window : windowsToFire) {
+      prefetchEmit(contextFactory.base(window, StateStyle.DIRECT),
+          contextFactory.base(window, StateStyle.RENAMED));
+    }
+    // Trigger output from firing windows.
+    for (W window : windowsToFire) {
+      emit(contextFactory.base(window, StateStyle.DIRECT),
+          contextFactory.base(window, StateStyle.RENAMED));
     }
 
     // We're all done with merging and emitting elements so can compress the activeWindow state.
@@ -325,52 +382,61 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
   }
 
   /**
-   * Extract the windows associated with the values, and invoke merge. Return a map
-   * from windows to the merge result window. If a window is not in the domain of
-   * the result map then it did not get merged into a different window.
+   * Extract the windows associated with the values.
    */
-  private Map<W, W> collectAndMergeWindows(Iterable<WindowedValue<InputT>> values)
-      throws Exception {
-    // No-op if no merging can take place
+  private Set<W> collectWindows(Iterable<WindowedValue<InputT>> values) throws Exception {
+    Set<W> windows = new HashSet<>();
+    for (WindowedValue<?> value : values) {
+      for (BoundedWindow untypedWindow : value.getWindows()) {
+        @SuppressWarnings("unchecked")
+          W window = (W) untypedWindow;
+        windows.add(window);
+      }
+    }
+    return windows;
+  }
+
+  /**
+   * Invoke merge for the given windows and return a map from windows to the
+   * merge result window. Windows that were not merged are not present in the
+   * map.
+   */
+  private Map<W, W> mergeWindows(Set<W> windows) throws Exception {
     if (windowingStrategy.getWindowFn().isNonMerging()) {
-      return ImmutableMap.of();
+      // Return an empty map, indicating that every window is not merged.
+      return Collections.emptyMap();
     }
 
+    Map<W, W> windowToMergeResult = new HashMap<>();
     // Collect the windows from all elements (except those which are too late) and
     // make sure they are already in the active window set or are added as NEW windows.
-    for (WindowedValue<?> value : values) {
-      for (BoundedWindow untypedWindow : value.getWindows()) {
-        @SuppressWarnings("unchecked")
-        W window = (W) untypedWindow;
-
-        // For backwards compat with pre 1.4 only.
-        // We may still have ACTIVE windows with multiple state addresses, representing
-        // a window who's state has not yet been eagerly merged.
-        // We'll go ahead and merge that state now so that we don't have to worry about
-        // this legacy case anywhere else.
-        if (activeWindows.isActive(window)) {
-          Set<W> stateAddressWindows = activeWindows.readStateAddresses(window);
-          if (stateAddressWindows.size() > 1) {
-            // This is a legacy window who's state has not been eagerly merged.
-            // Do that now.
-            ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext =
-                contextFactory.forPremerge(window);
-            reduceFn.onMerge(premergeContext);
-            watermarkHold.onMerge(premergeContext);
-            activeWindows.merged(window);
-          }
+    for (W window : windows) {
+      // For backwards compat with pre 1.4 only.
+      // We may still have ACTIVE windows with multiple state addresses, representing
+      // a window who's state has not yet been eagerly merged.
+      // We'll go ahead and merge that state now so that we don't have to worry about
+      // this legacy case anywhere else.
+      if (activeWindows.isActive(window)) {
+        Set<W> stateAddressWindows = activeWindows.readStateAddresses(window);
+        if (stateAddressWindows.size() > 1) {
+          // This is a legacy window who's state has not been eagerly merged.
+          // Do that now.
+          ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext =
+              contextFactory.forPremerge(window);
+          reduceFn.onMerge(premergeContext);
+          watermarkHold.onMerge(premergeContext);
+          activeWindows.merged(window);
         }
-
-        // Add this window as NEW if it is not currently ACTIVE.
-        // If we had already seen this window and closed its trigger, then the
-        // window will not be currently ACTIVE. It will then be added as NEW here,
-        // and fall into the merging logic as usual.
-        activeWindows.ensureWindowExists(window);
       }
+
+      // Add this window as NEW if it is not currently ACTIVE.
+      // If we had already seen this window and closed its trigger, then the
+      // window will not be currently ACTIVE. It will then be added as NEW here,
+      // and fall into the merging logic as usual.
+      activeWindows.ensureWindowExists(window);
     }
 
     // Merge all of the active windows and retain a mapping from source windows to result windows.
-    Map<W, W> windowToMergeResult = new HashMap<>();
     activeWindows.merge(new OnMergeCallback(windowToMergeResult));
     return windowToMergeResult;
   }
@@ -472,38 +538,50 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
   }
 
   /**
-   * Process an element.
-   *
-   * @param value the value being processed
-   * @return the set of windows in which the element was actually processed
+   * Redirect element windows to the ACTIVE windows they have been merged into.
+   * The compressed representation (value, {window1, window2, ...}) actually represents
+   * distinct elements (value, window1), (value, window2), ...
+   * so if window1 and window2 merge, the resulting window will contain both copies
+   * of the value.
    */
-  private Collection<W> processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT> value)
-      throws Exception {
-    // Redirect element windows to the ACTIVE windows they have been merged into.
-    // The compressed representation (value, {window1, window2, ...}) actually represents
-    // distinct elements (value, window1), (value, window2), ...
-    // so if window1 and window2 merge, the resulting window will contain both copies
-    // of the value.
-    Collection<W> windows = new ArrayList<>();
-    for (BoundedWindow untypedWindow : value.getWindows()) {
-      @SuppressWarnings("unchecked")
-      W window = (W) untypedWindow;
-      W mergeResult = windowToMergeResult.get(window);
-      if (mergeResult == null) {
-        mergeResult = window;
-      }
-      windows.add(mergeResult);
-    }
+  private ImmutableSet<W> toMergedWindows(final Map<W, W> windowToMergeResult,
+      final Collection<? extends BoundedWindow> windows) {
+    return ImmutableSet.copyOf(
+        FluentIterable.from(windows).transform(
+            new Function<BoundedWindow, W>() {
+              @Override
+              public W apply(BoundedWindow untypedWindow) {
+                @SuppressWarnings("unchecked")
+                W window = (W) untypedWindow;
+                W mergedWindow = windowToMergeResult.get(window);
+                // If the element is not present in the map, the window is unmerged.
+                return (mergedWindow == null) ? window : mergedWindow;
+              }
+            }
+        ));
+  }
 
+  private void prefetchWindowsForValues(Collection<W> windows) {
     // Prefetch in each of the windows if we're going to need to process triggers
     for (W window : windows) {
-      ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
-          window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
+      ReduceFn<K, InputT, OutputT, W>.Context directContext = contextFactory.base(
+          window, StateStyle.DIRECT);
       triggerRunner.prefetchForValue(window, directContext.state());
     }
+  }
+
+  /**
+   * Process an element.
+   *
+   * @param windowToMergeResult map of windows to merged windows. If a window is
+   * not present it is unmerged.
+   * @param value the value being processed
+   */
+  private void processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT> value)
+      throws Exception {
+    ImmutableSet<W> windows = toMergedWindows(windowToMergeResult, value.getWindows());
 
     // Process the element for each (mergeResultWindow, not closed) window it belongs to.
-    List<W> triggerableWindows = new ArrayList<>(windows.size());
     for (W window : windows) {
       ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
           window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
@@ -518,7 +596,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
         continue;
       }
 
-      triggerableWindows.add(window);
       activeWindows.ensureWindowIsActive(window);
       ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = contextFactory.forValue(
           window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
@@ -562,102 +639,152 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
       // cannot take a trigger state from firing to non-firing.
       // (We don't actually assert this since it is too slow.)
     }
-
-    return triggerableWindows;
   }
 
   /**
-   * Called when an end-of-window, garbage collection, or trigger-specific timer fires.
+   * Enriches TimerData with state necessary for processing a timer as well as
+   * common queries about a timer.
    */
-  public void onTimer(TimerData timer) throws Exception {
-    // Which window is the timer for?
-    checkArgument(timer.getNamespace() instanceof WindowNamespace,
-        "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace());
-    @SuppressWarnings("unchecked")
-    WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
-    W window = windowNamespace.getWindow();
-    ReduceFn<K, InputT, OutputT, W>.Context directContext =
-        contextFactory.base(window, StateStyle.DIRECT);
-    ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
-        contextFactory.base(window, StateStyle.RENAMED);
+  private class EnrichedTimerData {
+    public final Instant timestamp;
+    public final ReduceFn<K, InputT, OutputT, W>.Context directContext;
+    public final ReduceFn<K, InputT, OutputT, W>.Context renamedContext;
+    // If this is an end-of-window timer then we may need to set a garbage collection timer
+    // if allowed lateness is non-zero.
+    public final boolean isEndOfWindow;
+    // If this is a garbage collection timer then we should trigger and
+    // garbage collect the window.  We'll consider any timer at or after the
+    // end-of-window time to be a signal to garbage collect.
+    public final boolean isGarbageCollection;
+
+    EnrichedTimerData(
+        TimerData timer,
+        ReduceFn<K, InputT, OutputT, W>.Context directContext,
+        ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
+      this.timestamp = timer.getTimestamp();
+      this.directContext = directContext;
+      this.renamedContext = renamedContext;
+      W window = directContext.window();
+      this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
+          && timer.getTimestamp().equals(window.maxTimestamp());
+      Instant cleanupTime = garbageCollectionTime(window);
+      this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime);
+    }
 
     // Has this window had its trigger finish?
     // - The trigger may implement isClosed as constant false.
     // - If the window function does not support windowing then all windows will be considered
     // active.
     // So we must take conjunction of activeWindows and triggerRunner state.
-    boolean windowIsActiveAndOpen =
-        activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state());
+    public boolean windowIsActiveAndOpen() {
+      return activeWindows.isActive(directContext.window())
+          && !triggerRunner.isClosed(directContext.state());
+    }
+  }
 
-    if (!windowIsActiveAndOpen) {
-      WindowTracing.debug(
-          "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window);
+  public void onTimers(Iterable<TimerData> timers) throws Exception {
+    if (!timers.iterator().hasNext()) {
+      return;
     }
 
-    // If this is an end-of-window timer then we may need to set a garbage collection timer
-    // if allowed lateness is non-zero.
-    boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
-        && timer.getTimestamp().equals(window.maxTimestamp());
-
-    // If this is a garbage collection timer then we should trigger and garbage collect the window.
-    // We'll consider any timer at or after the end-of-window time to be a signal to garbage
-    // collect.
-    Instant cleanupTime = garbageCollectionTime(window);
-    boolean isGarbageCollection = TimeDomain.EVENT_TIME == timer.getDomain()
-        && !timer.getTimestamp().isBefore(cleanupTime);
-
-    if (isGarbageCollection) {
-      WindowTracing.debug(
-          "ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with "
-          + "inputWatermark:{}; outputWatermark:{}",
-          key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
-          timerInternals.currentOutputWatermarkTime());
-
-      if (windowIsActiveAndOpen) {
-        // We need to call onTrigger to emit the final pane if required.
-        // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
-        // and the watermark has passed the end of the window.
-        @Nullable Instant newHold =
-            onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow);
-        checkState(newHold == null,
-            "Hold placed at %s despite isFinished being true.", newHold);
+    // Create a reusable context for each timer and begin prefetching necessary
+    // state.
+    List<EnrichedTimerData> enrichedTimers = new LinkedList();
+    for (TimerData timer : timers) {
+      checkArgument(timer.getNamespace() instanceof WindowNamespace,
+          "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace());
+      @SuppressWarnings("unchecked")
+        WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
+      W window = windowNamespace.getWindow();
+      ReduceFn<K, InputT, OutputT, W>.Context directContext =
+          contextFactory.base(window, StateStyle.DIRECT);
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
+          contextFactory.base(window, StateStyle.RENAMED);
+      EnrichedTimerData enrichedTimer = new EnrichedTimerData(timer, directContext, renamedContext);
+      enrichedTimers.add(enrichedTimer);
+
+      // Perform prefetching of state to determine if the trigger should fire.
+      if (enrichedTimer.isGarbageCollection) {
+        triggerRunner.prefetchIsClosed(directContext.state());
+      } else {
+        triggerRunner.prefetchShouldFire(directContext.window(), directContext.state());
       }
+    }
 
-      // Cleanup flavor B: Clear all the remaining state for this window since we'll never
-      // see elements for it again.
-      clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
-    } else {
-      WindowTracing.debug(
-          "ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with "
-          + "inputWatermark:{}; outputWatermark:{}",
-          key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
-          timerInternals.currentOutputWatermarkTime());
-      if (windowIsActiveAndOpen) {
-        emitIfAppropriate(directContext, renamedContext);
+    // For those windows that are active and open, prefetch the triggering or emitting state.
+    for (EnrichedTimerData timer : enrichedTimers) {
+      if (timer.windowIsActiveAndOpen()) {
+        ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext;
+        if (timer.isGarbageCollection) {
+          prefetchOnTrigger(directContext, timer.renamedContext);
+        } else if (triggerRunner.shouldFire(
+            directContext.window(), directContext.timers(), directContext.state())) {
+          prefetchEmit(directContext, timer.renamedContext);
+        }
       }
+    }
 
-      if (isEndOfWindow) {
-        // If the window strategy trigger includes a watermark trigger then at this point
-        // there should be no data holds, either because we'd already cleared them on an
-        // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate.
-        // We could assert this but it is very expensive.
-
-        // Since we are processing an on-time firing we should schedule the garbage collection
-        // timer. (If getAllowedLateness is zero then the timer event will be considered a
-        // cleanup event and handled by the above).
-        // Note we must do this even if the trigger is finished so that we are sure to cleanup
-        // any final trigger finished bits.
-        checkState(
-            windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
-            "Unexpected zero getAllowedLateness");
-        WindowTracing.debug(
-            "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with "
-            + "inputWatermark:{}; outputWatermark:{}",
-            key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(),
+    // Perform processing now that everything is prefetched.
+    for (EnrichedTimerData timer : enrichedTimers) {
+      ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext;
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext = timer.renamedContext;
+
+      if (timer.isGarbageCollection) {
+        WindowTracing.debug("ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with "
+                + "inputWatermark:{}; outputWatermark:{}",
+            key, directContext.window(), timer.timestamp,
+            timerInternals.currentInputWatermarkTime(),
             timerInternals.currentOutputWatermarkTime());
-        checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
-                                 "Cleanup time %s is beyond end-of-time", cleanupTime);
-        directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
+
+        boolean windowIsActiveAndOpen = timer.windowIsActiveAndOpen();
+        if (windowIsActiveAndOpen) {
+          // We need to call onTrigger to emit the final pane if required.
+          // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
+          // and the watermark has passed the end of the window.
+          @Nullable
+          Instant newHold = onTrigger(
+              directContext, renamedContext, true /* isFinished */, timer.isEndOfWindow);
+          checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold);
+        }
+
+        // Cleanup flavor B: Clear all the remaining state for this window since we'll never
+        // see elements for it again.
+        clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
+      } else {
+        WindowTracing.debug("ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with "
+                + "inputWatermark:{}; outputWatermark:{}",
+            key, directContext.window(), timer.timestamp,
+            timerInternals.currentInputWatermarkTime(),
+            timerInternals.currentOutputWatermarkTime());
+        if (timer.windowIsActiveAndOpen()
+            && triggerRunner.shouldFire(
+                   directContext.window(), directContext.timers(), directContext.state())) {
+          emit(directContext, renamedContext);
+        }
+
+        if (timer.isEndOfWindow) {
+          // If the window strategy trigger includes a watermark trigger then at this point
+          // there should be no data holds, either because we'd already cleared them on an
+          // earlier onTrigger, or because we just cleared them on the above emit.
+          // We could assert this but it is very expensive.
+
+          // Since we are processing an on-time firing we should schedule the garbage collection
+          // timer. (If getAllowedLateness is zero then the timer event will be considered a
+          // cleanup event and handled by the above).
+          // Note we must do this even if the trigger is finished so that we are sure to cleanup
+          // any final trigger finished bits.
+          checkState(windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
+              "Unexpected zero getAllowedLateness");
+          Instant cleanupTime = garbageCollectionTime(directContext.window());
+          WindowTracing.debug(
+              "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with "
+                  + "inputWatermark:{}; outputWatermark:{}",
+              key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(),
+              timerInternals.currentOutputWatermarkTime());
+          checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+              "Cleanup time %s is beyond end-of-time", cleanupTime);
+          directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
+        }
       }
     }
   }
@@ -666,7 +793,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
    * Clear all the state associated with {@code context}'s window.
    * Should only be invoked if we know all future elements for this window will be considered
    * beyond allowed lateness.
-   * This is a superset of the clearing done by {@link #emitIfAppropriate} below since:
+   * This is a superset of the clearing done by {@link #emitPane} below since:
    * <ol>
    * <li>We can clear the trigger finished bits since we'll never need to ask if the trigger is
    * closed again.
@@ -692,10 +819,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
     } else {
       // If !windowIsActiveAndOpen then !activeWindows.isActive (1) or triggerRunner.isClosed (2).
       // For (1), if !activeWindows.isActive then the window must be merging and has been
-      // explicitly removed by emitIfAppropriate. But in that case the trigger must have fired
+      // explicitly removed by emit. But in that case the trigger must have fired
       // and been closed, so this case reduces to (2).
       // For (2), if triggerRunner.isClosed then the trigger was fired and entered the
-      // closed state. In that case emitIfAppropriate will have cleared all state in
+      // closed state. In that case emit will have cleared all state in
       // reduceFn, triggerRunner (except for finished bits), paneInfoTracker and activeWindows.
       // We also know nonEmptyPanes must have been unconditionally cleared by the trigger.
       // Since the trigger fired the existing watermark holds must have been cleared, and since
@@ -737,17 +864,23 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
     return false;
   }
 
+  private void prefetchEmit(ReduceFn<K, InputT, OutputT, W>.Context directContext,
+                                ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
+    triggerRunner.prefetchShouldFire(directContext.window(), directContext.state());
+    triggerRunner.prefetchOnFire(directContext.window(), directContext.state());
+    triggerRunner.prefetchIsClosed(directContext.state());
+    prefetchOnTrigger(directContext, renamedContext);
+  }
+
   /**
-   * Possibly emit a pane if a trigger is ready to fire or timers require it, and cleanup state.
+   * Emit if a trigger is ready to fire or timers require it, and cleanup state.
    */
-  private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context directContext,
+  private void emit(
+      ReduceFn<K, InputT, OutputT, W>.Context directContext,
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext)
       throws Exception {
-    if (!triggerRunner.shouldFire(
-        directContext.window(), directContext.timers(), directContext.state())) {
-      // Ignore unless trigger is ready to fire
-      return;
-    }
+    checkState(triggerRunner.shouldFire(
+        directContext.window(), directContext.timers(), directContext.state()));
 
     // Inform the trigger of the transition to see if it is finished
     triggerRunner.onFire(directContext.window(), directContext.timers(), directContext.state());
@@ -782,7 +915,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
   }
 
   /**
-   * Do we need to emit a pane?
+   * Do we need to emit?
    */
   private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) {
     if (!isEmpty) {
@@ -800,6 +933,15 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
     return false;
   }
 
+  private void prefetchOnTrigger(
+      final ReduceFn<K, InputT, OutputT, W>.Context directContext,
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
+    paneInfoTracker.prefetchPaneInfo(directContext);
+    watermarkHold.prefetchExtract(renamedContext);
+    nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
+    reduceFn.prefetchOnTrigger(directContext.state());
+  }
+
   /**
    * Run the {@link ReduceFn#onTrigger} method and produce any necessary output.
    *
@@ -813,25 +955,17 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
           throws Exception {
     Instant inputWM = timerInternals.currentInputWatermarkTime();
 
-    // Prefetch necessary states
-    ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture =
-        watermarkHold.extractAndRelease(renamedContext, isFinished).readLater();
-    ReadableState<PaneInfo> paneFuture =
-        paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
-    ReadableState<Boolean> isEmptyFuture =
-        nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
-
-    reduceFn.prefetchOnTrigger(directContext.state());
-    triggerRunner.prefetchOnFire(directContext.window(), directContext.state());
-
     // Calculate the pane info.
-    final PaneInfo pane = paneFuture.read();
-    // Extract the window hold, and as a side effect clear it.
+    final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read();
 
-    WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read();
+    // Extract the window hold, and as a side effect clear it.
+    final WatermarkHold.OldAndNewHolds pair =
+        watermarkHold.extractAndRelease(renamedContext, isFinished).read();
     final Instant outputTimestamp = pair.oldHold;
     @Nullable Instant newHold = pair.newHold;
 
+    final boolean isEmpty = nonEmptyPanes.isEmpty(renamedContext.state()).read();
+
     if (newHold != null) {
       // We can't be finished yet.
       checkState(
@@ -863,11 +997,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
     }
 
     // Only emit a pane if it has data or empty panes are observable.
-    if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
+    if (needToEmit(isEmpty, isFinished, pane.getTiming())) {
       // Run reduceFn.onTrigger method.
       final List<W> windows = Collections.singletonList(directContext.window());
       ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext =
-          contextFactory.forTrigger(directContext.window(), paneFuture, StateStyle.RENAMED,
+          contextFactory.forTrigger(directContext.window(), pane, StateStyle.RENAMED,
               new OnTriggerCallbacks<OutputT>() {
                 @Override
                 public void output(OutputT toOutput) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 3c04571..7f1afcc 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -444,6 +444,11 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
     }
   }
 
+  public void prefetchExtract(final ReduceFn<?, ?, ?, W>.Context context) {
+    context.state().access(elementHoldTag).readLater();
+    context.state().access(EXTRA_HOLD_TAG).readLater();
+  }
+
   /**
    * Return (a future for) the earliest hold for {@code context}. Clear all the holds after
    * reading, but add/restore an end-of-window or garbage collection hold if required.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
index 9f03216..2f277eb 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
@@ -99,25 +99,25 @@ public class TriggerStateMachineRunner<W extends BoundedWindow> {
     return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
   }
 
-  public void prefetchForValue(W window, StateAccessor<?> state) {
+  public void prefetchIsClosed(StateAccessor<?> state) {
     if (isFinishedSetNeeded()) {
       state.access(FINISHED_BITS_TAG).readLater();
     }
+  }
+
+  public void prefetchForValue(W window, StateAccessor<?> state) {
+    prefetchIsClosed(state);
     rootTrigger.getSpec().prefetchOnElement(
         contextFactory.createStateAccessor(window, rootTrigger));
   }
 
   public void prefetchOnFire(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
+    prefetchIsClosed(state);
     rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger));
   }
 
   public void prefetchShouldFire(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
+    prefetchIsClosed(state);
     rootTrigger.getSpec().prefetchShouldFire(
         contextFactory.createStateAccessor(window, rootTrigger));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 337be23..8be8ae5 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -509,8 +509,10 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
   public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exception {
     ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
-    runner.onTimer(
+    ArrayList timers = new ArrayList(1);
+    timers.add(
         TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), timestamp, domain));
+    runner.onTimers(timers);
     runner.persist();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 9d25bc6..f70fb94 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -201,9 +200,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
       // Drop any elements within expired windows
       reduceFnRunner.processElements(
           dropExpiredWindows(key, workItem.elementsIterable(), timerInternals));
-      for (TimerData timer : workItem.timersIterable()) {
-        reduceFnRunner.onTimer(timer);
-      }
+      reduceFnRunner.onTimers(workItem.timersIterable());
       reduceFnRunner.persist();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index daa8a06..f8b1222 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -466,8 +466,10 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
   private static TimerCallback collectInto(final List<TimerInternals.TimerData> firedTimers) {
     return new TimerCallback() {
       @Override
-      public void onTimer(TimerInternals.TimerData timer) throws Exception {
-        firedTimers.add(timer);
+      public void onTimers(Iterable<TimerInternals.TimerData> timers) throws Exception {
+        for (TimerInternals.TimerData timer : timers) {
+          firedTimers.add(timer);
+        }
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
index a3bb45a..f1ddaac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.MoreObjects;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.PriorityQueue;
 import java.util.Set;
@@ -235,13 +236,20 @@ public class InMemoryTimerInternals implements TimerInternals {
       throws Exception {
     checkNotNull(timerCallback);
     PriorityQueue<TimerData> queue = queue(domain);
-    while (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) {
-      // Remove before firing, so that if the callback adds another identical
-      // timer we don't remove it.
-      TimerData timer = queue.remove();
-      WindowTracing.trace(
-          "InMemoryTimerInternals.advanceAndFire: firing {} at {}", timer, currentTime);
-      timerCallback.onTimer(timer);
+    while (true) {
+      ArrayList<TimerData> firedTimers = new ArrayList();
+      while (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) {
+        // Remove before firing, so that if the callback adds another identical
+        // timer we don't remove it.
+        TimerData timer = queue.remove();
+        firedTimers.add(timer);
+        WindowTracing.trace(
+            "InMemoryTimerInternals.advanceAndFire: firing {} at {}", timer, currentTime);
+      }
+      if (firedTimers.isEmpty()) {
+        break;
+      }
+      timerCallback.onTimers(firedTimers);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
index 6598e30..dfdfd5b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
@@ -19,16 +19,17 @@ package org.apache.beam.sdk.util.state;
 
 import org.apache.beam.sdk.util.TimerInternals;
 
+
 /**
- * A callback that processes a {@link TimerInternals.TimerData TimerData}.
+ * A callback that processes an Iterable of {@link TimerInternals.TimerData TimerData}.
  */
 public interface TimerCallback {
-  /** Processes the {@link TimerInternals.TimerData TimerData}. */
-  void onTimer(TimerInternals.TimerData timer) throws Exception;
+  /** Processes an Iterable of {@link TimerInternals.TimerData TimerData}. */
+  void onTimers(Iterable<TimerInternals.TimerData> timers) throws Exception;
 
   TimerCallback NO_OP = new TimerCallback() {
     @Override
-    public void onTimer(TimerInternals.TimerData timer) throws Exception {
+    public void onTimers(Iterable<TimerInternals.TimerData> timers) throws Exception {
       // Nothing
     }
   };

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
index 951803a..a3a7749 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
@@ -17,6 +17,11 @@
  */
 package org.apache.beam.sdk.util.state;
 
+import static org.mockito.Matchers.argThat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.joda.time.Instant;
@@ -24,6 +29,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.ArgumentMatcher;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
@@ -44,6 +50,37 @@ public class InMemoryTimerInternalsTest {
     MockitoAnnotations.initMocks(this);
   }
 
+  private static class TimersAre extends ArgumentMatcher<Iterable<TimerData>> {
+    final List<TimerData> expectedTimers;
+    TimersAre(List<TimerData> timers) {
+      expectedTimers = timers;
+    }
+
+    @Override
+    public boolean matches(Object actual) {
+      if (actual == null || !(actual instanceof Iterable)) {
+        return false;
+      }
+      @SuppressWarnings("unchecked")
+      Iterable<TimerData> timers = (Iterable<TimerData>) actual;
+
+      List<TimerData> actualTimers = new ArrayList();
+      for (TimerData timer : timers) {
+        actualTimers.add(timer);
+      }
+      return expectedTimers.equals(actualTimers);
+    }
+
+    @Override
+    public String toString() {
+      return "ordered timers " + expectedTimers.toString();
+    }
+  }
+
+  private static TimersAre timersAre(TimerData... timers) {
+    return new TimersAre(Arrays.asList(timers));
+  }
+
   @Test
   public void testFiringTimers() throws Exception {
     InMemoryTimerInternals underTest = new InMemoryTimerInternals();
@@ -54,7 +91,7 @@ public class InMemoryTimerInternalsTest {
     underTest.setTimer(processingTime2);
 
     underTest.advanceProcessingTime(timerCallback, new Instant(20));
-    Mockito.verify(timerCallback).onTimer(processingTime1);
+    Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime1)));
     Mockito.verifyNoMoreInteractions(timerCallback);
 
     // Advancing just a little shouldn't refire
@@ -63,13 +100,13 @@ public class InMemoryTimerInternalsTest {
 
     // Adding the timer and advancing a little should refire
     underTest.setTimer(processingTime1);
-    Mockito.verify(timerCallback).onTimer(processingTime1);
+    Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime1)));
     underTest.advanceProcessingTime(timerCallback, new Instant(21));
     Mockito.verifyNoMoreInteractions(timerCallback);
 
     // And advancing the rest of the way should still have the other timer
     underTest.advanceProcessingTime(timerCallback, new Instant(30));
-    Mockito.verify(timerCallback).onTimer(processingTime2);
+    Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime2)));
     Mockito.verifyNoMoreInteractions(timerCallback);
   }
 
@@ -87,13 +124,11 @@ public class InMemoryTimerInternalsTest {
     underTest.setTimer(watermarkTime2);
 
     underTest.advanceInputWatermark(timerCallback, new Instant(30));
-    Mockito.verify(timerCallback).onTimer(watermarkTime1);
-    Mockito.verify(timerCallback).onTimer(watermarkTime2);
+    Mockito.verify(timerCallback).onTimers(argThat(timersAre(watermarkTime1, watermarkTime2)));
     Mockito.verifyNoMoreInteractions(timerCallback);
 
     underTest.advanceProcessingTime(timerCallback, new Instant(30));
-    Mockito.verify(timerCallback).onTimer(processingTime1);
-    Mockito.verify(timerCallback).onTimer(processingTime2);
+    Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime1, processingTime2)));
     Mockito.verifyNoMoreInteractions(timerCallback);
   }
 
@@ -107,10 +142,9 @@ public class InMemoryTimerInternalsTest {
     underTest.setTimer(processingTime);
     underTest.setTimer(processingTime);
     underTest.advanceProcessingTime(timerCallback, new Instant(20));
+    Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime)));
     underTest.advanceInputWatermark(timerCallback, new Instant(20));
-
-    Mockito.verify(timerCallback).onTimer(processingTime);
-    Mockito.verify(timerCallback).onTimer(watermarkTime);
+    Mockito.verify(timerCallback).onTimers(argThat(timersAre(watermarkTime)));
     Mockito.verifyNoMoreInteractions(timerCallback);
   }
 }


Mime
View raw message