hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1293161 - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/
Date Fri, 24 Feb 2012 10:19:15 GMT
Author: tjungblut
Date: Fri Feb 24 10:19:15 2012
New Revision: 1293161

URL: http://svn.apache.org/viewvc?rev=1293161&view=rev
Log:
[HAMA-484]: Counters should be accessible in client (tjungblut)

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobStatus.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1293161&r1=1293160&r2=1293161&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Fri Feb 24 10:19:15 2012
@@ -11,7 +11,8 @@ Release 0.5 - Unreleased
   BUG FIXES
 
   IMPROVEMENTS
-    
+
+    HAMA-484: Counters should be accessible in client (tjungblut)    
     HAMA-483: Remove old and deprecated BSP API (tjungblut)    
     HAMA-514: Add maven-gpg-plugin to parent POM file (edwardyoon)
     HAMA-510: Add sendMessageToNeighbors() to Vertex (tjungblut)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1293161&r1=1293160&r2=1293161&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Fri Feb
24 10:19:15 2012
@@ -116,28 +116,16 @@ public class BSPJobClient extends Config
       this.statustime = System.currentTimeMillis();
     }
 
-    /*
-     * (non-Javadoc)
-     * @see org.apache.hama.bsp.RunningJob#getID()
-     */
     @Override
     public BSPJobID getID() {
       return profile.getJobID();
     }
 
-    /*
-     * (non-Javadoc)
-     * @see org.apache.hama.bsp.RunningJob#getJobName()
-     */
     @Override
     public String getJobName() {
       return profile.getJobName();
     }
 
-    /*
-     * (non-Javadoc)
-     * @see org.apache.hama.bsp.RunningJob#getJobFile()
-     */
     @Override
     public String getJobFile() {
       return profile.getJobFile();
@@ -191,6 +179,11 @@ public class BSPJobClient extends Config
       return status.getRunState();
     }
 
+    @Override
+    public JobStatus getStatus() {
+      return status;
+    }
+
     /**
      * Tells the service to terminate the current job.
      */
@@ -602,12 +595,13 @@ public class BSPJobClient extends Config
       }
     }
 
-    if(job.isSuccessful()) {
+    if (job.isSuccessful()) {
       LOG.info("The total number of supersteps: " + info.getSuperstepCount());
+      info.getStatus().getCounter().incrCounter(BSPPeerImpl.PeerCounter.SUPERSTEPS, info.getSuperstepCount());
+      info.getStatus().getCounter().log(LOG);
     } else {
       LOG.info("Job failed.");
     }
-    // TODO job.getCounters().log(LOG);
     return job.isSuccessful();
   }
 
@@ -653,6 +647,7 @@ public class BSPJobClient extends Config
     if (running.isSuccessful()) {
       LOG.info("Job complete: " + jobId);
       LOG.info("The total number of supersteps: " + running.getSuperstepCount());
+      running.getStatus().getCounter().log(LOG);
     } else {
       LOG.info("Job failed.");
     }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1293161&r1=1293160&r2=1293161&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Fri Feb 24
