cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [1/2] git commit: adjust blockFor calculation to account for pending ranges due to node movement patch by jbellis and slebresne for CASSANDRA-833
Date Fri, 21 Sep 2012 15:13:21 GMT
Updated Branches:
  refs/heads/trunk f92fb2241 -> dc97a8ff5


adjust blockFor calculation to account for pending ranges due to node movement
patch by jbellis and slebresne for CASSANDRA-833


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc97a8ff
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc97a8ff
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc97a8ff

Branch: refs/heads/trunk
Commit: dc97a8ff524ce9fa56e3dc0932d9074006495596
Parents: 84c0657
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Thu Sep 20 17:02:52 2012 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Fri Sep 21 10:11:29 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    6 +-
 .../org/apache/cassandra/db/BatchlogManager.java   |    8 ++-
 .../org/apache/cassandra/db/CounterColumn.java     |   10 ++-
 .../locator/AbstractReplicationStrategy.java       |   14 ++--
 .../apache/cassandra/locator/TokenMetadata.java    |   29 +++----
 .../service/AbstractWriteResponseHandler.java      |   17 +++-
 .../DatacenterSyncWriteResponseHandler.java        |   14 ++--
 .../service/DatacenterWriteResponseHandler.java    |   14 ++--
 .../org/apache/cassandra/service/StorageProxy.java |   65 +++++++--------
 .../cassandra/service/WriteResponseHandler.java    |   28 +++----
 10 files changed, 108 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc97a8ff/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e213c7f..f77a889 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+1.2-beta2
+ * adjust blockFor calculation to account for pending ranges due to node 
+   movement (CASSANDRA-833)
+
 1.2-beta1
  * add atomic_batch_mutate (CASSANDRA-4542, -4635)
  * increase default max_hint_window_in_ms to 3h (CASSANDRA-4632)
@@ -114,6 +118,7 @@ Merged from 1.0:
  * Fix multiple values for CurrentLocal NodeID (CASSANDRA-4626)
 
 
+1.1.3
  * munmap commitlog segments before rename (CASSANDRA-4337)
  * (JMX) rename getRangeKeySample to sampleKeyRange to avoid returning
    multi-MB results as an attribute (CASSANDRA-4452)
@@ -1179,7 +1184,6 @@ Merged from 0.8:
  * workaround large resultsets causing large allocation retention
    by nio sockets (CASSANDRA-2654)
  * fix nodetool ring use with Ec2Snitch (CASSANDRA-2733)
- * fix inconsistency window during bootstrap (CASSANDRA-833)
  * fix removing columns and subcolumns that are supressed by a row or
    supercolumn tombstone during replica resolution (CASSANDRA-2590)
  * support sstable2json against snapshot sstables (CASSANDRA-2386)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc97a8ff/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index ded1ca4..f732ece 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -29,6 +29,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,6 +45,7 @@ import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageProxy;
@@ -221,7 +223,11 @@ public class BatchlogManager implements BatchlogManagerMBean
 
     private static void writeHintsForMutation(RowMutation mutation) throws IOException
     {
-        for (InetAddress target : StorageProxy.getWriteEndpoints(mutation.getTable(), mutation.key()))
+        String table = mutation.getTable();
+        Token tk = StorageService.getPartitioner().getToken(mutation.key());
+        List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table,
tk);
+        Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk,
table);
+        for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints))
         {
             if (target.equals(FBUtilities.getBroadcastAddress()))
                 mutation.apply();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc97a8ff/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
index 33f391e..31e9c36 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -21,8 +21,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.net.InetAddress;
 import java.security.MessageDigest;
-import java.util.Collection;
+import java.util.Set;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -366,14 +368,14 @@ public class CounterColumn extends Column
 
         StorageProxy.performWrite(rm, ConsistencyLevel.ANY, localDataCenter, new StorageProxy.WritePerformer()
         {
-            public void apply(IMutation mutation, Collection<InetAddress> targets,
AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level)
+            public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler
responseHandler, String localDataCenter, ConsistencyLevel consistency_level)
             throws IOException, OverloadedException
             {
                 // We should only send to the remote replica, not the local one
-                targets.remove(local);
+                Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets),
ImmutableSet.of(local));
                 // Fake local response to be a good lad but we won't wait on the responseHandler
                 responseHandler.response(null);
-                StorageProxy.sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler,
localDataCenter, consistency_level);
+                StorageProxy.sendToHintedEndpoints((RowMutation) mutation, remotes, responseHandler,
localDataCenter, consistency_level);
             }
         }, null);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc97a8ff/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index 2a68207..630aac4 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -26,15 +26,15 @@ import com.google.common.collect.Multimap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.RingPosition;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.AbstractWriteResponseHandler;
 import org.apache.cassandra.service.DatacenterSyncWriteResponseHandler;
 import org.apache.cassandra.service.DatacenterWriteResponseHandler;
