cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1153115 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/
Date Tue, 02 Aug 2011 13:27:14 GMT
Author: jbellis
Date: Tue Aug  2 13:27:10 2011
New Revision: 1153115

URL: http://svn.apache.org/viewvc?rev=1153115&view=rev
Log:
fix "short reads" in [multi]get
patch by Byron Clark; reviewed by jbellis for CASSANDRA-2643

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1153115&r1=1153114&r2=1153115&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Aug  2 13:27:10 2011
@@ -25,6 +25,7 @@
  * fix potential use of free'd native memory in SerializingCache 
    (CASSANDRA-1951)
  * add paging to get_count (CASSANDRA-2894)
+ * fix "short reads" in [multi]get (CASSANDRA-2643)
 
 
 0.8.3

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java?rev=1153115&r1=1153114&r2=1153115&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java Tue Aug
 2 13:27:10 2011
@@ -225,6 +225,19 @@ public abstract class AbstractColumnCont
         return getColumnCount();
     }
 
+    public int getLiveColumnCount()
+    {
+        int count = 0;
+
+        for (IColumn column : columns.values())
+        {
+            if (column.isLive())
+                count++;
+        }
+
+        return count;
+    }
+
     public Iterator<IColumn> iterator()
     {
         return columns.values().iterator();

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java?rev=1153115&r1=1153114&r2=1153115&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java Tue Aug
 2 13:27:10 2011
@@ -83,4 +83,9 @@ public abstract class AbstractRowResolve
     {
         return replies.keySet();
     }
+
+    public int getMaxLiveColumns()
+    {
+        throw new UnsupportedOperationException();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1153115&r1=1153114&r2=1153115&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java Tue Aug 
2 13:27:10 2011
@@ -43,4 +43,6 @@ public interface IResponseResolver<T> {
 
     public void preprocess(Message message);
     public Iterable<Message> getMessages();
+
+    public int getMaxLiveColumns();
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1153115&r1=1153114&r2=1153115&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
Tue Aug  2 13:27:10 2011
@@ -158,4 +158,9 @@ public class RangeSliceResponseResolver 
     {
         return responses;
     }
+
+    public int getMaxLiveColumns()
+    {
+        throw new UnsupportedOperationException();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java?rev=1153115&r1=1153114&r2=1153115&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java Tue Aug  2 13:27:10
2011
@@ -82,4 +82,9 @@ public class RepairCallback<T> implement
     {
         return true;
     }
+
+    public int getMaxLiveColumns()
+    {
+        return resolver.getMaxLiveColumns();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1153115&r1=1153114&r2=1153115&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java Tue Aug 
2 13:27:10 2011
@@ -37,6 +37,8 @@ import org.apache.cassandra.utils.*;
 
 public class RowRepairResolver extends AbstractRowResolver
 {
+    protected int maxLiveColumns = 0;
+
     public RowRepairResolver(String table, ByteBuffer key)
     {
         super(key, table);
@@ -76,6 +78,12 @@ public class RowRepairResolver extends A
         ColumnFamily resolved;
         if (versions.size() > 1)
         {
+            for (ColumnFamily cf : versions)
+            {
+                int liveColumns = cf.getLiveColumnCount();
+                if (liveColumns > maxLiveColumns)
+                    maxLiveColumns = liveColumns;
+            }
             resolved = resolveSuperset(versions);
             if (logger.isDebugEnabled())
                 logger.debug("versions merged");
@@ -90,8 +98,9 @@ public class RowRepairResolver extends A
 
         if (logger.isDebugEnabled())
             logger.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
-		return new Row(key, resolved);
-	}
+
+        return new Row(key, resolved);
+    }
 
     /**
      * For each row version, compare with resolved (the superset of all row versions);
@@ -163,4 +172,9 @@ public class RowRepairResolver extends A
 	{
         throw new UnsupportedOperationException();
     }
+
+    public int getMaxLiveColumns()
+    {
+        return maxLiveColumns;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1153115&r1=1153114&r2=1153115&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue Aug  2 13:27:10
2011
@@ -501,117 +501,172 @@ public class StorageProxy implements Sto
     {
         List<ReadCallback<Row>> readCallbacks = new ArrayList<ReadCallback<Row>>();
         List<Row> rows = new ArrayList<Row>();
+        List<ReadCommand> commandsToRetry = Collections.emptyList();
+        List<ReadCommand> repairCommands = Collections.emptyList();
 
-        // send out read requests
-        for (ReadCommand command: commands)
+        do
         {
-            assert !command.isDigestQuery();
-            logger.debug("Command/ConsistencyLevel is {}/{}", command, consistency_level);
+            List<ReadCommand> commandsToSend = commandsToRetry.isEmpty() ? commands
: commandsToRetry;
 
-            List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table,
command.key);
-            DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(),
endpoints);
+            if (!commandsToRetry.isEmpty())
+                logger.debug("Retrying {} commands", commandsToRetry.size());
 
-            RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
-            ReadCallback<Row> handler = getReadCallback(resolver, command, consistency_level,
endpoints);
-            handler.assureSufficientLiveNodes();
-            assert !handler.endpoints.isEmpty();
-
-            // The data-request message is sent to dataPoint, the node that will actually
get
-            // the data for us. The other replicas are only sent a digest query.
-            ReadCommand digestCommand = null;
-            if (handler.endpoints.size() > 1)
+            // send out read requests
+            for (ReadCommand command : commandsToSend)
             {
-                digestCommand = command.copy();
-                digestCommand.setDigestQuery(true);
-            }
+                assert !command.isDigestQuery();
+                logger.debug("Command/ConsistencyLevel is {}/{}", command, consistency_level);
 
-            InetAddress dataPoint = handler.endpoints.get(0);
-            if (dataPoint.equals(FBUtilities.getBroadcastAddress()))
-            {
-                logger.debug("reading data locally");
-                StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command,
handler));
-            }
-            else
-            {
-                logger.debug("reading data from {}", dataPoint);
-                MessagingService.instance().sendRR(command, dataPoint, handler);
-            }
+                List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table,
+                                                                                        
     command.key);
+                DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(),
endpoints);
 
-            // We lazy-construct the digest Message object since it may not be necessary
if we
-            // are doing a local digest read, or no digest reads at all.
-            MessageProducer producer = new CachingMessageProducer(digestCommand);
-            for (InetAddress digestPoint : handler.endpoints.subList(1, handler.endpoints.size()))
-            {
-                if (digestPoint.equals(FBUtilities.getBroadcastAddress()))
+                RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
+                ReadCallback<Row> handler = getReadCallback(resolver, command, consistency_level,
endpoints);
+                handler.assureSufficientLiveNodes();
+                assert !handler.endpoints.isEmpty();
+
+                // The data-request message is sent to dataPoint, the node that will actually
get
+                // the data for us. The other replicas are only sent a digest query.
+                ReadCommand digestCommand = null;
+                if (handler.endpoints.size() > 1)
+                {
+                    digestCommand = command.copy();
+                    digestCommand.setDigestQuery(true);
+                }
+
+                InetAddress dataPoint = handler.endpoints.get(0);
+                if (dataPoint.equals(FBUtilities.getBroadcastAddress()))
                 {
-                    logger.debug("reading digest locally");
-                    StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand,
handler));
+                    logger.debug("reading data locally");
+                    StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command,
handler));
                 }
                 else
                 {
-                    logger.debug("reading digest from {}", digestPoint);
-                    MessagingService.instance().sendRR(producer, digestPoint, handler);
+                    logger.debug("reading data from {}", dataPoint);
+                    MessagingService.instance().sendRR(command, dataPoint, handler);
                 }
-            }
-
-            readCallbacks.add(handler);
-        }
 
-        // read results and make a second pass for any digest mismatches
-        List<RepairCallback<Row>> repairResponseHandlers = null;
-        for (int i = 0; i < commands.size(); i++)
-        {
-            ReadCallback<Row> handler = readCallbacks.get(i);
-            Row row;
-            ReadCommand command = commands.get(i);
-            try
-            {
-                long startTime2 = System.currentTimeMillis();
-                row = handler.get(); // CL.ONE is special cased here to ignore digests even
if some have arrived
-                if (row != null)
-                    rows.add(row);
+                // We lazy-construct the digest Message object since it may not be necessary
if we
+                // are doing a local digest read, or no digest reads at all.
+                MessageProducer producer = new CachingMessageProducer(digestCommand);
+                for (InetAddress digestPoint : handler.endpoints.subList(1, handler.endpoints.size()))
+                {
+                    if (digestPoint.equals(FBUtilities.getBroadcastAddress()))
+                    {
+                        logger.debug("reading digest locally");
+                        StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand,
handler));
+                    }
+                    else
+                    {
+                        logger.debug("reading digest from {}", digestPoint);
+                        MessagingService.instance().sendRR(producer, digestPoint, handler);
+                    }
+                }
 
-                if (logger.isDebugEnabled())
-                    logger.debug("Read: " + (System.currentTimeMillis() - startTime2) + "
ms.");
-            }
-            catch (TimeoutException ex)
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("Read timeout: {}", ex.toString());
-                throw ex;
+                readCallbacks.add(handler);
             }
-            catch (DigestMismatchException ex)
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("Digest mismatch: {}", ex.toString());
-                RowRepairResolver resolver = new RowRepairResolver(command.table, command.key);
-                RepairCallback<Row> repairHandler = new RepairCallback<Row>(resolver,
handler.endpoints);
-                for (InetAddress endpoint : handler.endpoints)
-                    MessagingService.instance().sendRR(command, endpoint, repairHandler);
 
-                if (repairResponseHandlers == null)
-                    repairResponseHandlers = new ArrayList<RepairCallback<Row>>();
-                repairResponseHandlers.add(repairHandler);
-            }
-        }
+            if (repairCommands != Collections.EMPTY_LIST)
+                repairCommands.clear();
 
-        // read the results for the digest mismatch retries
-        if (repairResponseHandlers != null)
-        {
-            for (RepairCallback<Row> handler : repairResponseHandlers)
+            // read results and make a second pass for any digest mismatches
+            List<RepairCallback<Row>> repairResponseHandlers = null;
+            for (int i = 0; i < commandsToSend.size(); i++)
             {
+                ReadCallback<Row> handler = readCallbacks.get(i);
+                Row row;
+                ReadCommand command = commands.get(i);
                 try
                 {
-                    Row row = handler.get();
+                    long startTime2 = System.currentTimeMillis();
+                    row = handler.get(); // CL.ONE is special cased here to ignore digests
even if some have arrived
                     if (row != null)
                         rows.add(row);
+
+                    if (logger.isDebugEnabled())
+                        logger.debug("Read: " + (System.currentTimeMillis() - startTime2)
+ " ms.");
                 }
-                catch (DigestMismatchException e)
+                catch (TimeoutException ex)
                 {
-                    throw new AssertionError(e); // full data requested from each node here,
no digests should be sent
+                    if (logger.isDebugEnabled())
+                        logger.debug("Read timeout: {}", ex.toString());
+                    throw ex;
+                }
+                catch (DigestMismatchException ex)
+                {
+                    if (logger.isDebugEnabled())
+                        logger.debug("Digest mismatch: {}", ex.toString());
+                    RowRepairResolver resolver = new RowRepairResolver(command.table, command.key);
+                    RepairCallback<Row> repairHandler = new RepairCallback<Row>(resolver,
handler.endpoints);
+
+                    if (repairCommands == Collections.EMPTY_LIST)
+                        repairCommands = new ArrayList<ReadCommand>();
+                    repairCommands.add(command);
+
+                    for (InetAddress endpoint : handler.endpoints)
+                        MessagingService.instance().sendRR(command, endpoint, repairHandler);
+
+                    if (repairResponseHandlers == null)
+                        repairResponseHandlers = new ArrayList<RepairCallback<Row>>();
+                    repairResponseHandlers.add(repairHandler);
                 }
             }
-        }
+
+            if (commandsToRetry != Collections.EMPTY_LIST)
+                commandsToRetry.clear();
+
+            // read the results for the digest mismatch retries
+            if (repairResponseHandlers != null)
+            {
+                for (int i = 0; i < repairCommands.size(); i++)
+                {
+                    ReadCommand command = repairCommands.get(i);
+                    RepairCallback<Row> handler = repairResponseHandlers.get(i);
+
+                    try
+                    {
+                        Row row = handler.get();
+
+                        if (command instanceof SliceFromReadCommand)
+                        {
+                            // short reads are only possible on SliceFromReadCommand
+                            SliceFromReadCommand sliceCommand = (SliceFromReadCommand)command;
+                            int maxLiveColumns = handler.getMaxLiveColumns();
+                            int liveColumnsInRow = row != null ? row.cf.getLiveColumnCount()
: 0;
+
+                            assert maxLiveColumns <= sliceCommand.count;
+                            if ((maxLiveColumns == sliceCommand.count) && (liveColumnsInRow
< sliceCommand.count))
+                            {
+                                if (logger.isDebugEnabled())
+                                    logger.debug("detected short read: expected {} columns,
but only resolved {} columns",
+                                                 sliceCommand.count, liveColumnsInRow);
+
+                                int retryCount = sliceCommand.count + sliceCommand.count
- liveColumnsInRow;
+                                SliceFromReadCommand retryCommand = new SliceFromReadCommand(command.table,
+                                                                                        
    command.key,
+                                                                                        
    command.queryPath,
+                                                                                        
    sliceCommand.start,
+                                                                                        
    sliceCommand.finish,
+                                                                                        
    sliceCommand.reversed,
+                                                                                        
    retryCount);
+                                if (commandsToRetry == Collections.EMPTY_LIST)
+                                    commandsToRetry = new ArrayList<ReadCommand>();
+                                commandsToRetry.add(retryCommand);
+                            }
+                            else if (row != null)
+                                rows.add(row);
+                        }
+                        else if (row != null)
+                            rows.add(row);
+                    }
+                    catch (DigestMismatchException e)
+                    {
+                        throw new AssertionError(e); // full data requested from each node
here, no digests should be sent
+                    }
+                }
+            }
+        } while (!commandsToRetry.isEmpty());
 
         return rows;
     }



Mime
View raw message