hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1146992 - in /incubator/hama/trunk/src/java/org/apache/hama: bsp/ ipc/
Date Fri, 15 Jul 2011 07:38:06 GMT
Author: edwardyoon
Date: Fri Jul 15 07:38:05 2011
New Revision: 1146992

URL: http://svn.apache.org/viewvc?rev=1146992&view=rev
Log:
Refactor IPC package.

Added:
    incubator/hama/trunk/src/java/org/apache/hama/ipc/BSPPeerProtocol.java
    incubator/hama/trunk/src/java/org/apache/hama/ipc/GroomProtocol.java
Removed:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerProtocol.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRPCProtocolVersion.java
    incubator/hama/trunk/src/java/org/apache/hama/ipc/WorkerProtocol.java
Modified:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1146992&r1=1146991&r2=1146992&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Fri Jul 15 07:38:05 2011
@@ -49,7 +49,7 @@ import org.apache.hama.HamaConfiguration
 import org.apache.hama.http.HttpServer;
 import org.apache.hama.ipc.JobSubmissionProtocol;
 import org.apache.hama.ipc.MasterProtocol;
-import org.apache.hama.ipc.WorkerProtocol;
+import org.apache.hama.ipc.GroomProtocol;
 
 /**
  * BSPMaster is responsible to control all the groom servers and to manage bsp
@@ -113,7 +113,7 @@ public class BSPMaster implements JobSub
   private TaskScheduler taskScheduler;
 
   // GroomServers cache
-  protected ConcurrentMap<GroomServerStatus, WorkerProtocol> groomServers = new ConcurrentHashMap<GroomServerStatus,
WorkerProtocol>();
+  protected ConcurrentMap<GroomServerStatus, GroomProtocol> groomServers = new ConcurrentHashMap<GroomServerStatus,
GroomProtocol>();
 
   private Instructor instructor;
 
@@ -161,7 +161,7 @@ public class BSPMaster implements JobSub
             } else if (jip.getStatus().getRunState() == JobStatus.RUNNING) {
               jip.getStatus().setprogress(ts.getSuperstepCount());
             } else if (jip.getStatus().getRunState() == JobStatus.KILLED) {
-              WorkerProtocol worker = findGroomServer(tmpStatus);
+              GroomProtocol worker = findGroomServer(tmpStatus);
               Directive d1 = new DispatchTasksDirective(
                   currentGroomServerPeers(),
                   new GroomServerAction[] { new KillTaskAction(ts.getTaskId()) });
@@ -313,8 +313,8 @@ public class BSPMaster implements JobSub
     }
     Throwable e = null;
     try {
-      WorkerProtocol wc = (WorkerProtocol) RPC.waitForProxy(
-          WorkerProtocol.class, WorkerProtocol.versionID,
+      GroomProtocol wc = (GroomProtocol) RPC.waitForProxy(
+          GroomProtocol.class, GroomProtocol.versionID,
           resolveWorkerAddress(status.getRpcServer()), this.conf);
       if (null == wc) {
         LOG.warn("Fail to create Worker client at host " + status.getPeerName());
@@ -350,7 +350,7 @@ public class BSPMaster implements JobSub
   private void updateGroomServersKey(GroomServerStatus old,
       GroomServerStatus newKey) {
     synchronized (groomServers) {
-      WorkerProtocol worker = groomServers.remove(old);
+      GroomProtocol worker = groomServers.remove(old);
       groomServers.put(newKey, worker);
     }
   }
@@ -517,7 +517,7 @@ public class BSPMaster implements JobSub
     int numGroomServers = groomServers.size();
     if (detailed) {
       groomPeersMap = new HashMap<String, String>();
-      for (Map.Entry<GroomServerStatus, WorkerProtocol> entry : groomServers
+      for (Map.Entry<GroomServerStatus, GroomProtocol> entry : groomServers
           .entrySet()) {
         GroomServerStatus s = entry.getKey();
         groomPeersMap.put(s.getGroomName(), s.getPeerName());
@@ -537,12 +537,12 @@ public class BSPMaster implements JobSub
   }
 
   @Override
-  public WorkerProtocol findGroomServer(GroomServerStatus status) {
+  public GroomProtocol findGroomServer(GroomServerStatus status) {
     return groomServers.get(status);
   }
 
   @Override
-  public Collection<WorkerProtocol> findGroomServers() {
+  public Collection<GroomProtocol> findGroomServers() {
     return groomServers.values();
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1146992&r1=1146991&r2=1146992&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Fri Jul 15 07:38:05 2011
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hama.Constants;
+import org.apache.hama.ipc.BSPPeerProtocol;
 import org.apache.hama.util.Bytes;
 import org.apache.hama.zookeeper.QuorumPeer;
 import org.apache.zookeeper.CreateMode;

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java?rev=1146992&r1=1146991&r2=1146992&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java Fri Jul 15 07:38:05
2011
@@ -21,12 +21,13 @@ import java.io.Closeable;
 import java.io.IOException;
 
 import org.apache.hama.Constants;
+import org.apache.hama.ipc.HamaRPCProtocolVersion;
 import org.apache.zookeeper.KeeperException;
 
 /**
  * BSP communication interface.
  */