-import org.apache.cassandra.service.AbstractWriteResponseHandler;
 import org.apache.cassandra.service.WriteResponseHandler;
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.utils.FBUtilities;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
@@ -114,18 +114,18 @@ public abstract class AbstractReplicationStrategy
      */
     public abstract List<InetAddress> calculateNaturalEndpoints(Token searchToken,
TokenMetadata tokenMetadata);
 
-    public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress>
writeEndpoints, ConsistencyLevel consistency_level, Runnable callback)
+    public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress>
naturalEndpoints, Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistency_level,
Runnable callback)
     {
         if (consistency_level == ConsistencyLevel.LOCAL_QUORUM)
         {
             // block for in this context will be localnodes block.
-            return DatacenterWriteResponseHandler.create(writeEndpoints, consistency_level,
table, callback);
+            return DatacenterWriteResponseHandler.create(naturalEndpoints, pendingEndpoints,
consistency_level, table, callback);
         }
         else if (consistency_level == ConsistencyLevel.EACH_QUORUM)
         {
-            return DatacenterSyncWriteResponseHandler.create(writeEndpoints, consistency_level,
table, callback);
+            return DatacenterSyncWriteResponseHandler.create(naturalEndpoints, pendingEndpoints,
consistency_level, table, callback);
         }
-        return WriteResponseHandler.create(writeEndpoints, consistency_level, table, callback);
+        return WriteResponseHandler.create(naturalEndpoints, pendingEndpoints, consistency_level,
table, callback);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc97a8ff/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 5a38ef4..cee920c 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -917,35 +917,32 @@ public class TokenMetadata
         subscribers.remove(subscriber);
     }
 
-    /**
-     * write endpoints may be different from read endpoints, because read endpoints only
need care about the
-     * "natural" nodes for a token, but write endpoints also need to account for nodes that
are bootstrapping
-     * into the ring, and write data there too so that they stay up to date during the bootstrap
process.
-     * Thus, this method may return more nodes than the Replication Factor.
-     *
-     * If possible, will return the same collection it was passed, for efficiency.
-     *
-     * Only ReplicationStrategy should care about this method (higher level users should
only ask for Hinted).
-     */
-    public Collection<InetAddress> getWriteEndpoints(Token token, String table, Collection<InetAddress>
naturalEndpoints)
+    public Collection<InetAddress> pendingEndpointsFor(Token token, String table)
     {
         Map<Range<Token>, Collection<InetAddress>> ranges = getPendingRanges(table);
         if (ranges.isEmpty())
-            return naturalEndpoints;
-
-        Set<InetAddress> endpoints = new HashSet<InetAddress>(naturalEndpoints);
+            return Collections.emptyList();
 
+        Set<InetAddress> endpoints = new HashSet<InetAddress>();
         for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : ranges.entrySet())
         {
             if (entry.getKey().contains(token))
-            {
                 endpoints.addAll(entry.getValue());
-            }
         }
 
         return endpoints;
     }
 
+    /**
+     * @Deprecated; retained for benefit of old tests
+     */
+    public Collection<InetAddress> getWriteEndpoints(Token token, String table, Collection<InetAddress>
naturalEndpoints)
+    {
+        ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>();
+        Iterables.addAll(endpoints, Iterables.concat(naturalEndpoints, pendingEndpointsFor(token,
table)));
+        return endpoints;
+    }
+
     /** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy)
*/
     public Multimap<InetAddress, Token> getEndpointToTokenMapForReading()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc97a8ff/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 3fc5b7b..76eeb0d 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -33,19 +33,21 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
 {
     private final SimpleCondition condition = new SimpleCondition();
     protected final long startTime;
-    protected final Collection<InetAddress> writeEndpoints;
+    protected final Collection<InetAddress> naturalEndpoints;
     protected final ConsistencyLevel consistencyLevel;
     protected final Runnable callback;
+    protected final Collection<InetAddress> pendingEndpoints;
 
     /**
+     * @param pendingEndpoints
      * @param callback A callback to be called when the write is successful.
-     * Note that this callback will *not* be called in case of an exception (timeout or unavailable).
      */
-    protected AbstractWriteResponseHandler(Collection<InetAddress> writeEndpoints,
ConsistencyLevel consistencyLevel, Runnable callback)
+    protected AbstractWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistencyLevel, Runnable
callback)
     {
+        this.pendingEndpoints = pendingEndpoints;
         startTime = System.currentTimeMillis();
         this.consistencyLevel = consistencyLevel;
-        this.writeEndpoints = writeEndpoints;
+        this.naturalEndpoints = naturalEndpoints;
         this.callback = callback;
     }
 
