hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1242637 [1/2] - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/test/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/bsp/message/ core/src/test/java...
Date Fri, 10 Feb 2012 02:06:11 GMT
Author: edwardyoon
Date: Fri Feb 10 02:06:10 2012
New Revision: 1242637

URL: http://svn.apache.org/viewvc?rev=1242637&view=rev
Log:
Message API improvements

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Combiner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Fri Feb 10 02:06:10 2012
@@ -11,6 +11,7 @@ Release 0.5 - Unreleased
 
   IMPROVEMENTS
 
+    HAMA-502: Message API Improvement (edwardyoon)
   
 Release 0.4 - February 5, 2012
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java Fri Feb 10 02:06:10 2012
@@ -20,13 +20,14 @@ package org.apache.hama.bsp;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.sync.SyncException;
 
 /**
  * This class provides an abstract implementation of the BSP interface.
  */
-public abstract class BSP<K1, V1, K2, V2> implements
-    BSPInterface<K1, V1, K2, V2> {
+public abstract class BSP<K1, V1, K2, V2, M extends Writable> implements
+    BSPInterface<K1, V1, K2, V2, M> {
 
   protected Configuration conf;
 
@@ -39,7 +40,7 @@ public abstract class BSP<K1, V1, K2, V2
    * @throws SyncException
    * @throws InterruptedException
    */
-  public abstract void bsp(BSPPeer<K1, V1, K2, V2> peer) throws IOException,
+  public abstract void bsp(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
       SyncException, InterruptedException;
 
   /**
@@ -49,7 +50,7 @@ public abstract class BSP<K1, V1, K2, V2
    * @param peer Your BSPPeer instance.
    * @throws IOException
    */
-  public void setup(BSPPeer<K1, V1, K2, V2> peer) throws IOException,
+  public void setup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
       SyncException, InterruptedException {
 
   }
@@ -62,7 +63,7 @@ public abstract class BSP<K1, V1, K2, V2
    * @param peer Your BSPPeer instance.
    * @throws IOException
    */
-  public void cleanup(BSPPeer<K1, V1, K2, V2> peer) throws IOException {
+  public void cleanup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException {
 
   }
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java Fri Feb 10 02:06:10 2012
@@ -23,6 +23,6 @@ import org.apache.hadoop.conf.Configurab
  * Interface BSP defines the basic operations needed to implement the BSP
  * algorithm.
  */
-public interface BSPInterface<K1, V1, K2, V2> extends Configurable {
+public interface BSPInterface<K1, V1, K2, V2, M> extends Configurable {
 
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Fri Feb 10 02:06:10 2012
@@ -25,6 +25,7 @@ import java.util.Enumeration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.HamaConfiguration;
 
@@ -110,15 +111,15 @@ public class BSPJob extends BSPJobContex
     return (Class<? extends BSP>) conf.getClass(WORK_CLASS_ATTR, BSP.class);
   }
 
-  public void setCombinerClass(Class<? extends Combiner> cls) {
+  public void setCombinerClass(Class<? extends Combiner<? extends Writable>> cls) {
     ensureState(JobState.DEFINE);
     conf.setClass(COMBINER_CLASS_ATTR, cls, Combiner.class);
   }
 
   @SuppressWarnings("unchecked")
-  public Class<? extends Combiner> getCombinerClass() {
-    return (Class<? extends Combiner>) conf.getClass(COMBINER_CLASS_ATTR,
-        Combiner.class);
+  public Class<? extends Combiner<? extends Writable>> getCombinerClass() {
+    return (Class<? extends Combiner<? extends Writable>>) conf.getClass(
+        COMBINER_CLASS_ATTR, Combiner.class);
   }
 
   public void setJar(String jar) {
@@ -230,7 +231,7 @@ public class BSPJob extends BSPJobContex
   }
 
   public int getNumBspTask() {
-  // default is 1, because with zero, we will hang in infinity
+    // default is 1, because with zero, we will hang in infinity
     return conf.getInt("bsp.peers.num", 1);
   }
 
@@ -283,7 +284,6 @@ public class BSPJob extends BSPJobContex
     conf.setClass("bsp.input.value.class", theClass, Object.class);
   }
 
-  
   /**
    * Get the key class for the job output data.
    * 
@@ -368,4 +368,7 @@ public class BSPJob extends BSPJobContex
         conf);
   }
 
+  public void setMaxIteration(int maxIteration) {
+    conf.setInt("hama.graph.max.iteration", maxIteration);
+  }
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Fri Feb 10 02:06:10 2012
@@ -36,12 +36,12 @@ import org.apache.hadoop.util.Reflection
  * batch rather than individually.
  * 
  */
-public class BSPMessageBundle implements Writable {
+public class BSPMessageBundle<M extends Writable> implements Writable {
   
   public static final Log LOG = LogFactory.getLog(BSPMessageBundle.class);
 
-  private HashMap<String, LinkedList<BSPMessage>> messages = new HashMap<String, LinkedList<BSPMessage>>();
-  private HashMap<String, Class<? extends BSPMessage>> classCache = new HashMap<String, Class<? extends BSPMessage>>();
+  private HashMap<String, LinkedList<M>> messages = new HashMap<String, LinkedList<M>>();
+  private HashMap<String, Class<M>> classCache = new HashMap<String, Class<M>>();
 
   public BSPMessageBundle() {
   }
@@ -51,11 +51,11 @@ public class BSPMessageBundle implements
    * 
    * @param message BSPMessage to add.
    */
-  public void addMessage(BSPMessage message) {
+  public void addMessage(M message) {
     String className = message.getClass().getName();
     if (!messages.containsKey(className)) {
       // use linked list because we're just iterating over them
-      LinkedList<BSPMessage> list = new LinkedList<BSPMessage>();
+      LinkedList<M> list = new LinkedList<M>();
       list.add(message);
       messages.put(className, list);
     } else {
@@ -63,11 +63,11 @@ public class BSPMessageBundle implements
     }
   }
 
-  public List<BSPMessage> getMessages() {
+  public List<M> getMessages() {
     // here we use an arraylist, because we know the size and outside may need
     // random access
-    List<BSPMessage> mergeList = new ArrayList<BSPMessage>(messages.size());
-    for (LinkedList<BSPMessage> c : messages.values()) {
+    List<M> mergeList = new ArrayList<M>(messages.size());
+    for (LinkedList<M> c : messages.values()) {
       mergeList.addAll(c);
     }
     return mergeList;
@@ -77,11 +77,11 @@ public class BSPMessageBundle implements
     // writes the k/v mapping size
     out.writeInt(messages.size());
     if (messages.size() > 0) {
-      for (Entry<String, LinkedList<BSPMessage>> entry : messages.entrySet()) {
+      for (Entry<String, LinkedList<M>> entry : messages.entrySet()) {
         out.writeUTF(entry.getKey());
-        LinkedList<BSPMessage> messageList = entry.getValue();
+        LinkedList<M> messageList = entry.getValue();
         out.writeInt(messageList.size());
-        for (BSPMessage msg : messageList) {
+        for (M msg : messageList) {
           msg.write(out);
         }
       }
@@ -91,20 +91,20 @@ public class BSPMessageBundle implements
   @SuppressWarnings("unchecked")
   public void readFields(DataInput in) throws IOException {
     if (messages == null) {
-      messages = new HashMap<String, LinkedList<BSPMessage>>();
+      messages = new HashMap<String, LinkedList<M>>();
     }
     int numMessages = in.readInt();
     if (numMessages > 0) {
       for (int entries = 0; entries < numMessages; entries++) {
         String className = in.readUTF();
         int size = in.readInt();
-        LinkedList<BSPMessage> msgList = new LinkedList<BSPMessage>();
+        LinkedList<M> msgList = new LinkedList<M>();
         messages.put(className, msgList);
 
-        Class<? extends BSPMessage> clazz = null;
+        Class<M> clazz = null;
         if ((clazz = classCache.get(className)) == null) {
           try {
-            clazz = (Class<? extends BSPMessage>) Class.forName(className);
+            clazz = (Class<M>) Class.forName(className);
             classCache.put(className, clazz);
           } catch (ClassNotFoundException e) {
             LOG.error("Class was not found.",e);
@@ -112,7 +112,7 @@ public class BSPMessageBundle implements
         }
 
         for (int i = 0; i < size; i++) {
-          BSPMessage msg = ReflectionUtils.newInstance(clazz, null);
+          M msg = ReflectionUtils.newInstance(clazz, null);
           msg.readFields(in);
           msgList.add(msg);
         }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Fri Feb 10 02:06:10 2012
@@ -20,6 +20,7 @@ package org.apache.hama.bsp;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.Constants;
 import org.apache.hama.bsp.Counters.Counter;
 import org.apache.hama.bsp.sync.SyncException;
@@ -28,7 +29,7 @@ import org.apache.hama.util.KeyValuePair
 /**
  * BSP communication interface.
  */
-public interface BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Constants {
+public interface BSPPeer<K1, V1, K2, V2, M extends Writable> extends Constants {
 
   /**
    * Send a data with a tag to another BSPSlave corresponding to hostname.
@@ -39,13 +40,13 @@ public interface BSPPeer<KEYIN, VALUEIN,
    * @param msg
    * @throws IOException
    */
-  public void send(String peerName, BSPMessage msg) throws IOException;
+  public void send(String peerName, M msg) throws IOException;
 
   /**
    * @return A message from the peer's received messages queue (a FIFO).
    * @throws IOException
    */
-  public BSPMessage getCurrentMessage() throws IOException;
+  public M getCurrentMessage() throws IOException;
 
   /**
    * @return The number of messages in the peer's received messages queue.
@@ -103,7 +104,7 @@ public interface BSPPeer<KEYIN, VALUEIN,
    * @param value your value object
    * @throws IOException
    */
-  public void write(KEYOUT key, VALUEOUT value) throws IOException;
+  public void write(K2 key, V2 value) throws IOException;
 
   /**
    * Deserializes the next input key value into the given objects.
@@ -113,7 +114,7 @@ public interface BSPPeer<KEYIN, VALUEIN,
    * @return false if there are no records to read anymore
    * @throws IOException
    */
-  public boolean readNext(KEYIN key, VALUEIN value) throws IOException;
+  public boolean readNext(K1 key, V1 value) throws IOException;
 
   /**
    * Reads the next key value pair and returns it as a pair.
@@ -121,7 +122,7 @@ public interface BSPPeer<KEYIN, VALUEIN,
    * @return null if there are no records left.
    * @throws IOException
    */
-  public KeyValuePair<KEYIN, VALUEIN> readNext() throws IOException;
+  public KeyValuePair<K1, V1> readNext() throws IOException;
 
   /**
    * Closes the input and opens it right away, so that the file pointer is at

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Fri Feb 10 02:06:10 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.bsp.Counters.Counter;
@@ -45,8 +46,8 @@ import org.apache.hama.util.KeyValuePair
 /**
  * This class represents a BSP peer.
  */
-public final class BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> implements
-    BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+public final class BSPPeerImpl<K1, V1, K2, V2, M extends Writable> implements
+    BSPPeer<K1, V1, K2, V2, M> {
 
   private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
@@ -67,15 +68,15 @@ public final class BSPPeerImpl<KEYIN, VA
 
   // SYNC
   private SyncClient syncClient;
-  private MessageManager messenger;
+  private MessageManager<M> messenger;
 
   // IO
   private int partition;
   private String splitClass;
   private BytesWritable split;
-  private OutputCollector<KEYOUT, VALUEOUT> collector;
-  private RecordReader<KEYIN, VALUEIN> in;
-  private RecordWriter<KEYOUT, VALUEOUT> outWriter;
+  private OutputCollector<K2, V2> collector;
+  private RecordReader<K1, V1> in;
+  private RecordWriter<K2, V2> outWriter;
 
   private InetSocketAddress peerAddress;
   private Counters counters;
@@ -140,7 +141,7 @@ public final class BSPPeerImpl<KEYIN, VA
         TaskStatus.State.RUNNING, "running", peerAddress.getHostName(),
         TaskStatus.Phase.STARTING, counters));
 
-    messenger = MessageManagerFactory.getMessageManager(conf);
+    messenger = new MessageManagerFactory<M>().getMessageManager(conf);
     messenger.init(conf, peerAddress);
 
   }
@@ -158,10 +159,10 @@ public final class BSPPeerImpl<KEYIN, VA
           Task.getOutputName(partition));
       outWriter = bspJob.getOutputFormat().getRecordWriter(fs, bspJob,
           outdir.makeQualified(fs).toString());
-      final RecordWriter<KEYOUT, VALUEOUT> finalOut = outWriter;
+      final RecordWriter<K2, V2> finalOut = outWriter;
 
-      collector = new OutputCollector<KEYOUT, VALUEOUT>() {
-        public void collect(KEYOUT key, VALUEOUT value) throws IOException {
+      collector = new OutputCollector<K2, V2>() {
+        public void collect(K2 key, V2 value) throws IOException {
           finalOut.write(key, value);
         }
       };
@@ -196,12 +197,12 @@ public final class BSPPeerImpl<KEYIN, VA
   }
 
   @Override
-  public final BSPMessage getCurrentMessage() throws IOException {
+  public final M getCurrentMessage() throws IOException {
     return messenger.getCurrentMessage();
   }
 
   @Override
-  public final void send(String peerName, BSPMessage msg) throws IOException {
+  public final void send(String peerName, M msg) throws IOException {
     messenger.send(peerName, msg);
   }
 
@@ -214,7 +215,7 @@ public final class BSPPeerImpl<KEYIN, VA
     return ckptPath;
   }
 
-  final void checkpoint(String checkpointedPath, BSPMessageBundle bundle) {
+  final void checkpoint(String checkpointedPath, BSPMessageBundle<M> bundle) {
     FSDataOutputStream out = null;
     try {
       out = this.fs.create(new Path(checkpointedPath));
@@ -239,15 +240,15 @@ public final class BSPPeerImpl<KEYIN, VA
   public final void sync() throws IOException, SyncException,
       InterruptedException {
     enterBarrier();
-    Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>> it = messenger
+    Iterator<Entry<InetSocketAddress, LinkedList<M>>> it = messenger
         .getMessageIterator();
 
     while (it.hasNext()) {
-      Entry<InetSocketAddress, LinkedList<BSPMessage>> entry = it.next();
+      Entry<InetSocketAddress, LinkedList<M>> entry = it.next();
       final InetSocketAddress addr = entry.getKey();
-      final Iterable<BSPMessage> messages = entry.getValue();
+      final Iterable<M> messages = entry.getValue();
 
-      final BSPMessageBundle bundle = combineMessages(messages);
+      final BSPMessageBundle<M> bundle = combineMessages(messages);
 
       if (conf.getBoolean("bsp.checkpoint.enabled", false)) {
         checkpoint(checkpointedPath(), bundle);
@@ -269,16 +270,16 @@ public final class BSPPeerImpl<KEYIN, VA
     messenger.clearOutgoingQueues();
   }
 
-  private final BSPMessageBundle combineMessages(Iterable<BSPMessage> messages) {
+  private final BSPMessageBundle<M> combineMessages(Iterable<M> messages) {
     if (!conf.getClass("bsp.combiner.class", Combiner.class).equals(
         Combiner.class)) {
-      Combiner combiner = (Combiner) ReflectionUtils.newInstance(
+      Combiner<M> combiner = (Combiner<M>) ReflectionUtils.newInstance(
           conf.getClass("bsp.combiner.class", Combiner.class), conf);
 
       return combiner.combine(messages);
     } else {
-      BSPMessageBundle bundle = new BSPMessageBundle();
-      for (BSPMessage message : messages) {
+      BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+      for (M message : messages) {
         bundle.addMessage(message);
       }
       return bundle;
@@ -383,21 +384,21 @@ public final class BSPPeerImpl<KEYIN, VA
    */
 
   @Override
-  public final void write(KEYOUT key, VALUEOUT value) throws IOException {
+  public final void write(K2 key, V2 value) throws IOException {
     collector.collect(key, value);
   }
 
   @Override
-  public final boolean readNext(KEYIN key, VALUEIN value) throws IOException {
+  public final boolean readNext(K1 key, V1 value) throws IOException {
     return in.next(key, value);
   }
 
   @Override
-  public final KeyValuePair<KEYIN, VALUEIN> readNext() throws IOException {
-    KEYIN k = in.createKey();
-    VALUEIN v = in.createValue();
+  public final KeyValuePair<K1, V1> readNext() throws IOException {
+    K1 k = in.createKey();
+    V1 v = in.createValue();
     if (in.next(k, v)) {
-      return new KeyValuePair<KEYIN, VALUEIN>(k, v);
+      return new KeyValuePair<K1, V1>(k, v);
     } else {
       return null;
     }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Fri Feb 10 02:06:10 2012
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.ipc.BSPPeerProtocol;
@@ -60,7 +61,7 @@ public final class BSPTask extends Task 
   }
 
   @Override
-  public final void run(BSPJob job, BSPPeerImpl<?, ?, ?, ?> bspPeer,
+  public final void run(BSPJob job, BSPPeerImpl<?, ?, ?, ?, ?> bspPeer,
       BSPPeerProtocol umbilical) throws IOException, SyncException,
       ClassNotFoundException, InterruptedException {
     runBSP(job, bspPeer, split, umbilical);
@@ -69,13 +70,13 @@ public final class BSPTask extends Task 
   }
 
   @SuppressWarnings("unchecked")
-  private final <KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runBSP(
-      final BSPJob job, BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> bspPeer,
+  private final <KEYIN, VALUEIN, KEYOUT, VALUEOUT, M extends Writable> void runBSP(
+      final BSPJob job, BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bspPeer,
       final BytesWritable rawSplit, final BSPPeerProtocol umbilical)
       throws IOException, SyncException, ClassNotFoundException,
       InterruptedException {
 
-    BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT> bsp = (BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT>) ReflectionUtils
+    BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bsp = (BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M>) ReflectionUtils
         .newInstance(job.getConf().getClass("bsp.work.class", BSP.class),
             job.getConf());
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Combiner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Combiner.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Combiner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Combiner.java Fri Feb 10 02:06:10 2012
@@ -17,7 +17,9 @@
  */
 package org.apache.hama.bsp;
 
-public abstract class Combiner {
+import org.apache.hadoop.io.Writable;
+
+public abstract class Combiner<M extends Writable> {
 
   /**
    * Combines messages
@@ -25,6 +27,6 @@ public abstract class Combiner {
    * @param messages
    * @return the combined message
    */
-  public abstract BSPMessageBundle combine(Iterable<BSPMessage> messages);
+  public abstract BSPMessageBundle<M> combine(Iterable<M> messages);
   
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Fri Feb 10 02:06:10 2012
@@ -966,7 +966,7 @@ public class GroomServer implements Runn
 
         // instantiate and init our peer
         @SuppressWarnings("rawtypes")
-        final BSPPeerImpl<?, ?, ?, ?> bspPeer = new BSPPeerImpl(job,
+        final BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job,
             defaultConf, taskid, umbilical, task.partition, task.splitClass,
             task.split, task.getCounters());
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Fri Feb 10 02:06:10 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
@@ -301,13 +302,13 @@ public class LocalBSPRunner implements J
 
   }
 
-  public static class LocalMessageManager implements MessageManager {
+  public static class LocalMessageManager<M extends Writable> implements MessageManager<M> {
 
     private static final ConcurrentHashMap<InetSocketAddress, LocalMessageManager> managerMap = new ConcurrentHashMap<InetSocketAddress, LocalBSPRunner.LocalMessageManager>();
 
-    private final HashMap<InetSocketAddress, LinkedList<BSPMessage>> localOutgoingMessages = new HashMap<InetSocketAddress, LinkedList<BSPMessage>>();
+    private final HashMap<InetSocketAddress, LinkedList<M>> localOutgoingMessages = new HashMap<InetSocketAddress, LinkedList<M>>();
     private static final ConcurrentHashMap<String, InetSocketAddress> socketCache = new ConcurrentHashMap<String, InetSocketAddress>();
-    private final LinkedBlockingDeque<BSPMessage> localIncomingMessages = new LinkedBlockingDeque<BSPMessage>();
+    private final LinkedBlockingDeque<M> localIncomingMessages = new LinkedBlockingDeque<M>();
 
     @Override
     public void init(Configuration conf, InetSocketAddress peerAddress) {
@@ -320,7 +321,7 @@ public class LocalBSPRunner implements J
     }
 
     @Override
-    public BSPMessage getCurrentMessage() throws IOException {
+    public M getCurrentMessage() throws IOException {
       if (localIncomingMessages.isEmpty()) {
         return null;
       } else {
@@ -329,16 +330,16 @@ public class LocalBSPRunner implements J
     }
 
     @Override
-    public void send(String peerName, BSPMessage msg) throws IOException {
+    public void send(String peerName, M msg) throws IOException {
       InetSocketAddress inetSocketAddress = socketCache.get(peerName);
       if (inetSocketAddress == null) {
         inetSocketAddress = BSPNetUtils.getAddress(peerName);
         socketCache.put(peerName, inetSocketAddress);
       }
-      LinkedList<BSPMessage> msgs = localOutgoingMessages
+      LinkedList<M> msgs = localOutgoingMessages
           .get(inetSocketAddress);
       if (msgs == null) {
-        msgs = new LinkedList<BSPMessage>();
+        msgs = new LinkedList<M>();
       }
       msgs.add(msg);
 
@@ -346,19 +347,19 @@ public class LocalBSPRunner implements J
     }
 
     @Override
-    public Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>> getMessageIterator() {
-      return localOutgoingMessages.entrySet().iterator();
-    }
-
-    @Override
-    public void transfer(InetSocketAddress addr, BSPMessageBundle bundle)
+    public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
         throws IOException {
-      for (BSPMessage value : bundle.getMessages()) {
+      for (M value : bundle.getMessages()) {
         managerMap.get(addr).localIncomingMessages.add(value);
       }
     }
 
     @Override
+    public Iterator<Entry<InetSocketAddress, LinkedList<M>>> getMessageIterator() {
+      return localOutgoingMessages.entrySet().iterator();
+    }
+    
+    @Override
     public void clearOutgoingQueues() {
       localOutgoingMessages.clear();
     }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java Fri Feb 10 02:06:10 2012
@@ -142,7 +142,7 @@ public abstract class Task implements Wr
    * @param bspPeer for communications
    * @param umbilical for communications with GroomServer
    */
-  public abstract void run(BSPJob job, BSPPeerImpl<?,?,?,?> bspPeer, BSPPeerProtocol umbilical)
+  public abstract void run(BSPJob job, BSPPeerImpl<?,?,?,?,?> bspPeer, BSPPeerProtocol umbilical)
       throws IOException, SyncException, ClassNotFoundException, InterruptedException;
 
   public abstract BSPTaskRunner createRunner(GroomServer groom);

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java Fri Feb 10 02:06:10 2012
@@ -21,8 +21,9 @@ import java.nio.ByteBuffer;
 
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.hadoop.io.Writable;
 
-public final class AvroBSPMessageBundle extends SpecificRecordBase implements
+public final class AvroBSPMessageBundle<M extends Writable> extends SpecificRecordBase implements
     SpecificRecord {
   public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema
       .parse("{\"type\":\"record\",\"name\":\"AvroBSPMessage\",\"namespace\":\"de.jungblut.avro\",\"fields\":[{\"name\":\"data\",\"type\":\"bytes\"}]}");

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java Fri Feb 10 02:06:10 2012
@@ -39,25 +39,24 @@ import org.apache.avro.ipc.specific.Spec
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hama.bsp.BSPMessage;
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
-import org.apache.hama.bsp.message.MessageManager;
 import org.apache.hama.util.BSPNetUtils;
 
-public class AvroMessageManagerImpl implements MessageManager, Sender {
+public class AvroMessageManagerImpl<M extends Writable> implements MessageManager<M>, Sender<M> {
 
   private static final Log LOG = LogFactory
       .getLog(AvroMessageManagerImpl.class);
 
   private NettyServer server = null;
 
-  private final HashMap<InetSocketAddress, Sender> peers = new HashMap<InetSocketAddress, Sender>();
+  private final HashMap<InetSocketAddress, Sender<M>> peers = new HashMap<InetSocketAddress, Sender<M>>();
   private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
 
-  private final HashMap<InetSocketAddress, LinkedList<BSPMessage>> outgoingQueues = new HashMap<InetSocketAddress, LinkedList<BSPMessage>>();
-  private Deque<BSPMessage> localQueue = new LinkedList<BSPMessage>();
+  private final HashMap<InetSocketAddress, LinkedList<M>> outgoingQueues = new HashMap<InetSocketAddress, LinkedList<M>>();
+  private Deque<M> localQueue = new LinkedList<M>();
   // this must be a synchronized implementation: this is accessed per RPC
-  private final ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+  private final ConcurrentLinkedQueue<M> localQueueForNextIteration = new ConcurrentLinkedQueue<M>();
 
   @Override
   public void init(Configuration conf, InetSocketAddress addr) {
@@ -76,8 +75,8 @@ public class AvroMessageManagerImpl impl
     localQueueForNextIteration.clear();
   }
 
-  public void put(BSPMessageBundle messages) {
-    for (BSPMessage message : messages.getMessages()) {
+  public void put(BSPMessageBundle<M> messages) {
+    for (M message : messages.getMessages()) {
       this.localQueueForNextIteration.add(message);
     }
   }
@@ -88,15 +87,15 @@ public class AvroMessageManagerImpl impl
   }
 
   @Override
-  public void transfer(InetSocketAddress addr, BSPMessageBundle bundle)
+  public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
       throws IOException {
-    AvroBSPMessageBundle msg = new AvroBSPMessageBundle();
+    AvroBSPMessageBundle<M> msg = new AvroBSPMessageBundle<M>();
     msg.setData(serializeMessage(bundle));
-    Sender sender = peers.get(addr);
+    Sender<M> sender = peers.get(addr);
 
     if (sender == null) {
       NettyTransceiver client = new NettyTransceiver(addr);
-      sender = (Sender) SpecificRequestor.getClient(Sender.class, client);
+      sender = (Sender<M>) SpecificRequestor.getClient(Sender.class, client);
       peers.put(addr, sender);
     }
     
@@ -104,10 +103,10 @@ public class AvroMessageManagerImpl impl
   }
 
   @Override
-  public Void transfer(AvroBSPMessageBundle messagebundle)
+  public Void transfer(AvroBSPMessageBundle<M> messagebundle)
       throws AvroRemoteException {
     try {
-      BSPMessageBundle deserializeMessage = deserializeMessage(messagebundle
+      BSPMessageBundle<M> deserializeMessage = deserializeMessage(messagebundle
           .getData());
       this.put(deserializeMessage);
     } catch (IOException e) {
@@ -117,12 +116,12 @@ public class AvroMessageManagerImpl impl
   }
 
   @Override
-  public BSPMessage getCurrentMessage() throws IOException {
+  public M getCurrentMessage() throws IOException {
     return localQueue.poll();
   }
 
   @Override
-  public void send(String peerName, BSPMessage msg) throws IOException {
+  public void send(String peerName, M msg) throws IOException {
     LOG.debug("Send message (" + msg.toString() + ") to " + peerName);
     InetSocketAddress targetPeerAddress = null;
     // Get socket for target peer.
@@ -132,17 +131,17 @@ public class AvroMessageManagerImpl impl
       targetPeerAddress = BSPNetUtils.getAddress(peerName);
       peerSocketCache.put(peerName, targetPeerAddress);
     }
-    LinkedList<BSPMessage> queue = outgoingQueues.get(targetPeerAddress);
+    LinkedList<M> queue = outgoingQueues.get(targetPeerAddress);
     if (queue == null) {
-      queue = new LinkedList<BSPMessage>();
+      queue = new LinkedList<M>();
     }
     queue.add(msg);
     outgoingQueues.put(targetPeerAddress, queue);
   }
 
-  private static final BSPMessageBundle deserializeMessage(ByteBuffer buffer)
+  private final BSPMessageBundle<M> deserializeMessage(ByteBuffer buffer)
       throws IOException {
-    BSPMessageBundle msg = new BSPMessageBundle();
+    BSPMessageBundle<M> msg = new BSPMessageBundle<M>();
 
     ByteArrayInputStream inArray = new ByteArrayInputStream(buffer.array());
     DataInputStream in = new DataInputStream(inArray);
@@ -151,7 +150,7 @@ public class AvroMessageManagerImpl impl
     return msg;
   }
 
-  private static final ByteBuffer serializeMessage(BSPMessageBundle msg)
+  private final ByteBuffer serializeMessage(BSPMessageBundle<M> msg)
       throws IOException {
     ByteArrayOutputStream outArray = new ByteArrayOutputStream();
     DataOutputStream out = new DataOutputStream(outArray);
@@ -162,7 +161,7 @@ public class AvroMessageManagerImpl impl
   }
 
   @Override
-  public Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>> getMessageIterator() {
+  public Iterator<Entry<InetSocketAddress, LinkedList<M>>> getMessageIterator() {
     return this.outgoingQueues.entrySet().iterator();
   }
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java Fri Feb 10 02:06:10 2012
@@ -17,7 +17,7 @@
  */
 package org.apache.hama.bsp.message;
 
-import org.apache.hama.bsp.BSPMessage;
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
 
@@ -25,7 +25,7 @@ import org.apache.hama.ipc.HamaRPCProtoc
  * Hadoop RPC Interface for messaging.
  * 
  */
-public interface HadoopMessageManager extends HamaRPCProtocolVersion {
+public interface HadoopMessageManager<M extends Writable> extends HamaRPCProtocolVersion {
 
   /**
    * This method puts a message for the next iteration. Accessed concurrently
@@ -33,7 +33,7 @@ public interface HadoopMessageManager ex
    * 
    * @param msg
    */
-  public void put(BSPMessage msg);
+  public void put(M msg);
 
   /**
    * This method puts a messagebundle for the next iteration. Accessed
@@ -41,6 +41,6 @@ public interface HadoopMessageManager ex
    * 
    * @param messages
    */
-  public void put(BSPMessageBundle messages);
+  public void put(BSPMessageBundle<M> messages);
 
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java Fri Feb 10 02:06:10 2012
@@ -29,9 +29,9 @@ import java.util.concurrent.ConcurrentLi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
-import org.apache.hama.bsp.BSPMessage;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.util.BSPNetUtils;
 
@@ -39,8 +39,8 @@ import org.apache.hama.util.BSPNetUtils;
  * Implementation of the {@link HadoopMessageManager}.
  * 
  */
-public final class HadoopMessageManagerImpl implements MessageManager,
-    HadoopMessageManager {
+public final class HadoopMessageManagerImpl<M extends Writable> implements MessageManager<M>,
+    HadoopMessageManager<M> {
 
   private static final Log LOG = LogFactory
       .getLog(HadoopMessageManagerImpl.class);
@@ -51,10 +51,10 @@ public final class HadoopMessageManagerI
   private final HashMap<InetSocketAddress, HadoopMessageManager> peers = new HashMap<InetSocketAddress, HadoopMessageManager>();
   private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
 
-  private final HashMap<InetSocketAddress, LinkedList<BSPMessage>> outgoingQueues = new HashMap<InetSocketAddress, LinkedList<BSPMessage>>();
-  private Deque<BSPMessage> localQueue = new LinkedList<BSPMessage>();
+  private final HashMap<InetSocketAddress, LinkedList<M>> outgoingQueues = new HashMap<InetSocketAddress, LinkedList<M>>();
+  private Deque<M> localQueue = new LinkedList<M>();
   // this must be a synchronized implementation: this is accessed per RPC
-  private final ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+  private final ConcurrentLinkedQueue<M> localQueueForNextIteration = new ConcurrentLinkedQueue<M>();
 
   @Override
   public final void init(Configuration conf, InetSocketAddress peerAddress) {
@@ -84,12 +84,12 @@ public final class HadoopMessageManagerI
   }
 
   @Override
-  public final BSPMessage getCurrentMessage() throws IOException {
+  public final M getCurrentMessage() throws IOException {
     return localQueue.poll();
   }
 
   @Override
-  public final void send(String peerName, BSPMessage msg) throws IOException {
+  public final void send(String peerName, M msg) throws IOException {
     LOG.debug("Send message (" + msg.toString() + ") to " + peerName);
     InetSocketAddress targetPeerAddress = null;
     // Get socket for target peer.
@@ -99,16 +99,16 @@ public final class HadoopMessageManagerI
       targetPeerAddress = BSPNetUtils.getAddress(peerName);
       peerSocketCache.put(peerName, targetPeerAddress);
     }
-    LinkedList<BSPMessage> queue = outgoingQueues.get(targetPeerAddress);
+    LinkedList<M> queue = outgoingQueues.get(targetPeerAddress);
     if (queue == null) {
-      queue = new LinkedList<BSPMessage>();
+      queue = new LinkedList<M>();
     }
     queue.add(msg);
     outgoingQueues.put(targetPeerAddress, queue);
   }
 
   @Override
-  public final Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>> getMessageIterator() {
+  public final Iterator<Entry<InetSocketAddress, LinkedList<M>>> getMessageIterator() {
     return this.outgoingQueues.entrySet().iterator();
   }
 
@@ -124,7 +124,7 @@ public final class HadoopMessageManagerI
   }
 
   @Override
-  public final void transfer(InetSocketAddress addr, BSPMessageBundle bundle)
+  public final void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
       throws IOException {
 
     HadoopMessageManager bspPeerConnection = this.getBSPPeerConnection(addr);
@@ -145,13 +145,13 @@ public final class HadoopMessageManagerI
   }
 
   @Override
-  public final void put(BSPMessage msg) {
+  public final void put(M msg) {
     this.localQueueForNextIteration.add(msg);
   }
 
   @Override
-  public final void put(BSPMessageBundle messages) {
-    for (BSPMessage message : messages.getMessages()) {
+  public final void put(BSPMessageBundle<M> messages) {
+    for (M message : messages.getMessages()) {
       this.localQueueForNextIteration.add(message);
     }
   }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Fri Feb 10 02:06:10 2012
@@ -24,7 +24,7 @@ import java.util.LinkedList;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hama.bsp.BSPMessage;
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
 
 /**
@@ -32,7 +32,7 @@ import org.apache.hama.bsp.BSPMessageBun
  * server if needed and deal with incoming data.
  * 
  */
-public interface MessageManager {
+public interface MessageManager<M extends Writable> {
 
   /**
    * Init can be used to start servers and initialize internal state.
@@ -54,7 +54,7 @@ public interface MessageManager {
    * @return
    * @throws IOException
    */
-  public BSPMessage getCurrentMessage() throws IOException;
+  public M getCurrentMessage() throws IOException;
 
   /**
    * Send a message to the peer.
@@ -63,14 +63,14 @@ public interface MessageManager {
    * @param msg
    * @throws IOException
    */
-  public void send(String peerName, BSPMessage msg) throws IOException;
+  public void send(String peerName, M msg) throws IOException;
 
   /**
    * Returns an iterator of messages grouped by peer.
    * 
    * @return
    */
-  public Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>> getMessageIterator();
+  public Iterator<Entry<InetSocketAddress, LinkedList<M>>> getMessageIterator();
 
   /**
    * This is the real transferring to a host with a bundle.
@@ -79,7 +79,7 @@ public interface MessageManager {
    * @param bundle
    * @throws IOException
    */
-  public void transfer(InetSocketAddress addr, BSPMessageBundle bundle)
+  public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
       throws IOException;
 
   /**

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java Fri Feb 10 02:06:10 2012
@@ -18,10 +18,10 @@
 package org.apache.hama.bsp.message;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 
-public class MessageManagerFactory {
-
+public class MessageManagerFactory<M extends Writable> {
   public static final String MESSAGE_MANAGER_CLASS = "hama.messanger.class";
 
   /**
@@ -30,9 +30,9 @@ public class MessageManagerFactory {
    * @param conf
    * @return
    */
-  public static MessageManager getMessageManager(Configuration conf)
+  public MessageManager<M> getMessageManager(Configuration conf)
       throws ClassNotFoundException {
-    return (MessageManager) ReflectionUtils.newInstance(conf
+    return (MessageManager<M>) ReflectionUtils.newInstance(conf
         .getClassByName(conf.get(MESSAGE_MANAGER_CLASS,
             org.apache.hama.bsp.message.AvroMessageManagerImpl.class
                 .getCanonicalName())), conf);

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java Fri Feb 10 02:06:10 2012
@@ -17,18 +17,21 @@
  */
 package org.apache.hama.bsp.message;
 
-public interface Sender {
+import org.apache.hadoop.io.Writable;
+
+public interface Sender<M extends Writable> {
+  
   public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol
       .parse("{\"protocol\":\"Sender\",\"namespace\":\"de.jungblut.avro\",\"types\":[{\"type\":\"record\",\"name\":\"AvroBSPMessageBundle\",\"fields\":[{\"name\":\"data\",\"type\":\"bytes\"}]}],\"messages\":{\"transfer\":{\"request\":[{\"name\":\"messagebundle\",\"type\":\"AvroBSPMessageBundle\"}],\"response\":\"null\"}}}");
 
-  java.lang.Void transfer(AvroBSPMessageBundle messagebundle)
+  java.lang.Void transfer(AvroBSPMessageBundle<M> messagebundle)
       throws org.apache.avro.AvroRemoteException;
 
   @SuppressWarnings("all")
   public interface Callback extends Sender {
     public static final org.apache.avro.Protocol PROTOCOL = Sender.PROTOCOL;
 
-    void transfer(AvroBSPMessageBundle messagebundle,
+    public void transfer(AvroBSPMessageBundle messagebundle,
         org.apache.avro.ipc.Callback<java.lang.Void> callback)
         throws java.io.IOException;
   }

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java Fri Feb 10 02:06:10 2012
@@ -25,26 +25,29 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
+
 import junit.framework.TestCase;
 
+import org.apache.hadoop.io.BytesWritable;;
+
 public class TestBSPMessageBundle extends TestCase {
 
   public void testEmpty() throws IOException {
-    BSPMessageBundle bundle = new BSPMessageBundle();
+    BSPMessageBundle<BytesWritable> bundle = new BSPMessageBundle<BytesWritable>();
     // Serialize it.
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     bundle.write(new DataOutputStream(baos));
     baos.close();
     // Deserialize it.
-    BSPMessageBundle readBundle = new BSPMessageBundle();
+    BSPMessageBundle<BytesWritable> readBundle = new BSPMessageBundle<BytesWritable>();
     readBundle.readFields(new DataInputStream(new ByteArrayInputStream(baos
         .toByteArray())));
     assertEquals(0, readBundle.getMessages().size());
   }
 
   public void testSerializationDeserialization() throws IOException {
-    BSPMessageBundle bundle = new BSPMessageBundle();
-    ByteMessage[] testMessages = new ByteMessage[16];
+    BSPMessageBundle<BytesWritable> bundle = new BSPMessageBundle<BytesWritable>();
+    BytesWritable[] testMessages = new BytesWritable[16];
     for (int i = 0; i < testMessages.length; ++i) {
       // Create a one byte tag containing the number of the message.
       byte[] tag = new byte[1];
@@ -55,7 +58,9 @@ public class TestBSPMessageBundle extend
       baos.write(i);
       baos.close();
       byte[] data = baos.toByteArray();
-      testMessages[i] = new ByteMessage(tag, data);
+      BytesWritable msg = new BytesWritable();
+      msg.set(data, 0, data.length);
+      testMessages[i] = msg;
       bundle.addMessage(testMessages[i]);
     }
     // Serialize it.
@@ -63,17 +68,16 @@ public class TestBSPMessageBundle extend
     bundle.write(new DataOutputStream(baos));
     baos.close();
     // Deserialize it.
-    BSPMessageBundle readBundle = new BSPMessageBundle();
+    BSPMessageBundle<BytesWritable> readBundle = new BSPMessageBundle<BytesWritable>();
     readBundle.readFields(new DataInputStream(new ByteArrayInputStream(baos
         .toByteArray())));
     // Check contents.
     int messageNumber = 0;
-    for (BSPMessage message : readBundle.getMessages()) {
-      ByteMessage byteMessage = (ByteMessage) message;
-      assertTrue(Arrays.equals(testMessages[messageNumber].getTag(),
-          byteMessage.getTag()));
-      assertTrue(Arrays.equals(testMessages[messageNumber].getData(),
-          byteMessage.getData()));
+    for (BytesWritable message : readBundle.getMessages()) {
+      BytesWritable byteMessage = message;
+
+      assertTrue(Arrays.equals(testMessages[messageNumber].getBytes(),
+          byteMessage.getBytes()));
       ++messageNumber;
     }
     assertEquals(testMessages.length, messageNumber);

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java Fri Feb 10 02:06:10 2012
@@ -55,10 +55,10 @@ public class TestPartitioning extends Te
   }
 
   public static class PartionedBSP extends
-      BSP<LongWritable, Text, NullWritable, NullWritable> {
+      BSP<LongWritable, Text, NullWritable, NullWritable, NullWritable> {
 
     @Override
-    public void bsp(BSPPeer<LongWritable, Text, NullWritable, NullWritable> peer)
+    public void bsp(BSPPeer<LongWritable, Text, NullWritable, NullWritable, NullWritable> peer)
         throws IOException, SyncException, InterruptedException {
       long numOfPairs = 0;
       KeyValuePair<LongWritable, Text> readNext = null;

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java Fri Feb 10 02:06:10 2012
@@ -25,9 +25,8 @@ import java.util.Map.Entry;
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hama.bsp.BSPMessage;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hama.bsp.BSPMessageBundle;
-import org.apache.hama.bsp.IntegerMessage;
 import org.apache.hama.util.BSPNetUtils;
 
 public class TestHadoopMessageManager extends TestCase {
@@ -35,7 +34,7 @@ public class TestHadoopMessageManager ex
   public void testMessaging() throws Exception {
     Configuration conf = new Configuration();
     conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS, "org.apache.hama.bsp.message.HadoopMessageManagerImpl");
-    MessageManager messageManager = MessageManagerFactory
+    MessageManager<IntWritable> messageManager = new MessageManagerFactory<IntWritable>()
         .getMessageManager(conf);
 
     assertTrue(messageManager instanceof HadoopMessageManagerImpl);
@@ -45,20 +44,20 @@ public class TestHadoopMessageManager ex
     messageManager.init(conf, peer);
     String peerName = peer.getHostName() + ":" + peer.getPort();
 
-    messageManager.send(peerName, new IntegerMessage("test", 1337));
+    messageManager.send(peerName, new IntWritable(1337));
 
-    Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>> messageIterator = messageManager
+    Iterator<Entry<InetSocketAddress, LinkedList<IntWritable>>> messageIterator = messageManager
         .getMessageIterator();
 
-    Entry<InetSocketAddress, LinkedList<BSPMessage>> entry = messageIterator
+    Entry<InetSocketAddress, LinkedList<IntWritable>> entry = messageIterator
         .next();
 
     assertEquals(entry.getKey(), peer);
 
     assertTrue(entry.getValue().size() == 1);
 
-    BSPMessageBundle bundle = new BSPMessageBundle();
-    for (BSPMessage msg : entry.getValue()) {
+    BSPMessageBundle<IntWritable> bundle = new BSPMessageBundle<IntWritable>();
+    for (IntWritable msg : entry.getValue()) {
       bundle.addMessage(msg);
     }
 
@@ -67,14 +66,8 @@ public class TestHadoopMessageManager ex
     messageManager.clearOutgoingQueues();
 
     assertTrue(messageManager.getNumCurrentMessages() == 1);
-    BSPMessage currentMessage = messageManager.getCurrentMessage();
-
-    assertTrue(currentMessage instanceof IntegerMessage);
-
-    IntegerMessage rec = (IntegerMessage) currentMessage;
-
-    assertEquals(rec.getTag(), "test");
-    assertEquals(rec.getData(), Integer.valueOf(1337));
+    IntWritable currentMessage = messageManager.getCurrentMessage();
 
+    assertEquals(currentMessage.get(), 1337);
   }
 }

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java Fri Feb 10 02:06:10 2012
@@ -18,32 +18,41 @@
 package org.apache.hama.examples;
 
 import java.io.IOException;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.IntegerMessage;
 import org.apache.hama.bsp.sync.SyncException;
 
 public class ClassSerializePrinting extends
-    BSP<NullWritable, NullWritable, IntWritable, Text> {
+    BSP<NullWritable, NullWritable, IntWritable, Text, MapWritable> {
 
   public static final int NUM_SUPERSTEPS = 15;
 
   @Override
-  public void bsp(BSPPeer<NullWritable, NullWritable, IntWritable, Text> bspPeer)
+  public void bsp(
+      BSPPeer<NullWritable, NullWritable, IntWritable, Text, MapWritable> bspPeer)
       throws IOException, SyncException, InterruptedException {
 
     for (int i = 0; i < NUM_SUPERSTEPS; i++) {
       for (String otherPeer : bspPeer.getAllPeerNames()) {
-        bspPeer.send(otherPeer, new IntegerMessage(bspPeer.getPeerName(), i));
+        MapWritable map = new MapWritable();
+        map.put(new Text(bspPeer.getPeerName()), new IntWritable(i));
+
+        bspPeer.send(otherPeer, map);
       }
       bspPeer.sync();
-      IntegerMessage msg = null;
-      while ((msg = (IntegerMessage) bspPeer.getCurrentMessage()) != null) {
-        bspPeer.write(new IntWritable(msg.getData()), new Text(msg.getTag()));
+
+      MapWritable msg = null;
+      while ((msg = bspPeer.getCurrentMessage()) != null) {
+        for (Entry<Writable, Writable> e : msg.entrySet()) {
+          bspPeer.write((IntWritable) e.getValue(), (Text) e.getKey());
+        }
       }
     }
   }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java Fri Feb 10 02:06:10 2012
@@ -33,55 +33,54 @@ import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.bsp.BSPMessage;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.FileOutputFormat;
-import org.apache.hama.bsp.IntegerMessage;
 import org.apache.hama.bsp.NullInputFormat;
 import org.apache.hama.bsp.TextOutputFormat;
 import org.apache.hama.bsp.sync.SyncException;
 
 public class CombineExample {
-  private static Path TMP_OUTPUT = new Path("/tmp/combine-" + System.currentTimeMillis());
-  
+  private static Path TMP_OUTPUT = new Path("/tmp/combine-"
+      + System.currentTimeMillis());
+
   public static class MyBSP extends
-      BSP<NullWritable, NullWritable, Text, IntWritable> {
+      BSP<NullWritable, NullWritable, Text, IntWritable, IntWritable> {
     public static final Log LOG = LogFactory.getLog(MyBSP.class);
 
     @Override
-    public void bsp(BSPPeer<NullWritable, NullWritable, Text, IntWritable> peer)
+    public void bsp(
+        BSPPeer<NullWritable, NullWritable, Text, IntWritable, IntWritable> peer)
         throws IOException, SyncException, InterruptedException {
       for (String peerName : peer.getAllPeerNames()) {
-        peer.send(peerName, new IntegerMessage(peer.getPeerName(), 1));
-        peer.send(peerName, new IntegerMessage(peer.getPeerName(), 2));
-        peer.send(peerName, new IntegerMessage(peer.getPeerName(), 3));
+        peer.send(peerName, new IntWritable(1));
+        peer.send(peerName, new IntWritable(2));
+        peer.send(peerName, new IntWritable(3));
       }
       peer.sync();
 
-      IntegerMessage received;
-      while ((received = (IntegerMessage) peer.getCurrentMessage()) != null) {
-        peer.write(new Text(received.getTag()),
-            new IntWritable(received.getData()));
+      IntWritable received;
+      while ((received = peer.getCurrentMessage()) != null) {
+        peer.write(new Text("Sum = "), received);
       }
     }
 
   }
 
-  public static class SumCombiner extends Combiner {
+  public static class SumCombiner extends Combiner<IntWritable> {
 
     @Override
-    public BSPMessageBundle combine(Iterable<BSPMessage> messages) {
-      BSPMessageBundle bundle = new BSPMessageBundle();
+    public BSPMessageBundle<IntWritable> combine(Iterable<IntWritable> messages) {
+      BSPMessageBundle<IntWritable> bundle = new BSPMessageBundle<IntWritable>();
       int sum = 0;
 
-      Iterator<BSPMessage> it = messages.iterator();
+      Iterator<IntWritable> it = messages.iterator();
       while (it.hasNext()) {
-        sum += ((IntegerMessage) it.next()).getData();
+        sum += it.next().get();
       }
 
-      bundle.addMessage(new IntegerMessage("Sum = ", sum));
+      bundle.addMessage(new IntWritable(sum));
       return bundle;
     }
   }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java Fri Feb 10 02:06:10 2012
@@ -24,7 +24,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.IntegerMessage;
 import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
 import org.apache.hama.graph.Edge;
@@ -33,33 +32,23 @@ import org.apache.hama.graph.Vertex;
 import org.apache.hama.graph.VertexArrayWritable;
 import org.apache.hama.graph.VertexWritable;
 
-public class InlinkCount extends Vertex<IntegerMessage> {
-  int inlinkCount;
-
-  public InlinkCount() {
-    super(IntegerMessage.class);
-  }
+public class InlinkCount extends Vertex<IntWritable> {
 
   @Override
-  public void compute(Iterator<IntegerMessage> messages) throws IOException {
+  public void compute(Iterator<IntWritable> messages) throws IOException {
 
     if (getSuperstepCount() == 0L) {
       for (Edge e : getOutEdges()) {
-        sendMessage(e.getTarget(), new IntegerMessage(e.getName(), 1));
+        sendMessage(e, new IntWritable(1));
       }
     } else {
       while (messages.hasNext()) {
-        IntegerMessage msg = messages.next();
-        inlinkCount += msg.getData();
+        IntWritable msg = messages.next();
+        this.setValue(new IntWritable(this.getValue().get() + msg.get()));
       }
     }
   }
 
-  @Override
-  public Object getValue() {
-    return inlinkCount;
-  }
-
   public static void main(String[] args) throws IOException,
       InterruptedException, ClassNotFoundException {
     // Graph job configuration

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Fri Feb 10 02:06:10 2012
@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.DoubleMessage;
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
@@ -35,34 +34,30 @@ import org.apache.hama.graph.Vertex;
 
 public class PageRank {
 
-  public static class PageRankVertex extends Vertex<DoubleMessage> {
-    public PageRankVertex() {
-      super(DoubleMessage.class);
-    }
+  public static class PageRankVertex extends Vertex<DoubleWritable> {
 
     @Override
-    public void compute(Iterator<DoubleMessage> messages)
-        throws IOException {
-      if(this.getSuperstepCount() == 0) {
-         this.setValue(1.0 / (double) this.getNumVertices());
+    public void compute(Iterator<DoubleWritable> messages) throws IOException {
+      if (this.getSuperstepCount() == 0) {
+        this.setValue(new DoubleWritable(1.0 / (double) this.getNumVertices()));
       }
-      
+
       if (this.getSuperstepCount() >= 1) {
         double sum = 0;
-        while(messages.hasNext()) {
-          DoubleMessage msg = messages.next();
-          sum += msg.getData();
+        while (messages.hasNext()) {
+          DoubleWritable msg = messages.next();
+          sum += msg.get();
         }
 
         double ALPHA = (1 - 0.85) / (double) this.getNumVertices();
-        this.setValue(ALPHA + (0.85 * sum));
+        this.setValue(new DoubleWritable(ALPHA + (0.85 * sum)));
       }
 
       if (this.getSuperstepCount() < this.getMaxIteration()) {
         int numEdges = this.getOutEdges().size();
         for (Edge e : this.getOutEdges()) {
-          this.sendMessage(e.getTarget(), new DoubleMessage(e.getName(),
-              (Double) this.getValue() / numEdges));
+          this.sendMessage(e, new DoubleWritable(this.getValue().get()
+              / numEdges));
         }
       }
     }
@@ -72,12 +67,12 @@ public class PageRank {
     System.out.println("Usage: <input> <output> [tasks]");
     System.exit(-1);
   }
-  
+
   public static void main(String[] args) throws IOException,
       InterruptedException, ClassNotFoundException {
     if (args.length < 2)
       printUsage();
-    
+
     HamaConfiguration conf = new HamaConfiguration(new Configuration());
     GraphJob pageJob = new GraphJob(conf);
     pageJob.setJobName("Pagerank");

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java Fri Feb 10 02:06:10 2012
@@ -35,7 +35,6 @@ import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.BSPJobClient;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.ClusterStatus;
-import org.apache.hama.bsp.DoubleMessage;
 import org.apache.hama.bsp.FileOutputFormat;
 import org.apache.hama.bsp.NullInputFormat;
 import org.apache.hama.bsp.TextOutputFormat;
@@ -45,14 +44,14 @@ public class PiEstimator {
   private static Path TMP_OUTPUT = new Path("/tmp/pi-" + System.currentTimeMillis());
 
   public static class MyEstimator extends
-      BSP<NullWritable, NullWritable, Text, DoubleWritable> {
+      BSP<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> {
     public static final Log LOG = LogFactory.getLog(MyEstimator.class);
     private String masterTask;
     private static final int iterations = 10000;
 
     @Override
     public void bsp(
-        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)
+        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
         throws IOException, SyncException, InterruptedException {
 
       int in = 0, out = 0;
@@ -66,34 +65,33 @@ public class PiEstimator {
       }
 
       double data = 4.0 * (double) in / (double) iterations;
-      DoubleMessage estimate = new DoubleMessage(peer.getPeerName(), data);
 
-      peer.send(masterTask, estimate);
+      peer.send(masterTask, new DoubleWritable(data));
       peer.sync();
     }
 
     @Override
     public void setup(
-        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)
+        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
         throws IOException {
       // Choose one as a master
       this.masterTask = peer.getPeerName(peer.getNumPeers() / 2);
     }
 
     public void cleanup(
-        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)
+        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
         throws IOException {
       if (peer.getPeerName().equals(masterTask)) {
         double pi = 0.0;
         int numPeers = peer.getNumCurrentMessages();
-        DoubleMessage received;
-        while ((received = (DoubleMessage) peer.getCurrentMessage()) != null) {
-          pi += received.getData();
+        DoubleWritable received;
+        while ((received = peer.getCurrentMessage()) != null) {
+          pi += received.get();
         }
 
         pi = pi / numPeers;
         peer
-            .write(new Text("Estimated value of PI is"), new DoubleWritable(pi));
+.write(new Text("Estimated value of PI is"), new DoubleWritable(pi));
       }
     }
   }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java Fri Feb 10 02:06:10 2012
@@ -22,27 +22,26 @@ import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.BSPJobClient;
-import org.apache.hama.bsp.BSPMessage;
 import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.ByteMessage;
 import org.apache.hama.bsp.ClusterStatus;
 import org.apache.hama.bsp.NullInputFormat;
 import org.apache.hama.bsp.NullOutputFormat;
 import org.apache.hama.bsp.sync.SyncException;
-import org.apache.hama.util.Bytes;
 
 public class RandBench {
   private static final String SIZEOFMSG = "msg.size";
   private static final String N_COMMUNICATIONS = "communications.num";
   private static final String N_SUPERSTEPS = "supersteps.num";
 
-  public static class RandBSP extends
-      BSP<NullWritable, NullWritable, NullWritable, NullWritable> {
+  public static class RandBSP
+      extends
+      BSP<NullWritable, NullWritable, NullWritable, NullWritable, BytesWritable> {
     public static final Log LOG = LogFactory.getLog(RandBSP.class);
     private Random r = new Random();
     private int sizeOfMsg;
@@ -51,28 +50,25 @@ public class RandBench {
 
     @Override
     public void bsp(
-        BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable> peer)
+        BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, BytesWritable> peer)
         throws IOException, SyncException, InterruptedException {
       byte[] dummyData = new byte[sizeOfMsg];
-      BSPMessage msg = null;
       String[] peers = peer.getAllPeerNames();
-      String peerName = peer.getPeerName();
 
       for (int i = 0; i < nSupersteps; i++) {
 
         for (int j = 0; j < nCommunications; j++) {
           String tPeer = peers[r.nextInt(peers.length)];
-          String tag = peerName + " to " + tPeer;
-          msg = new ByteMessage(Bytes.toBytes(tag), dummyData);
-          peer.send(tPeer, msg);
+          BytesWritable data = new BytesWritable();
+          data.set(dummyData, 0, dummyData.length);
+          peer.send(tPeer, data);
         }
 
         peer.sync();
 
-        ByteMessage received;
-        while ((received = (ByteMessage) peer.getCurrentMessage()) != null) {
-          LOG.info(Bytes.toString(received.getTag()) + " : "
-              + received.getData().length);
+        BytesWritable received;
+        while ((received = peer.getCurrentMessage()) != null) {
+          LOG.info(received.getBytes().length);
         }
 
       }
@@ -80,7 +76,7 @@ public class RandBench {
 
     @Override
     public void setup(
-        BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable> peer) {
+        BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, BytesWritable> peer) {
       this.sizeOfMsg = conf.getInt(SIZEOFMSG, 1);
       this.nCommunications = conf.getInt(N_COMMUNICATIONS, 1);
       this.nSupersteps = conf.getInt(N_SUPERSTEPS, 1);

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java Fri Feb 10 02:06:10 2012
@@ -25,7 +25,6 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.IntegerMessage;
 import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
 import org.apache.hama.graph.Edge;
@@ -37,10 +36,10 @@ import org.apache.hama.graph.VertexWrita
 public class SSSP {
   public static final String START_VERTEX = "shortest.paths.start.vertex.name";
 
-  public static class ShortestPathVertex extends Vertex<IntegerMessage> {
+  public static class ShortestPathVertex extends Vertex<IntWritable> {
+
     public ShortestPathVertex() {
-      super(IntegerMessage.class);
-      this.setValue(Integer.MAX_VALUE);
+      this.setValue(new IntWritable(Integer.MAX_VALUE));
     }
 
     public boolean isStartVertex() {
@@ -49,21 +48,20 @@ public class SSSP {
     }
 
     @Override
-    public void compute(Iterator<IntegerMessage> messages) throws IOException {
+    public void compute(Iterator<IntWritable> messages) throws IOException {
       int minDist = isStartVertex() ? 0 : Integer.MAX_VALUE;
 
       while (messages.hasNext()) {
-        IntegerMessage msg = messages.next();
-        if (msg.getData() < minDist) {
-          minDist = msg.getData();
+        IntWritable msg = messages.next();
+        if (msg.get() < minDist) {
+          minDist = msg.get();
         }
       }
 
-      if (minDist < (Integer) this.getValue()) {
-        this.setValue(minDist);
+      if (minDist < this.getValue().get()) {
+        this.setValue(new IntWritable(minDist));
         for (Edge e : this.getOutEdges()) {
-          sendMessage(e.getTarget(), new IntegerMessage(e.getName(), minDist
-              + e.getCost()));
+          sendMessage(e, new IntWritable(minDist + e.getCost()));
         }
       }
     }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Fri Feb 10 02:06:10 2012
@@ -19,9 +19,9 @@ package org.apache.hama.graph;
 
 import java.io.IOException;
 
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.bsp.BSPMessage;
 
 public class GraphJob extends BSPJob {
   public final static String VERTEX_CLASS_ATTR = "hama.graph.vertex.class";
@@ -37,20 +37,15 @@ public class GraphJob extends BSPJob {
    * @param cls
    * @throws IllegalStateException
    */
-  public void setVertexClass(Class<? extends Vertex<? extends BSPMessage>> cls)
+  public void setVertexClass(Class<? extends Vertex<? extends Writable>> cls)
       throws IllegalStateException {
     ensureState(JobState.DEFINE);
     conf.setClass(VERTEX_CLASS_ATTR, cls, Vertex.class);
   }
 
   @SuppressWarnings("unchecked")
-  public Class<? extends Vertex<? extends BSPMessage>> getVertexClass() {
-    return (Class<? extends Vertex<? extends BSPMessage>>) conf.getClass(
+  public Class<? extends Vertex<? extends Writable>> getVertexClass() {
+    return (Class<? extends Vertex<? extends Writable>>) conf.getClass(
         VERTEX_CLASS_ATTR, Vertex.class);
   }
-
-  // TODO this method should be moved into BSPJob
-  public void setMaxIteration(int maxIteration) {
-    conf.setInt("hama.graph.max.iteration", maxIteration);
-  }
 }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Fri Feb 10 02:06:10 2012
@@ -23,10 +23,13 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.bsp.BSP;
-import org.apache.hama.bsp.BSPMessage;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.util.KeyValuePair;
@@ -47,18 +50,24 @@ public class GraphJobRunner extends BSP 
     while (updated && iteration < maxIteration) {
       peer.sync();
 
-      BSPMessage msg = null;
-      Map<String, LinkedList<BSPMessage>> msgMap = new HashMap<String, LinkedList<BSPMessage>>();
-      while ((msg = peer.getCurrentMessage()) != null) {
-
-        if (msgMap.containsKey(msg.getTag())) {
-          LinkedList<BSPMessage> msgs = msgMap.get(msg.getTag());
-          msgs.add(msg);
-          msgMap.put((String) msg.getTag(), msgs);
-        } else {
-          LinkedList<BSPMessage> msgs = new LinkedList<BSPMessage>();
-          msgs.add(msg);
-          msgMap.put((String) msg.getTag(), msgs);
+      MapWritable msg = null;
+      Map<String, LinkedList<Writable>> msgMap = new HashMap<String, LinkedList<Writable>>();
+      while ((msg = (MapWritable) peer.getCurrentMessage()) != null) {
+
+        for (Entry<Writable, Writable> e : msg.entrySet()) {
+          String vertexID = ((Text) e.getKey()).toString();
+          Writable value = e.getValue();
+
+          if (msgMap.containsKey(vertexID)) {
+            LinkedList<Writable> msgs = msgMap.get(vertexID);
+            msgs.add(value);
+            msgMap.put(vertexID, msgs);
+          } else {
+            LinkedList<Writable> msgs = new LinkedList<Writable>();
+            msgs.add(value);
+            msgMap.put(vertexID, msgs);
+          }
+
         }
       }
 
@@ -66,7 +75,7 @@ public class GraphJobRunner extends BSP 
         updated = false;
       }
 
-      for (Map.Entry<String, LinkedList<BSPMessage>> e : msgMap.entrySet()) {
+      for (Map.Entry<String, LinkedList<Writable>> e : msgMap.entrySet()) {
         vertices.get(e.getKey()).compute(e.getValue().iterator());
       }
       iteration++;
@@ -99,19 +108,10 @@ public class GraphJobRunner extends BSP 
     long numberVertices = vertices.size() * peer.getNumPeers();
 
     for (Map.Entry<String, Vertex> e : vertices.entrySet()) {
-      LinkedList<BSPMessage> msgIterator = new LinkedList<BSPMessage>();
-
-      try {
-        BSPMessage msg = (BSPMessage) e.getValue().messageClass.newInstance();
-        msg.setTag(e.getValue().getVertexID());
-        msg.setData(e.getValue().getValue());
-        msgIterator.add(msg);
-      } catch (Exception e1) {
-        // TODO init failed.
-        e1.printStackTrace();
-      }
-
       e.getValue().setNumVertices(numberVertices);
+
+      LinkedList<Writable> msgIterator = new LinkedList<Writable>();
+      msgIterator.add(e.getValue().getValue());
       e.getValue().compute(msgIterator.iterator());
     }
   }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Fri Feb 10 02:06:10 2012
@@ -21,33 +21,19 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hama.bsp.BSPMessage;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPPeer;
 
-public abstract class Vertex<MSGTYPE extends BSPMessage> implements
-    VertexInterface<MSGTYPE> {
-  protected Class<MSGTYPE> messageClass;
-  private MSGTYPE value;
+public abstract class Vertex<M extends Writable> implements
+    VertexInterface<M> {
+  private M value;
   private String vertexID;
-  protected BSPPeer<?, ?, ?, ?> peer;
+  protected BSPPeer<?, ?, ?, ?, MapWritable> peer;
   public List<Edge> edges;
   private long numVertices;
 
-  // FIXME find another way to handles vertex value.
-  // See also HAMA-502
-  public Vertex(Class<MSGTYPE> messageClass) {
-    this.messageClass = messageClass;
-    try {
-      this.value = (MSGTYPE) messageClass.newInstance();
-    } catch (InstantiationException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    } catch (IllegalAccessException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-  }
-
   public Configuration getConf() {
     return peer.getConfiguration();
   }
@@ -58,8 +44,11 @@ public abstract class Vertex<MSGTYPE ext
   }
 
   @Override
-  public void sendMessage(String target, MSGTYPE msg) throws IOException {
-    peer.send(target, msg);
+  public void sendMessage(Edge e, M msg) throws IOException {
+    MapWritable message = new MapWritable();
+    message.put(new Text(e.getName()), msg);
+
+    peer.send(e.getTarget(), message);
   }
 
   @Override
@@ -73,13 +62,13 @@ public abstract class Vertex<MSGTYPE ext
   }
 
   @Override
-  public Object getValue() {
-    return value.getData();
+  public M getValue() {
+    return value;
   }
 
   @Override
-  public void setValue(Object value) {
-    this.value.setData(value);
+  public void setValue(M value) {
+    this.value = value;
   }
 
   public void setVertexID(String vertexID) {

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Fri Feb 10 02:06:10 2012
@@ -32,12 +32,12 @@ public interface VertexInterface<MSGTYPE
 
   public List<Edge> getOutEdges();
 
-  public void sendMessage(String target, MSGTYPE msg) throws IOException;
+  public void sendMessage(Edge e, MSGTYPE msg) throws IOException;
 
   public long getSuperstepCount();
 
-  public void setValue(Object value);
+  public void setValue(MSGTYPE value);
 
-  public Object getValue();
+  public MSGTYPE getValue();
 
 }

Modified: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java?rev=1242637&r1=1242636&r2=1242637&view=diff
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (original)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java Fri Feb 10 02:06:10 2012
@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
@@ -37,7 +38,7 @@ public class BSPRunner {
 
   private Configuration conf;
   private TaskAttemptID id;
-  private BSPPeerImpl<?, ?, ?, ?> peer;
+  private BSPPeerImpl<?, ?, ?, ?, ? extends Writable> peer;
   private Counters counters = new Counters();
   
   @SuppressWarnings("rawtypes")



Mime
View raw message