incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [1/2] git commit: S4-7 fix a few issues with previous patch - synchro with topology - add error logging for emitter asynchronous tasks - add missing config for tcp queue size - TODO fix concurrency issue in TCPEmitter
Date Wed, 01 Feb 2012 18:31:19 GMT
Updated Branches:
  refs/heads/S4-7 [created] 0931075a9


S4-7 fix a few issues with previous patch
- synchro with topology
- add error logging for emitter asynchronous tasks
- add missing config for tcp queue size
- TODO fix concurrency issue in TCPEmitter


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/0931075a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/0931075a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/0931075a

Branch: refs/heads/S4-7
Commit: 0931075a914782fc6c7ab11cd6716c91ece95bb3
Parents: 8e2427b
Author: Matthieu Morel <mmorel@apache.org>
Authored: Wed Feb 1 19:25:45 2012 +0100
Committer: Matthieu Morel <mmorel@apache.org>
Committed: Wed Feb 1 19:25:45 2012 +0100

----------------------------------------------------------------------
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |   51 ++++++++++-----
 .../java/org/apache/s4/comm/udp/UDPEmitter.java    |    8 ++-
 .../org/apache/s4/comm/util/PartitionInfo.java     |   25 +++++--
 .../src/main/java/org/apache/s4/core/Main.java     |   28 ++++----
 .../src/test/resources/default.s4.properties       |    1 +
 .../resources/org.apache.s4.deploy.s4.properties   |    3 +-
 6 files changed, 75 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0931075a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index ba48a28..9950506 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -1,5 +1,6 @@
 package org.apache.s4.comm.tcp;
 
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.Hashtable;
@@ -34,23 +35,29 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.HashBiMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
-/*
- * TCPEmitter - Uses TCP to send messages across to other partitions.
- *            - Message ordering between partitions is preserved.
- *            - For efficiency, NettyEmitter.send() queues the messages partition-wise, 
- *              a threadPool sends the messages asynchronously; message dequeued only on
success.
- *            - Tolerates topology changes, partition re-mapping, and network glitches.
+/**
+ * TCPEmitter - Uses TCP to send messages across to other partitions. - Message ordering
between partitions is
+ * preserved. - For efficiency, NettyEmitter.send() queues the messages partition-wise, a
threadPool sends the messages
+ * asynchronously; message dequeued only on success. - Tolerates topology changes, partition
re-mapping, and network
+ * glitches.
  */
 
 public class TCPEmitter implements Emitter, TopologyChangeListener {
     private static final Logger logger = LoggerFactory.getLogger(TCPEmitter.class);
+    // TODO must be configurable
     private final int numRetries = 10;
+    // TODO must be configurable
+    private final long retryDelayMs = 10;
     private final int bufferSize;
     private Topology topology;
     private final ClientBootstrap bootstrap;
+    // id for prefixing emitter threads names
+    private static int instanceId = 0;
+    private volatile long sentMsgCount = 0;
 
     /*
      * Channel used to send messages to each partition
@@ -70,7 +77,14 @@ public class TCPEmitter implements Emitter, TopologyChangeListener {
     /*
      * Thread pool to actually send messages
      */
-    private ExecutorService sendService = Executors.newCachedThreadPool();
+    private ExecutorService sendService = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+            .setNameFormat("TCPEmitterSendServiceThread-#" + instanceId++ + "-%d")
+            .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+                @Override
+                public void uncaughtException(Thread paramThread, Throwable paramThrowable)
{
+                    logger.error("Cannot send message", paramThrowable);
+                }
+            }).build());
 
     @Inject
     public TCPEmitter(Topology topology, @Named("tcp.partition.queue_size") int bufferSize)