@@ -69,7 +71,12 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
 
     protected abstract int ackCount();
 
-    protected abstract int blockFor();
+    protected int blockFor()
+    {
+        return blockForCL() + pendingEndpoints.size();
+    }
+
+    protected abstract int blockForCL();
 
     /** null message means "response from local write" */
     public abstract void response(MessageIn msg);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc97a8ff/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index 08b9c48..ffee975 100644
--- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -23,6 +23,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.collect.Iterables;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.exceptions.UnavailableException;
@@ -50,10 +52,10 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
     private final NetworkTopologyStrategy strategy;
     private final HashMap<String, AtomicInteger> responses = new HashMap<String,
AtomicInteger>();
 
-    protected DatacenterSyncWriteResponseHandler(Collection<InetAddress> writeEndpoints,
ConsistencyLevel consistencyLevel, String table, Runnable callback)
+    protected DatacenterSyncWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistencyLevel, String
table, Runnable callback)
     {
         // Response is been managed by the map so make it 1 for the superclass.
-        super(writeEndpoints, consistencyLevel, callback);
+        super(naturalEndpoints, pendingEndpoints, consistencyLevel, callback);
         assert consistencyLevel == ConsistencyLevel.EACH_QUORUM;
 
         this.table = table;
@@ -66,9 +68,9 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
         }
     }
 
-    public static AbstractWriteResponseHandler create(Collection<InetAddress> writeEndpoints,
ConsistencyLevel consistencyLevel, String table, Runnable callback)
+    public static AbstractWriteResponseHandler create(Collection<InetAddress> naturalEndpoints,
Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistencyLevel, String
table, Runnable callback)
     {
-        return new DatacenterSyncWriteResponseHandler(writeEndpoints, consistencyLevel, table,
callback);
+        return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints,
consistencyLevel, table, callback);
     }
 
     public void response(MessageIn message)
@@ -89,7 +91,7 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
         signal();
     }
 
-    protected int blockFor()
+    protected int blockForCL()
     {
         return consistencyLevel.blockFor(table);
     }
@@ -112,7 +114,7 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
         for (String dc: strategy.getDatacenters())
             dcEndpoints.put(dc, new AtomicInteger());
 
-        for (InetAddress destination : writeEndpoints)
+        for (InetAddress destination : Iterables.concat(naturalEndpoints, pendingEndpoints))
         {
             if (FailureDetector.instance.isAlive(destination))
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc97a8ff/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index ad984a1..14e2b5f 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -20,12 +20,12 @@ package org.apache.cassandra.service;
 import java.net.InetAddress;
 import java.util.Collection;
 
+import com.google.common.collect.Iterables;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Table;
 import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.locator.IEndpointSnitch;
-import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.utils.FBUtilities;
@@ -43,15 +43,15 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler
         localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
     }
 
-    protected DatacenterWriteResponseHandler(Collection<InetAddress> writeEndpoints,
ConsistencyLevel consistencyLevel, String table, Runnable callback)
+    protected DatacenterWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistencyLevel, String
table, Runnable callback)
     {
-        super(writeEndpoints, consistencyLevel, table, callback);
+        super(naturalEndpoints, pendingEndpoints, consistencyLevel, table, callback);
         assert consistencyLevel == ConsistencyLevel.LOCAL_QUORUM;
     }
 
-    public static AbstractWriteResponseHandler create(Collection<InetAddress> writeEndpoints,
ConsistencyLevel consistencyLevel, String table, Runnable callback)
+    public static AbstractWriteResponseHandler create(Collection<InetAddress> writeEndpoints,
Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistencyLevel, String
table, Runnable callback)
     {
-        return new DatacenterWriteResponseHandler(writeEndpoints, consistencyLevel, table,
callback);
+        return new DatacenterWriteResponseHandler(writeEndpoints, pendingEndpoints, consistencyLevel,
table, callback);
     }
 
     @Override
