apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [1/2] incubator-apex-core git commit: SPOI-5053 APEX-56 #resolve #comment Fixing removal of terminated operators from physical plan when downanStream operators are also completed till shutdown window Id Also fixed containers to be removed only when opera
Date Sun, 18 Oct 2015 06:36:51 GMT
Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 dac8fa8f7 -> d7afe7280


SPOI-5053 APEX-56 #resolve #comment
Fixing removal of terminated operators from physical plan when downanStream operators are
also completed till shutdown window Id
Also fixed containers to be removed only when operators are removed from physical plan


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/fb33c23e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/fb33c23e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/fb33c23e

Branch: refs/heads/devel-3
Commit: fb33c23e141c422c85eefc06e680e9ecbf459936
Parents: 809e6f6
Author: ishark <isha@datatorrent.com>
Authored: Mon Oct 12 16:05:32 2015 -0700
Committer: ishark <isha@datatorrent.com>
Committed: Tue Oct 13 18:17:03 2015 -0700

----------------------------------------------------------------------
 .../stram/StreamingContainerManager.java        | 35 +++++++++++---------
 .../stram/StreamingContainerManagerTest.java    | 11 ++----
 2 files changed, 22 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fb33c23e/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index ed366db..3931fad 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -1040,10 +1040,7 @@ public class StreamingContainerManager implements PlanContext
         Iterator<Map.Entry<Long, Set<PTOperator>>> it = shutdownOperators.entrySet().iterator();
         while (it.hasNext()) {
           Map.Entry<Long, Set<PTOperator>> windowAndOpers = it.next();
-          if (windowAndOpers.getKey().longValue() > this.committedWindowId) {
-            // wait until window is committed
-            continue;
-          } else {
+          if (windowAndOpers.getKey().longValue() <= this.committedWindowId || checkDownStreamOperators(windowAndOpers))
{
             LOG.info("Removing inactive operators at window {} {}", Codec.getStringWindowId(windowAndOpers.getKey()),
windowAndOpers.getValue());
             for (PTOperator oper : windowAndOpers.getValue()) {
               plan.removeTerminatedPartition(oper);
@@ -1070,8 +1067,7 @@ public class StreamingContainerManager implements PlanContext
       try {
         command.run();
         count++;
-      }
-      catch (Exception e) {
+      } catch (Exception e) {
         // TODO: handle error
         LOG.error("Failed to execute {}", command, e);
       }
@@ -1081,8 +1077,7 @@ public class StreamingContainerManager implements PlanContext
     if (count > 0) {
       try {
         checkpoint();
-      }
-      catch (Exception e) {
+      } catch (Exception e) {
         throw new RuntimeException("Failed to checkpoint state.", e);
       }
     }
@@ -1090,6 +1085,19 @@ public class StreamingContainerManager implements PlanContext
     return count;
   }
 
+  private boolean checkDownStreamOperators(Map.Entry<Long, Set<PTOperator>> windowAndOpers)
+  {
+    // Check if all downStream operators are at higher window Ids, then operator can be removed
from dag
+    Set<PTOperator> downStreamOperators = getPhysicalPlan().getDependents(windowAndOpers.getValue());
+    for (PTOperator oper : downStreamOperators) {
+      long windowId = oper.stats.currentWindowId.get();
+      if (windowId < windowAndOpers.getKey().longValue()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   /**
    * Schedule container restart. Called by Stram after a container was terminated
    * and requires recovery (killed externally, or after heartbeat timeout). <br>
@@ -1495,8 +1503,6 @@ public class StreamingContainerManager implements PlanContext
     }
     Set<Integer> reportedOperators = Sets.newHashSetWithExpectedSize(sca.container.getOperators().size());
 
-    boolean containerIdle = true;
-
     for (OperatorHeartbeat shb : heartbeat.getContainerStats().operators) {
 
       long maxEndWindowTimestamp = 0;
@@ -1533,9 +1539,7 @@ public class StreamingContainerManager implements PlanContext
 
       oper.stats.lastHeartbeat = shb;
       List<ContainerStats.OperatorStats> statsList = shb.getOperatorStatsContainer();
-      if (!oper.stats.isIdle()) {
-        containerIdle = false;
-      }
+
       if (!statsList.isEmpty()) {
         long tuplesProcessed = 0;
         long tuplesEmitted = 0;
@@ -1743,11 +1747,10 @@ public class StreamingContainerManager implements PlanContext
 
     ContainerHeartbeatResponse rsp = getHeartbeatResponse(sca);
 
-    if (containerIdle && isApplicationIdle()) {
+    if (heartbeat.getContainerStats().operators.isEmpty() && isApplicationIdle())
{
       LOG.info("requesting idle shutdown for container {}", heartbeat.getContainerId());
       rsp.shutdown = true;
-    }
-    else {
+    } else {
       if (sca.shutdownRequested) {
         LOG.info("requesting shutdown for container {}", heartbeat.getContainerId());
         rsp.shutdown = true;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fb33c23e/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 710440d..2884323 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -762,18 +762,13 @@ public class StreamingContainerManagerTest
     mc1.sendHeartbeat();
     scm.monitorHeartbeat();
 
+    Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
+
     o2p1mos.currentWindowId(2).checkpointWindowId(2);
     mc2.sendHeartbeat();
     scm.monitorHeartbeat();
-    Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
-    scm.monitorHeartbeat(); // committedWindowId updated in next cycle
-    Assert.assertEquals("committedWindowId", 2, scm.getCommittedWindowId());
-    Assert.assertEquals(1, o1p1.getContainer().getOperators().size());
-    Assert.assertEquals(1, o2p1.getContainer().getOperators().size());
-    Assert.assertEquals(2, physicalPlan.getContainers().size());
 
-    // call again as events are processed after committed window was updated
-    scm.processEvents();
+    // Operators are shutdown when both operators reach window Id 2
     Assert.assertEquals(0, o1p1.getContainer().getOperators().size());
     Assert.assertEquals(0, o2p1.getContainer().getOperators().size());
     Assert.assertEquals(0, physicalPlan.getContainers().size());


Mime
View raw message