10:19:15 2012
@@ -161,9 +161,10 @@ public class BSPMaster implements JobSub
 
             if (ts.getRunState() == TaskStatus.State.SUCCEEDED) {
               jip.completedTask(tip, ts);
+              // increment counters only if successful
+              jip.getCounters().incrAllCounters(ts.getCounters());
             } else if (ts.getRunState() == TaskStatus.State.RUNNING) {
-              // TODO add progress counter
-              jip.getStatus().setprogress(ts.getSuperstepCount());
+              jip.getStatus().setProgress(ts.getSuperstepCount());
               jip.getStatus().setSuperstepCount(ts.getSuperstepCount());
             } else if (ts.getRunState() == TaskStatus.State.FAILED) {
               jip.status.setRunState(JobStatus.FAILED);
@@ -178,7 +179,7 @@ public class BSPMaster implements JobSub
                 }
               }
             } else if (jip.getStatus().getRunState() == JobStatus.RUNNING) {
-              jip.getStatus().setprogress(ts.getSuperstepCount());
+              jip.getStatus().setProgress(ts.getSuperstepCount());
               jip.getStatus().setSuperstepCount(ts.getSuperstepCount());
             } else if (jip.getStatus().getRunState() == JobStatus.KILLED) {
               GroomProtocol worker = findGroomServer(tmpStatus);

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1293161&r1=1293160&r2=1293161&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Fri Feb 24
10:19:15 2012
@@ -51,8 +51,8 @@ public final class BSPPeerImpl<K1, V1, K
 
   private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
-  protected static enum PeerCounter {
-    SUPERSTEPS
+  public static enum PeerCounter {
+    SUPERSTEP_SUM, SUPERSTEPS
   }
 
   private final Configuration conf;
@@ -137,7 +137,7 @@ public final class BSPPeerImpl<K1, V1, K
     // consistent peernames.
     syncClient.enterBarrier(taskId.getJobID(), taskId, -1);
     syncClient.leaveBarrier(taskId.getJobID(), taskId, -1);
-    setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 0,
+    setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 1.0f,
         TaskStatus.State.RUNNING, "running", peerAddress.getHostName(),
         TaskStatus.Phase.STARTING, counters));
 
@@ -262,7 +262,8 @@ public final class BSPPeerImpl<K1, V1, K
 
     leaveBarrier();
 
-    incrCounter(PeerCounter.SUPERSTEPS, 1);
+    incrCounter(PeerCounter.SUPERSTEP_SUM, 1L);
+
     currentTaskStatus.setCounters(counters);
 
     umbilical.statusUpdate(taskId, currentTaskStatus);
@@ -273,6 +274,7 @@ public final class BSPPeerImpl<K1, V1, K
   private final BSPMessageBundle<M> combineMessages(Iterable<M> messages) {
     if (!conf.getClass("bsp.combiner.class", Combiner.class).equals(
         Combiner.class)) {
+      @SuppressWarnings("unchecked")
       Combiner<M> combiner = (Combiner<M>) ReflectionUtils.newInstance(
           conf.getClass("bsp.combiner.class", Combiner.class), conf);
 
@@ -436,4 +438,8 @@ public final class BSPPeerImpl<K1, V1, K
       counters.incrCounter(group, counter, amount);
     }
   }
+
+  public Counters getCounters() {
+    return counters;
+  }
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java?rev=1293161&r1=1293160&r2=1293161&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java Fri Feb 24 10:19:15
2012
@@ -410,7 +410,7 @@ public class Counters implements Writabl
    * A cache from enum values to the associated counter. Dramatically speeds up
    * typical usage.
    */
-  private Map<Enum, Counter> cache = new IdentityHashMap<Enum, Counter>();
+  private Map<Enum<?>, Counter> cache = new IdentityHashMap<Enum<?>,
Counter>();
 
   /**
    * Returns the names of all counter classes.
@@ -445,7 +445,7 @@ public class Counters implements Writabl
    * @param key the counter key
    * @return the matching counter object
    */
