beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [04/17] incubator-beam git commit: Rename runners-core Trigger to TriggerStateMachine
Date Thu, 13 Oct 2016 22:22:38 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java
new file mode 100644
index 0000000..2490463
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Instant;
+
+/**
+ * {@code AfterProcessingTime} triggers fire based on the current processing time. They operate in
+ * the real-time domain.
+ *
+ * <p>The time at which to fire the timer can be adjusted via the methods in {@link
+ * AfterDelayFromFirstElementStateMachine}, such as {@link
+ * AfterDelayFromFirstElementStateMachine#plusDelayOf} or {@link
+ * AfterDelayFromFirstElementStateMachine#alignedTo}.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class AfterProcessingTimeStateMachine extends AfterDelayFromFirstElementStateMachine {
+
+  @Override
+  @Nullable
+  public Instant getCurrentTime(TriggerStateMachine.TriggerContext context) {
+    return context.currentProcessingTime();
+  }
+
+  private AfterProcessingTimeStateMachine(List<SerializableFunction<Instant, Instant>> transforms) {
+    super(TimeDomain.PROCESSING_TIME, transforms);
+  }
+
+  /**
+   * Creates a trigger that fires when the current processing time passes the processing time
+   * at which this trigger saw the first element in a pane.
+   */
+  public static AfterProcessingTimeStateMachine pastFirstElementInPane() {
+    return new AfterProcessingTimeStateMachine(IDENTITY);
+  }
+
+  @Override
+  protected AfterProcessingTimeStateMachine newWith(
+      List<SerializableFunction<Instant, Instant>> transforms) {
+    return new AfterProcessingTimeStateMachine(transforms);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder("AfterProcessingTime.pastFirstElementInPane()");
+    for (SerializableFunction<Instant, Instant> delayFn : timestampMappers) {
+      builder
+          .append(".plusDelayOf(")
+          .append(delayFn)
+          .append(")");
+    }
+
+    return builder.toString();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof AfterProcessingTimeStateMachine)) {
+      return false;
+    }
+    AfterProcessingTimeStateMachine that = (AfterProcessingTimeStateMachine) obj;
+    return Objects.equals(this.timestampMappers, that.timestampMappers);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(getClass(), this.timestampMappers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java
new file mode 100644
index 0000000..000f6e7
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import com.google.common.base.Objects;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Instant;
+
+class AfterSynchronizedProcessingTimeStateMachine extends AfterDelayFromFirstElementStateMachine {
+
+  @Override
+  @Nullable
+  public Instant getCurrentTime(TriggerStateMachine.TriggerContext context) {
+    return context.currentSynchronizedProcessingTime();
+  }
+
+  public AfterSynchronizedProcessingTimeStateMachine() {
+    super(TimeDomain.SYNCHRONIZED_PROCESSING_TIME,
+        Collections.<SerializableFunction<Instant, Instant>>emptyList());
+  }
+
+  @Override
+  public String toString() {
+    return "AfterSynchronizedProcessingTime.pastFirstElementInPane()";
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return this == obj || obj instanceof AfterSynchronizedProcessingTimeStateMachine;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(AfterSynchronizedProcessingTimeStateMachine.class);
+  }
+
+  @Override
+  protected AfterSynchronizedProcessingTimeStateMachine
+      newWith(List<SerializableFunction<Instant, Instant>> transforms) {
+    // ignore transforms
+    return this;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
new file mode 100644
index 0000000..5ad6214
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.util.TimeDomain;
+
+/**
+ * {@code AfterWatermark} triggers fire based on progress of the system watermark. This time is a
+ * lower-bound, sometimes heuristically established, on event times that have been fully processed
+ * by the pipeline.
+ *
+ * <p>For sources that provide non-heuristic watermarks (e.g.
+ * {@link org.apache.beam.sdk.io.PubsubIO} when using arrival times as event times), the
+ * watermark is a strict guarantee that no data with an event time earlier than
+ * that watermark will ever be observed in the pipeline. In this case, it's safe to assume that any
+ * pane triggered by an {@code AfterWatermark} trigger with a reference point at or beyond the end
+ * of the window will be the last pane ever for that window.
+ *
+ * <p>For sources that provide heuristic watermarks (e.g.
+ * {@link org.apache.beam.sdk.io.PubsubIO} when using user-supplied event times), the
+ * watermark itself becomes an <i>estimate</i> that no data with an event time earlier than that
+ * watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics can
+ * often be quite accurate, but the chance of seeing late data for any given window is non-zero.
+ * Thus, if absolute correctness over time is important to your use case, you may want to consider
+ * using a trigger that accounts for late data. The default trigger,
+ * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}, which fires
+ * once when the watermark passes the end of the window and then immediately therafter when any
+ * late data arrives, is one such example.
+ *
+ * <p>The watermark is the clock that defines {@link TimeDomain#EVENT_TIME}.
+ *
+ * <p>Additionaly firings before or after the watermark can be requested by calling
+ * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)} or
+ * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)}.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class AfterWatermarkStateMachine {
+
+  private static final String TO_STRING = "AfterWatermark.pastEndOfWindow()";
+
+  // Static factory class.
+  private AfterWatermarkStateMachine() {}
+
+  /**
+   * Creates a trigger that fires when the watermark passes the end of the window.
+   */
+  public static FromEndOfWindow pastEndOfWindow() {
+    return new FromEndOfWindow();
+  }
+
+  /**
+   * @see AfterWatermarkStateMachine
+   */
+  public static class AfterWatermarkEarlyAndLate extends TriggerStateMachine {
+
+    private static final int EARLY_INDEX = 0;
+    private static final int LATE_INDEX = 1;
+
+    private final OnceTriggerStateMachine earlyTrigger;
+    @Nullable
+    private final OnceTriggerStateMachine lateTrigger;
+
+    @SuppressWarnings("unchecked")
+    private AfterWatermarkEarlyAndLate(
+        OnceTriggerStateMachine earlyTrigger, OnceTriggerStateMachine lateTrigger) {
+      super(
+          lateTrigger == null
+              ? ImmutableList.<TriggerStateMachine>of(earlyTrigger)
+              : ImmutableList.<TriggerStateMachine>of(earlyTrigger, lateTrigger));
+      this.earlyTrigger = checkNotNull(earlyTrigger, "earlyTrigger should not be null");
+      this.lateTrigger = lateTrigger;
+    }
+
+    public TriggerStateMachine withEarlyFirings(OnceTriggerStateMachine earlyTrigger) {
+      return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
+    }
+
+    public TriggerStateMachine withLateFirings(OnceTriggerStateMachine lateTrigger) {
+      return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
+    }
+
+    @Override
+    public void onElement(OnElementContext c) throws Exception {
+      if (!c.trigger().isMerging()) {
+        // If merges can never happen, we just run the unfinished subtrigger
+        c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
+      } else {
+        // If merges can happen, we run for all subtriggers because they might be
+        // de-activated or re-activated
+        for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) {
+          subTrigger.invokeOnElement(c);
+        }
+      }
+    }
+
+    @Override
+    public void onMerge(OnMergeContext c) throws Exception {
+      // NOTE that the ReduceFnRunner will delete all end-of-window timers for the
+      // merged-away windows.
+
+      ExecutableTriggerStateMachine earlySubtrigger = c.trigger().subTrigger(EARLY_INDEX);
+      // We check the early trigger to determine if we are still processing it or
+      // if the end of window has transitioned us to the late trigger
+      OnMergeContext earlyContext = c.forTrigger(earlySubtrigger);
+
+      // If the early trigger is still active in any merging window then it is still active in
+      // the new merged window, because even if the merged window is "done" some pending elements
+      // haven't had a chance to fire.
+      if (!earlyContext.trigger().finishedInAllMergingWindows() || !endOfWindowReached(c)) {
+        earlyContext.trigger().setFinished(false);
+        if (lateTrigger != null) {
+          ExecutableTriggerStateMachine lateSubtrigger = c.trigger().subTrigger(LATE_INDEX);
+          OnMergeContext lateContext = c.forTrigger(lateSubtrigger);
+          lateContext.trigger().setFinished(false);
+          lateSubtrigger.invokeClear(lateContext);
+        }
+      } else {
+        // Otherwise the early trigger and end-of-window bit is done for good.
+        earlyContext.trigger().setFinished(true);
+        if (lateTrigger != null) {
+          c.trigger().subTrigger(LATE_INDEX).invokeOnMerge(c);
+        }
+      }
+    }
+
+    private boolean endOfWindowReached(TriggerStateMachine.TriggerContext context) {
+      return context.currentEventTime() != null
+          && context.currentEventTime().isAfter(context.window().maxTimestamp());
+    }
+
+    @Override
+    public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception {
+      if (!context.trigger().isFinished(EARLY_INDEX)) {
+        // We have not yet transitioned to late firings.
+        // We should fire if either the trigger is ready or we reach the end of the window.
+        return context.trigger().subTrigger(EARLY_INDEX).invokeShouldFire(context)
+            || endOfWindowReached(context);
+      } else if (lateTrigger == null) {
+        return false;
+      } else {
+        // We are running the late trigger
+        return context.trigger().subTrigger(LATE_INDEX).invokeShouldFire(context);
+      }
+    }
+
+    @Override
+    public void onFire(TriggerStateMachine.TriggerContext context) throws Exception {
+      if (!context.forTrigger(context.trigger().subTrigger(EARLY_INDEX)).trigger().isFinished()) {
+        onNonLateFiring(context);
+      } else if (lateTrigger != null) {
+        onLateFiring(context);
+      } else {
+        // all done
+        context.trigger().setFinished(true);
+      }
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder(TO_STRING);
+
+      if (!(earlyTrigger instanceof NeverStateMachine)) {
+        builder
+            .append(".withEarlyFirings(")
+            .append(earlyTrigger)
+            .append(")");
+      }
+
+      if (lateTrigger != null && !(lateTrigger instanceof NeverStateMachine)) {
+        builder
+            .append(".withLateFirings(")
+            .append(lateTrigger)
+            .append(")");
+      }
+
+      return builder.toString();
+    }
+
+    private void onNonLateFiring(TriggerStateMachine.TriggerContext context) throws Exception {
+      // We have not yet transitioned to late firings.
+      ExecutableTriggerStateMachine earlySubtrigger = context.trigger().subTrigger(EARLY_INDEX);
+      TriggerStateMachine.TriggerContext earlyContext = context.forTrigger(earlySubtrigger);
+
+      if (!endOfWindowReached(context)) {
+        // This is an early firing, since we have not arrived at the end of the window
+        // Implicitly repeats
+        earlySubtrigger.invokeOnFire(context);
+        earlySubtrigger.invokeClear(context);
+        earlyContext.trigger().setFinished(false);
+      } else {
+        // We have arrived at the end of the window; terminate the early trigger
+        // and clear out the late trigger's state
+        if (earlySubtrigger.invokeShouldFire(context)) {
+          earlySubtrigger.invokeOnFire(context);
+        }
+        earlyContext.trigger().setFinished(true);
+        earlySubtrigger.invokeClear(context);
+
+        if (lateTrigger == null) {
+          // Done if there is no late trigger.
+          context.trigger().setFinished(true);
+        } else {
+          // If there is a late trigger, we transition to it, and need to clear its state
+          // because it was run in parallel.
+          context.trigger().subTrigger(LATE_INDEX).invokeClear(context);
+        }
+      }
+
+    }
+
+    private void onLateFiring(TriggerStateMachine.TriggerContext context) throws Exception {
+      // We are firing the late trigger, with implicit repeat
+      ExecutableTriggerStateMachine lateSubtrigger = context.trigger().subTrigger(LATE_INDEX);
+      lateSubtrigger.invokeOnFire(context);
+      // It is a OnceTrigger, so it must have finished; unfinished it and clear it
+      lateSubtrigger.invokeClear(context);
+      context.forTrigger(lateSubtrigger).trigger().setFinished(false);
+    }
+  }
+
+  /**
+   * A watermark trigger targeted relative to the end of the window.
+   */
+  public static class FromEndOfWindow extends OnceTriggerStateMachine {
+
+    private FromEndOfWindow() {
+      super(null);
+    }
+
+    /**
+     * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever
+     * the given {@code Trigger} fires before the watermark has passed the end of the window.
+     */
+    public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTriggerStateMachine earlyFirings) {
+      checkNotNull(earlyFirings, "Must specify the trigger to use for early firings");
+      return new AfterWatermarkEarlyAndLate(earlyFirings, null);
+    }
+
+    /**
+     * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever
+     * the given {@code Trigger} fires after the watermark has passed the end of the window.
+     */
+    public AfterWatermarkEarlyAndLate withLateFirings(OnceTriggerStateMachine lateFirings) {
+      checkNotNull(lateFirings, "Must specify the trigger to use for late firings");
+      return new AfterWatermarkEarlyAndLate(NeverStateMachine.ever(), lateFirings);
+    }
+
+    @Override
+    public void onElement(OnElementContext c) throws Exception {
+      // We're interested in knowing when the input watermark passes the end of the window.
+      // (It is possible this has already happened, in which case the timer will be fired
+      // almost immediately).
+      c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
+    }
+
+    @Override
+    public void onMerge(OnMergeContext c) throws Exception {
+      // NOTE that the ReduceFnRunner will delete all end-of-window timers for the
+      // merged-away windows.
+
+      if (!c.trigger().finishedInAllMergingWindows()) {
+        // If the trigger is still active in any merging window then it is still active in the new
+        // merged window, because even if the merged window is "done" some pending elements haven't
+        // had a chance to fire
+        c.trigger().setFinished(false);
+      } else if (!endOfWindowReached(c)) {
+        // If the end of the new window has not been reached, then the trigger is active again.
+        c.trigger().setFinished(false);
+      } else {
+        // Otherwise it is done for good
+        c.trigger().setFinished(true);
+      }
+    }
+
+    @Override
+    public String toString() {
+      return TO_STRING;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof FromEndOfWindow;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getClass());
+    }
+
+    @Override
+    public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception {
+      return endOfWindowReached(context);
+    }
+
+    private boolean endOfWindowReached(TriggerStateMachine.TriggerContext context) {
+      return context.currentEventTime() != null
+          && context.currentEventTime().isAfter(context.window().maxTimestamp());
+    }
+
+    @Override
+    protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) throws Exception { }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachine.java
new file mode 100644
index 0000000..be4dd68
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachine.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.util.TimeDomain;
+
+/**
+ * A trigger that is equivalent to {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}. See
+ * {@link RepeatedlyStateMachine#forever} and {@link AfterWatermarkStateMachine#pastEndOfWindow} for
+ * more details.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class DefaultTriggerStateMachine extends TriggerStateMachine {
+
+  private DefaultTriggerStateMachine() {
+    super(null);
+  }
+
+  /**
+   * Returns the default trigger.
+   */
+  public static DefaultTriggerStateMachine of() {
+    return new DefaultTriggerStateMachine();
+  }
+
+  @Override
+  public void onElement(OnElementContext c) throws Exception {
+    // If the end of the window has already been reached, then we are already ready to fire
+    // and do not need to set a wake-up timer.
+    if (!endOfWindowReached(c)) {
+      c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
+    }
+  }
+
+  @Override
+  public void onMerge(OnMergeContext c) throws Exception {
+    // If the end of the window has already been reached, then we are already ready to fire
+    // and do not need to set a wake-up timer.
+    if (!endOfWindowReached(c)) {
+      c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
+    }
+  }
+
+  @Override
+  public void clear(TriggerContext c) throws Exception { }
+
+  @Override
+  public boolean isCompatible(TriggerStateMachine other) {
+    // Semantically, all default triggers are identical
+    return other instanceof DefaultTriggerStateMachine;
+  }
+
+  @Override
+  public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception {
+    return endOfWindowReached(context);
+  }
+
+  private boolean endOfWindowReached(TriggerStateMachine.TriggerContext context) {
+    return context.currentEventTime() != null
+        && context.currentEventTime().isAfter(context.window().maxTimestamp());
+  }
+
+  @Override
+  public void onFire(TriggerStateMachine.TriggerContext context) throws Exception { }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java
new file mode 100644
index 0000000..c4d89c2
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+
+/**
+ * A wrapper around a trigger used during execution. While an actual trigger may appear multiple
+ * times (both in the same trigger expression and in other trigger expressions), the
+ * {@code ExecutableTrigger} wrapped around them forms a tree (only one occurrence).
+ */
+public class ExecutableTriggerStateMachine implements Serializable {
+
+  /** Store the index assigned to this trigger. */
+  private final int triggerIndex;
+  private final int firstIndexAfterSubtree;
+  private final List<ExecutableTriggerStateMachine> subTriggers = new ArrayList<>();
+  private final TriggerStateMachine trigger;
+
+  public static <W extends BoundedWindow> ExecutableTriggerStateMachine create(
+      TriggerStateMachine trigger) {
+    return create(trigger, 0);
+  }
+
+  private static <W extends BoundedWindow> ExecutableTriggerStateMachine create(
+      TriggerStateMachine trigger, int nextUnusedIndex) {
+    if (trigger instanceof OnceTriggerStateMachine) {
+      return new ExecutableOnceTriggerStateMachine(
+          (OnceTriggerStateMachine) trigger, nextUnusedIndex);
+    } else {
+      return new ExecutableTriggerStateMachine(trigger, nextUnusedIndex);
+    }
+  }
+
+  public static <W extends BoundedWindow> ExecutableTriggerStateMachine createForOnceTrigger(
+      OnceTriggerStateMachine trigger, int nextUnusedIndex) {
+    return new ExecutableOnceTriggerStateMachine(trigger, nextUnusedIndex);
+  }
+
+  private ExecutableTriggerStateMachine(TriggerStateMachine trigger, int nextUnusedIndex) {
+    this.trigger = checkNotNull(trigger, "trigger must not be null");
+    this.triggerIndex = nextUnusedIndex++;
+
+    if (trigger.subTriggers() != null) {
+      for (TriggerStateMachine subTrigger : trigger.subTriggers()) {
+        ExecutableTriggerStateMachine subExecutable = create(subTrigger, nextUnusedIndex);
+        subTriggers.add(subExecutable);
+        nextUnusedIndex = subExecutable.firstIndexAfterSubtree;
+      }
+    }
+    firstIndexAfterSubtree = nextUnusedIndex;
+  }
+
+  public List<ExecutableTriggerStateMachine> subTriggers() {
+    return subTriggers;
+  }
+
+  @Override
+  public String toString() {
+    return trigger.toString();
+  }
+
+  /**
+   * Return the underlying trigger specification corresponding to this {@code ExecutableTrigger}.
+   */
+  public TriggerStateMachine getSpec() {
+    return trigger;
+  }
+
+  public int getTriggerIndex() {
+    return triggerIndex;
+  }
+
+  public final int getFirstIndexAfterSubtree() {
+    return firstIndexAfterSubtree;
+  }
+
+  public boolean isCompatible(ExecutableTriggerStateMachine other) {
+    return trigger.isCompatible(other.trigger);
+  }
+
+  public ExecutableTriggerStateMachine getSubTriggerContaining(int index) {
+    checkNotNull(subTriggers);
+    checkState(index > triggerIndex && index < firstIndexAfterSubtree,
+        "Cannot find sub-trigger containing index not in this tree.");
+    ExecutableTriggerStateMachine previous = null;
+    for (ExecutableTriggerStateMachine subTrigger : subTriggers) {
+      if (index < subTrigger.triggerIndex) {
+        return previous;
+      }
+      previous = subTrigger;
+    }
+    return previous;
+  }
+
+  /**
+   * Invoke the {@link TriggerStateMachine#onElement} method for this trigger, ensuring that the
+   * bits are properly updated if the trigger finishes.
+   */
+  public void invokeOnElement(TriggerStateMachine.OnElementContext c) throws Exception {
+    trigger.onElement(c.forTrigger(this));
+  }
+
+  /**
+   * Invoke the {@link TriggerStateMachine#onMerge} method for this trigger, ensuring that the bits
+   * are properly updated.
+   */
+  public void invokeOnMerge(TriggerStateMachine.OnMergeContext c) throws Exception {
+    TriggerStateMachine.OnMergeContext subContext = c.forTrigger(this);
+    trigger.onMerge(subContext);
+  }
+
+  public boolean invokeShouldFire(TriggerStateMachine.TriggerContext c) throws Exception {
+    return trigger.shouldFire(c.forTrigger(this));
+  }
+
+  public void invokeOnFire(TriggerStateMachine.TriggerContext c) throws Exception {
+    trigger.onFire(c.forTrigger(this));
+  }
+
+  /**
+   * Invoke clear for the current this trigger.
+   */
+  public void invokeClear(TriggerStateMachine.TriggerContext c) throws Exception {
+    trigger.clear(c.forTrigger(this));
+  }
+
+  /**
+   * {@link ExecutableTriggerStateMachine} that enforces the fact that the trigger should always
+   * FIRE_AND_FINISH and never just FIRE.
+   */
+  private static class ExecutableOnceTriggerStateMachine extends ExecutableTriggerStateMachine {
+
+    public ExecutableOnceTriggerStateMachine(OnceTriggerStateMachine trigger, int nextUnusedIndex) {
+      super(trigger, nextUnusedIndex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggers.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggers.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggers.java
new file mode 100644
index 0000000..1098716
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggers.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+/**
+ * A mutable set which tracks whether any particular {@link ExecutableTriggerStateMachine} is
+ * finished.
+ */
+public interface FinishedTriggers {
+  /**
+   * Returns {@code true} if the trigger is finished.
+   */
+  public boolean isFinished(ExecutableTriggerStateMachine trigger);
+
+  /**
+   * Sets the fact that the trigger is finished.
+   */
+  public void setFinished(ExecutableTriggerStateMachine trigger, boolean value);
+
+  /**
+   * Sets the trigger and all of its subtriggers to unfinished.
+   */
+  public void clearRecursively(ExecutableTriggerStateMachine trigger);
+
+  /**
+   * Create an independent copy of this mutable {@link FinishedTriggers}.
+   */
+  public FinishedTriggers copy();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java
new file mode 100644
index 0000000..0f74969
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import java.util.BitSet;
+
+/**
+ * A {@link FinishedTriggers} implementation based on an underlying {@link BitSet}.
+ */
+public class FinishedTriggersBitSet implements FinishedTriggers {
+
+  private final BitSet bitSet;
+
+  private FinishedTriggersBitSet(BitSet bitSet) {
+    this.bitSet = bitSet;
+  }
+
+  public static FinishedTriggersBitSet emptyWithCapacity(int capacity) {
+    return new FinishedTriggersBitSet(new BitSet(capacity));
+  }
+
+  public static FinishedTriggersBitSet fromBitSet(BitSet bitSet) {
+    return new FinishedTriggersBitSet(bitSet);
+  }
+
+  /**
+   * Returns the underlying {@link BitSet} for this {@link FinishedTriggersBitSet}.
+   */
+  public BitSet getBitSet() {
+    return bitSet;
+  }
+
+  @Override
+  public boolean isFinished(ExecutableTriggerStateMachine trigger) {
+    return bitSet.get(trigger.getTriggerIndex());
+  }
+
+  @Override
+  public void setFinished(ExecutableTriggerStateMachine trigger, boolean value) {
+    bitSet.set(trigger.getTriggerIndex(), value);
+  }
+
+  @Override
+  public void clearRecursively(ExecutableTriggerStateMachine trigger) {
+    bitSet.clear(trigger.getTriggerIndex(), trigger.getFirstIndexAfterSubtree());
+  }
+
+  @Override
+  public FinishedTriggersBitSet copy() {
+    return new FinishedTriggersBitSet((BitSet) bitSet.clone());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersSet.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersSet.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersSet.java
new file mode 100644
index 0000000..95c35f2
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersSet.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import com.google.common.collect.Sets;
+import java.util.Set;
+
+/**
+ * An implementation of {@link FinishedTriggers} atop a user-provided mutable {@link Set}.
+ */
+public class FinishedTriggersSet implements FinishedTriggers {
+
+  private final Set<ExecutableTriggerStateMachine> finishedTriggers;
+
+  private FinishedTriggersSet(Set<ExecutableTriggerStateMachine> finishedTriggers) {
+    this.finishedTriggers = finishedTriggers;
+  }
+
+  public static FinishedTriggersSet fromSet(Set<ExecutableTriggerStateMachine> finishedTriggers) {
+    return new FinishedTriggersSet(finishedTriggers);
+  }
+
+  /**
+   * Returns a mutable {@link Set} of the underlying triggers that are finished.
+   */
+  public Set<ExecutableTriggerStateMachine> getFinishedTriggers() {
+    return finishedTriggers;
+  }
+
+  @Override
+  public boolean isFinished(ExecutableTriggerStateMachine trigger) {
+    return finishedTriggers.contains(trigger);
+  }
+
+  @Override
+  public void setFinished(ExecutableTriggerStateMachine trigger, boolean value) {
+    if (value) {
+      finishedTriggers.add(trigger);
+    } else {
+      finishedTriggers.remove(trigger);
+    }
+  }
+
+  @Override
+  public void clearRecursively(ExecutableTriggerStateMachine trigger) {
+    finishedTriggers.remove(trigger);
+    for (ExecutableTriggerStateMachine subTrigger : trigger.subTriggers()) {
+      clearRecursively(subTrigger);
+    }
+  }
+
+  @Override
+  public FinishedTriggersSet copy() {
+    return fromSet(Sets.newHashSet(finishedTriggers));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java
new file mode 100644
index 0000000..f32c7a8
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+
+/**
+ * A {@link TriggerStateMachine} which never fires.
+ *
+ * <p>Using this trigger will only produce output when the watermark passes the end of the
+ * {@link BoundedWindow window} plus the allowed lateness.
+ */
+public final class NeverStateMachine extends OnceTriggerStateMachine {
+  /**
+   * Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey}
+   * when the {@link BoundedWindow} closes.
+   */
+  public static NeverStateMachine ever() {
+    // NeverTrigger ignores all inputs and is Window-type independent.
+    return new NeverStateMachine();
+  }
+
+  private NeverStateMachine() {
+    super(null);
+  }
+
+  @Override
+  public void onElement(OnElementContext c) {}
+
+  @Override
+  public void onMerge(OnMergeContext c) {}
+
+  @Override
+  public boolean shouldFire(TriggerStateMachine.TriggerContext context) {
+    return false;
+  }
+
+  @Override
+  protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) {
+    throw new UnsupportedOperationException(
+        String.format("%s should never fire", getClass().getSimpleName()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachine.java
new file mode 100644
index 0000000..f9ec5e7
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachine.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Arrays;
+
+/**
+ * Executes the {@code actual} trigger until it finishes or until the {@code until} trigger fires.
+ */
+class OrFinallyStateMachine extends TriggerStateMachine {
+
+  private static final int ACTUAL = 0;
+  private static final int UNTIL = 1;
+
+  @VisibleForTesting
+  OrFinallyStateMachine(TriggerStateMachine actual, OnceTriggerStateMachine until) {
+    super(Arrays.asList(actual, until));
+  }
+
+  @Override
+  public void onElement(OnElementContext c) throws Exception {
+    c.trigger().subTrigger(ACTUAL).invokeOnElement(c);
+    c.trigger().subTrigger(UNTIL).invokeOnElement(c);
+  }
+
+  @Override
+  public void onMerge(OnMergeContext c) throws Exception {
+    for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) {
+      subTrigger.invokeOnMerge(c);
+    }
+    updateFinishedState(c);
+  }
+
+  @Override
+  public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception {
+    return context.trigger().subTrigger(ACTUAL).invokeShouldFire(context)
+        || context.trigger().subTrigger(UNTIL).invokeShouldFire(context);
+  }
+
+  @Override
+  public void onFire(TriggerStateMachine.TriggerContext context) throws Exception {
+    ExecutableTriggerStateMachine actualSubtrigger = context.trigger().subTrigger(ACTUAL);
+    ExecutableTriggerStateMachine untilSubtrigger = context.trigger().subTrigger(UNTIL);
+
+    if (untilSubtrigger.invokeShouldFire(context)) {
+      untilSubtrigger.invokeOnFire(context);
+      actualSubtrigger.invokeClear(context);
+    } else {
+      // If until didn't fire, then the actual must have (or it is forbidden to call
+      // onFire) so we are done only if actual is done.
+      actualSubtrigger.invokeOnFire(context);
+      // Do not clear the until trigger, because it tracks data cross firings.
+    }
+    updateFinishedState(context);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s.orFinally(%s)", subTriggers.get(ACTUAL), subTriggers.get(UNTIL));
+  }
+
+  private void updateFinishedState(TriggerContext c) throws Exception {
+    boolean anyStillFinished = false;
+    for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) {
+      anyStillFinished |= c.forTrigger(subTrigger).trigger().isFinished();
+    }
+    c.trigger().setFinished(anyStillFinished);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/RepeatedlyStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/RepeatedlyStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/RepeatedlyStateMachine.java
new file mode 100644
index 0000000..35b701c
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/RepeatedlyStateMachine.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import java.util.Arrays;
+
+/**
+ * Repeat a trigger, either until some condition is met or forever.
+ *
+ * <p>For example, to fire after the end of the window, and every time late data arrives:
+ * <pre> {@code
+ *     Repeatedly.forever(AfterWatermark.isPastEndOfWindow());
+ * } </pre>
+ *
+ * <p>{@code Repeatedly.forever(someTrigger)} behaves like an infinite
+ * {@code AfterEach.inOrder(someTrigger, someTrigger, someTrigger, ...)}.
+ */
+public class RepeatedlyStateMachine extends TriggerStateMachine {
+
+  private static final int REPEATED = 0;
+
+  /**
+   * Create a composite trigger that repeatedly executes the trigger {@code repeated}, firing each
+   * time it fires and ignoring any indications to finish.
+   *
+   * <p>Unless used with {@link TriggerStateMachine#orFinally} the composite trigger will never
+   * finish.
+   *
+   * @param repeated the trigger to execute repeatedly.
+   */
+  public static RepeatedlyStateMachine forever(TriggerStateMachine repeated) {
+    return new RepeatedlyStateMachine(repeated);
+  }
+
+  private RepeatedlyStateMachine(TriggerStateMachine repeated) {
+    super(Arrays.asList(repeated));
+  }
+
+
+  @Override
+  public void onElement(OnElementContext c) throws Exception {
+    getRepeated(c).invokeOnElement(c);
+  }
+
+  @Override
+  public void onMerge(OnMergeContext c) throws Exception {
+    getRepeated(c).invokeOnMerge(c);
+  }
+
+  @Override
+  public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception {
+    return getRepeated(context).invokeShouldFire(context);
+  }
+
+  @Override
+  public void onFire(TriggerContext context) throws Exception {
+    getRepeated(context).invokeOnFire(context);
+
+    if (context.trigger().isFinished(REPEATED)) {
+      // Reset tree will recursively clear the finished bits, and invoke clear.
+      context.forTrigger(getRepeated(context)).trigger().resetTree();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Repeatedly.forever(%s)", subTriggers.get(REPEATED));
+  }
+
+  private ExecutableTriggerStateMachine getRepeated(TriggerContext context) {
+    return context.trigger().subTrigger(REPEATED);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachine.java
new file mode 100644
index 0000000..fc9f203
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachine.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import org.apache.beam.sdk.util.Reshuffle;
+
+/**
+ * The trigger used with {@link Reshuffle} which triggers on every element
+ * and never buffers state.
+ */
+public class ReshuffleTriggerStateMachine extends TriggerStateMachine {
+
+  public ReshuffleTriggerStateMachine() {
+    super(null);
+  }
+
+  @Override
+  public void onElement(TriggerStateMachine.OnElementContext c) { }
+
+  @Override
+  public void onMerge(TriggerStateMachine.OnMergeContext c) { }
+
+  @Override
+  public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception {
+    return true;
+  }
+
+  @Override
+  public void onFire(TriggerStateMachine.TriggerContext context) throws Exception { }
+
+  @Override
+  public String toString() {
+    return "ReshuffleTriggerStateMachine()";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
new file mode 100644
index 0000000..f8def20
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import com.google.common.base.Joiner;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.joda.time.Instant;
+
+/**
+ * {@code Trigger}s control when the elements for a specific key and window are output. As elements
+ * arrive, they are put into one or more windows by a {@link Window} transform and its associated
+ * {@link WindowFn}, and then passed to the associated {@code Trigger} to determine if the {@code
+ * Window}s contents should be output.
+ *
+ * <p>See {@link org.apache.beam.sdk.transforms.GroupByKey} and {@link Window} for more information
+ * about how grouping with windows works.
+ *
+ * <p>The elements that are assigned to a window since the last time it was fired (or since the
+ * window was created) are placed into the current window pane. Triggers are evaluated against the
+ * elements as they are added. When the root trigger fires, the elements in the current pane will be
+ * output. When the root trigger finishes (indicating it will never fire again), the window is
+ * closed and any new elements assigned to that window are discarded.
+ *
+ * <p>Several predefined {@code Trigger}s are provided:
+ *
+ * <ul>
+ * <li> {@link AfterWatermarkStateMachine} for firing when the watermark passes a timestamp
+ *     determined from either the end of the window or the arrival of the first element in a pane.
+ * <li> {@link AfterProcessingTimeStateMachine} for firing after some amount of processing time has
+ *     elapsed (typically since the first element in a pane).
+ * <li> {@link AfterPaneStateMachine} for firing off a property of the elements in the current pane,
+ *     such as the number of elements that have been assigned to the current pane.
+ * </ul>
+ *
+ * <p>In addition, {@code Trigger}s can be combined in a variety of ways:
+ *
+ * <ul>
+ * <li> {@link RepeatedlyStateMachine#forever} to create a trigger that executes forever. Any time
+ *     its argument finishes it gets reset and starts over. Can be combined with {@link
+ *     TriggerStateMachine#orFinally} to specify a condition that causes the repetition to stop.
+ * <li> {@link AfterEachStateMachine#inOrder} to execute each trigger in sequence, firing each (and
+ *     every) time that a trigger fires, and advancing to the next trigger in the sequence when it
+ *     finishes.
+ * <li> {@link AfterFirstStateMachine#of} to create a trigger that fires after at least one of its
+ *     arguments fires. An {@link AfterFirstStateMachine} trigger finishes after it fires once.
+ * <li> {@link AfterAllStateMachine#of} to create a trigger that fires after all least one of its
+ *     arguments have fired at least once. An {@link AfterAllStateMachine} trigger finishes after it
+ *     fires once.
+ * </ul>
+ *
+ * <p>Each trigger tree is instantiated per-key and per-window. Every trigger in the tree is in one
+ * of the following states:
+ *
+ * <ul>
+ * <li> Never Existed - before the trigger has started executing, there is no state associated with
+ *     it anywhere in the system. A trigger moves to the executing state as soon as it processes in
+ *     the current pane.
+ * <li> Executing - while the trigger is receiving items and may fire. While it is in this state, it
+ *     may persist book-keeping information to persisted state, set timers, etc.
+ * <li> Finished - after a trigger finishes, all of its book-keeping data is cleaned up, and the
+ *     system remembers only that it is finished. Entering this state causes us to discard any
+ *     elements in the buffer for that window, as well.
+ * </ul>
+ *
+ * <p>Once finished, a trigger cannot return itself back to an earlier state, however a composite
+ * trigger could reset its sub-triggers.
+ *
+ * <p>Triggers should not build up any state internally since they may be recreated between
+ * invocations of the callbacks. All important values should be persisted using state before the
+ * callback returns.
+ */
+public abstract class TriggerStateMachine implements Serializable {
+
+  /**
+   * Interface for accessing information about the trigger being executed and other triggers in the
+   * same tree.
+   */
+  public interface TriggerInfo {
+
+    /**
+     * Returns true if the windowing strategy of the current {@code PCollection} is a merging {@link
+     * WindowFn}. If true, the trigger execution needs to keep enough information to support the
+     * possibility of {@link TriggerStateMachine#onMerge} being called. If false, {@link
+     * TriggerStateMachine#onMerge} will never be called.
+     */
+    boolean isMerging();
+
+    /**
+     * Access the executable versions of the sub-triggers of the current trigger.
+     */
+    Iterable<ExecutableTriggerStateMachine> subTriggers();
+
+    /**
+     * Access the executable version of the specified sub-trigger.
+     */
+    ExecutableTriggerStateMachine subTrigger(int subtriggerIndex);
+
+    /**
+     * Returns true if the current trigger is marked finished.
+     */
+    boolean isFinished();
+
+    /**
+     * Return true if the given subtrigger is marked finished.
+     */
+    boolean isFinished(int subtriggerIndex);
+
+    /**
+     * Returns true if all the sub-triggers of the current trigger are marked finished.
+     */
+    boolean areAllSubtriggersFinished();
+
+    /**
+     * Returns an iterable over the unfinished sub-triggers of the current trigger.
+     */
+    Iterable<ExecutableTriggerStateMachine> unfinishedSubTriggers();
+
+    /**
+     * Returns the first unfinished sub-trigger.
+     */
+    ExecutableTriggerStateMachine firstUnfinishedSubTrigger();
+
+    /**
+     * Clears all keyed state for triggers in the current sub-tree and unsets all the associated
+     * finished bits.
+     */
+    void resetTree() throws Exception;
+
+    /**
+     * Sets the finished bit for the current trigger.
+     */
+    void setFinished(boolean finished);
+
+    /**
+     * Sets the finished bit for the given sub-trigger.
+     */
+    void setFinished(boolean finished, int subTriggerIndex);
+  }
+
+  /**
+   * Interact with properties of the trigger being executed, with extensions to deal with the
+   * merging windows.
+   */
+  public interface MergingTriggerInfo extends TriggerInfo {
+
+    /** Return true if the trigger is finished in any window being merged. */
+    public abstract boolean finishedInAnyMergingWindow();
+
+    /** Return true if the trigger is finished in all windows being merged. */
+    public abstract boolean finishedInAllMergingWindows();
+  }
+
+  /**
+   * Information accessible to all operational hooks in this {@code Trigger}.
+   *
+   * <p>Used directly in {@link TriggerStateMachine#shouldFire} and {@link
+   * TriggerStateMachine#clear}, and extended with additional information in other methods.
+   */
+  public abstract class TriggerContext {
+
+    /** Returns the interface for accessing trigger info. */
+    public abstract TriggerInfo trigger();
+
+    /** Returns the interface for accessing persistent state. */
+    public abstract StateAccessor<?> state();
+
+    /** The window that the current context is executing in. */
+    public abstract BoundedWindow window();
+
+    /** Create a sub-context for the given sub-trigger. */
+    public abstract TriggerContext forTrigger(ExecutableTriggerStateMachine trigger);
+
+    /**
+     * Removes the timer set in this trigger context for the given {@link Instant}
+     * and {@link TimeDomain}.
+     */
+    public abstract void deleteTimer(Instant timestamp, TimeDomain domain);
+
+    /** The current processing time. */
+    public abstract Instant currentProcessingTime();
+
+    /** The current synchronized upstream processing time or {@code null} if unknown. */
+    @Nullable
+    public abstract Instant currentSynchronizedProcessingTime();
+
+    /** The current event time for the input or {@code null} if unknown. */
+    @Nullable
+    public abstract Instant currentEventTime();
+  }
+
+  /**
+   * Extended {@link TriggerContext} containing information accessible to the {@link #onElement}
+   * operational hook.
+   */
+  public abstract class OnElementContext extends TriggerContext {
+    /** The event timestamp of the element currently being processed. */
+    public abstract Instant eventTimestamp();
+
+    /**
+     * Sets a timer to fire when the watermark or processing time is beyond the given timestamp.
+     * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards.
+     *
+     * <p>As with {@link #state}, timers are implicitly scoped to the current window. All
+     * timer firings for a window will be received, but the implementation should choose to ignore
+     * those that are not applicable.
+     *
+     * @param timestamp the time at which the trigger should be re-evaluated
+     * @param domain the domain that the {@code timestamp} applies to
+     */
+    public abstract void setTimer(Instant timestamp, TimeDomain domain);
+
+    /** Create an {@code OnElementContext} for executing the given trigger. */
+    @Override
+    public abstract OnElementContext forTrigger(ExecutableTriggerStateMachine trigger);
+  }
+
+  /**
+   * Extended {@link TriggerContext} containing information accessible to the {@link #onMerge}
+   * operational hook.
+   */
+  public abstract class OnMergeContext extends TriggerContext {
+    /**
+     * Sets a timer to fire when the watermark or processing time is beyond the given timestamp.
+     * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards.
+     *
+     * <p>As with {@link #state}, timers are implicitly scoped to the current window. All
+     * timer firings for a window will be received, but the implementation should choose to ignore
+     * those that are not applicable.
+     *
+     * @param timestamp the time at which the trigger should be re-evaluated
+     * @param domain the domain that the {@code timestamp} applies to
+     */
+    public abstract void setTimer(Instant timestamp, TimeDomain domain);
+
+    /** Create an {@code OnMergeContext} for executing the given trigger. */
+    @Override
+    public abstract OnMergeContext forTrigger(ExecutableTriggerStateMachine trigger);
+
+    @Override
+    public abstract MergingStateAccessor<?, ?> state();
+
+    @Override
+    public abstract MergingTriggerInfo trigger();
+  }
+
+  @Nullable
+  protected final List<TriggerStateMachine> subTriggers;
+
+  protected TriggerStateMachine(@Nullable List<TriggerStateMachine> subTriggers) {
+    this.subTriggers = subTriggers;
+  }
+
+
+  /**
+   * Called every time an element is incorporated into a window.
+   */
+  public abstract void onElement(OnElementContext c) throws Exception;
+
+  /**
+   * Called immediately after windows have been merged.
+   *
+   * <p>Leaf triggers should update their state by inspecting their status and any state in the
+   * merging windows. Composite triggers should update their state by calling {@link
+   * ExecutableTriggerStateMachine#invokeOnMerge} on their sub-triggers, and applying appropriate
+   * logic.
+   *
+   * <p>A trigger such as {@link AfterWatermarkStateMachine#pastEndOfWindow} may no longer be
+   * finished; it is the responsibility of the trigger itself to record this fact. It is forbidden
+   * for a trigger to become finished due to {@link #onMerge}, as it has not yet fired the pending
+   * elements that led to it being ready to fire.
+   *
+   * <p>The implementation does not need to clear out any state associated with the old windows.
+   */
+  public abstract void onMerge(OnMergeContext c) throws Exception;
+
+  /**
+   * Returns {@code true} if the current state of the trigger indicates that its condition
+   * is satisfied and it is ready to fire.
+   */
+  public abstract boolean shouldFire(TriggerContext context) throws Exception;
+
+  /**
+   * Adjusts the state of the trigger to be ready for the next pane. For example, a
+   * {@link RepeatedlyStateMachine} trigger will reset its inner trigger, since it has fired.
+   *
+   * <p>If the trigger is finished, it is the responsibility of the trigger itself to
+   * record that fact via the {@code context}.
+   */
+  public abstract void onFire(TriggerContext context) throws Exception;
+
+  /**
+   * Called to allow the trigger to prefetch any state it will likely need to read from during
+   * an {@link #onElement} call.
+   */
+  public void prefetchOnElement(StateAccessor<?> state) {
+    if (subTriggers != null) {
+      for (TriggerStateMachine trigger : subTriggers) {
+        trigger.prefetchOnElement(state);
+      }
+    }
+  }
+
+  /**
+   * Called to allow the trigger to prefetch any state it will likely need to read from during
+   * an {@link #onMerge} call.
+   */
+  public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
+    if (subTriggers != null) {
+      for (TriggerStateMachine trigger : subTriggers) {
+        trigger.prefetchOnMerge(state);
+      }
+    }
+  }
+
+  /**
+   * Called to allow the trigger to prefetch any state it will likely need to read from during
+   * an {@link #shouldFire} call.
+   */
+  public void prefetchShouldFire(StateAccessor<?> state) {
+    if (subTriggers != null) {
+      for (TriggerStateMachine trigger : subTriggers) {
+        trigger.prefetchShouldFire(state);
+      }
+    }
+  }
+
+  /**
+   * Called to allow the trigger to prefetch any state it will likely need to read from during
+   * an {@link #onFire} call.
+   */
+  public void prefetchOnFire(StateAccessor<?> state) {
+    if (subTriggers != null) {
+      for (TriggerStateMachine trigger : subTriggers) {
+        trigger.prefetchOnFire(state);
+      }
+    }
+  }
+
+  /**
+   * Clear any state associated with this trigger in the given window.
+   *
+   * <p>This is called after a trigger has indicated it will never fire again. The trigger system
+   * keeps enough information to know that the trigger is finished, so this trigger should clear all
+   * of its state.
+   */
+  public void clear(TriggerContext c) throws Exception {
+    if (subTriggers != null) {
+      for (ExecutableTriggerStateMachine trigger : c.trigger().subTriggers()) {
+        trigger.invokeClear(c);
+      }
+    }
+  }
+
+  public Iterable<TriggerStateMachine> subTriggers() {
+    return subTriggers;
+  }
+
+  /**
+   * Returns whether this performs the same triggering as the given {@code Trigger}.
+   */
+  public boolean isCompatible(TriggerStateMachine other) {
+    if (!getClass().equals(other.getClass())) {
+      return false;
+    }
+
+    if (subTriggers == null) {
+      return other.subTriggers == null;
+    } else if (other.subTriggers == null) {
+      return false;
+    } else if (subTriggers.size() != other.subTriggers.size()) {
+      return false;
+    }
+
+    for (int i = 0; i < subTriggers.size(); i++) {
+      if (!subTriggers.get(i).isCompatible(other.subTriggers.get(i))) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    String simpleName = getClass().getSimpleName();
+    if (getClass().getEnclosingClass() != null) {
+      simpleName = getClass().getEnclosingClass().getSimpleName() + "." + simpleName;
+    }
+    if (subTriggers == null || subTriggers.size() == 0) {
+      return simpleName;
+    } else {
+      return simpleName + "(" + Joiner.on(", ").join(subTriggers) + ")";
+    }
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof TriggerStateMachine)) {
+      return false;
+    }
+    TriggerStateMachine that = (TriggerStateMachine) obj;
+    return Objects.equals(getClass(), that.getClass())
+        && Objects.equals(subTriggers, that.subTriggers);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(getClass(), subTriggers);
+  }
+
+  /**
+   * Specify an ending condition for this trigger. If the {@code until} fires then the combination
+   * fires.
+   *
+   * <p>The expression {@code t1.orFinally(t2)} fires every time {@code t1} fires, and finishes as
+   * soon as either {@code t1} finishes or {@code t2} fires, in which case it fires one last time
+   * for {@code t2}. Both {@code t1} and {@code t2} are executed in parallel. This means that {@code
+   * t1} may have fired since {@code t2} started, so not all of the elements that {@code t2} has
+   * seen are necessarily in the current pane.
+   *
+   * <p>For example the final firing of the following trigger may only have 1 element:
+   *
+   * <pre>{@code
+   * Repeatedly.forever(AfterPane.elementCountAtLeast(2))
+   *     .orFinally(AfterPane.elementCountAtLeast(5))
+   * }
+   * </pre>
+   *
+   * <p>Note that if {@code t1} is {@link OnceTriggerStateMachine}, then {@code t1.orFinally(t2)} is
+   * the same as {@code AfterFirst.of(t1, t2)}.
+   */
+  public TriggerStateMachine orFinally(OnceTriggerStateMachine until) {
+    return new OrFinallyStateMachine(this, until);
+  }
+
+  /**
+   * {@link TriggerStateMachine}s that are guaranteed to fire at most once should extend from this,
+   * rather than the general {@link TriggerStateMachine} class to indicate that behavior.
+   */
+  public abstract static class OnceTriggerStateMachine extends TriggerStateMachine {
+    protected OnceTriggerStateMachine(List<TriggerStateMachine> subTriggers) {
+      super(subTriggers);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public final void onFire(TriggerContext context) throws Exception {
+      onOnlyFiring(context);
+      context.trigger().setFinished(true);
+    }
+
+    /**
+     * Called exactly once by {@link #onFire} when the trigger is fired. By default,
+     * invokes {@link #onFire} on all subtriggers for which {@link #shouldFire} is {@code true}.
+     */
+    protected abstract void onOnlyFiring(TriggerContext context) throws Exception;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
new file mode 100644
index 0000000..1c06e8d
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.triggers.TriggerStateMachine.MergingTriggerInfo;
+import org.apache.beam.runners.core.triggers.TriggerStateMachine.TriggerInfo;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.ActiveWindowSet;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timers;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.joda.time.Instant;
+
+/**
+ * Factory for creating instances of the various {@link TriggerStateMachine} contexts.
+ *
+ * <p>These contexts are highly interdependent and share many fields; it is inadvisable
+ * to create them via any means other than this factory class.
+ */
+public class TriggerStateMachineContextFactory<W extends BoundedWindow> {
+
+  private final WindowFn<?, W> windowFn;
+  private StateInternals<?> stateInternals;
+  private final Coder<W> windowCoder;
+
+  public TriggerStateMachineContextFactory(WindowFn<?, W> windowFn,
+      StateInternals<?> stateInternals, ActiveWindowSet<W> activeWindows) {
+    // Future triggers may be able to exploit the active window to state address window mapping.
+    this.windowFn = windowFn;
+    this.stateInternals = stateInternals;
+    this.windowCoder = windowFn.windowCoder();
+  }
+
+  public TriggerStateMachine.TriggerContext base(W window, Timers timers,
+      ExecutableTriggerStateMachine rootTrigger, FinishedTriggers finishedSet) {
+    return new TriggerContextImpl(window, timers, rootTrigger, finishedSet);
+  }
+
+  public TriggerStateMachine.OnElementContext createOnElementContext(
+      W window, Timers timers, Instant elementTimestamp,
+      ExecutableTriggerStateMachine rootTrigger, FinishedTriggers finishedSet) {
+    return new OnElementContextImpl(window, timers, rootTrigger, finishedSet, elementTimestamp);
+  }
+
+  public TriggerStateMachine.OnMergeContext createOnMergeContext(W window, Timers timers,
+      ExecutableTriggerStateMachine rootTrigger, FinishedTriggers finishedSet,
+      Map<W, FinishedTriggers> finishedSets) {
+    return new OnMergeContextImpl(window, timers, rootTrigger, finishedSet, finishedSets);
+  }
+
+  public StateAccessor<?> createStateAccessor(W window, ExecutableTriggerStateMachine trigger) {
+    return new StateAccessorImpl(window, trigger);
+  }
+
+  public MergingStateAccessor<?, W> createMergingStateAccessor(
+      W mergeResult, Collection<W> mergingWindows, ExecutableTriggerStateMachine trigger) {
+    return new MergingStateAccessorImpl(trigger, mergingWindows, mergeResult);
+  }
+
+  private class TriggerInfoImpl implements TriggerStateMachine.TriggerInfo {
+
+    protected final ExecutableTriggerStateMachine trigger;
+    protected final FinishedTriggers finishedSet;
+    private final TriggerStateMachine.TriggerContext context;
+
+    public TriggerInfoImpl(ExecutableTriggerStateMachine trigger, FinishedTriggers finishedSet,
+        TriggerStateMachine.TriggerContext context) {
+      this.trigger = trigger;
+      this.finishedSet = finishedSet;
+      this.context = context;
+    }
+
+    @Override
+    public boolean isMerging() {
+      return !windowFn.isNonMerging();
+    }
+
+    @Override
+    public Iterable<ExecutableTriggerStateMachine> subTriggers() {
+      return trigger.subTriggers();
+    }
+
+    @Override
+    public ExecutableTriggerStateMachine subTrigger(int subtriggerIndex) {
+      return trigger.subTriggers().get(subtriggerIndex);
+    }
+
+    @Override
+    public boolean isFinished() {
+      return finishedSet.isFinished(trigger);
+    }
+
+    @Override
+    public boolean isFinished(int subtriggerIndex) {
+      return finishedSet.isFinished(subTrigger(subtriggerIndex));
+    }
+
+    @Override
+    public boolean areAllSubtriggersFinished() {
+      return Iterables.isEmpty(unfinishedSubTriggers());
+    }
+
+    @Override
+    public Iterable<ExecutableTriggerStateMachine> unfinishedSubTriggers() {
+      return FluentIterable
+          .from(trigger.subTriggers())
+          .filter(new Predicate<ExecutableTriggerStateMachine>() {
+            @Override
+            public boolean apply(ExecutableTriggerStateMachine trigger) {
+              return !finishedSet.isFinished(trigger);
+            }
+          });
+    }
+
+    @Override
+    public ExecutableTriggerStateMachine firstUnfinishedSubTrigger() {
+      for (ExecutableTriggerStateMachine subTrigger : trigger.subTriggers()) {
+        if (!finishedSet.isFinished(subTrigger)) {
+          return subTrigger;
+        }
+      }
+      return null;
+    }
+
+    @Override
+    public void resetTree() throws Exception {
+      finishedSet.clearRecursively(trigger);
+      trigger.invokeClear(context);
+    }
+
+    @Override
+    public void setFinished(boolean finished) {
+      finishedSet.setFinished(trigger, finished);
+    }
+
+    @Override
+    public void setFinished(boolean finished, int subTriggerIndex) {
+      finishedSet.setFinished(subTrigger(subTriggerIndex), finished);
+    }
+  }
+
+  private class TriggerTimers implements Timers {
+
+    private final Timers timers;
+    private final W window;
+
+    public TriggerTimers(W window, Timers timers) {
+      this.timers = timers;
+      this.window = window;
+    }
+
+    @Override
+    public void setTimer(Instant timestamp, TimeDomain timeDomain) {
+      timers.setTimer(timestamp, timeDomain);
+    }
+
+    @Override
+    public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
+      if (timeDomain == TimeDomain.EVENT_TIME
+          && timestamp.equals(window.maxTimestamp())) {
+        // Don't allow triggers to unset the at-max-timestamp timer. This is necessary for on-time
+        // state transitions.
+        return;
+      }
+      timers.deleteTimer(timestamp, timeDomain);
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+      return timers.currentProcessingTime();
+    }
+
+    @Override
+    @Nullable
+    public Instant currentSynchronizedProcessingTime() {
+      return timers.currentSynchronizedProcessingTime();
+    }
+
+    @Override
+    public Instant currentEventTime() {
+      return timers.currentEventTime();
+    }
+  }
+
+  private class MergingTriggerInfoImpl
+      extends TriggerInfoImpl implements TriggerStateMachine.MergingTriggerInfo {
+
+    private final Map<W, FinishedTriggers> finishedSets;
+
+    public MergingTriggerInfoImpl(
+        ExecutableTriggerStateMachine trigger,
+        FinishedTriggers finishedSet,
+        TriggerStateMachine.TriggerContext context,
+        Map<W, FinishedTriggers> finishedSets) {
+      super(trigger, finishedSet, context);
+      this.finishedSets = finishedSets;
+    }
+
+    @Override
+    public boolean finishedInAnyMergingWindow() {
+      for (FinishedTriggers finishedSet : finishedSets.values()) {
+        if (finishedSet.isFinished(trigger)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public boolean finishedInAllMergingWindows() {
+      for (FinishedTriggers finishedSet : finishedSets.values()) {
+        if (!finishedSet.isFinished(trigger)) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  private class StateAccessorImpl implements StateAccessor<Object> {
+    protected final int triggerIndex;
+    protected final StateNamespace windowNamespace;
+
+    public StateAccessorImpl(
+        W window,
+        ExecutableTriggerStateMachine trigger) {
+      this.triggerIndex = trigger.getTriggerIndex();
+      this.windowNamespace = namespaceFor(window);
+    }
+
+    protected StateNamespace namespaceFor(W window) {
+      return StateNamespaces.windowAndTrigger(windowCoder, window, triggerIndex);
+    }
+
+    @Override
+    public <StateT extends State> StateT access(StateTag<? super Object, StateT> address) {
+      return stateInternals.state(windowNamespace, address);
+    }
+  }
+
+  private class MergingStateAccessorImpl extends StateAccessorImpl
+  implements MergingStateAccessor<Object, W> {
+    private final Collection<W> activeToBeMerged;
+
+    public MergingStateAccessorImpl(
+        ExecutableTriggerStateMachine trigger, Collection<W> activeToBeMerged, W mergeResult) {
+      super(mergeResult, trigger);
+      this.activeToBeMerged = activeToBeMerged;
+    }
+
+    @Override
+    public <StateT extends State> StateT access(
+        StateTag<? super Object, StateT> address) {
+      return stateInternals.state(windowNamespace, address);
+    }
+
+    @Override
+    public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
+        StateTag<? super Object, StateT> address) {
+      ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
+      for (W mergingWindow : activeToBeMerged) {
+        StateT stateForWindow = stateInternals.state(namespaceFor(mergingWindow), address);
+        builder.put(mergingWindow, stateForWindow);
+      }
+      return builder.build();
+    }
+  }
+
+  private class TriggerContextImpl extends TriggerStateMachine.TriggerContext {
+
+    private final W window;
+    private final StateAccessorImpl state;
+    private final Timers timers;
+    private final TriggerInfoImpl triggerInfo;
+
+    private TriggerContextImpl(
+        W window,
+        Timers timers,
+        ExecutableTriggerStateMachine trigger,
+        FinishedTriggers finishedSet) {
+      trigger.getSpec().super();
+      this.window = window;
+      this.state = new StateAccessorImpl(window, trigger);
+      this.timers = new TriggerTimers(window, timers);
+      this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this);
+    }
+
+    @Override
+    public TriggerStateMachine.TriggerContext forTrigger(ExecutableTriggerStateMachine trigger) {
+      return new TriggerContextImpl(window, timers, trigger, triggerInfo.finishedSet);
+    }
+
+    @Override
+    public TriggerInfo trigger() {
+      return triggerInfo;
+    }
+
+    @Override
+    public StateAccessor<?> state() {
+      return state;
+    }
+
+    @Override
+    public W window() {
+      return window;
+    }
+
+    @Override
+    public void deleteTimer(Instant timestamp, TimeDomain domain) {
+      timers.deleteTimer(timestamp, domain);
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+      return timers.currentProcessingTime();
+    }
+
+    @Override
+    @Nullable
+    public Instant currentSynchronizedProcessingTime() {
+      return timers.currentSynchronizedProcessingTime();
+    }
+
+    @Override
+    @Nullable
+    public Instant currentEventTime() {
+      return timers.currentEventTime();
+    }
+  }
+
+  private class OnElementContextImpl extends TriggerStateMachine.OnElementContext {
+
+    private final W window;
+    private final StateAccessorImpl state;
+    private final Timers timers;
+    private final TriggerInfoImpl triggerInfo;
+    private final Instant eventTimestamp;
+
+    private OnElementContextImpl(
+        W window,
+        Timers timers,
+        ExecutableTriggerStateMachine trigger,
+        FinishedTriggers finishedSet,
+        Instant eventTimestamp) {
+      trigger.getSpec().super();
+      this.window = window;
+      this.state = new StateAccessorImpl(window, trigger);
+      this.timers = new TriggerTimers(window, timers);
+      this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this);
+      this.eventTimestamp = eventTimestamp;
+    }
+
+
+    @Override
+    public Instant eventTimestamp() {
+      return eventTimestamp;
+    }
+
+    @Override
+    public TriggerStateMachine.OnElementContext forTrigger(ExecutableTriggerStateMachine trigger) {
+      return new OnElementContextImpl(
+          window, timers, trigger, triggerInfo.finishedSet, eventTimestamp);
+    }
+
+    @Override
+    public TriggerInfo trigger() {
+      return triggerInfo;
+    }
+
+    @Override
+    public StateAccessor<?> state() {
+      return state;
+    }
+
+    @Override
+    public W window() {
+      return window;
+    }
+
+    @Override
+    public void setTimer(Instant timestamp, TimeDomain domain) {
+      timers.setTimer(timestamp, domain);
+    }
+
+
+    @Override
+    public void deleteTimer(Instant timestamp, TimeDomain domain) {
+      timers.deleteTimer(timestamp, domain);
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+      return timers.currentProcessingTime();
+    }
+
+    @Override
+    @Nullable
+    public Instant currentSynchronizedProcessingTime() {
+      return timers.currentSynchronizedProcessingTime();
+    }
+
+    @Override
+    @Nullable
+    public Instant currentEventTime() {
+      return timers.currentEventTime();
+    }
+  }
+
+  private class OnMergeContextImpl extends TriggerStateMachine.OnMergeContext {
+    private final MergingStateAccessor<?, W> state;
+    private final W window;
+    private final Collection<W> mergingWindows;
+    private final Timers timers;
+    private final MergingTriggerInfoImpl triggerInfo;
+
+    private OnMergeContextImpl(
+        W window,
+        Timers timers,
+        ExecutableTriggerStateMachine trigger,
+        FinishedTriggers finishedSet,
+        Map<W, FinishedTriggers> finishedSets) {
+      trigger.getSpec().super();
+      this.mergingWindows = finishedSets.keySet();
+      this.window = window;
+      this.state = new MergingStateAccessorImpl(trigger, mergingWindows, window);
+      this.timers = new TriggerTimers(window, timers);
+      this.triggerInfo = new MergingTriggerInfoImpl(trigger, finishedSet, this, finishedSets);
+    }
+
+    @Override
+    public TriggerStateMachine.OnMergeContext forTrigger(ExecutableTriggerStateMachine trigger) {
+      return new OnMergeContextImpl(
+          window, timers, trigger, triggerInfo.finishedSet, triggerInfo.finishedSets);
+    }
+
+    @Override
+    public MergingStateAccessor<?, W> state() {
+      return state;
+    }
+
+    @Override
+    public MergingTriggerInfo trigger() {
+      return triggerInfo;
+    }
+
+    @Override
+    public W window() {
+      return window;
+    }
+
+    @Override
+    public void setTimer(Instant timestamp, TimeDomain domain) {
+      timers.setTimer(timestamp, domain);
+    }
+
+    @Override
+    public void deleteTimer(Instant timestamp, TimeDomain domain) {
+      timers.setTimer(timestamp, domain);
+
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+      return timers.currentProcessingTime();
+    }
+
+    @Override
+    @Nullable
+    public Instant currentSynchronizedProcessingTime() {
+      return timers.currentSynchronizedProcessingTime();
+    }
+
+    @Override
+    @Nullable
+    public Instant currentEventTime() {
+      return timers.currentEventTime();
+    }
+  }
+}



Mime
View raw message