Author: edwardyoon
Date: Mon Sep 26 01:06:12 2011
New Revision: 1175640
URL: http://svn.apache.org/viewvc?rev=1175640&view=rev
Log:
Refactor BSPPeer
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1175640&r1=1175639&r2=1175640&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Mon Sep 26
01:06:12 2011
@@ -485,6 +485,9 @@ public class BSPMaster implements JobSub
for (String node : zk.getChildren(bspRoot, this)) {
for (String subnode : zk.getChildren(bspRoot + "/" + node, this)) {
for (String subnode2 : zk.getChildren(bspRoot + "/" + node, this)) {
+ for (String subnode3 : zk.getChildren(bspRoot + "/" + node + "/" + subnode2,
this)) {
+ zk.delete(bspRoot + "/" + node + "/" + subnode + "/" + subnode2 + "/" + subnode3,
0);
+ }
zk.delete(bspRoot + "/" + node + "/" + subnode + "/" + subnode2, 0);
}
zk.delete(bspRoot + "/" + node + "/" + subnode, 0);
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1175640&r1=1175639&r2=1175640&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Mon Sep 26 01:06:12
2011
@@ -227,8 +227,8 @@ public class BSPPeer implements Watcher,
BSPMessageSerializer msgSerializer = null;
if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
msgSerializer = new BSPMessageSerializer(conf.getInt(
- "bsp.checkpoint.port", Integer
- .parseInt(CheckpointRunner.DEFAULT_PORT)));
+ "bsp.checkpoint.port",
+ Integer.parseInt(CheckpointRunner.DEFAULT_PORT)));
}
this.messageSerializer = msgSerializer;
}
@@ -237,8 +237,8 @@ public class BSPPeer implements Watcher,
try {
if (LOG.isDebugEnabled())
LOG.debug("reinitialize(): " + getPeerName());
- this.server = RPC.getServer(this, peerAddress.getHostName(), peerAddress
- .getPort(), conf);
+ this.server = RPC.getServer(this, peerAddress.getHostName(),
+ peerAddress.getPort(), conf);
server.start();
LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:"
+ peerAddress.getPort());
@@ -365,12 +365,40 @@ public class BSPPeer implements Watcher,
private void createZnode(final String path, final CreateMode mode)
throws KeeperException, InterruptedException {
- Stat s = zk.exists(path, false);
- if (null == s) {
- try {
- zk.create(path, null, Ids.OPEN_ACL_UNSAFE, mode);
- } catch (KeeperException.NodeExistsException nee) {
- LOG.warn("Ignore because znode may be already created at " + path, nee);
+ synchronized (zk) {
+ Stat s = zk.exists(path, false);
+ if (null == s) {
+ try {
+ zk.create(path, null, Ids.OPEN_ACL_UNSAFE, mode);
+ } catch (KeeperException.NodeExistsException nee) {
+ LOG.warn("Ignore because znode may be already created at " + path,
+ nee);
+ }
+ }
+ }
+ }
+
+ private class BarrierWatcher implements Watcher {
+ private boolean complete = false;
+
+ boolean isComplete() {
+ return this.complete;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ this.complete = true;
+ synchronized (mutex) {
+ LOG.info(">>>>>>>>>>>>>>> at superstep
" + getSuperstepCount()
+ + " taskid:" + taskid.toString() + " is notified.");
+ /*
+ * try { Stat s = zk.exists(pathToSuperstepZnode+"/ready", false);
+ * if(null != s) { zk.delete(pathToSuperstepZnode+"/ready", 0); } }
+ * catch(KeeperException.NoNodeException nne) {
+ * LOG.warn("Ignore because znode may be deleted.", nne); }
+ * catch(Exception e) { throw new RuntimeException(e); }
+ */
+ mutex.notifyAll();
}
}
}
@@ -381,44 +409,50 @@ public class BSPPeer implements Watcher,
+ this.getSuperstepCount());
}
- createZnode(bspRoot);
+ synchronized (zk) {
+ createZnode(bspRoot);
+ final String pathToJobIdZnode = bspRoot + "/"
+ + taskid.getJobID().toString();
+ createZnode(pathToJobIdZnode);
+ final String pathToSuperstepZnode = pathToJobIdZnode + "/"
+ + getSuperstepCount();
+ createZnode(pathToSuperstepZnode);
+ BarrierWatcher barrierWatcher = new BarrierWatcher();
+ Stat readyStat = zk.exists(pathToSuperstepZnode + "/ready",
+ barrierWatcher);
+ zk.create(getNodeName(), null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- final String pathToJobIdZnode = bspRoot + "/"
- + taskid.getJobID().toString();
- createZnode(pathToJobIdZnode);
-
- final String pathToSuperstepZnode = pathToJobIdZnode + "/"
- + getSuperstepCount();
- createZnode(pathToSuperstepZnode);
-
- zk.exists(pathToSuperstepZnode + "/ready", new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- synchronized (mutex) {
- try {
- Stat s = zk.exists(pathToSuperstepZnode + "/ready", false);
- if (null != s) {
- zk.delete(pathToSuperstepZnode + "/ready", 0);
- }
- } catch (KeeperException.NoNodeException nne) {
- LOG.warn("Ignore because znode may be deleted.", nne);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- mutex.notifyAll();
- }
+ List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
+ int size = znodes.size(); // may contains ready
+ boolean hasReady = znodes.contains("ready");
+ if (hasReady) {
+ size--;
}
- });
- zk.create(getNodeName(), null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- synchronized (mutex) {
- List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
+ LOG.debug("===> at superstep :" + getSuperstepCount()
+ + " current znode size: " + znodes.size() + " current znodes:"
+ + znodes);
+
if (LOG.isDebugEnabled())
LOG.debug("enterBarrier() znode size within " + pathToSuperstepZnode
+ " is " + znodes.size() + ". Znodes include " + znodes);
- if (znodes.size() < jobConf.getNumBspTask()) {
- mutex.wait();
+
+ if (size < jobConf.getNumBspTask()) {
+ LOG.info("xxxx 1. At superstep: " + getSuperstepCount()
+ + " which task is waiting? " + taskid.toString()
+ + " stat is null? " + readyStat);
+ while (!barrierWatcher.isComplete()) {
+ if (!hasReady) {
+ synchronized (mutex) {
+ mutex.wait(1000);
+ }
+ }
+ }
+ LOG.debug("xxxx 2. at superstep: " + getSuperstepCount()
+ + " after waiting ..." + taskid.toString());
} else {
+ LOG.debug("---> at superstep: " + getSuperstepCount()
+ + " task that is creating /ready znode:" + taskid.toString());
createEphemeralZnode(pathToSuperstepZnode + "/ready");
}
}
@@ -429,45 +463,86 @@ public class BSPPeer implements Watcher,
final String pathToSuperstepZnode = bspRoot + "/"
+ taskid.getJobID().toString() + "/" + getSuperstepCount();
while (true) {
- synchronized (mutex) {
- final List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
- final int size = znodes.size();
- if (null == znodes || znodes.isEmpty())
- return true;
- if (1 == size) {
+ List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
+ LOG.info("leaveBarrier() !!! checking znodes cotnains /ready node or not: at superstep:"
+ + getSuperstepCount() + " znode:" + znodes);
+ if (znodes.contains("ready")) {
+ znodes.remove("ready");
+ }
+ final int size = znodes.size();
+ LOG.info("leaveBarrier() at superstep:" + getSuperstepCount()
+ + " znode size: (" + size + ") znodes:" + znodes);
+ if (null == znodes || znodes.isEmpty())
+ return true;
+ if (1 == size) {
+ try {
zk.delete(getNodeName(), 0);
- return true;
+ } catch (KeeperException.NoNodeException nne) {
+ LOG.warn(
+ "+++ (znode size is 1). Ignore because znode may disconnect.",
+ nne);
}
- Collections.sort(znodes);
- final String lowest = znodes.get(0);
- final String highest = znodes.get(size - 1);
+ return true;
+ }
+ Collections.sort(znodes);
+
+ final String lowest = znodes.get(0);
+ final String highest = znodes.get(size - 1);
+
+ LOG.info("leaveBarrier() at superstep: " + getSuperstepCount()
+ + " taskid:" + taskid.toString() + " lowest: " + lowest + " highest:"
+ + highest);
+ synchronized (mutex) {
+
if (getNodeName().equals(pathToSuperstepZnode + "/" + lowest)) {
Stat s = zk.exists(pathToSuperstepZnode + "/" + highest,
new Watcher() {
@Override
public void process(WatchedEvent event) {
synchronized (mutex) {
+ LOG.debug("leaveBarrier() at superstep: "
+ + getSuperstepCount() + " taskid:" + taskid.toString()
+ + " highest notify lowest.");
mutex.notifyAll();
}
}
});
- if (null != s)
+
+ if (null != s) {
+ LOG.debug("leaveBarrier(): superstep:" + getSuperstepCount()
+ + " taskid:" + taskid.toString() + " wait for higest notify.");
mutex.wait();
+ }
} else {
Stat s1 = zk.exists(getNodeName(), false);
- if (null != s1)
- zk.delete(getNodeName(), 0);
+
+ if (null != s1) {
+ LOG.info("leaveBarrier() znode at superstep:" + getSuperstepCount()
+ + " taskid:" + taskid.toString() + " exists, so delete it.");
+ try {
+ zk.delete(getNodeName(), 0);
+ } catch (KeeperException.NoNodeException nne) {
+ LOG.warn("++++ Ignore because node may be dleted.", nne);
+ }
+ }
+
Stat s2 = zk.exists(pathToSuperstepZnode + "/" + lowest,
new Watcher() {
@Override
public void process(WatchedEvent event) {
synchronized (mutex) {
+ LOG.debug("leaveBarrier() at superstep: "
+ + getSuperstepCount() + " taskid:" + taskid.toString()
+ + " lowest notify other nodes.");
mutex.notifyAll();
}
}
});
- if (null != s2)
+ if (null != s2) {
+ LOG.debug("leaveBarrier(): superstep:" + getSuperstepCount()
+ + " taskid:" + taskid.toString() + " wait for lowest notify.");
mutex.wait();
+ }
}
}
}
@@ -555,8 +630,8 @@ public class BSPPeer implements Watcher,
"Peername must consist of exactly ONE \":\"! Given peername was: "
+ peerName);
}
- return new InetSocketAddress(peerAddrParts[0], Integer
- .parseInt(peerAddrParts[1]));
+ return new InetSocketAddress(peerAddrParts[0],
+ Integer.parseInt(peerAddrParts[1]));
}
@Override
|