throws InterruptedException {
@@ -107,7 +121,7 @@ public class TCPEmitter implements Emitter, TopologyChangeListener {
         bootstrap.setOption("connectTimeoutMillis", 100);
     }
 
-    private static class Message implements ChannelFutureListener {
+    private class Message implements ChannelFutureListener {
         private final SendQueue sendQ;
         private final byte[] message;
         private boolean sendInProcess;
@@ -129,8 +143,10 @@ public class TCPEmitter implements Emitter, TopologyChangeListener {
         @Override
         public synchronized void operationComplete(ChannelFuture future) throws Exception
{
             if (future.isSuccess()) {
+                sentMsgCount++;
                 synchronized (sendQ.messages) {
                     sendQ.messages.remove(this);
+                    logger.debug("sent messages {}, msg size {}", sentMsgCount, sendQ.messages.size());
                 }
                 return;
             }
@@ -146,7 +162,7 @@ public class TCPEmitter implements Emitter, TopologyChangeListener {
         }
     }
 
-    private static class SendQueue {
+    private class SendQueue {
         private final TCPEmitter emitter;
         private final int partitionId;
         private final int bufferSize;
@@ -168,7 +184,7 @@ public class TCPEmitter implements Emitter, TopologyChangeListener {
                     sendThreadRecheck = true;
                 } else {
                     sendThreadActive = true;
-                    emitter.sendService.execute(new SendThread(this));
+                    emitter.sendService.execute(new SendTask(this));
                 }
             }
         }
@@ -193,10 +209,10 @@ public class TCPEmitter implements Emitter, TopologyChangeListener {
         }
     }
 
-    private static class SendThread extends Thread {
+    private static class SendTask implements Runnable {
         private final SendQueue sendQ;
 
-        SendThread(SendQueue sendQ) {
+        SendTask(SendQueue sendQ) {
             this.sendQ = sendQ;
         }
 
@@ -220,8 +236,10 @@ public class TCPEmitter implements Emitter, TopologyChangeListener {
 
     private boolean connectTo(Integer partitionId) {
         ClusterNode clusterNode = partitionNodeMap.get(partitionId);
-
         if (clusterNode == null) {
+            if (topology.getTopology().getNodes().size() <= partitionId) {
+                return false;
+            }
             clusterNode = topology.getTopology().getNodes().get(partitionId);
             partitionNodeMap.forcePut(partitionId, clusterNode);
         }
@@ -240,7 +258,7 @@ public class TCPEmitter implements Emitter, TopologyChangeListener {
                 return true;
             }
             try {
-                Thread.sleep(10);
+                Thread.sleep(retryDelayMs);
             } catch (InterruptedException ie) {
                 logger.error(String.format("Interrupted while connecting to %s:%d", clusterNode.getMachineName(),
                         clusterNode.getPort()));
@@ -256,8 +274,9 @@ public class TCPEmitter implements Emitter, TopologyChangeListener {
 
         while (true) {
             if (!partitionChannelMap.containsKey(partitionId)) {
-                connectTo(partitionId);
-                continue;
+                if (!connectTo(partitionId)) {
+                    continue;
+                }
             }
 
             SendQueue sendQ = sendQueues.get(partitionId);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0931075a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index 1e1f229..3d5a199 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -12,6 +12,7 @@ import org.apache.s4.base.Emitter;
 import org.apache.s4.comm.topology.ClusterNode;
 import org.apache.s4.comm.topology.Topology;
 import org.apache.s4.comm.topology.TopologyChangeListener;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.HashBiMap;
 import com.google.inject.Inject;
@@ -29,6 +30,8 @@ public class UDPEmitter implements Emitter, TopologyChangeListener {
 
     @Inject
     public UDPEmitter(Topology topology) {
+        LoggerFactory.getLogger(getClass()).debug("UDPEmitter with topology {}",
+                topology.getTopology().getPartitionCount());
         this.topology = topology;
         topology.addListener(this);
         nodes = HashBiMap.create(topology.getTopology().getNodes().size());
@@ -48,7 +51,10 @@ public class UDPEmitter implements Emitter, TopologyChangeListener {
         try {
             ClusterNode node = nodes.get(partitionId);
             if (node == null) {
-                throw new RuntimeException(String.format("Bad partition id %d", partitionId));
+                LoggerFactory.getLogger(getClass()).error(
+                        "Cannot send message to partition {} because this partition is not
visible to this emitter",
+                        partitionId);
+                return false;
             }
             byte[] byteBuffer = new byte[message.length];
             System.arraycopy(message, 0, byteBuffer, 0, message.length);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0931075a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
index 47d1aa6..c8d2020 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
@@ -11,15 +11,15 @@ import org.slf4j.LoggerFactory;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
-/*
+/**
  * Test util for communication protocols.
- *
+ * 
  * <ul>
- * <li> The util defines Send and Receive Threads </li>
- * <li> SendThread sends out a pre-defined number of messages to all the partitions
</li>
- * <li> ReceiveThread receives all/most of these messages </li>
- * <li> To avoid the receiveThread waiting for ever, it spawns a TimerThread that would

- * interrupt after a pre-defined but long enough interval </li>
+ * <li>The util defines Send and Receive Threads</li>
+ * <li>SendThread sends out a pre-defined number of messages to all the partitions</li>
+ * <li>ReceiveThread receives all/most of these messages</li>
+ * <li>To avoid the receiveThread waiting for ever, it spawns a TimerThread that would
interrupt after a pre-defined but
+ * long enough interval</li>
  * </ul>
  * 
  */
@@ -53,15 +53,23 @@ public class PartitionInfo {
     }
 
     public class SendThread extends Thread {
+
+        public SendThread() {
+            super("SendThread");
+        }
+
         @Override
         public void run() {
             try {
                 for (int i = 0; i < numMessages; i++) {
                     for (int partition = 0; partition < emitter.getPartitionCount(); partition++)
{
                         byte[] message = new String(partitionId + " " + i).getBytes();
-                        while (!emitter.send(partition, message)) {
+                        if (!emitter.send(partition, message)) {
                             logger.debug("SendThread {}: Resending message to {}", partitionId,
partition);
                             Thread.sleep(interval);
+                            if (!emitter.send(partition, message)) {
+                                throw new RuntimeException("failed to send message");
+                            }
                         }
                     }
                 }
@@ -95,6 +103,7 @@ public class PartitionInfo {
         private Timer timer;
 
         ReceiveThread() {
+            super("ReceiveThread");
             receivedMessages = new int[emitter.getPartitionCount()];
             for (int i = 0; i < receivedMessages.length; i++)
                 receivedMessages[i] = -1;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0931075a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index ed4c0ce..39d0a49 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -1,13 +1,8 @@
 package org.apache.s4.core;
 
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
 import java.util.Arrays;
 
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,16 +24,19 @@ public class Main {
      * @param args
      */
     public static void main(String[] args) {
-
-        if (args.length == 0) {
-            logger.info("Starting S4 node with default configuration");
-            startDefaultS4Node();
-        } else if (args.length == 1) {
-            logger.info("Starting S4 node with custom configuration from file {}", args[0]);
-            startCustomS4Node(args[0]);
-        } else {
-            logger.info("Starting S4 node in development mode");
-            startDevelopmentMode(args);
+        try {
+            if (args.length == 0) {
+                logger.info("Starting S4 node with default configuration");
+                startDefaultS4Node();
+            } else if (args.length == 1) {
+                logger.info("Starting S4 node with custom configuration from file {}", args[0]);
+                startCustomS4Node(args[0]);
+            } else {
+                logger.info("Starting S4 node in development mode");
+                startDevelopmentMode(args);
+            }
+        } catch (Exception e) {
+            logger.error("S4 node failure", e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0931075a/subprojects/s4-core/src/test/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/default.s4.properties b/subprojects/s4-core/src/test/resources/default.s4.properties
index 62fc7d5..d014a12 100644
--- a/subprojects/s4-core/src/test/resources/default.s4.properties
+++ b/subprojects/s4-core/src/test/resources/default.s4.properties
@@ -7,3 +7,4 @@ cluster.name = s4-test-cluster
 cluster.zk_address = localhost:21810
 cluster.zk_session_timeout = 10000
 cluster.zk_connection_timeout = 10000
+tcp.partition.queue_size=256

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0931075a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
index d74927f..1b03f23 100644
--- a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
+++ b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
@@ -8,4 +8,5 @@ cluster.zk_session_timeout = 10000
 cluster.zk_connection_timeout = 10000
 comm.module = org.apache.s4.deploy.TestModule
 s4.logger_level = TRACE
-appsDir=/tmp/deploy-test
\ No newline at end of file
+appsDir=/tmp/deploy-test
+tcp.partition.queue_size=256
\ No newline at end of file


Mime
View raw message