-public interface BSPPeerInterface extends BSPRPCProtocolVersion, Closeable,
+public interface BSPPeerInterface extends HamaRPCProtocolVersion, Closeable,
     Constants {
 
   /**

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1146992&r1=1146991&r2=1146992&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Fri Jul 15 07:38:05 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.ipc.BSPPeerProtocol;
 import org.apache.zookeeper.KeeperException;
 
 /**

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1146992&r1=1146991&r2=1146992&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Fri Jul 15 07:38:05
2011
@@ -54,8 +54,9 @@ import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.ipc.BSPPeerProtocol;
 import org.apache.hama.ipc.MasterProtocol;
-import org.apache.hama.ipc.WorkerProtocol;
+import org.apache.hama.ipc.GroomProtocol;
 import org.apache.log4j.LogManager;
 
 /**
@@ -66,7 +67,7 @@ import org.apache.log4j.LogManager;
  * storages. Basically, a groom server and a data node should be run on one
  * physical node.
  */
-public class GroomServer implements Runnable, WorkerProtocol, BSPPeerProtocol {
+public class GroomServer implements Runnable, GroomProtocol, BSPPeerProtocol {
   public static final Log LOG = LogFactory.getLog(GroomServer.class);
   static final String SUBDIR = "groomServer";
 
@@ -798,8 +799,8 @@ public class GroomServer implements Runn
   @Override
   public long getProtocolVersion(String protocol, long clientVersion)
       throws IOException {
-    if (protocol.equals(WorkerProtocol.class.getName())) {
-      return WorkerProtocol.versionID;
+    if (protocol.equals(GroomProtocol.class.getName())) {
+      return GroomProtocol.versionID;
     } else if (protocol.equals(BSPPeerProtocol.class.getName())) {
       return BSPPeerProtocol.versionID;
     } else {

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java?rev=1146992&r1=1146991&r2=1146992&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java Fri Jul 15 07:38:05
2011
@@ -20,7 +20,7 @@ package org.apache.hama.bsp;
 import java.util.Collection;
 import java.util.Map;
 
-import org.apache.hama.ipc.WorkerProtocol;
+import org.apache.hama.ipc.GroomProtocol;
 
 /**
  * Manages information about the {@link GroomServer}s in the cluster 
@@ -41,14 +41,14 @@ interface GroomServerManager {
    * @param groomId The identification value of GroomServer 
    * @return GroomServerStatus 
    */
-  WorkerProtocol findGroomServer(GroomServerStatus status);
+  GroomProtocol findGroomServer(GroomServerStatus status);
 
   /**
    * Find the collection of groom servers.
    * 
    * @return Collection of groom servers list.
    */
-  Collection<WorkerProtocol> findGroomServers();
+  Collection<GroomProtocol> findGroomServers();
 
   /**
    * Collection of GroomServerStatus as the key set.

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1146992&r1=1146991&r2=1146992&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java Fri Jul 15
07:38:05 2011
@@ -26,7 +26,7 @@ import java.util.concurrent.ScheduledExe
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hama.ipc.WorkerProtocol;
+import org.apache.hama.ipc.GroomProtocol;
 
 /**
  * A simple task scheduler. 
@@ -134,7 +134,7 @@ class SimpleTaskScheduler extends TaskSc
       // assembly into actions
       // List<Task> tasks = new ArrayList<Task>();
       if (jip.getStatus().getRunState() == JobStatus.RUNNING) {
-        WorkerProtocol worker = groomServerManager.findGroomServer(this.stus);
+        GroomProtocol worker = groomServerManager.findGroomServer(this.stus);
         try {
           // dispatch() to the groom server
           Directive d1 = new DispatchTasksDirective(groomServerManager

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1146992&r1=1146991&r2=1146992&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Fri Jul 15 07:38:05 2011
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hama.ipc.BSPPeerProtocol;
 
 /**
  * Base class for tasks.

Added: incubator/hama/trunk/src/java/org/apache/hama/ipc/BSPPeerProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/BSPPeerProtocol.java?rev=1146992&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/BSPPeerProtocol.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/BSPPeerProtocol.java Fri Jul 15 07:38:05
2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.PeerNames;
+import org.apache.hama.bsp.Task;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Protocol that task child process uses to contact its parent process.
+ */
+public interface BSPPeerProtocol extends HamaRPCProtocolVersion, Closeable,
+    Constants {
+
+  /** Called when a child task process starts, to get its task. */
+  Task getTask(TaskAttemptID taskid) throws IOException;
+
+  /**
+   * Periodically called by child to check if parent is still alive.
+   * 
+   * @return True if the task is known
+   */
+  boolean ping(TaskAttemptID taskid) throws IOException;
+
+  /**
+   * Report that the task is successfully completed. Failure is assumed if the
+   * task process exits without calling this.
+   * 
+   * @param taskid task's id
+   * @param shouldBePromoted whether to promote the task's output or not
+   */
+  void done(TaskAttemptID taskid, boolean shouldBePromoted) throws IOException;
+
+  /** Report that the task encounted a local filesystem error. */
+  void fsError(TaskAttemptID taskId, String message) throws IOException;
+
+  void incrementSuperstepCount(TaskAttemptID taskid) throws IOException;
+
+  /**
+   * @return the all BSPPeer names.
+   */
+  PeerNames getAllPeerNames();
+
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/ipc/GroomProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/GroomProtocol.java?rev=1146992&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/GroomProtocol.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/GroomProtocol.java Fri Jul 15 07:38:05
2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+
+import org.apache.hama.bsp.Directive;
+
+/**
+ * A protocol for BSPMaster talks to GroomServer. 
+ */
+public interface GroomProtocol extends HamaRPCProtocolVersion {
+
+  /**
+   * Instruct GroomServer performaning tasks.
+   * 
+   * @param directive instructs a GroomServer performing necessary
+   *        execution.
+   * @throws IOException
+   */
+  void dispatch(Directive directive) throws IOException;
+
+}



Mime
View raw message