beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From p..@apache.org
Subject [1/2] beam git commit: JStorm-runner: Implementation of processing timer
Date Thu, 07 Sep 2017 11:10:34 GMT
Repository: beam
Updated Branches:
  refs/heads/jstorm-runner 80bd7f8be -> d24d2831d


JStorm-runner: Implementation of processing timer


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6a0d3896
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6a0d3896
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6a0d3896

Branch: refs/heads/jstorm-runner
Commit: 6a0d389667369ec7d4f85469e6954d47097b7b68
Parents: 80bd7f8
Author: basti.lj <basti.lj@alibaba-inc.com>
Authored: Thu Sep 7 18:42:10 2017 +0800
Committer: Pei He <pei@apache.org>
Committed: Thu Sep 7 19:08:20 2017 +0800

----------------------------------------------------------------------
 .../jstorm/translation/ExecutorsBolt.java       | 39 +++++++++-----
 .../jstorm/translation/TimerService.java        |  3 +-
 .../jstorm/translation/TimerServiceImpl.java    | 54 +++++++++++---------
 3 files changed, 58 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6a0d3896/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
index 3d58a37..aca2ca4 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
@@ -19,12 +19,14 @@ package org.apache.beam.runners.jstorm.translation;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import backtype.storm.Config;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichBatchBolt;
 import backtype.storm.tuple.ITupleExt;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
+import backtype.storm.utils.TupleUtils;
 import com.alibaba.jstorm.cache.IKvStoreManager;
 import com.alibaba.jstorm.cache.KvStoreManagerFactory;
 import com.alibaba.jstorm.cluster.Common;
