hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1005315 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/bsp/ src/java/org/apache/hama/ipc/
Date Thu, 07 Oct 2010 02:14:09 GMT
Author: edwardyoon
Date: Thu Oct  7 02:14:09 2010
New Revision: 1005315

URL: http://svn.apache.org/viewvc?rev=1005315&view=rev
Log:
Implement BSPMaster's all methods extended from JobSubmissionProtocol

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
    incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1005315&r1=1005314&r2=1005315&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Oct  7 02:14:09 2010
@@ -46,6 +46,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
     
+    HAMA-268: Implement BSPMaster's all methods extended 
+                from JobSubmissionProtocol (hyunsik via edwardyoon)
     HAMA-295: Removing HBase dependency (edwardyoon)
     HAMA-294: Change log level of "Got heartbeatResponse.." messages
                 to debug (edwardyoon)

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1005315&r1=1005314&r2=1005315&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Thu Oct  7 02:14:09 2010
@@ -100,7 +100,8 @@ public class BSPMaster implements JobSub
   /**
    * Start the BSPMaster process, listen on the indicated hostname/port
    */
-  public BSPMaster(HamaConfiguration conf) throws IOException, InterruptedException {
+  public BSPMaster(HamaConfiguration conf) throws IOException,
+      InterruptedException {
     this(conf, generateNewIdentifier());
   }
 
@@ -413,7 +414,7 @@ public class BSPMaster implements JobSub
           job.completedTask(tip, report);
         } else if (report.getRunState() == TaskStatus.State.FAILED) {
           // TODO Tell the job to fail the relevant task
-          
+
         } else {
           job.updateTaskStatus(tip, report);
         }
@@ -564,8 +565,34 @@ public class BSPMaster implements JobSub
   }
 
   @Override
+  public JobStatus[] jobsToComplete() throws IOException {
+    return getJobStatus(jobs.values(), true);
+  }
+
+  @Override
   public JobStatus[] getAllJobs() throws IOException {
-    return null;
+    return getJobStatus(jobs.values(), false);
+  }
+
+  private synchronized JobStatus[] getJobStatus(Collection<JobInProgress> jips,
+      boolean toComplete) {
+    if (jips == null) {
+      return new JobStatus[] {};
+    }
+    List<JobStatus> jobStatusList = new ArrayList<JobStatus>();
+    for (JobInProgress jip : jips) {
+      JobStatus status = jip.getStatus();
+      if (toComplete) {
+        if (status.getRunState() == JobStatus.RUNNING
+            || status.getRunState() == JobStatus.PREP) {
+          jobStatusList.add(status);
+        }
+      } else {
+        jobStatusList.add(status);
+      }
+    }
+
+    return jobStatusList.toArray(new JobStatus[jobStatusList.size()]);
   }
 
   @Override

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=1005315&r1=1005314&r2=1005315&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Thu Oct  7 02:14:09
2010
@@ -30,7 +30,7 @@ public class LocalJobRunner implements J
     this.fs = FileSystem.get(conf);
     this.conf = conf;
   }
-
+  
   @Override
   public JobStatus[] getAllJobs() throws IOException {
     // TODO Auto-generated method stub
@@ -180,4 +180,10 @@ public class LocalJobRunner implements J
       // TODO Auto-generated method stub
     }
   }
