tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-3053. Containers timeout if they do not receive a task within the container timeout interval. (sseth)
Date Wed, 20 Jan 2016 18:30:57 GMT
Repository: tez
Updated Branches:
  refs/heads/master cc0640013 -> e171fddce


TEZ-3053. Containers timeout if they do not receive a task within the
container timeout interval. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e171fddc
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e171fddc
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e171fddc

Branch: refs/heads/master
Commit: e171fddce3ae657dcc2baaf6b50913ef06a1d70c
Parents: cc06400
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Jan 20 10:30:41 2016 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Jan 20 10:30:41 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/dag/app/TezTaskCommunicatorImpl.java    |  3 +-
 .../dag/app/TestTezTaskCommunicatorManager.java | 72 ++++++++++++++++++++
 3 files changed, 75 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e171fddc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f1cc292..3b8b016 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-3053. Containers timeout if they do not receive a task within the container timeout
interval.
   TEZ-2898. tez tools : swimlanes.py is broken.
   TEZ-2937. Can Processor.close() be called after closing inputs and outputs?
   TEZ-3037. History URL should be set regardless of which history logging service is enabled.

http://git-wip-us.apache.org/repos/asf/tez/blob/e171fddc/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 0bbe97a..b879f07 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -410,7 +410,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
 
   private ContainerTask getContainerTask(ContainerId containerId) throws IOException {
     ContainerInfo containerInfo = registeredContainers.get(containerId);
-    ContainerTask task = null;
+    ContainerTask task;
     if (containerInfo == null) {
       if (getContext().isKnownContainer(containerId)) {
         LOG.info("Container with id: " + containerId
@@ -422,6 +422,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
       task = TASK_FOR_INVALID_JVM;
     } else {
       synchronized (containerInfo) {
+        getContext().containerAlive(containerId);
         if (containerInfo.taskSpec != null) {
           if (!containerInfo.taskPulled) {
             containerInfo.taskPulled = true;

http://git-wip-us.apache.org/repos/asf/tez/blob/e171fddc/tez-dag/src/test/java/org/apache/tez/dag/app/dag/app/TestTezTaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/app/TestTezTaskCommunicatorManager.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/app/TestTezTaskCommunicatorManager.java
new file mode 100644
index 0000000..65f43a8
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/app/TestTezTaskCommunicatorManager.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.app;
+
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
+import org.junit.Test;
+
+public class TestTezTaskCommunicatorManager {
+
+  @Test (timeout = 5000)
+  public void testContainerAliveOnGetTask() throws IOException {
+
+    TaskCommunicatorContext context = mock(TaskCommunicatorContext.class);
+    Configuration conf = new Configuration(false);
+    UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
+
+
+
+
+    ApplicationId appId = ApplicationId.newInstance(1000, 1);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId containerId = createContainerId(appId, 1);
+
+    doReturn(appAttemptId).when(context).getApplicationAttemptId();
+    doReturn(userPayload).when(context).getInitialUserPayload();
+    doReturn(new Credentials()).when(context).getCredentials();
+
+    TezTaskCommunicatorImpl taskComm = new TezTaskCommunicatorImpl(context);
+
+    ContainerContext containerContext = new ContainerContext(containerId.toString());
+    taskComm.registerRunningContainer(containerId, "fakehost", 0);
+    ContainerTask containerTask = taskComm.getUmbilical().getTask(containerContext);
+    assertNull(containerTask);
+
+    verify(context).containerAlive(containerId);
+  }
+
+  @SuppressWarnings("deprecation")
+  private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) {
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
+    return ContainerId.newInstance(appAttemptId, containerIdx);
+  }
+}


Mime
View raw message