incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [40/50] [abbrv] git commit: Modified comm tests to conform to the testing norms
Date Tue, 03 Jan 2012 14:03:28 GMT
Modified comm tests to conform to the testing norms


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/92f3252f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/92f3252f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/92f3252f

Branch: refs/heads/piper
Commit: 92f3252f47e72325af47b50173006ddbe76d428b
Parents: 62d5b50
Author: Karthik Kambatla <kkambatl@cs.purdue.edu>
Authored: Fri Oct 21 00:26:52 2011 -0400
Committer: Karthik Kambatla <kkambatl@cs.purdue.edu>
Committed: Sat Oct 22 23:51:25 2011 -0400

----------------------------------------------------------------------
 .../java/org/apache/s4/comm/NettyTestModule.java   |   96 ++++++
 .../org/apache/s4/comm/SimpleDeliveryTest.java     |  256 +++++++++------
 .../test/java/org/apache/s4/comm/TestModule.java   |   96 ------
 .../java/org/apache/s4/comm/UDPTestModule.java     |   96 ++++++
 4 files changed, 347 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/92f3252f/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTestModule.java
new file mode 100644
index 0000000..65c3dee
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTestModule.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm;
+
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.netty.NettyEmitter;
+import org.apache.s4.comm.netty.NettyListener;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+/*
+ * Module for s4-comm/tests
+ */
+public class NettyTestModule extends AbstractModule {
+
+	protected PropertiesConfiguration config = null;
+
+	private void loadProperties(Binder binder) {
+
+		try {
+			InputStream is = this.getClass().getResourceAsStream(
+					"/s4-comm-test.properties");
+			config = new PropertiesConfiguration();
+			config.load(is);
+
+			System.out.println(ConfigurationUtils.toString(config));
+			Names.bindProperties(binder,
+					ConfigurationConverter.getProperties(config));
+		} catch (ConfigurationException e) {
+			binder.addError(e);
+			e.printStackTrace();
+		}
+	}
+
+	@Override
+	protected void configure() {
+		if (config == null)
+			loadProperties(binder());
+
+		int numHosts = config.getList("cluster.hosts").size();
+		boolean isCluster = numHosts > 1 ? true : false;
+		bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(
+				Boolean.valueOf(isCluster));
+
+		bind(Cluster.class);
+
+		bind(Assignment.class).to(AssignmentFromFile.class);
+
+		bind(Topology.class).to(TopologyFromFile.class);
+
+		/* Use a simple UDP comm layer implementation. */
+		bind(Listener.class).to(NettyListener.class);
+		bind(Emitter.class).to(NettyEmitter.class);
+
+		/* The hashing function to map keys top partitions. */
+		bind(Hasher.class).to(DefaultHasher.class);
+
+		/* Use Kryo to serialize events. */
+		bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+		
+        bind(Integer.class).annotatedWith(Names.named("emitter.send.interval"))
+        .toInstance(config.getInt("emitter.send.interval"));
+
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/92f3252f/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
index 3216aa5..ba85899 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
@@ -8,6 +8,9 @@ import com.google.inject.Inject;
 import com.google.inject.Injector;
 import com.google.inject.name.Named;
 
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
 /*
  * Test class to test communication protocols.
  * 
@@ -21,135 +24,186 @@ import com.google.inject.name.Named;
  * </ul>
  * 
  */
-public class SimpleDeliveryTest {
-	final private static int messageCount = 200;
-	final private static int timerThreadCount = 100;
-
-	final private Emitter emitter;
-	final private Listener listener;
-	final private int interval;
-	
-	public Thread sendThread, receiveThread;
-	private int totalMessagesReceived;
-
-	@Inject
-	public SimpleDeliveryTest(@Named("emitter.send.interval") int interval,
-			Emitter emitter, Listener listener) {
-		this.emitter = emitter;
-		this.listener = listener;
-		this.interval = interval;
-		this.totalMessagesReceived = messageCount
-				* this.emitter.getPartitionCount();
-
-		this.sendThread = new SendThread();
-		this.receiveThread = new ReceiveThread();
-	}
+public class SimpleDeliveryTest extends TestCase {
+
+	class CommWrapper {
+		final private static int messageCount = 200;
+		final private static int timerThreadCount = 100;
+
+		final private Emitter emitter;
+		final private Listener listener;
+		final private int interval;
+
+		public Thread sendThread, receiveThread;
+		private int messagesExpected;
+		private int messagesReceived = 0;
+
+		@Inject
+		public CommWrapper(@Named("emitter.send.interval") int interval,
+				Emitter emitter, Listener listener) {
+			this.emitter = emitter;
+			this.listener = listener;
+			this.interval = interval;
+			this.messagesExpected = messageCount
+					* this.emitter.getPartitionCount();
+
+			this.sendThread = new SendThread();
+			this.receiveThread = new ReceiveThread();
+		}
 
-	class SendThread extends Thread {
-		@Override
-		public void run() {
-			try {
-				for (int partition = 0; partition < emitter.getPartitionCount(); partition++) {
-					for (int i = 0; i < messageCount; i++) {
-						byte[] message = (new String("message-" + i))
-								.getBytes();
-						emitter.send(partition, message);
-						Thread.sleep(interval);
+		public boolean moreMessages() {
+			return ((messagesExpected - messagesReceived) > 0);
+		}
+
+		class SendThread extends Thread {
+			@Override
+			public void run() {
+				try {
+					for (int partition = 0; partition < emitter
+							.getPartitionCount(); partition++) {
+						for (int i = 0; i < messageCount; i++) {
+							byte[] message = (new String("message-" + i))
+									.getBytes();
+							emitter.send(partition, message);
+							Thread.sleep(interval);
+						}
 					}
+				} catch (Exception e) {
+					e.printStackTrace();
+					return;
 				}
-			} catch (Exception e) {
-				e.printStackTrace();
-				return;
 			}
 		}
-	}
 
-	/*
-	 * TimerThread - interrupts the passed thread, after specified
-	 * time-interval.
-	 */
-	class TimerThread extends Thread {
-		private Thread watchThread;
-		private Integer sleepCounter;
+		/*
+		 * TimerThread - interrupts the passed thread, after specified
+		 * time-interval.
+		 */
+		class TimerThread extends Thread {
+			private Thread watchThread;
+			private Integer sleepCounter;
+
+			TimerThread(Thread watchThread) {
+				this.watchThread = watchThread;
+				this.sleepCounter = new Integer(timerThreadCount);
+			}
 
-		TimerThread(Thread watchThread) {
-			this.watchThread = watchThread;
-			this.sleepCounter = new Integer(timerThreadCount);
-		}
+			public void resetSleepCounter() {
+				synchronized (this.sleepCounter) {
+					this.sleepCounter = timerThreadCount;
+				}
+			}
 
-		public void resetSleepCounter() {
-			synchronized (this.sleepCounter) {
-				this.sleepCounter = timerThreadCount;
+			public void clearSleepCounter() {
+				synchronized (this.sleepCounter) {
+					this.sleepCounter = 0;
+				}
 			}
-		}
-		
-		public void clearSleepCounter() {
-			synchronized (this.sleepCounter) {
-				this.sleepCounter = 0;
+
+			private int getCounter() {
+				synchronized (this.sleepCounter) {
+					return this.sleepCounter--;
+				}
 			}
-		}
 
-		private int getCounter() {
-			synchronized (this.sleepCounter) {
-				return this.sleepCounter--;
+			@Override
+			public void run() {
+				try {
+					while (getCounter() > 0) {
+						Thread.sleep(interval);
+					}
+					watchThread.interrupt();
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
 			}
 		}
 
-		@Override
-		public void run() {
-			try {
-				while (getCounter() > 0) {
-					Thread.sleep(interval);
+		class ReceiveThread extends Thread {
+			@Override
+			public void run() {
+
+				// start the timer thread to interrupt if blocked for too long
+				TimerThread timer = new TimerThread(this);
+				timer.start();
+				while (messagesReceived < messagesExpected) {
+					byte[] message = listener.recv();
+					timer.resetSleepCounter();
+					if (message != null)
+						messagesReceived++;
+					else
+						break;
 				}
-				watchThread.interrupt();
-			} catch (Exception e) {
-				e.printStackTrace();
+				timer.clearSleepCounter();
 			}
 		}
 	}
 
-	class ReceiveThread extends Thread {
-		@Override
-		public void run() {
-			int count = 0;
-
-			// start the timer thread to interrupt if blocked for too long
-			TimerThread timer = new TimerThread(this);
-			timer.start();
-			while (count < totalMessagesReceived) {
-				byte[] message = listener.recv();
-				timer.resetSleepCounter();
-				if (message != null)
-					count++;
-				else
-					break;
-			}
-			timer.clearSleepCounter();
-			System.out.println("# Messages received = " + count
-					+ ";\t # Messages lost = "
-					+ (totalMessagesReceived - count));
+	/**
+	 * test1() tests the UDP protocol. If all components function without
+	 * throwing exceptions, the test passes. As UDP doesn't guarantee message
+	 * delivery, the number of messages received doesn't come into play to
+	 * determine if it passes the test.
+	 * 
+	 * 
+	 * @throws InterruptedException
+	 */
+	public void test1() throws InterruptedException {
+		System.out.println("Testing UDP");
+
+		Injector injector = Guice.createInjector(new UDPTestModule());
+		try {
+			CommWrapper sdt = injector.getInstance(CommWrapper.class);
+
+			// start send and receive threads
+			sdt.sendThread.start();
+			sdt.receiveThread.start();
+
+			// wait for them to finish
+			sdt.sendThread.join();
+			sdt.receiveThread.join();
+
+			// exit - system.exit is called here to revoke the lock file and
+			// listener
+			// sockets
+		} catch (Exception e) {
+			Assert.fail("UDP test has failed");
 		}
+		Assert.assertTrue("UDP test PASSED. Seems to work fine", true);
+
+		System.out.println("Done");
 	}
 
 	/**
-	 * @param args
+	 * test2() tests the Netty TCP protocol. If all components function without
+	 * throwing exceptions, the test passes partially. As TCP guarantees message
+	 * delivery, the test checks for that too.
+	 * 
 	 * @throws InterruptedException
 	 */
-	public static void main(String[] args) throws InterruptedException {
-		Injector injector = Guice.createInjector(new TestModule());
-		SimpleDeliveryTest sdt = injector.getInstance(SimpleDeliveryTest.class);
+	public void test2() throws InterruptedException {
+		System.out.println("Testing Netty TCP");
+
+		Injector injector = Guice.createInjector(new NettyTestModule());
+		try {
+			CommWrapper sdt = injector.getInstance(CommWrapper.class);
 
-		// start send and receive threads
-		sdt.sendThread.start();
-		sdt.receiveThread.start();
+			// start send and receive threads
+			sdt.sendThread.start();
+			sdt.receiveThread.start();
+
+			// wait for them to finish
+			sdt.sendThread.join();
+			sdt.receiveThread.join();
+
+			Assert.assertTrue("Guaranteed message delivery",
+					!sdt.moreMessages());
+		} catch (Exception e) {
+			Assert.fail("Netty test has failed basic functionality test");
+		}
 
-		// wait for them to finish
-		sdt.sendThread.join();
-		sdt.receiveThread.join();
+		Assert.assertTrue("Netty seems to be working crash-free", true);
 
-		// exit - system.exit is called here to revoke the lock file and listner
-		// sockets
 		System.out.println("Done");
-		System.exit(0);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/92f3252f/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TestModule.java
deleted file mode 100644
index d915daf..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TestModule.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm;
-
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.netty.NettyEmitter;
-import org.apache.s4.comm.netty.NettyListener;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-/*
- * Module for s4-comm/tests
- */
-public class TestModule extends AbstractModule {
-
-	protected PropertiesConfiguration config = null;
-
-	private void loadProperties(Binder binder) {
-
-		try {
-			InputStream is = this.getClass().getResourceAsStream(
-					"/s4-comm-test.properties");
-			config = new PropertiesConfiguration();
-			config.load(is);
-
-			System.out.println(ConfigurationUtils.toString(config));
-			Names.bindProperties(binder,
-					ConfigurationConverter.getProperties(config));
-		} catch (ConfigurationException e) {
-			binder.addError(e);
-			e.printStackTrace();
-		}
-	}
-
-	@Override
-	protected void configure() {
-		if (config == null)
-			loadProperties(binder());
-
-		int numHosts = config.getList("cluster.hosts").size();
-		boolean isCluster = numHosts > 1 ? true : false;
-		bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(
-				Boolean.valueOf(isCluster));
-
-		bind(Cluster.class);
-
-		bind(Assignment.class).to(AssignmentFromFile.class);
-
-		bind(Topology.class).to(TopologyFromFile.class);
-
-		/* Use a simple UDP comm layer implementation. */
-		bind(Listener.class).to(NettyListener.class);
-		bind(Emitter.class).to(NettyEmitter.class);
-
-		/* The hashing function to map keys top partitions. */
-		bind(Hasher.class).to(DefaultHasher.class);
-
-		/* Use Kryo to serialize events. */
-		bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-		
-        bind(Integer.class).annotatedWith(Names.named("emitter.send.interval"))
-        .toInstance(config.getInt("emitter.send.interval"));
-
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/92f3252f/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTestModule.java
new file mode 100644
index 0000000..b052e70
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTestModule.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm;
+
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+/*
+ * Module for s4-comm/tests
+ */
+public class UDPTestModule extends AbstractModule {
+
+	protected PropertiesConfiguration config = null;
+
+	private void loadProperties(Binder binder) {
+
+		try {
+			InputStream is = this.getClass().getResourceAsStream(
+					"/s4-comm-test.properties");
+			config = new PropertiesConfiguration();
+			config.load(is);
+
+			System.out.println(ConfigurationUtils.toString(config));
+			Names.bindProperties(binder,
+					ConfigurationConverter.getProperties(config));
+		} catch (ConfigurationException e) {
+			binder.addError(e);
+			e.printStackTrace();
+		}
+	}
+
+	@Override
+	protected void configure() {
+		if (config == null)
+			loadProperties(binder());
+
+		int numHosts = config.getList("cluster.hosts").size();
+		boolean isCluster = numHosts > 1 ? true : false;
+		bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(
+				Boolean.valueOf(isCluster));
+
+		bind(Cluster.class);
+
+		bind(Assignment.class).to(AssignmentFromFile.class);
+
+		bind(Topology.class).to(TopologyFromFile.class);
+
+		/* Use a simple UDP comm layer implementation. */
+		bind(Listener.class).to(UDPListener.class);
+		bind(Emitter.class).to(UDPEmitter.class);
+
+		/* The hashing function to map keys top partitions. */
+		bind(Hasher.class).to(DefaultHasher.class);
+
+		/* Use Kryo to serialize events. */
+		bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+		bind(Integer.class).annotatedWith(Names.named("emitter.send.interval"))
+				.toInstance(config.getInt("emitter.send.interval"));
+
+	}
+}
\ No newline at end of file


Mime
View raw message