hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1062420 - in /incubator/hama/trunk/src/java/org/apache/hama/bsp: BSPMaster.java GroomServer.java
Date Sun, 23 Jan 2011 12:59:57 GMT
Author: edwardyoon
Date: Sun Jan 23 12:59:57 2011
New Revision: 1062420

URL: http://svn.apache.org/viewvc?rev=1062420&view=rev
Log:
A fix to the issue that a TaskStatus reported back from the GroomServer can not associate
with any JobInProgress in BSPMaster, resulting in whichJob() returns null; thus failing to
accomplish a job submission from the client.

Modified:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1062420&r1=1062419&r2=1062420&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Sun Jan 23 12:59:57 2011
@@ -251,27 +251,21 @@ public class BSPMaster implements JobSub
         List<TaskStatus> tlist = ustus.getTaskReports();
         for (TaskStatus ts : tlist) {
           JobInProgress jip = whichJob(ts.getJobId());
-          // TODO: need for each tip execute completed?
-          // each tip already maintain a data structure, checking
-          // if task status is completed
-
-          if (jip != null) { // passes if jip is null
-            TaskInProgress tip = jip.findTaskInProgress(((TaskAttemptID) ts
-                .getTaskId()).getTaskID());
-            jip.completedTask(tip, ts);
-            LOG.info("JobInProgress id:" + jip.getJobID() + " status:"
-                + jip.getStatus());
-            if (jip.getStatus().getRunState() == JobStatus.SUCCEEDED) {
-              for (JobInProgressListener listener : jobInProgressListeners) {
-                try {
-                  listener.jobRemoved(jip);
-                } catch (IOException ioe) {
-                  LOG.error("Fail to alter scheduler a job is moved.", ioe);
-                }
+
+          TaskInProgress tip = jip.findTaskInProgress(((TaskAttemptID) ts
+              .getTaskId()).getTaskID());
+          jip.completedTask(tip, ts);
+          LOG.info("JobInProgress id:" + jip.getJobID() + " status:"
+              + jip.getStatus());
+          if (jip.getStatus().getRunState() == JobStatus.SUCCEEDED) {
+            for (JobInProgressListener listener : jobInProgressListeners) {
+              try {
+                listener.jobRemoved(jip);
+              } catch (IOException ioe) {
+                LOG.error("Fail to alter scheduler a job is moved.", ioe);
               }
             }
           }
-
         }
       } else {
         throw new RuntimeException("BSPMaster contains GroomServerSatus, "

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1062420&r1=1062419&r2=1062420&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Sun Jan 23 12:59:57
2011
@@ -98,11 +98,12 @@ public class GroomServer implements Runn
   Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
   /** Map from taskId -> TaskInProgress. */
   Map<TaskAttemptID, TaskInProgress> runningTasks = null;
+  Map<TaskAttemptID, TaskInProgress> finishedTasks = null;
   Map<BSPJobID, RunningJob> runningJobs = null;
 
   // new nexus between GroomServer and BSPMaster
   // holds/ manage all tasks
-  List<TaskInProgress> tasksList = new CopyOnWriteArrayList<TaskInProgress>();
+  //List<TaskInProgress> tasksList = new CopyOnWriteArrayList<TaskInProgress>();
 
   private String rpcServer;
   private Server workerServer;
@@ -143,6 +144,7 @@ public class GroomServer implements Runn
     this.tasks.clear();
     this.runningJobs = new TreeMap<BSPJobID, RunningJob>();
     this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
+    this.finishedTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
     this.conf.set(Constants.PEER_HOST, localHostname);
     this.conf.set(Constants.GROOM_RPC_HOST, localHostname);
     bspPeer = new BSPPeer(conf);
@@ -606,7 +608,7 @@ public class GroomServer implements Runn
         if (bspPeer.getLocalQueueSize() == 0
             && bspPeer.getOutgoingQueueSize() == 0 && !runner.isAlive())
{
           taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
-          doReport();
+          doReport(this.taskStatus);
           break;
         }
       }
@@ -616,9 +618,9 @@ public class GroomServer implements Runn
     /**
      * Update and report refresh status back to BSPMaster.
      */
-    private void doReport() {
+    private void doReport(TaskStatus taskStatus) {
       GroomServerStatus gss = new GroomServerStatus(groomServerName, bspPeer
-          .getPeerName(), updateTaskStatus(), failures, maxCurrentTasks,
+          .getPeerName(), updateTaskStatus(taskStatus), failures, maxCurrentTasks,
           rpcServer);
       try {
         boolean ret = masterClient.report(new Directive(gss));
@@ -632,14 +634,17 @@ public class GroomServer implements Runn
       }
     }
 
-    private List<TaskStatus> updateTaskStatus() {
+    private List<TaskStatus> updateTaskStatus(TaskStatus taskStatus) {
       List<TaskStatus> tlist = new ArrayList<TaskStatus>();
-      for (TaskInProgress tip : runningTasks.values()) {
-        TaskStatus stus = tip.getStatus();
-        stus.setProgress(1f);
-        stus.setRunState(TaskStatus.State.SUCCEEDED);
-        stus.setPhase(TaskStatus.Phase.CLEANUP);
-        tlist.add((TaskStatus) stus.clone());
+      synchronized(runningTasks){
+        synchronized(finishedTasks){
+          TaskInProgress tip = runningTasks.remove(taskStatus.getTaskId());
+          taskStatus.setProgress(1f);
+          taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+          taskStatus.setPhase(TaskStatus.Phase.CLEANUP);
+          tlist.add((TaskStatus) taskStatus.clone());
+          finishedTasks.put(taskStatus.getTaskId(), tip);
+        }
       }
       return tlist;
     }



Mime
View raw message