hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1618812 - in /hama/trunk: conf/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/hama/bsp/message/io/ core/src/main/java/org/apache/hama/bsp/message/queue/ core/src/test/...
Date Tue, 19 Aug 2014 06:38:49 GMT
Author: edwardyoon
Date: Tue Aug 19 06:38:49 2014
New Revision: 1618812

URL: http://svn.apache.org/r1618812
Log:
revert to 1617395

Added:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/
      - copied from r1617395, hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java
      - copied unchanged from r1617395, hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java
      - copied unchanged from r1617395, hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java
      - copied unchanged from r1617395, hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
      - copied unchanged from r1617395, hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java
      - copied unchanged from r1617395, hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java
      - copied unchanged from r1617395, hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java
Modified:
    hama/trunk/conf/hama-env.sh
    hama/trunk/conf/hama-site.xml
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java

Modified: hama/trunk/conf/hama-env.sh
URL: http://svn.apache.org/viewvc/hama/trunk/conf/hama-env.sh?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
--- hama/trunk/conf/hama-env.sh (original)
+++ hama/trunk/conf/hama-env.sh Tue Aug 19 06:38:49 2014
@@ -22,10 +22,10 @@
 # Set environment variables here.
 
 # The java implementation to use.  Required.
-# export JAVA_HOME=/usr/lib/jvm/java-7-oracle
+export JAVA_HOME=/usr/lib/jvm/java-7-oracle
 
 # Where log files are stored.  $HAMA_HOME/logs by default.
-# export HAMA_LOG_DIR=${HAMA_HOME}/logs
+export HAMA_LOG_DIR=${HAMA_HOME}/logs
 
 # The maximum amount of heap to use, in MB. Default is 1000.
 # export HAMA_HEAPSIZE=1000

Modified: hama/trunk/conf/hama-site.xml
URL: http://svn.apache.org/viewvc/hama/trunk/conf/hama-site.xml?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
--- hama/trunk/conf/hama-site.xml (original)
+++ hama/trunk/conf/hama-site.xml Tue Aug 19 06:38:49 2014
@@ -22,4 +22,34 @@
  */
 -->
 <configuration>
+
+  <property>
+    <name>bsp.master.address</name>
+    <value>edward-VirtualBox:40000</value>
+    <description>The address of the bsp master server. Either the
+    literal string "local" or a host:port for distributed mode
+    </description>
+  </property>
+
+  <property>
+    <name>fs.default.name</name>
+    <value>hdfs://edward-VirtualBox:9000/</value>
+    <description>
+      The name of the default file system. Either the literal string
+      "local" or a host:port for HDFS.
+    </description>
+  </property>
+
+  <property>
+    <name>hama.zookeeper.quorum</name>
+    <value>edward-VirtualBox</value>
+    <description>Comma separated list of servers in the ZooKeeper Quorum.
+    For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com".
+    By default this is set to localhost for local and pseudo-distributed modes
+    of operation. For a fully-distributed setup, this should be set to a full
+    list of ZooKeeper quorum servers. If HAMA_MANAGES_ZK is set in hama-env.sh
+    this is the list of servers which we will start/stop zookeeper on.
+    </description>
+  </property>
+
 </configuration>

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Tue Aug 19 06:38:49
2014
@@ -20,6 +20,7 @@ package org.apache.hama.bsp;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Iterator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CyclicBarrier;
@@ -355,9 +356,12 @@ public class LocalBSPRunner implements J
       bundle.setCompressor(compressor,
           conf.getLong("hama.messenger.compression.threshold", 512));
 
