cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r929768 - in /cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra: db/ReadVerbHandler.java service/ConsistencyChecker.java service/StorageProxy.java
Date Thu, 01 Apr 2010 01:39:01 GMT
Author: jbellis
Date: Thu Apr  1 01:39:01 2010
New Revision: 929768

URL: http://svn.apache.org/viewvc?rev=929768&view=rev
Log:
keep the replica set constant throughout the read repair process.  patch by jbellis; reviewed
by gdusbabek for CASSANDRA-937

Modified:
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=929768&r1=929767&r2=929768&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
Thu Apr  1 01:39:01 2010
@@ -97,9 +97,7 @@ public class ReadVerbHandler implements 
             if (message.getHeader(ReadCommand.DO_REPAIR) != null)
             {
                 List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table,
command.key);
-                /* Remove the local storage endpoint from the list. */
-                endpoints.remove(FBUtilities.getLocalAddress());
-                if (endpoints.size() > 0)
+                if (endpoints.size() > 1)
                     StorageService.instance.doConsistencyCheck(row, endpoints, command);
             }
         }

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=929768&r1=929767&r2=929768&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Thu Apr  1 01:39:01 2010
@@ -31,10 +31,13 @@ import org.apache.log4j.Logger;
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.cache.ICacheExpungeHook;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Row;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -45,19 +48,18 @@ import org.apache.cassandra.utils.FBUtil
 class ConsistencyChecker implements Runnable
 {
 	private static Logger logger_ = Logger.getLogger(ConsistencyChecker.class);
-    private static long scheduledTimeMillis_ = 600;
-    private static ExpiringMap<String, String> readRepairTable_ = new ExpiringMap<String,
String>(scheduledTimeMillis_);
+    private static ExpiringMap<String, String> readRepairTable_ = new ExpiringMap<String,
String>(DatabaseDescriptor.getRpcTimeout());
 
     private final String table_;
     private final Row row_;
     protected final List<InetAddress> replicas_;
     private final ReadCommand readCommand_;
 
-    public ConsistencyChecker(String table, Row row, List<InetAddress> replicas, ReadCommand
readCommand)
+    public ConsistencyChecker(String table, Row row, List<InetAddress> endpoints, ReadCommand
readCommand)
     {
         table_ = table;
         row_ = row;
-        replicas_ = replicas;
+        replicas_ = endpoints;
         readCommand_ = readCommand;
     }
 
@@ -69,7 +71,13 @@ class ConsistencyChecker implements Runn
 			Message message = readCommandDigestOnly.makeReadMessage();
             if (logger_.isDebugEnabled())
               logger_.debug("Reading consistency digest for " + readCommand_.key + " from
" + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
-            MessagingService.instance.sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]),
new DigestResponseHandler());
+
+            MessagingService.instance.addCallback(new DigestResponseHandler(), message.getMessageId());
+            for (InetAddress endpoint : replicas_)
+            {
+                if (!endpoint.equals(FBUtilities.getLocalAddress()))
+                    MessagingService.instance.sendOneWay(message, endpoint);
+            }
 		}
 		catch (IOException ex)
 		{
@@ -86,48 +94,49 @@ class ConsistencyChecker implements Runn
 
     class DigestResponseHandler implements IAsyncCallback
 	{
-		Collection<Message> responses_ = new LinkedBlockingQueue<Message>();
+        private boolean repairInvoked;
 
-        // syncronized so "size() == " works
-		public synchronized void response(Message msg)
+		public synchronized void response(Message response)
 		{
-			responses_.add(msg);
-            if (responses_.size() != ConsistencyChecker.this.replicas_.size())
+            if (repairInvoked)
                 return;
 
-            for (Message response : responses_)
+            try
             {
-                try
+                byte[] body = response.getMessageBody();
+                ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+                ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+                byte[] digest = result.digest();
+
+                if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
                 {
-                    byte[] body = response.getMessageBody();
-                    ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
-                    ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
-                    byte[] digest = result.digest();
-                    if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
+                    IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(table_,
replicas_.size());
+                    IAsyncCallback responseHandler;
+                    if (replicas_.contains(FBUtilities.getLocalAddress()))
+                        responseHandler = new DataRepairHandler(row_, replicas_.size(), readResponseResolver);
+                    else
+                        responseHandler = new DataRepairHandler(replicas_.size(), readResponseResolver);
+
+                    ReadCommand readCommand = constructReadMessage(false);
+                    Message message = readCommand.makeReadMessage();
+                    if (logger_.isDebugEnabled())
+                      logger_.debug("Performing read repair for " + readCommand_.key + "
to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
+                    MessagingService.instance.addCallback(responseHandler, message.getMessageId());
+                    for (InetAddress endpoint : replicas_)
                     {
-                        doReadRepair();
-                        break;
+                        if (!endpoint.equals(FBUtilities.getLocalAddress()))
+                            MessagingService.instance.sendOneWay(message, endpoint);
                     }
+
+                    repairInvoked = true;
                 }
-                catch (Exception e)
-                {
-                    throw new RuntimeException("Error handling responses for " + row_, e);
-                }
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException("Error handling responses for " + row_, e);
             }
         }
-
-        private void doReadRepair() throws IOException
-		{
-            replicas_.add(FBUtilities.getLocalAddress());
-            IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(table_,
replicas_.size());
-            IAsyncCallback responseHandler = new DataRepairHandler(replicas_.size(), readResponseResolver);
-            ReadCommand readCommand = constructReadMessage(false);
-            Message message = readCommand.makeReadMessage();
-            if (logger_.isDebugEnabled())
-              logger_.debug("Performing read repair for " + readCommand_.key + " to " + message.getMessageId()
+ "@[" + StringUtils.join(replicas_, ", ") + "]");
-            MessagingService.instance.sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]),
responseHandler);
-		}
-	}
+    }
 
 	static class DataRepairHandler implements IAsyncCallback, ICacheExpungeHook<String, String>
 	{
@@ -141,6 +150,18 @@ class ConsistencyChecker implements Runn
 			majority_ = (responseCount / 2) + 1;  
 		}
 
+        public DataRepairHandler(Row localRow, int responseCount, IResponseResolver<Row>
readResponseResolver) throws IOException
+        {
+            this(responseCount, readResponseResolver);
+            // wrap localRow in a response Message so it doesn't need to be special-cased
in the resolver
+            ReadResponse readResponse = new ReadResponse(localRow);
+            DataOutputBuffer out = new DataOutputBuffer();
+            ReadResponse.serializer().serialize(readResponse, out);
+            byte[] bytes = new byte[out.getLength()];
+            System.arraycopy(out.getData(), 0, bytes, 0, bytes.length);
+            responses_.add(new Message(FBUtilities.getLocalAddress(), StageManager.RESPONSE_STAGE,
StorageService.Verb.READ_RESPONSE, bytes));
+        }
+
         // synchronized so the " == majority" is safe
 		public synchronized void response(Message message)
 		{

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=929768&r1=929767&r2=929768&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
Thu Apr  1 01:39:01 2010
@@ -763,9 +763,7 @@ public class StorageProxy implements Sto
             if (DatabaseDescriptor.getConsistencyCheck())
             {
                 List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table,
command.key);
-                /* Remove the local storage endpoint from the list. */
-                endpoints.remove(FBUtilities.getLocalAddress());
-                if (endpoints.size() > 0)
+                if (endpoints.size() > 1)
                     StorageService.instance.doConsistencyCheck(row, endpoints, command);
             }
 



Mime
View raw message