@@ -68,7 +68,7 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler
     public void assureSufficientLiveNodes() throws UnavailableException
     {
         int liveNodes = 0;
-        for (InetAddress destination : writeEndpoints)
+        for (InetAddress destination : Iterables.concat(naturalEndpoints, pendingEndpoints))
         {
             if (localdc.equals(snitch.getDatacenter(destination)) && FailureDetector.instance.isAlive(destination))
                 liveNodes++;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc97a8ff/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 8f048ee..6834987 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -106,7 +106,7 @@ public class StorageProxy implements StorageProxyMBean
         standardWritePerformer = new WritePerformer()
         {
             public void apply(IMutation mutation,
-                              Collection<InetAddress> targets,
+                              Iterable<InetAddress> targets,
                               AbstractWriteResponseHandler responseHandler,
                               String localDataCenter,
                               ConsistencyLevel consistency_level)
@@ -126,7 +126,7 @@ public class StorageProxy implements StorageProxyMBean
         counterWritePerformer = new WritePerformer()
         {
             public void apply(IMutation mutation,
-                              Collection<InetAddress> targets,
+                              Iterable<InetAddress> targets,
                               AbstractWriteResponseHandler responseHandler,
                               String localDataCenter,
                               ConsistencyLevel consistency_level)
@@ -143,7 +143,7 @@ public class StorageProxy implements StorageProxyMBean
         counterWriteOnCoordinatorPerformer = new WritePerformer()
         {
             public void apply(IMutation mutation,
-                              Collection<InetAddress> targets,
+                              Iterable<InetAddress> targets,
                               AbstractWriteResponseHandler responseHandler,
                               String localDataCenter,
                               ConsistencyLevel consistency_level)
@@ -268,7 +268,7 @@ public class StorageProxy implements StorageProxyMBean
             // write to the batchlog
             Collection<InetAddress> batchlogEndpoints = getBatchlogEndpoints(localDataCenter);
             UUID batchUUID = UUID.randomUUID();
-            syncWriteToBatchlog(mutations, localDataCenter, batchlogEndpoints, batchUUID);
+            syncWriteToBatchlog(mutations, batchlogEndpoints, batchUUID);
 
             // now actually perform the writes and wait for them to complete
             syncWriteBatchedMutations(wrappers, localDataCenter, consistency_level);
@@ -295,13 +295,12 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     private static void syncWriteToBatchlog(List<RowMutation> mutations,
-                                            String localDataCenter,
                                             Collection<InetAddress> endpoints,
                                             UUID uuid)
     throws WriteTimeoutException
     {
         RowMutation rm = BatchlogManager.getBatchlogMutationFor(mutations, uuid);
-        AbstractWriteResponseHandler handler = WriteResponseHandler.create(endpoints, ConsistencyLevel.ONE,
Table.SYSTEM_KS, null);
+        AbstractWriteResponseHandler handler = WriteResponseHandler.create(endpoints, Collections.<InetAddress>emptyList(),
ConsistencyLevel.ONE, Table.SYSTEM_KS, null);
 
         try
         {
@@ -326,7 +325,7 @@ public class StorageProxy implements StorageProxyMBean
     {
         RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
         rm.delete(new QueryPath(SystemTable.BATCHLOG_CF), FBUtilities.timestampMicros());
-        AbstractWriteResponseHandler handler = WriteResponseHandler.create(endpoints, ConsistencyLevel.ANY,
Table.SYSTEM_KS, null);
+        AbstractWriteResponseHandler handler = WriteResponseHandler.create(endpoints, Collections.<InetAddress>emptyList(),
ConsistencyLevel.ANY, Table.SYSTEM_KS, null);
 
         try
         {
@@ -347,7 +346,8 @@ public class StorageProxy implements StorageProxyMBean
         {
             try
             {
-                sendToHintedEndpoints(wrapper.mutation, wrapper.endpoints, wrapper.handler,
localDataCenter, consistencyLevel);
+                Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints,
wrapper.handler.pendingEndpoints);
+                sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter,
consistencyLevel);
             }
             catch (IOException e)
             {
@@ -399,14 +399,16 @@ public class StorageProxy implements StorageProxyMBean
         String table = mutation.getTable();
         AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
 
-        Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, mutation.key());
+        Token tk = StorageService.getPartitioner().getToken(mutation.key());
+        List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table,
tk);
+        Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk,
table);
 
-        AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints,
consistency_level, callback);
+        AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints,
pendingEndpoints, consistency_level, callback);
 
         // exit early if we can't fulfill the CL at this time
         responseHandler.assureSufficientLiveNodes();
 
-        performer.apply(mutation, writeEndpoints, responseHandler, localDataCenter, consistency_level);
+        performer.apply(mutation, Iterables.concat(naturalEndpoints, pendingEndpoints), responseHandler,
localDataCenter, consistency_level);
         return responseHandler;
     }
 
@@ -414,9 +416,12 @@ public class StorageProxy implements StorageProxyMBean
     private static WriteResponseHandlerWrapper wrapResponseHandler(RowMutation mutation,
ConsistencyLevel consistency_level)
     {
         AbstractReplicationStrategy rs = Table.open(mutation.getTable()).getReplicationStrategy();
-        Collection<InetAddress> writeEndpoints = getWriteEndpoints(mutation.getTable(),
mutation.key());
-        AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints,
consistency_level, null);
-        return new WriteResponseHandlerWrapper(responseHandler, mutation, writeEndpoints);
+        String table = mutation.getTable();
+        Token tk = StorageService.getPartitioner().getToken(mutation.key());
+        List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table,
tk);
+        Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk,
table);
+        AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints,
pendingEndpoints, consistency_level, null);
+        return new WriteResponseHandlerWrapper(responseHandler, mutation);
     }
 
     // used by atomic_batch_mutate to decouple availability check from the write itself,