-      MANAGER_MAP.get(addr).localQueueForNextIteration.add(bundle);
-      peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
-          bundle.size());
+      Iterator<M> it = bundle.iterator();
+      while (it.hasNext()) {
+        MANAGER_MAP.get(addr).localQueueForNextIteration.add(it.next());
+        peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
+            1L);
+      }
     }
 
     @Override

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Tue Aug 19 06:38:49 2014
@@ -232,12 +232,7 @@ public abstract class AbstractMessageMan
   }
 
   protected SynchronizedQueue<M> getSynchronizedReceiverQueue() {
-    MessageQueue<M> queue = getReceiverQueue();
-    if (queue.isMemoryBasedQueue()) {
-      return (SynchronizedQueue<M>) queue;
-    }
-
-    return new SingleLockQueue<M>(queue);
+    return SingleLockQueue.synchronize(getReceiverQueue());
   }
 
   @Override
@@ -286,9 +281,11 @@ public abstract class AbstractMessageMan
   public void loopBackBundle(BSPMessageBundle<M> bundle) throws IOException {
     bundle.setCompressor(compressor,
         conf.getLong("hama.messenger.compression.threshold", 128));
-    this.localQueueForNextIteration.add(bundle);
-    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
-        bundle.size());
+
+    Iterator<? extends Writable> it = bundle.iterator();
+    while (it.hasNext()) {
+      loopBackMessage(it.next());
+    }
   }
 
   @SuppressWarnings("unchecked")

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java Tue Aug
19 06:38:49 2014
@@ -29,7 +29,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.TaskAttemptID;
 
 /**
@@ -46,7 +45,7 @@ import org.apache.hama.bsp.TaskAttemptID
  * configuration. <br/>
  * <b>It is experimental to use.</b>
  */