@@ -44,6 +46,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -230,20 +233,25 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt
{
 
   @Override
   public void execute(Tuple input) {
-    // process a batch
-    String streamId = input.getSourceStreamId();
-    ITupleExt tuple = (ITupleExt) input;
-    Iterator<List<Object>> valueIterator = tuple.batchValues().iterator();
-    if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) {
-      while (valueIterator.hasNext()) {
-        processWatermark((Long) valueIterator.next().get(0), input.getSourceTask());
-      }
+    if (TupleUtils.isTick(input)) {
+      // tick to trigger processing timer
+      timerService.fireTimers(Instant.now().getMillis(), TimeDomain.PROCESSING_TIME);
     } else {
-      doFnStartBundle();
-      while (valueIterator.hasNext()) {
-        processElement(valueIterator.next(), streamId);
+      // process a batch
+      String streamId = input.getSourceStreamId();
+      ITupleExt tuple = (ITupleExt) input;
+      Iterator<List<Object>> valueIterator = tuple.batchValues().iterator();
+      if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) {
+        while (valueIterator.hasNext()) {
+          processWatermark((Long) valueIterator.next().get(0), input.getSourceTask());
+        }
+      } else {
+        doFnStartBundle();
+        while (valueIterator.hasNext()) {
+          processElement(valueIterator.next(), streamId);
+        }
+        doFnFinishBundle();
       }
-      doFnFinishBundle();
     }
   }
 
@@ -256,7 +264,7 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt
{
     if (newWaterMark != 0) {
       // Some buffer windows are going to be triggered.
       doFnStartBundle();
-      timerService.fireTimers(newWaterMark);
+      timerService.fireTimers(newWaterMark, TimeDomain.EVENT_TIME);
 
       // SideInput: If receiving water mark with max timestamp, It means no more data is
supposed
       // to be received from now on. So we are going to process all push back data.
@@ -310,7 +318,10 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt
{
 
   @Override
   public Map<String, Object> getComponentConfiguration() {
-    return null;
+    Map conf = Maps.newHashMap();
+    // Add tick tuple for triggering processing timer
+    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1);
+    return conf;
   }
 
   public TimerService timerService() {

http://git-wip-us.apache.org/repos/asf/beam/blob/6a0d3896/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
index 159fe70..1265143 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.jstorm.translation;
 import java.io.Serializable;
 import java.util.List;
 import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.state.TimeDomain;
 import org.joda.time.Instant;
 
 /**
@@ -47,7 +48,7 @@ interface TimerService extends Serializable {
 
   void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor);
 
-  void fireTimers(long newWatermark);
+  void fireTimers(long currentTime, TimeDomain timeDomain);
 
   void deleteTimer(TimerInternals.TimerData timerData);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6a0d3896/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
index 027fc14..ea4b1bb 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.jstorm.translation;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.alibaba.jstorm.utils.Pair;
@@ -49,6 +48,8 @@ class TimerServiceImpl implements TimerService {
   private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>();
   private final PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue =
       new PriorityQueue<>();
+  private final PriorityQueue<TimerInternals.TimerData> processTimeTimersQueue =
+      new PriorityQueue<>();
   private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>>
       timerDataToKeyedExecutors = Maps.newHashMap();
 
@@ -90,20 +91,6 @@ class TimerServiceImpl implements TimerService {
   }
 
   @Override
-  public void fireTimers(long newWatermark) {
-    TimerInternals.TimerData timerData;
-    while ((timerData = eventTimeTimersQueue.peek()) != null
-        && timerData.getTimestamp().getMillis() <= newWatermark) {
-      for (Pair<Integer, Object> keyedExecutor : timerDataToKeyedExecutors.get(timerData))
{
-        DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst());
-        executor.onTimer(keyedExecutor.getSecond(), timerData);
-      }
-      eventTimeTimersQueue.remove();
-      timerDataToKeyedExecutors.remove(timerData);
-    }
-  }
-
-  @Override
   public long currentInputWatermark() {
     return initialized ? inputWatermarks.peek() : BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
   }
@@ -141,24 +128,45 @@ class TimerServiceImpl implements TimerService {
 
   @Override
   public void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor)
{
-    checkArgument(
-        TimeDomain.EVENT_TIME.equals(timerData.getDomain()),
-        String.format("Does not support domain: %s.", timerData.getDomain()));
     Set<Pair<Integer, Object>> keyedExecutors = timerDataToKeyedExecutors.get(timerData);
     if (keyedExecutors == null) {
       keyedExecutors = Sets.newHashSet();
-      eventTimeTimersQueue.add(timerData);
+      getTimerQueue(timerData.getDomain()).add(timerData);
     }
     keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key));
     timerDataToKeyedExecutors.put(timerData, keyedExecutors);
   }
 
   @Override
+  public void fireTimers(long currentTime, TimeDomain timeDomain) {
+    TimerInternals.TimerData timerData;
+    PriorityQueue<TimerInternals.TimerData> timerQueue = getTimerQueue(timeDomain);
+    while ((timerData = timerQueue.peek()) != null
+        && timerData.getTimestamp().getMillis() <= currentTime) {
+      for (Pair<Integer, Object> keyedExecutor : timerDataToKeyedExecutors.get(timerData))
{
+        DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst());
+        executor.onTimer(keyedExecutor.getSecond(), timerData);
+      }
+      timerQueue.remove();
+      timerDataToKeyedExecutors.remove(timerData);
+    }
+  }
+
+  @Override
   public void deleteTimer(TimerInternals.TimerData timerData) {
-    checkArgument(
-        TimeDomain.EVENT_TIME.equals(timerData.getDomain()),
-        String.format("Does not support domain: %s.", timerData.getDomain()));
-    eventTimeTimersQueue.remove(timerData);
+    getTimerQueue(timerData.getDomain()).remove(timerData);
     timerDataToKeyedExecutors.remove(timerData);
   }
+
+  private PriorityQueue<TimerInternals.TimerData> getTimerQueue(TimeDomain timeDomain)
{
+    switch (timeDomain) {
+      case EVENT_TIME :
+        return eventTimeTimersQueue;
+      case PROCESSING_TIME:
+        return processTimeTimersQueue;
+      default:
+        throw new IllegalArgumentException(
+            String.format("Does not support domain: %s.", timeDomain));
+    }
+  }
 }


Mime
View raw message