Repository: tez Updated Branches: refs/heads/branch-0.7 238c3ad8f -> 2cbf40248 TEZ-3036. Tez AM can hang on startup with no indication of error (jlowe) (cherry picked from commit 92def52ff8b02eab7aae38170ca6c9b0caf83ef7) Conflicts: CHANGES.txt Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2cbf4024 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2cbf4024 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2cbf4024 Branch: refs/heads/branch-0.7 Commit: 2cbf40248f7abe49bab4ac78d3ecff1a040d52e1 Parents: 238c3ad Author: Jason Lowe Authored: Thu Jan 21 20:30:07 2016 +0000 Committer: Jason Lowe Committed: Thu Jan 21 20:30:07 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/dag/app/DAGAppMaster.java | 48 ++++++++++++++------ 2 files changed, 34 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/2cbf4024/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bd3e64a..f40029a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES + TEZ-3036. Tez AM can hang on startup with no indication of error TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED TEZ-3046. Compilation issue in tez-runtime-internals of branch-0.7 TEZ-2937. Can Processor.close() be called after closing inputs and outputs? http://git-wip-us.apache.org/repos/asf/tez/blob/2cbf4024/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 8e6c21a..c163b62 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -1592,7 +1592,18 @@ public class DAGAppMaster extends AbstractService { LOG.debug("Service dependency: " + dependency.getName() + " notify" + " for service: " + service.getName()); } - if (dependency.isInState(Service.STATE.STARTED)) { + Throwable dependencyError = dependency.getFailureCause(); + if (dependencyError != null) { + synchronized(this) { + dependenciesFailed = true; + if(LOG.isDebugEnabled()) { + LOG.debug("Service: " + service.getName() + " will fail to start" + + " as dependent service " + dependency.getName() + + " failed to start: " + dependencyError); + } + this.notifyAll(); + } + } else if (dependency.isInState(Service.STATE.STARTED)) { if(dependenciesStarted.incrementAndGet() == dependencies.size()) { synchronized(this) { if(LOG.isDebugEnabled()) { @@ -1602,17 +1613,6 @@ public class DAGAppMaster extends AbstractService { this.notifyAll(); } } - } else if (!service.isInState(Service.STATE.STARTED) - && dependency.getFailureState() != null) { - synchronized(this) { - dependenciesFailed = true; - if(LOG.isDebugEnabled()) { - LOG.debug("Service: " + service.getName() + " will fail to start" - + " as dependent service " + dependency.getName() - + " failed to start"); - } - this.notifyAll(); - } } } @@ -1646,9 +1646,12 @@ public class DAGAppMaster extends AbstractService { private static class ServiceThread extends Thread { final ServiceWithDependency serviceWithDependency; - Throwable error = null; - public ServiceThread(ServiceWithDependency serviceWithDependency) { + final Map services; + volatile Throwable error = null; + public ServiceThread(ServiceWithDependency serviceWithDependency, + Map services) { this.serviceWithDependency = serviceWithDependency; + this.services = services; this.setName("ServiceThread:" + serviceWithDependency.service.getName()); } @@ -1660,7 +1663,14 @@ public class DAGAppMaster extends AbstractService { try { serviceWithDependency.start(); } catch (Throwable t) { + // AbstractService does not notify listeners if something throws, so + // notify dependent services explicitly to prevent hanging. + // AbstractService only records fault causes for exceptions, not + // errors, so dependent services will proceed thinking startup + // succeeded if an error is thrown. The error will be noted when the + // main thread joins the ServiceThread. error = t; + notifyDependentServices(); } finally { if(LOG.isDebugEnabled()) { LOG.debug("Service: " + serviceWithDependency.service.getName() + @@ -1672,6 +1682,14 @@ public class DAGAppMaster extends AbstractService { + serviceWithDependency.service.getName()); } } + + private void notifyDependentServices() { + for (ServiceWithDependency otherSvc : services.values()) { + if (otherSvc.dependencies.contains(serviceWithDependency.service)) { + otherSvc.stateChanged(serviceWithDependency.service); + } + } + } } void startServices(){ @@ -1684,7 +1702,7 @@ public class DAGAppMaster extends AbstractService { for(ServiceWithDependency sd : services.values()) { // start the service. If this fails that service // will be stopped and an exception raised - ServiceThread st = new ServiceThread(sd); + ServiceThread st = new ServiceThread(sd, services); threads.add(st); }