-public final class DiskQueue<M extends Writable> implements MessageQueue<M> {
+public final class DiskQueue<M extends Writable> extends DefaultMessageQueue<M>
{
 
   public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir";
 
@@ -171,14 +170,7 @@ public final class DiskQueue<M extends W
   }
 
   @Override
-  public void add(BSPMessageBundle<M> bundle){
-    addAll(bundle);
-  }
-
-  @Override
   public final void addAll(Iterable<M> col) {
-    // TODO Write bundle object directly
-
     for (M item : col) {
       add(item);
     }
@@ -321,5 +313,4 @@ public final class DiskQueue<M extends W
   public boolean isMemoryBasedQueue() {
     return false;
   }
-
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java Tue Aug
19 06:38:49 2014
@@ -22,104 +22,54 @@ import java.util.concurrent.ConcurrentLi
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
-import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.TaskAttemptID;
 
 /**
  * LinkedList backed queue structure for bookkeeping messages.
  */
 public final class MemoryQueue<M extends Writable> implements
-    SynchronizedQueue<M>, MessageQueue<M> {
-
-  private final ConcurrentLinkedQueue<M> messages = new ConcurrentLinkedQueue<M>();
-  private final ConcurrentLinkedQueue<BSPMessageBundle<M>> bundles = new ConcurrentLinkedQueue<BSPMessageBundle<M>>();
-  private Iterator<M> bundleIterator;
-
-  int bundledMessageSize = 0;
+    SynchronizedQueue<M> {
 
+  private final ConcurrentLinkedQueue<M> deque = new ConcurrentLinkedQueue<M>();
   private Configuration conf;
 
   @Override
   public final void addAll(Iterable<M> col) {
     for (M m : col)
-      messages.add(m);
+      deque.add(m);
   }
 
   @Override
   public void addAll(MessageQueue<M> otherqueue) {
     M poll = null;
     while ((poll = otherqueue.poll()) != null) {
-      messages.add(poll);
+      deque.add(poll);
     }
   }
 
   @Override
   public final void add(M item) {
-    messages.add(item);
-  }
-
-  @Override
-  public void add(BSPMessageBundle<M> bundle) {
-    bundledMessageSize += bundle.size();
-    bundles.add(bundle);
+    deque.add(item);
   }
 
   @Override
   public final void clear() {
-    messages.clear();
-    bundles.clear();
-    bundleIterator = null;
+    deque.clear();
   }
 
   @Override
   public final M poll() {
-    if (messages.size() > 0) {
-      return messages.poll();
-    } else {
-      if (bundles.size() > 0) {
-        if (bundleIterator == null) {
-          bundleIterator = bundles.poll().iterator();
-        } else {
-          if (!bundleIterator.hasNext()) {
-            bundleIterator = bundles.poll().iterator();
-          }
-        }
-
-        bundledMessageSize--;
-        return bundleIterator.next();
-      }
-    }
-
-    return null;
+    return deque.poll();
   }
 
   @Override
   public final int size() {
-    return messages.size() + bundledMessageSize;
+    return deque.size();
   }
 
   @Override
   public final Iterator<M> iterator() {
-    Iterator<M> it = new Iterator<M>() {
-
-      @Override
-      public boolean hasNext() {
-        if (size() > 0)
-          return true;
-        else
-          return false;
-      }
-
-      @Override
-      public M next() {
-        return poll();
-      }
-
-      @Override
-      public void remove() {
-      }
-    };
-    return it;
+    return deque.iterator();
   }
 
   @Override
@@ -167,5 +117,4 @@ public final class MemoryQueue<M extends
   public MessageQueue<M> getMessageQueue() {
     return this;
   }
-
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java Tue
Aug 19 06:38:49 2014
@@ -19,23 +19,17 @@ package org.apache.hama.bsp.message.queu
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.TaskAttemptID;
 
 /**
  * Simple queue interface.
  */
-public interface MessageQueue<M extends Writable> extends Iterable<M>,
-    Configurable {
+public interface MessageQueue<M> extends Iterable<M>, Configurable {
 
   public static final String PERSISTENT_QUEUE = "hama.queue.behaviour.persistent";
 
   /**
    * Used to initialize the queue.
-   * 
-   * @param conf
-   * @param id
    */
   public void init(Configuration conf, TaskAttemptID id);
 
@@ -56,33 +50,20 @@ public interface MessageQueue<M extends 
 
   /**
    * Adds a whole Java Collection to the implementing queue.
-   * 
-   * @param col
    */
   public void addAll(Iterable<M> col);
 
   /**
    * Adds the other queue to this queue.
-   * 
-   * @param otherqueue
    */
   public void addAll(MessageQueue<M> otherqueue);
 
   /**
    * Adds a single item to the implementing queue.
-   * 
-   * @param item
    */
   public void add(M item);
 
   /**
-   * Adds a bundle to the queue.
-   * 
-   * @param bundle
-   */
-  public void add(BSPMessageBundle<M> bundle);
-
-  /**
    * Clears all entries in the given queue.
    */
   public void clear();
@@ -104,10 +85,7 @@ public interface MessageQueue<M extends 
    * @return true if the messages in the queue are serialized to byte buffers.
    */
   public boolean isMessageSerialized();
-
-  /**
-   * @return true if the queue is memory resident.
-   */
+  
   public boolean isMemoryBasedQueue();
 
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java Tue
Aug 19 06:38:49 2014
@@ -20,24 +20,22 @@ package org.apache.hama.bsp.message.queu
 import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.TaskAttemptID;
 
 /**
  * A global mutex based synchronized queue.
  */
-public final class SingleLockQueue<T extends Writable> implements SynchronizedQueue<T>
{
+public final class SingleLockQueue<T> implements SynchronizedQueue<T> {
 
   private final MessageQueue<T> queue;
   private final Object mutex;
 
-  public SingleLockQueue(MessageQueue<T> queue) {
+  private SingleLockQueue(MessageQueue<T> queue) {
     this.queue = queue;
     this.mutex = new Object();
   }
 
-  public SingleLockQueue(MessageQueue<T> queue, Object mutex) {
+  private SingleLockQueue(MessageQueue<T> queue, Object mutex) {
     this.queue = queue;
     this.mutex = mutex;
   }
@@ -180,6 +178,22 @@ public final class SingleLockQueue<T ext
     }
   }
 
+  /*
+   * static constructor methods to be type safe
+   */
+  public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue)
{
+    if(queue.isMemoryBasedQueue()) {
+      return (SynchronizedQueue<T>) queue;
+    }
+    
+    return new SingleLockQueue<T>(queue);
+  }
+
+  public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue,
+      Object mutex) {
+    return new SingleLockQueue<T>(queue, mutex);
+  }
+
   @Override
   public void prepareWrite() {
     synchronized (mutex) {
@@ -205,9 +219,4 @@ public final class SingleLockQueue<T ext
   public boolean isMemoryBasedQueue() {
     return true;
   }
-
-  @Override
-  public void add(BSPMessageBundle<T> bundle) {
-    queue.add(bundle);
-  }
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
Tue Aug 19 06:38:49 2014
@@ -31,7 +31,7 @@ import org.apache.hama.bsp.TaskAttemptID
  * sorted receive and send.
  */
 public final class SortedMemoryQueue<M extends WritableComparable<M>>
-    implements SynchronizedQueue<M>, MessageQueue<M> {
+    implements SynchronizedQueue<M>, BSPMessageInterface<M> {
 
   private final BlockingQueue<M> queue = new PriorityBlockingQueue<M>();
   private Configuration conf;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
Tue Aug 19 06:38:49 2014
@@ -17,13 +17,11 @@
  */
 package org.apache.hama.bsp.message.queue;
 
-import org.apache.hadoop.io.Writable;
-
 /**
  * Synchronized Queue interface. Can be used to implement better synchronized
  * datastructures.
  */
-public interface SynchronizedQueue<T extends Writable> extends MessageQueue<T>
{
+public interface SynchronizedQueue<T> extends MessageQueue<T> {
 
   public abstract MessageQueue<T> getMessageQueue();
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java Tue Aug 19 06:38:49
2014
@@ -37,6 +37,14 @@ public class TestPersistQueue extends Te
 
   public static final Log LOG = LogFactory.getLog(TestPartitioning.class);
 
+  public void testDiskQueue() throws Exception {
+    BSPJob bsp = getNewJobConf();
+    bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
+        "org.apache.hama.bsp.message.queue.DiskQueue");
+
+    assertTrue(bsp.waitForCompletion(true));
+  }
+
   public void testMemoryQueue() throws Exception {
     BSPJob bsp = getNewJobConf();
     bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
@@ -53,10 +61,10 @@ public class TestPersistQueue extends Te
     assertTrue(bsp.waitForCompletion(true));
   }
 
-  public void testDiskQueue() throws Exception {
+  public void testSpillingQueue() throws Exception {
     BSPJob bsp = getNewJobConf();
     bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
-        "org.apache.hama.bsp.message.queue.DiskQueue");
+        "org.apache.hama.bsp.message.queue.SpillingQueue");
 
     assertTrue(bsp.waitForCompletion(true));
   }

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
Tue Aug 19 06:38:49 2014
@@ -49,6 +49,7 @@ public class TestHamaMessageManager exte
     HamaConfiguration conf = new HamaConfiguration();
     conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
         MessageQueue.class);
+    conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
     messagingInternal(conf);
   }
 
@@ -59,7 +60,7 @@ public class TestHamaMessageManager exte
         MessageQueue.class);
     messagingInternal(conf);
   }
-  
+
   private static void messagingInternal(HamaConfiguration conf)
       throws Exception {
     conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS,
@@ -101,10 +102,10 @@ public class TestHamaMessageManager exte
     }
 
     messageManager.transfer(peer, bundle);
-    
+
     messageManager.clearOutgoingMessages();
 
-    assertEquals(messageManager.getNumCurrentMessages(), 1);
+    assertTrue(messageManager.getNumCurrentMessages() == 1);
     IntWritable currentMessage = messageManager.getCurrentMessage();
 
     assertEquals(currentMessage.get(), 1337);



Mime
View raw message