cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1199010 - in /cassandra/trunk: ./ contrib/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/net/
Date Mon, 07 Nov 2011 23:35:04 GMT
Author: jbellis
Date: Mon Nov  7 23:35:04 2011
New Revision: 1199010

URL: http://svn.apache.org/viewvc?rev=1199010&view=rev
Log:
add message expiration logic to OutboundTcpConnection
patch by Melvin Wang and jbellis for CASSANDRA-3005

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java 
 (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props
changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
  (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov  7 23:35:04 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1196961,1197786
 /cassandra/branches/cassandra-0.8.0:1125021-1130369
 /cassandra/branches/cassandra-0.8.1:1101014-1125018
-/cassandra/branches/cassandra-1.0:1167085-1198999
+/cassandra/branches/cassandra-1.0:1167085-1199009
 /cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1199010&r1=1199009&r2=1199010&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Nov  7 23:35:04 2011
@@ -1,4 +1,5 @@
 1.1-dev
+ * add message expiration logic to OutboundTcpConnection (CASSANDRA-3005)
  * off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271)
  * EACH_QUORUM is only supported for writes (CASSANDRA-3272)
  * replace compactionlock use in schema migration by checking CFS.isInvalidD

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov  7 23:35:04 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1196961,1197786
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
-/cassandra/branches/cassandra-1.0/contrib:1167085-1198999
+/cassandra/branches/cassandra-1.0/contrib:1167085-1199009
 /cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov  7 23:35:04 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1196961,1197786
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1198999
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1199009
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov  7 23:35:04 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1196961,1197786
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1198999
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1199009
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov  7 23:35:04 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1196961,1197786
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1198999
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1199009
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov  7 23:35:04 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1196961,1197786
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1198999
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1199009
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov  7 23:35:04 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1196961,1197786
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1198999
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1199009
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1199010&r1=1199009&r2=1199010&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon Nov  7 23:35:04
2011
@@ -674,6 +674,14 @@ public final class MessagingService impl
         return completedTasks;
     }
 