caches consistency level and endpoints.
@@ -424,24 +429,14 @@ public class StorageProxy implements StorageProxyMBean
     {
         final AbstractWriteResponseHandler handler;
         final RowMutation mutation;
-        final Collection<InetAddress> endpoints;
 
-        WriteResponseHandlerWrapper(AbstractWriteResponseHandler handler, RowMutation mutation,
Collection<InetAddress> endpoints)
+        WriteResponseHandlerWrapper(AbstractWriteResponseHandler handler, RowMutation mutation)
         {
             this.handler = handler;
             this.mutation = mutation;
-            this.endpoints = endpoints;
         }
     }
 
-    public static Collection<InetAddress> getWriteEndpoints(String table, ByteBuffer
key)
-    {
-        StorageService ss = StorageService.instance;
-        Token tk = StorageService.getPartitioner().getToken(key);
-        List<InetAddress> naturalEndpoints = ss.getNaturalEndpoints(table, tk);
-        return ss.getTokenMetadata().getWriteEndpoints(tk, table, naturalEndpoints);
-    }
-
     /*
      * Replicas are picked manually:
      * - replicas should be alive according to the failure detector
@@ -498,14 +493,14 @@ public class StorageProxy implements StorageProxyMBean
      * @throws TimeoutException if the hints cannot be written/enqueued
      */
     public static void sendToHintedEndpoints(final RowMutation rm,
-                                             Collection<InetAddress> targets,
+                                             Iterable<InetAddress> targets,
                                              AbstractWriteResponseHandler responseHandler,
                                              String localDataCenter,
                                              ConsistencyLevel consistency_level)
     throws IOException, OverloadedException
     {
         // Multimap that holds onto all the messages and addresses meant for a specific datacenter
-        Map<String, Multimap<MessageOut, InetAddress>> dcMessages = new HashMap<String,
Multimap<MessageOut, InetAddress>>(targets.size());
+        Map<String, Multimap<MessageOut, InetAddress>> dcMessages = new HashMap<String,
Multimap<MessageOut, InetAddress>>();
 
         for (InetAddress destination : targets)
         {
@@ -709,9 +704,11 @@ public class StorageProxy implements StorageProxyMBean
             // Exit now if we can't fulfill the CL here instead of forwarding to the leader
replica
             String table = cm.getTable();
             AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
-            Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, cm.key());
+            Token tk = StorageService.getPartitioner().getToken(cm.key());
+            List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table,
tk);
+            Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk,
table);
 
-            rs.getWriteResponseHandler(writeEndpoints, cm.consistency(), null).assureSufficientLiveNodes();
+            rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, cm.consistency(),
null).assureSufficientLiveNodes();
 
             // Forward the actual update to the chosen leader replica
             AbstractWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
@@ -776,7 +773,7 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     private static Runnable counterWriteTask(final IMutation mutation,
-                                             final Collection<InetAddress> targets,
+                                             final Iterable<InetAddress> targets,
                                              final AbstractWriteResponseHandler responseHandler,
                                              final String localDataCenter,
                                              final ConsistencyLevel consistency_level)