-  public synchronized Counter findCounter(Enum key) {
+  public synchronized Counter findCounter(Enum<?> key) {
     Counter counter = cache.get(key);
     if (counter == null) {
       Group group = getGroup(key.getDeclaringClass().getName());
@@ -487,7 +487,7 @@ public class Counters implements Writabl
    * @param key identifies a counter
    * @param amount amount by which counter is to be incremented
    */
-  public synchronized void incrCounter(Enum key, long amount) {
+  public synchronized void incrCounter(Enum<?> key, long amount) {
     findCounter(key).increment(amount);
   }
 
@@ -507,7 +507,7 @@ public class Counters implements Writabl
    * Returns current value of the specified counter, or 0 if the counter does
    * not exist.
    */
-  public synchronized long getCounter(Enum key) {
+  public synchronized long getCounter(Enum<?> key) {
     return findCounter(key).getValue();
   }
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1293161&r1=1293160&r2=1293161&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Fri Feb
24 10:19:15 2012
@@ -45,6 +45,10 @@ class JobInProgress {
     }
   }
 
+  public static enum JobCounter {
+    LAUNCHED_TASKS
+  }
+
   static final Log LOG = LogFactory.getLog(JobInProgress.class);
   boolean tasksInited = false;
   boolean jobInited = false;
@@ -71,6 +75,8 @@ class JobInProgress {
   TaskInProgress tasks[] = new TaskInProgress[0];
   private long superstepCounter;
 
+  private final Counters counters = new Counters();
+
   int numBSPTasks = 0;
   int clusterSize;
   String jobSplit;
@@ -83,8 +89,8 @@ class JobInProgress {
     this.jobFile = jobFile;
     this.master = master;
 
-    this.status = new JobStatus(jobId, null, 0L, 0L, JobStatus.State.PREP
-        .value());
+    this.status = new JobStatus(jobId, null, 0L, 0L,
+        JobStatus.State.PREP.value(), counters);
     this.startTime = System.currentTimeMillis();
     this.superstepCounter = 0;
     this.restartCount = 0;
@@ -102,8 +108,8 @@ class JobInProgress {
 
     this.numBSPTasks = job.getNumBspTask();
 
-    this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job
-        .getJobName());
+    this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
+        job.getJobName());
 
     this.setJobName(job.getJobName());
 
@@ -215,7 +221,7 @@ class JobInProgress {
 
     // Update job status
     this.status = new JobStatus(this.status.getJobID(), this.profile.getUser(),
-        0L, 0L, JobStatus.RUNNING);
+        0L, 0L, JobStatus.RUNNING, counters);
 
     // delete all nodes before start
     master.clearZKNodes();
@@ -246,6 +252,7 @@ class JobInProgress {
     } catch (IOException e) {
       LOG.error("Exception while obtaining new task!", e);
     }
+    counters.incrCounter(JobCounter.LAUNCHED_TASKS, 1L);
     return result;
   }
 
@@ -268,9 +275,9 @@ class JobInProgress {
     }
 
     if (allDone) {
-      this.status = new JobStatus(this.status.getJobID(), this.profile
-          .getUser(), superstepCounter, superstepCounter, superstepCounter,
-          JobStatus.SUCCEEDED, superstepCounter);
+      this.status = new JobStatus(this.status.getJobID(),
+          this.profile.getUser(), superstepCounter, superstepCounter,
+          superstepCounter, JobStatus.SUCCEEDED, superstepCounter, counters);
       this.finishTime = System.currentTimeMillis();
       this.status.setFinishTime(this.finishTime);
 
@@ -297,15 +304,16 @@ class JobInProgress {
         break;
       }
     }
-    
-    // TODO 
-    
+
+    // TODO
+
     if (!allDone) {
       // Kill job
       this.kill();
       // Send KillTaskAction to GroomServer
-      this.status = new JobStatus(this.status.getJobID(), this.profile
-          .getUser(), 0L, 0L, 0L, JobStatus.KILLED, superstepCounter);
+      this.status = new JobStatus(this.status.getJobID(),
+          this.profile.getUser(), 0L, 0L, 0L, JobStatus.KILLED,
+          superstepCounter, counters);
       this.finishTime = System.currentTimeMillis();
       this.status.setFinishTime(this.finishTime);
 
@@ -329,7 +337,7 @@ class JobInProgress {
   public synchronized void kill() {
     if (status.getRunState() != JobStatus.KILLED) {
       this.status = new JobStatus(status.getJobID(), this.profile.getUser(),
-          0L, 0L, 0L, JobStatus.KILLED);
+          0L, 0L, 0L, JobStatus.KILLED, counters);
       this.finishTime = System.currentTimeMillis();
       this.status.setFinishTime(this.finishTime);
       //
@@ -391,4 +399,8 @@ class JobInProgress {
     return jobName;
   }
 
+  public Counters getCounters() {
+    return counters;
+  }
+
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobStatus.java?rev=1293161&r1=1293160&r2=1293161&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobStatus.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobStatus.java Fri Feb 24
10:19:15 2012
@@ -100,31 +100,33 @@ public class JobStatus implements Writab
   private int tasks;
   
   private long finishTime;
+  private Counters counter;
 
   public JobStatus() {
   }
 
-  public JobStatus(BSPJobID jobid, String user, long progress, int runState) {
-    this(jobid, user, progress, 0, runState);
+  public JobStatus(BSPJobID jobid, String user, long progress, int runState, Counters counter)
{
+    this(jobid, user, progress, 0, runState, counter);
   }
 
   public JobStatus(BSPJobID jobid, String user, long progress,
-      long cleanupProgress, int runState) {
-    this(jobid, user, 0, progress, cleanupProgress, runState);
+      long cleanupProgress, int runState, Counters counter) {
+    this(jobid, user, 0, progress, cleanupProgress, runState, counter);
   }
 
   public JobStatus(BSPJobID jobid, String user, long setupProgress,
-      long progress, long cleanupProgress, int runState) {
-    this(jobid, user, 0, progress, cleanupProgress, runState, 0);
+      long progress, long cleanupProgress, int runState, Counters counter) {
+    this(jobid, user, 0, progress, cleanupProgress, runState, 0, counter);
   }
 
   public JobStatus(BSPJobID jobid, String user, long setupProgress,
-      long progress, long cleanupProgress, int runState, long superstepCount) {
+      long progress, long cleanupProgress, int runState, long superstepCount, Counters counter)
{
     this.jobid = jobid;
     this.setupProgress = setupProgress;
     this.progress = progress;
     this.cleanupProgress = cleanupProgress;
     this.runState = runState;
+    this.counter = counter;
     this.state = State.values()[runState - 1];
     this.superstepCount = superstepCount;
     this.user = user;
@@ -138,7 +140,7 @@ public class JobStatus implements Writab
     return progress;
   }
 
-  synchronized void setprogress(long p) {
+  synchronized void setProgress(long p) {
     this.progress = p;
   }
 
@@ -255,6 +257,7 @@ public class JobStatus implements Writab
     Text.writeString(out, user);
     Text.writeString(out, schedulingInfo);
     out.writeLong(superstepCount);
+    counter.write(out);
   }
 
   public synchronized void readFields(DataInput in) throws IOException {
@@ -269,6 +272,8 @@ public class JobStatus implements Writab
     this.user = Text.readString(in);
     this.schedulingInfo = Text.readString(in);
     this.superstepCount = in.readLong();
+    counter = new Counters();
+    counter.readFields(in);
   }
 
   /**
@@ -285,4 +290,8 @@ public class JobStatus implements Writab
     return name;
   }
 
+  public Counters getCounter() {
+    return counter;
+  }
+
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1293161&r1=1293160&r2=1293161&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Fri Feb
24 10:19:15 2012
@@ -66,7 +66,7 @@ public class LocalBSPRunner implements J
   private volatile ThreadPoolExecutor threadPool;
 
   @SuppressWarnings("rawtypes")
-  protected static final LinkedList<Future<BSP>> futureList = new LinkedList<Future<BSP>>();
+  protected static final LinkedList<Future<BSPPeerImpl>> futureList = new LinkedList<Future<BSPPeerImpl>>();
 
   protected String jobFile;
   protected String jobName;
@@ -78,6 +78,7 @@ public class LocalBSPRunner implements J
 
   private static volatile long superStepCount = 0L;
   private static String[] peerNames;
+  private final Counters globalCounters = new Counters();
 
   // this is used for not-input driven job
   private int maxTasks;
@@ -126,7 +127,7 @@ public class LocalBSPRunner implements J
 
     BSPJob job = new BSPJob(new HamaConfiguration(conf), jobID);
     currentJobStatus = new JobStatus(jobID, System.getProperty("user.name"),
-        0L, JobStatus.RUNNING);
+        0L, JobStatus.RUNNING, globalCounters);
 
     int numBspTask = job.getNumBspTask();
 
@@ -151,6 +152,7 @@ public class LocalBSPRunner implements J
       peerNames[i] = "local:" + i;
       futureList.add(threadPool.submit(new BSPRunner(new Configuration(conf),
           job, i, splits)));
+      globalCounters.incrCounter(JobInProgress.JobCounter.LAUNCHED_TASKS, 1L);
     }
 
     new Thread(new ThreadObserver(currentJobStatus)).start();
@@ -171,7 +173,7 @@ public class LocalBSPRunner implements J
   @Override
   public JobStatus getJobStatus(BSPJobID jobid) throws IOException {
     currentJobStatus.setSuperstepCount(superStepCount);
-    currentJobStatus.setprogress(superStepCount);
+    currentJobStatus.setProgress(superStepCount);
     return currentJobStatus;
   }
 
@@ -207,14 +209,15 @@ public class LocalBSPRunner implements J
   }
 
   // this class will spawn a new thread and executes the BSP
-  @SuppressWarnings({ "deprecation", "rawtypes" })
-  static class BSPRunner implements Callable<BSP> {
+  @SuppressWarnings({ "rawtypes" })
+  static class BSPRunner implements Callable<BSPPeerImpl> {
 
     private Configuration conf;
     private BSPJob job;
     private int id;
     private BSP bsp;
     private RawSplit[] splits;
+    private BSPPeerImpl peer;
 
     public BSPRunner(Configuration conf, BSPJob job, int id, RawSplit[] splits) {
       super();
@@ -243,7 +246,7 @@ public class LocalBSPRunner implements J
         realBytes = splits[id].getBytes();
       }
 
-      BSPPeerImpl peer = new BSPPeerImpl(job, conf, new TaskAttemptID(
+      peer = new BSPPeerImpl(job, conf, new TaskAttemptID(
           new TaskID(job.getJobID(), id), id), new LocalUmbilical(), id,
           splitname, realBytes, new Counters());
       try {
@@ -258,9 +261,9 @@ public class LocalBSPRunner implements J
     }
 
     @Override
-    public BSP call() throws Exception {
+    public BSPPeerImpl call() throws Exception {
       run();
-      return bsp;
+      return peer;
     }
   }
 
@@ -273,13 +276,15 @@ public class LocalBSPRunner implements J
       this.status = currentJobStatus;
     }
 
+    @SuppressWarnings("rawtypes")
     @Override
     public void run() {
       boolean success = true;
-      for (@SuppressWarnings("rawtypes")
-      Future<BSP> future : futureList) {
+      for (Future<BSPPeerImpl> future : futureList) {
         try {
-          future.get();
+          BSPPeerImpl bspPeerImpl = future.get();
+          currentJobStatus.getCounter().incrAllCounters(
+              bspPeerImpl.getCounters());
         } catch (InterruptedException e) {
           LOG.error("Exception during BSP execution!", e);
           success = false;
@@ -303,6 +308,7 @@ public class LocalBSPRunner implements J
   public static class LocalMessageManager<M extends Writable> implements
       MessageManager<M> {
 
+    @SuppressWarnings("rawtypes")
     private static final ConcurrentHashMap<InetSocketAddress, LocalMessageManager>
managerMap = new ConcurrentHashMap<InetSocketAddress, LocalBSPRunner.LocalMessageManager>();
 
     private final HashMap<InetSocketAddress, LinkedList<M>> localOutgoingMessages
= new HashMap<InetSocketAddress, LinkedList<M>>();
@@ -344,6 +350,7 @@ public class LocalBSPRunner implements J
       localOutgoingMessages.put(inetSocketAddress, msgs);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
         throws IOException {

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java?rev=1293161&r1=1293160&r2=1293161&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java Fri Feb 24
10:19:15 2012
@@ -113,4 +113,9 @@ public interface RunningJob {
       throws IOException;
 
   public long getSuperstepCount() throws IOException;
+
+  /**
+   * @return the latest status of the job.
+   */
+  public JobStatus getStatus();
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java?rev=1293161&r1=1293160&r2=1293161&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java Fri Feb 24
10:19:15 2012
@@ -245,7 +245,7 @@ public class TaskStatus implements Writa
    * @return The number of BSP super steps executed by the task.
    */
   public long getSuperstepCount() {
-    return counters.getCounter(PeerCounter.SUPERSTEPS);
+    return counters.getCounter(PeerCounter.SUPERSTEP_SUM);
   }
 
   @Override



Mime
View raw message