+
+  @Override
+  public JobStatus[] jobsToComplete() throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1005315&r1=1005314&r2=1005315&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Thu Oct  7 02:14:09
2010
@@ -90,6 +90,7 @@ class TaskInProgress {
   public Task getTaskToRun(GroomServerStatus status) throws IOException {
       Task t = null;
       
+      // TODO use the TaskID, instead of String. 
       String taskid = null;
       if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) {
         taskid = new String("task_" + status.getGroomName() + "_" + nextTaskId);

Modified: incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java?rev=1005315&r1=1005314&r2=1005315&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java Thu Oct 
7 02:14:09 2010
@@ -26,86 +26,98 @@ import org.apache.hama.bsp.JobStatus;
 import org.apache.hama.bsp.TaskAttemptID;
 
 /**
- * Protocol that a groom server and the central BSP Master use to communicate. This
- * interface will contains several methods: submitJob, killJob, and killTask.
+ * Protocol that a groom server and the central BSP Master use to communicate.
+ * This interface will contains several methods: submitJob, killJob, and
+ * killTask.
  */
 public interface JobSubmissionProtocol extends HamaRPCProtocolVersion {
-  
+
   /**
    * Allocate a new id for the job.
+   * 
    * @return job id
    * @throws IOException
    */
   public BSPJobID getNewJobId() throws IOException;
-  
-  
+
   /**
-   * Submit a Job for execution.  Returns the latest profile for
-   * that job. 
-   * The job files should be submitted in <b>system-dir</b>/<b>jobName</b>.
-   *
+   * Submit a Job for execution. Returns the latest profile for that job. The
+   * job files should be submitted in <b>system-dir</b>/<b>jobName</b>.
+   * 
    * @param jobID
    * @param jobFile
    * @return jobStatus
    * @throws IOException
    */
-  //public JobStatus submitJob(BSPJobID jobName) throws IOException;
+  // public JobStatus submitJob(BSPJobID jobName) throws IOException;
 
   public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException;
-  
+
   /**
    * Get the current status of the cluster
+   * 
    * @param detailed if true then report groom names as well
    * @return summary of the state of the cluster
    */
   public ClusterStatus getClusterStatus(boolean detailed) throws IOException;
-  
+
   /**
    * Grab a handle to a job that is already known to the BSPMaster.
-   * @return Profile of the job, or null if not found. 
+   * 
+   * @return Profile of the job, or null if not found.
    */
   public JobProfile getJobProfile(BSPJobID jobid) throws IOException;
-  
+
   /**
    * Grab a handle to a job that is already known to the BSPMaster.
+   * 
    * @return Status of the job, or null if not found.
    */
   public JobStatus getJobStatus(BSPJobID jobid) throws IOException;
-  
+
   /**
-   * A BSP system always operates on a single filesystem.  This 
-   * function returns the fs name.  ('local' if the localfs; 'addr:port' 
-   * if dfs).  The client can then copy files into the right locations 
-   * prior to submitting the job.
+   * A BSP system always operates on a single filesystem. This function returns
+   * the fs name. ('local' if the localfs; 'addr:port' if dfs). The client can
+   * then copy files into the right locations prior to submitting the job.
    */
   public String getFilesystemName() throws IOException;
-  
-  /** 
-   * Get all the jobs submitted. 
+
+  /**
+   * Get the jobs that are not completed and not failed
+   * 
+   * @return array of JobStatus for the running/to-be-run jobs.
+   */
+  public JobStatus[] jobsToComplete() throws IOException;
+
+  /**
+   * Get all the jobs submitted.
+   * 
    * @return array of JobStatus for the submitted jobs
    */
   public JobStatus[] getAllJobs() throws IOException;
 
   /**
-   * Grab the bspmaster system directory path where job-specific files are to be placed.
+   * Grab the bspmaster system directory path where job-specific files are to be
+   * placed.
    * 
    * @return the system directory where job-specific files are to be placed.
    */
   public String getSystemDir();
-  
+
   /**
    * Kill the indicated job
    */
   public void killJob(BSPJobID jobid) throws IOException;
-  
-  
+
   /**
    * Kill indicated task attempt.
+   * 
    * @param taskId the id of the task to kill.
-   * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
-   * it is just killed, w/o affecting job failure status.  
-   */ 
-  public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException;
+   * @param shouldFail if true the task is failed and added to failed tasks
+   *          list, otherwise it is just killed, w/o affecting job failure
+   *          status.
+   */
+  public boolean killTask(TaskAttemptID taskId, boolean shouldFail)
+      throws IOException;
 
-  
 }



Mime
View raw message