cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1128074 - in /cassandra/branches/cassandra-0.7: CHANGES.txt src/java/org/apache/cassandra/db/migration/Migration.java src/java/org/apache/cassandra/service/MigrationManager.java
Date Thu, 26 May 2011 20:47:16 GMT
Author: jbellis
Date: Thu May 26 20:47:16 2011
New Revision: 1128074

URL: http://svn.apache.org/viewvc?rev=1128074&view=rev
Log:
throttle migration replay
patch by jbellis; reviewed by gdusbabek for CASSANDRA-2714

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1128074&r1=1128073&r2=1128074&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Thu May 26 20:47:16 2011
@@ -10,6 +10,7 @@
  * remove no-op HHOM.renameHints (CASSANDRA-2693)
  * clone super columns to avoid modifying them during flush (CASSANDRA-2675)
  * close scrub file handles (CASSANDRA-2669)
+ * throttle migration replay (CASSANDRA-2714)
 
 
 0.7.6

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1128074&r1=1128073&r2=1128074&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
Thu May 26 20:47:16 2011
@@ -298,7 +298,12 @@ public abstract class Migration
         DecoratedKey dkey = StorageService.getPartitioner().decorateKey(MIGRATIONS_KEY);
         Table defs = Table.open(Table.SYSTEM_TABLE);
         ColumnFamilyStore cfStore = defs.getColumnFamilyStore(Migration.MIGRATIONS_CF);
-        QueryFilter filter = QueryFilter.getSliceFilter(dkey, new QueryPath(MIGRATIONS_CF),
ByteBuffer.wrap(UUIDGen.decompose(start)), ByteBuffer.wrap(UUIDGen.decompose(end)), false,
1000);   
+        QueryFilter filter = QueryFilter.getSliceFilter(dkey,
+                                                        new QueryPath(MIGRATIONS_CF),
+                                                        ByteBuffer.wrap(UUIDGen.decompose(start)),
+                                                        ByteBuffer.wrap(UUIDGen.decompose(end)),
+                                                        false,
+                                                        100);
         ColumnFamily cf = cfStore.getColumnFamily(filter);
         return cf.getSortedColumns();
     }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1128074&r1=1128073&r2=1128074&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
Thu May 26 20:47:16 2011
@@ -24,8 +24,10 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.cassandra.utils.ByteBufferUtil;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.MapMaker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,16 +37,21 @@ import org.apache.cassandra.config.Confi
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.db.migration.Migration;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
 public class MigrationManager implements IEndpointStateChangeSubscriber
 {
     private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class);
-    
+
+    // avoids re-pushing migrations that we're waiting on target to apply already
+    private static Map<InetAddress,UUID> lastPushed = new MapMaker().expiration(1,
TimeUnit.MINUTES).makeMap();
+
     /** I'm not going to act here. */
     public void onJoin(InetAddress endpoint, EndpointState epState) { }
 
@@ -87,8 +94,16 @@ public class MigrationManager implements
         }
         else if (!StorageService.instance.isClientMode())
         {
-            logger.debug("Their data definitions are old. Sending updates since {}", theirVersion.toString());
-            pushMigrations(theirVersion, myVersion, endpoint);
+            if (lastPushed.get(endpoint) == null || theirVersion.timestamp() >= lastPushed.get(endpoint).timestamp())
+            {
+                logger.debug("Schema on {} is old. Sending updates since {}", endpoint, theirVersion);
+                pushMigrations(theirVersion, myVersion, endpoint);
+            }
+            else
+            {
+                logger.debug("Waiting for {} to process migrations up to {} before sending
more",
+                             endpoint, lastPushed.get(endpoint));
+            }
         }
     }
 
@@ -172,6 +187,7 @@ public class MigrationManager implements
         {
             Message msg = makeMigrationMessage(migrations);
             MessagingService.instance().sendOneWay(msg, host);
+            lastPushed.put(host, TimeUUIDType.instance.compose(Iterables.getLast(migrations).name()));
         }
         catch (IOException ex)
         {



Mime
View raw message