+    public Map<String, Long> getCommandDroppedTasks()
+    {
+        Map<String, Long> droppedTasks = new HashMap<String, Long>();
+        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers_.entrySet())
+            droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().cmdCon.getDroppedMessages());
+        return droppedTasks;
+    }
+
     public Map<String, Integer> getResponsePendingTasks()
     {
         Map<String, Integer> pendingTasks = new HashMap<String, Integer>();

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingServiceMBean.java?rev=1199010&r1=1199009&r2=1199010&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingServiceMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingServiceMBean.java Mon Nov 
7 23:35:04 2011
@@ -41,6 +41,11 @@ public interface MessagingServiceMBean
     public Map<String, Long> getCommandCompletedTasks();
 
     /**
+     * Dropped tasks for Command(Mutations, Read etc) TCP Connections
+     */
+    public Map<String, Long> getCommandDroppedTasks();
+
+    /**
      * Pending tasks for Response(GOSSIP & RESPONSE) TCP Connections
      */
     public Map<String, Integer> getResponsePendingTasks();

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1199010&r1=1199009&r2=1199010&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Mon Nov 
7 23:35:04 2011
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.net.Socket;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
@@ -35,7 +36,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 
 public class OutboundTcpConnection extends Thread
 {
@@ -47,12 +47,18 @@ public class OutboundTcpConnection exten
                                                               MessagingService.version_);
 
     private static final int OPEN_RETRY_DELAY = 100; // ms between retries
-    private final BlockingQueue<Pair<Message, String>> queue = new LinkedBlockingQueue<Pair<Message,
String>>();
+
+    // sending thread reads from "active" (one of queue1, queue2) until it is empty.
+    // then it swaps it with "backlog."
+    private volatile BlockingQueue<Entry> backlog = new LinkedBlockingQueue<Entry>();
+    private volatile BlockingQueue<Entry> active = new LinkedBlockingQueue<Entry>();
+
     private final OutboundTcpConnectionPool poolReference;    
 
     private DataOutputStream out;
     private Socket socket;
-    private long completedCount;
+    private volatile long completed;
+    private final AtomicLong dropped = new AtomicLong();
 
     public OutboundTcpConnection(OutboundTcpConnectionPool pool)
     {
@@ -62,9 +68,10 @@ public class OutboundTcpConnection exten
 
     public void enqueue(Message message, String id)
     {
+        expireMessages();
         try
         {
-            queue.put(Pair.create(message, id));
+            backlog.put(new Entry(message, id, System.currentTimeMillis()));
         }
         catch (InterruptedException e)
         {
@@ -74,7 +81,8 @@ public class OutboundTcpConnection exten
 
     void closeSocket()
     {
-        queue.clear();
+        active.clear();
+        backlog.clear();
         enqueue(CLOSE_SENTINEL, null);
     }
 
@@ -82,30 +90,54 @@ public class OutboundTcpConnection exten
     {
         while (true)
         {
-            Pair<Message, String> pair = take();
-            Message m = pair.left;
-            String id = pair.right;
+            Entry entry = active.poll();
+            if (entry == null)
+            {
+                // exhausted the active queue.  switch to backlog, once there's something
to process there
+                try
+                {
+                    entry = backlog.take();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+
+                BlockingQueue<Entry> tmp = backlog;
+                backlog = active;
+                active = tmp;
+            }
+
+            Message m = entry.message;
+            String id = entry.id;
             if (m == CLOSE_SENTINEL)
             {
                 disconnect();
                 continue;
             }
-            if (socket != null || connect())
+            if (entry.timestamp < System.currentTimeMillis() - DatabaseDescriptor.getRpcTimeout())
+                dropped.incrementAndGet();
+            else if (socket != null || connect())
                 writeConnected(m, id);
             else
                 // clear out the queue, else gossip messages back up.
-                queue.clear();            
+                active.clear();
         }
     }
 
     public int getPendingMessages()
     {
-        return queue.size();
+        return active.size() + backlog.size();
     }
 
     public long getCompletedMesssages()
     {
-        return completedCount;
+        return completed;
+    }
+
+    public long getDroppedMessages()
+    {
+        return dropped.get();
     }
 
     private void writeConnected(Message message, String id)
@@ -113,7 +145,8 @@ public class OutboundTcpConnection exten
         try
         {
             write(message, id, out);
-            if (queue.peek() == null)
+            completed++;
+            if (active.peek() == null)
             {
                 out.flush();
             }
@@ -182,21 +215,6 @@ public class OutboundTcpConnection exten
         }
     }
 
-    private Pair<Message, String> take()
-    {
-        Pair<Message, String> pair;
-        try
-        {
-            pair = queue.take();
-            completedCount++;
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-        return pair;
-    }
-
     private boolean connect()
     {
         if (logger.isDebugEnabled())
@@ -229,4 +247,41 @@ public class OutboundTcpConnection exten
         }
         return false;
     }
+
+    private void expireMessages()
+    {
+        while (true)
+        {
+            Entry entry = backlog.peek();
+            if (entry == null || entry.timestamp >= System.currentTimeMillis() - DatabaseDescriptor.getRpcTimeout())
+                break;
+
+            Entry entry2 = backlog.poll();
+            if (entry2 != entry)
+            {
+                // sending thread switched queues.  add this entry (from the "new" backlog)
+                // at the end of the active queue, which keeps it in the same position relative
to the other entries
+                // without having to contend with other clients for the head-of-backlog lock.
+                if (entry2 != null)
+                    active.add(entry2);
+                break;
+            }
+
+            dropped.incrementAndGet();
+        }
+    }
+
+    private static class Entry
+    {
+        final Message message;
+        final String id;
+        final long timestamp;
+
+        Entry(Message message, String id, long timestamp)
+        {
+            this.message = message;
+            this.id = id;
+            this.timestamp = timestamp;
+        }
+    }
 }



Mime
View raw message