cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1104598 - in /cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra: net/MessagingService.java utils/ExpiringMap.java
Date Tue, 17 May 2011 22:14:35 GMT
Author: jbellis
Date: Tue May 17 22:14:35 2011
New Revision: 1104598

URL: http://svn.apache.org/viewvc?rev=1104598&view=rev
Log:
add per-callback timeouts to ExpiringMap

Modified:
    cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/utils/ExpiringMap.java

Modified: cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java?rev=1104598&r1=1104597&r2=1104598&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/net/MessagingService.java
Tue May 17 22:14:35 2011
@@ -83,6 +83,7 @@ public final class MessagingService impl
     private final SimpleCondition listenGate;
     private final Map<StorageService.Verb, AtomicInteger> droppedMessages = new EnumMap<StorageService.Verb,
AtomicInteger>(StorageService.Verb.class);
     private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
+    private static final long DEFAULT_CALLBACK_TIMEOUT = (long) (1.1 * DatabaseDescriptor.getRpcTimeout());
 
     {
         for (StorageService.Verb verb : StorageService.Verb.values())
@@ -121,7 +122,7 @@ public final class MessagingService impl
                 return null;
             }
         };
-        callbacks = new ExpiringMap<String, Pair<InetAddress, IMessageCallback>>((long)
(1.1 * DatabaseDescriptor.getRpcTimeout()), timeoutReporter);
+        callbacks = new ExpiringMap<String, Pair<InetAddress, IMessageCallback>>(DEFAULT_CALLBACK_TIMEOUT,
timeoutReporter);
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
@@ -256,7 +257,12 @@ public final class MessagingService impl
 
     private void addCallback(IMessageCallback cb, String messageId, InetAddress to)
     {
-        Pair<InetAddress, IMessageCallback> previous = callbacks.put(messageId, new
Pair<InetAddress, IMessageCallback>(to, cb));
+        addCallback(cb, messageId, to, DEFAULT_CALLBACK_TIMEOUT);
+    }
+
+    private void addCallback(IMessageCallback cb, String messageId, InetAddress to, long
timeout)
+    {
+        Pair<InetAddress, IMessageCallback> previous = callbacks.put(messageId, new
Pair<InetAddress, IMessageCallback>(to, cb), timeout);
         assert previous == null;
     }
     
@@ -267,6 +273,14 @@ public final class MessagingService impl
         return Integer.toString(idGen.incrementAndGet());
     }
 
+    /*
+     * @see #sendRR(Message message, InetAddress to, IMessageCallback cb, long timeout)
+     */
+    public String sendRR(Message message, InetAddress to, IMessageCallback cb)
+    {
+        return sendRR(message, to, cb, DEFAULT_CALLBACK_TIMEOUT);
+    }
+
     /**
      * Send a message to a given endpoint. This method specifies a callback
      * which is invoked with the actual response.
@@ -275,12 +289,13 @@ public final class MessagingService impl
      * @param cb callback interface which is used to pass the responses or
      *           suggest that a timeout occurred to the invoker of the send().
      *           suggest that a timeout occurred to the invoker of the send().
+     * @param timeout the timeout used for expiration
      * @return an reference to message id used to match with the result
      */
-    public String sendRR(Message message, InetAddress to, IMessageCallback cb)
+    public String sendRR(Message message, InetAddress to, IMessageCallback cb, long timeout)
     {
         String id = nextId();
-        addCallback(cb, id, to);
+        addCallback(cb, id, to, timeout);
         sendOneWay(message, id, to);
         return id;
     }
@@ -624,4 +639,9 @@ public final class MessagingService impl
             completedTasks.put(entry.getKey().getHostAddress(), entry.getValue().ackCon.getCompletedMesssages());
         return completedTasks;
     }
+
+    public static long getDefaultCallbackTimeout()
+    {
+        return DEFAULT_CALLBACK_TIMEOUT;
+    }
 }

Modified: cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1104598&r1=1104597&r2=1104598&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/utils/ExpiringMap.java
(original)
+++ cassandra/branches/cassandra-0.8.1/src/java/org/apache/cassandra/utils/ExpiringMap.java
Tue May 17 22:14:35 2011
@@ -32,11 +32,13 @@ public class ExpiringMap<K, V>
     {
         private final T value;
         private final long age;
+        private final long expiration;
 
-        CacheableObject(T o)
+        CacheableObject(T o, long e)
         {
             assert o != null;
             value = o;
+            expiration = e;
             age = System.currentTimeMillis();
         }
 
@@ -45,26 +47,21 @@ public class ExpiringMap<K, V>
             return value;
         }
 
-        boolean isReadyToDie(long expiration)
+        boolean isReadyToDie(long start)
         {
-            return ((System.currentTimeMillis() - age) > expiration);
+            return ((start - age) > expiration);
         }
     }
 
     private class CacheMonitor extends TimerTask
     {
-        private final long expiration;
-
-        CacheMonitor(long expiration)
-        {
-            this.expiration = expiration;
-        }
 
         public void run()
         {
+            long start = System.currentTimeMillis();
             for (Map.Entry<K, CacheableObject<V>> entry : cache.entrySet())
             {
-                if (entry.getValue().isReadyToDie(expiration))
+                if (entry.getValue().isReadyToDie(start))
                 {
                     cache.remove(entry.getKey());
                     if (postExpireHook != null)
@@ -77,6 +74,7 @@ public class ExpiringMap<K, V>
     private final NonBlockingHashMap<K, CacheableObject<V>> cache = new NonBlockingHashMap<K,
CacheableObject<V>>();
     private final Timer timer;
     private static int counter = 0;
+    private final long expiration;
 
     public ExpiringMap(long expiration)
     {
@@ -90,13 +88,15 @@ public class ExpiringMap<K, V>
     public ExpiringMap(long expiration, Function<Pair<K,V>, ?> postExpireHook)
     {
         this.postExpireHook = postExpireHook;
+        this.expiration = expiration;
+
         if (expiration <= 0)
         {
             throw new IllegalArgumentException("Argument specified must be a positive number");
         }
 
         timer = new Timer("EXPIRING-MAP-TIMER-" + (++counter), true);
-        timer.schedule(new CacheMonitor(expiration), expiration / 2, expiration / 2);
+        timer.schedule(new CacheMonitor(), expiration / 2, expiration / 2);
     }
 
     public void shutdown()
@@ -106,7 +106,12 @@ public class ExpiringMap<K, V>
 
     public V put(K key, V value)
     {
-        CacheableObject<V> previous = cache.put(key, new CacheableObject<V>(value));
+        return put(key, value, this.expiration);
+    }
+
+    public V put(K key, V value, long timeout)
+    {
+        CacheableObject<V> previous = cache.put(key, new CacheableObject<V>(value,
timeout));
         return (previous == null) ? null : previous.getValue();
     }
 



Mime
View raw message