@@ -793,8 +790,8 @@ public class StorageProxy implements StorageProxyMBean
                 responseHandler.response(null);
 
                 // then send to replicas, if any
-                targets.remove(FBUtilities.getBroadcastAddress());
-                if (cm.shouldReplicateOnWrite() && !targets.isEmpty())
+                Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets),
ImmutableSet.of(FBUtilities.getBroadcastAddress()));
+                if (cm.shouldReplicateOnWrite() && !remotes.isEmpty())
                 {
                     // We do the replication on another stage because it involves a read
(see CM.makeReplicationMutation)
                     // and we want to avoid blocking too much the MUTATION stage
@@ -803,7 +800,7 @@ public class StorageProxy implements StorageProxyMBean
                         public void runMayThrow() throws IOException, OverloadedException
                         {
                             // send mutation to other replica
-                            sendToHintedEndpoints(cm.makeReplicationMutation(), targets,
responseHandler, localDataCenter, consistency_level);
+                            sendToHintedEndpoints(cm.makeReplicationMutation(), remotes,
responseHandler, localDataCenter, consistency_level);
                         }
                     });
                 }
@@ -1478,7 +1475,7 @@ public class StorageProxy implements StorageProxyMBean
 
     public interface WritePerformer
     {
-        public void apply(IMutation mutation, Collection<InetAddress> targets, AbstractWriteResponseHandler
responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException,
OverloadedException;
+        public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler
responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException,
OverloadedException;
     }
 
     private static abstract class DroppableRunnable implements Runnable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc97a8ff/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 0d9e116..e61a25f 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -20,17 +20,17 @@ package org.apache.cassandra.service;
 import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.Table;
 import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL consistency levels.
@@ -42,23 +42,23 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
     protected final AtomicInteger responses;
     private final int blockFor;
 
-    protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel
consistencyLevel, String table, Runnable callback)
+    protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, Collection<InetAddress>
pendingEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback)
     {
-        super(writeEndpoints, consistencyLevel, callback);
+        super(writeEndpoints, pendingEndpoints, consistencyLevel, callback);
         blockFor = consistencyLevel.blockFor(table);
         responses = new AtomicInteger(blockFor);
     }
 
     protected WriteResponseHandler(InetAddress endpoint)
     {
-        super(Arrays.asList(endpoint), ConsistencyLevel.ALL, null);
+        super(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ALL,
null);
         blockFor = 1;
         responses = new AtomicInteger(1);
     }
 
-    public static AbstractWriteResponseHandler create(Collection<InetAddress> writeEndpoints,
ConsistencyLevel consistencyLevel, String table, Runnable callback)
+    public static AbstractWriteResponseHandler create(Collection<InetAddress> writeEndpoints,
Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistencyLevel, String
table, Runnable callback)
     {
-        return new WriteResponseHandler(writeEndpoints, consistencyLevel, table, callback);
+        return new WriteResponseHandler(writeEndpoints, pendingEndpoints, consistencyLevel,
table, callback);
     }
 
     public static AbstractWriteResponseHandler create(InetAddress endpoint)
@@ -77,7 +77,7 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
         return blockFor - responses.get();
     }
 
-    protected int blockFor()
+    protected int blockForCL()
     {
         return blockFor;
     }
@@ -86,24 +86,20 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
     {
         if (consistencyLevel == ConsistencyLevel.ANY)
         {
-            // Ensure there are blockFor distinct living nodes (hints (local) are ok).
-            // Thus we include the local node (coordinator) as a valid replica if it is there
already.
-            int effectiveEndpoints = writeEndpoints.contains(FBUtilities.getBroadcastAddress())
? writeEndpoints.size() : writeEndpoints.size() + 1;
-            if (effectiveEndpoints < responses.get())
-                throw new UnavailableException(consistencyLevel, responses.get(), effectiveEndpoints);
+            // local hint is acceptable, and local node is always live
             return;
         }
 
         // count destinations that are part of the desired target set
         int liveNodes = 0;
-        for (InetAddress destination : writeEndpoints)
+        for (InetAddress destination : Iterables.concat(naturalEndpoints, pendingEndpoints))
         {
             if (FailureDetector.instance.isAlive(destination))
                 liveNodes++;
         }
-        if (liveNodes < responses.get())
+        if (liveNodes < blockFor)
         {
-            throw new UnavailableException(consistencyLevel, responses.get(), liveNodes);
+            throw new UnavailableException(consistencyLevel, blockFor, liveNodes);
         }
     }
 


Mime
View raw message