cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gdusba...@apache.org
Subject svn commit: r931198 [1/2] - in /cassandra/trunk: src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/migration/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/utils/ test/unit/o...
Date Tue, 06 Apr 2010 16:01:32 GMT
Author: gdusbabek
Date: Tue Apr  6 16:01:31 2010
New Revision: 931198

URL: http://svn.apache.org/viewvc?rev=931198&view=rev
Log:
Refactor so meta mutations can be serialized and moved around.  Patch by Gary Dusbabek, reviewed by Jonthan Ellis. CASSANDRA-827

Added:
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java
    cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
    cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=931198&r1=931197&r2=931198&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Tue Apr  6 16:01:31 2010
@@ -115,7 +115,6 @@ public final class CFMetaData
     /** clones existing CFMetaData. keeps the id but changes the table name.*/
     public static CFMetaData renameTable(CFMetaData cfm, String tableName)
     {
-        purge(cfm);
         return new CFMetaData(tableName, cfm.cfName, cfm.columnType, cfm.comparator, cfm.subcolumnComparator, cfm.comment, cfm.rowCacheSize, cfm.keyCacheSize, cfm.readRepairChance, cfm.cfId);
     }
     

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=931198&r1=931197&r2=931198&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Tue Apr  6 16:01:31 2010
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.marshal.A
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.migration.Migration;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.locator.IEndPointSnitch;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
@@ -487,8 +488,8 @@ public class DatabaseDescriptor
             // todo: fill in repStrat and epSnitch when this table is set to replicate.
             CFMetaData[] definitionCfDefs = new CFMetaData[]
             {
-                new CFMetaData(Table.DEFINITIONS, DefsTable.MIGRATIONS_CF, "Standard", new TimeUUIDType(), null, "individual schema mutations", 0, 0),
-                new CFMetaData(Table.DEFINITIONS, DefsTable.SCHEMA_CF, "Standard", new UTF8Type(), null, "current state of the schema", 0, 0)
+                new CFMetaData(Table.DEFINITIONS, Migration.MIGRATIONS_CF, "Standard", new TimeUUIDType(), null, "individual schema mutations", 0, 0),
+                new CFMetaData(Table.DEFINITIONS, Migration.SCHEMA_CF, "Standard", new UTF8Type(), null, "current state of the schema", 0, 0)
             };
             KSMetaData ksDefs = new KSMetaData(Table.DEFINITIONS, null, -1, null, definitionCfDefs);
             tables.put(Table.DEFINITIONS, ksDefs);
@@ -1125,8 +1126,14 @@ public class DatabaseDescriptor
     public static void clearTableDefinition(KSMetaData ksm, UUID newVersion)
     {
         tables.remove(ksm.name);
+        StorageService.instance.clearReplicationStrategy(ksm.name);
         DatabaseDescriptor.defsVersion = newVersion;
     }
+    
+    public static UUID getDefsVersion()
+    {
+        return defsVersion;
+    }
 
     public static class ConfigurationException extends Exception
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java?rev=931198&r1=931197&r2=931198&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java Tue Apr  6 16:01:31 2010
@@ -54,13 +54,17 @@ public final class KSMetaData
         this.cfMetaData = Collections.<String, CFMetaData>unmodifiableMap(cfmap);
     }
     
-    public static KSMetaData rename(KSMetaData ksm, String newName)
+    public static KSMetaData rename(KSMetaData ksm, String newName, boolean purgeOldCfs)
     {
         // cfs will need to have their tablenames reset. CFMetaData are immutable, so new ones get created with the
         // same ids.
         List<CFMetaData> newCfs = new ArrayList<CFMetaData>(ksm.cfMetaData().size());
         for (CFMetaData oldCf : ksm.cfMetaData().values())
+        {
+            if (purgeOldCfs)
+                CFMetaData.purge(oldCf);
             newCfs.add(CFMetaData.renameTable(oldCf, newName));
+        }
         return new KSMetaData(newName, ksm.strategyClass, ksm.replicationFactor, ksm.snitch, newCfs.toArray(new CFMetaData[newCfs.size()]));
     }
     

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=931198&r1=931197&r2=931198&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Tue Apr  6 16:01:31 2010
@@ -58,7 +58,7 @@ public class Column implements IColumn
         this(name, ArrayUtils.EMPTY_BYTE_ARRAY);
     }
 
-    Column(byte[] name, byte[] value)
+    public Column(byte[] name, byte[] value)
     {
         this(name, value, 0);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java?rev=931198&r1=931197&r2=931198&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java Tue Apr  6 16:01:31 2010
@@ -18,20 +18,13 @@
 
 package org.apache.cassandra.db;
 
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-import static org.apache.cassandra.config.DatabaseDescriptor.ConfigurationException;
+import org.apache.cassandra.db.migration.Migration;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
@@ -41,230 +34,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 
 public class DefsTable
 {
-    private static final ExecutorService executor = new JMXEnabledThreadPoolExecutor("DEFINITIONS-UPDATER");
-    
-    public static final String MIGRATIONS_CF = "Migrations";
-    public static final String SCHEMA_CF = "Schema";
-
-    /** add a column family. */
-    public static Future add(final CFMetaData cfm)
-    {
-        return executor.submit(new WrappedRunnable() 
-        {
-            protected void runMayThrow() throws Exception
-            {
-                // make sure the ks is real and the cf doesn't already exist.
-                KSMetaData ksm = DatabaseDescriptor.getTableDefinition(cfm.tableName);
-                if (ksm == null)
-                    throw new ConfigurationException("Keyspace does not already exist.");
-                else if (ksm.cfMetaData().containsKey(cfm.cfName))
-                    throw new ConfigurationException("CF is already defined in that keyspace.");
-        
-                // clone ksm but include the new cf def.
-                List<CFMetaData> newCfs = new ArrayList<CFMetaData>(ksm.cfMetaData().values());
-                newCfs.add(cfm);
-                ksm = new KSMetaData(ksm.name, ksm.strategyClass, ksm.replicationFactor, ksm.snitch, newCfs.toArray(new CFMetaData[newCfs.size()]));
-        
-                // store it.
-                
-                UUID newVersion = saveKeyspaceDefs(ksm, null);
-        
-                // reinitialize the table.
-                Table.open(ksm.name).addCf(cfm.cfName);
-                DatabaseDescriptor.setTableDefinition(ksm, newVersion);
-                
-                // force creation of a new commit log segment.
-                CommitLog.instance().forceNewSegment();    
-            }
-        });     
-    }
-
-    /**
-     * drop a column family. blockOnDeletion was added to make testing simpler.
-     */
-    public static Future drop(final CFMetaData cfm, final boolean blockOnFileDeletion)
-    {
-        return executor.submit(new WrappedRunnable() 
-        {
-            protected void runMayThrow() throws Exception
-            {
-                KSMetaData ksm = DatabaseDescriptor.getTableDefinition(cfm.tableName);
-                if (ksm == null)
-                    throw new ConfigurationException("Keyspace does not already exist.");
-                else if (!ksm.cfMetaData().containsKey(cfm.cfName))
-                    throw new ConfigurationException("CF is not defined in that keyspace.");
-                
-                // clone ksm but do not include the new def
-                List<CFMetaData> newCfs = new ArrayList<CFMetaData>(ksm.cfMetaData().values());
-                newCfs.remove(cfm);
-                assert newCfs.size() == ksm.cfMetaData().size() - 1;
-                ksm = new KSMetaData(ksm.name, ksm.strategyClass, ksm.replicationFactor, ksm.snitch, newCfs.toArray(new CFMetaData[newCfs.size()]));
-                
-                // store it.
-                UUID newVersion = saveKeyspaceDefs(ksm, null);
-                
-                // reinitialize the table.
-                CFMetaData.purge(cfm);
-                DatabaseDescriptor.setTableDefinition(ksm, newVersion);
-                Table.open(ksm.name).dropCf(cfm.cfName);
-                
-                // indicate that some files need to be deleted (eventually)
-                SystemTable.markForRemoval(cfm);
-                
-                // we don't really need a new segment, but let's force it to be consistent with other operations.
-                CommitLog.instance().forceNewSegment();
-        
-                cleanupDeadFiles(blockOnFileDeletion);    
-            }
-        });
-    }
-    
-    /** rename a column family */
-    public static Future rename(final CFMetaData oldCfm, final String newName)
-    {
-        return executor.submit(new WrappedRunnable()
-        {
-            protected void runMayThrow() throws Exception
-            {
-                KSMetaData ksm = DatabaseDescriptor.getTableDefinition(oldCfm.tableName);
-                if (ksm == null)
-                    throw new ConfigurationException("Keyspace does not already exist.");
-                if (!ksm.cfMetaData().containsKey(oldCfm.cfName))
-                    throw new ConfigurationException("CF is not defined in that keyspace.");
-                if (ksm.cfMetaData().containsKey(newName))
-                    throw new ConfigurationException("CF is already defined in that keyspace.");
-                
-                // clone the ksm, replacing cfm with the new one.
-                List<CFMetaData> newCfs = new ArrayList<CFMetaData>(ksm.cfMetaData().values());
-                newCfs.remove(oldCfm);
-                assert newCfs.size() == ksm.cfMetaData().size() - 1;
-                CFMetaData newCfm = CFMetaData.rename(oldCfm, newName);
-                newCfs.add(newCfm);
-                ksm = new KSMetaData(ksm.name, ksm.strategyClass, ksm.replicationFactor, ksm.snitch, newCfs.toArray(new CFMetaData[newCfs.size()]));
-                
-                // store it
-                UUID newVersion = saveKeyspaceDefs(ksm, null);
-                
-                // leave it up to operators to ensure there are no writes going on durng the file rename. Just know that
-                // attempting row mutations on oldcfName right now would be really bad.
-                try
-                {
-                    renameCfStorageFiles(ksm.name, oldCfm.cfName, newCfm.cfName);
-                }
-                catch (IOException e)
-                {
-                    // todo: is this a big enough problem to bring the entire node down?  For sure, it is something that needs to be addressed immediately.
-                    ConfigurationException cex = new ConfigurationException("Critical: encountered IOException while attempting to rename CF storage files for " + oldCfm.cfName);
-                    cex.initCause(e);
-                    throw cex;
-                }
-                // reset defs.
-                DatabaseDescriptor.setTableDefinition(ksm, newVersion);
-                Table.open(ksm.name).renameCf(oldCfm.cfName, newName);
-                
-                CommitLog.instance().forceNewSegment();
-            }
-        });
-    }
-
-    /** adds a keyspace */
-    public static Future add(final KSMetaData ksm)
-    {
-        return executor.submit(new WrappedRunnable()
-        {
-            protected void runMayThrow() throws Exception
-            {
-                if (DatabaseDescriptor.getTableDefinition(ksm.name) != null)
-                    throw new ConfigurationException("Keyspace already exists.");
-                
-                UUID versionId = saveKeyspaceDefs(ksm, null);
-                DatabaseDescriptor.setTableDefinition(ksm, versionId);
-                Table.open(ksm.name);
-                CommitLog.instance().forceNewSegment();
-            }
-        });
-    }
-    
-    /** drop a keyspace. */
-    public static Future drop(final KSMetaData ksm, final boolean blockOnFileDeletion)
-    {
-        return executor.submit(new WrappedRunnable()
-        {
-            protected void runMayThrow() throws Exception
-            {
-                if (DatabaseDescriptor.getTableDefinition(ksm.name) != ksm)
-                    throw new ConfigurationException("Either keyspace doesn't exist or the name of the one you supplied doesn't match with the one being used.");
-                
-                // remove the table from the static instances.
-                Table table = Table.clear(ksm.name);
-                if (table == null)
-                    throw new ConfigurationException("Table is not active. " + ksm.name);
-                
-                // remove all cfs from the table instance.
-                for (CFMetaData cfm : ksm.cfMetaData().values())
-                {
-                    CFMetaData.purge(cfm);
-                    table.dropCf(cfm.cfName);
-                    SystemTable.markForRemoval(cfm);
-                }
-                                
-                // update internal table.
-                UUID versionId = saveKeyspaceDefs(null, ksm);
-                
-                // reset defs.
-                DatabaseDescriptor.clearTableDefinition(ksm, versionId);
-                
-                CommitLog.instance().forceNewSegment();
-                cleanupDeadFiles(blockOnFileDeletion);
-                
-            }
-        });
-    }
-    
-    public static Future rename(final KSMetaData oldKsm, final String newName)
-    {
-        return executor.submit(new WrappedRunnable()
-        {
-            protected void runMayThrow() throws Exception
-            {
-                if (oldKsm == null || DatabaseDescriptor.getTableDefinition(oldKsm.name) != oldKsm)
-                    throw new ConfigurationException("Keyspace either does not exist or does not match the one currently defined.");
-                if (DatabaseDescriptor.getTableDefinition(newName) != null)
-                    throw new ConfigurationException("Keyspace already exists.");
-                
-                // clone the ksm, replacing thename.
-                KSMetaData newKsm = KSMetaData.rename(oldKsm, newName);
-                // at this point, the static methods in CFMetaData will start returning references to the new table, so
-                // it helps if the node is reasonably quiescent with respect to this ks. 
-                UUID newVersion = saveKeyspaceDefs(newKsm, oldKsm);
-                try
-                {
-                    renameKsStorageFiles(oldKsm.name, newName);
-                }
-                catch (IOException e)
-                {
-                    ConfigurationException cex = new ConfigurationException("Critical: encountered IOException while attempting to rename KS storage files from " + oldKsm.name + " to " + newName);
-                    throw cex;
-                }
-                
-                DatabaseDescriptor.clearTableDefinition(oldKsm, newVersion);
-                DatabaseDescriptor.setTableDefinition(newKsm, newVersion);
-                Table.clear(oldKsm.name);
-                Table.open(newName);
-                // this isn't strictly necessary since the set of all cfs was not modified.
-                CommitLog.instance().forceNewSegment(); 
-            }
-        });
-    }
-    
     /** dumps current keyspace definitions to storage */
     public static synchronized void dumpToStorage(UUID version) throws IOException
     {
@@ -274,7 +47,7 @@ public class DefsTable
         for (String tableName : DatabaseDescriptor.getNonSystemTables())
         {
             KSMetaData ks = DatabaseDescriptor.getTableDefinition(tableName);
-            rm.add(new QueryPath(SCHEMA_CF, null, ks.name.getBytes()), KSMetaData.serialize(ks), now);
+            rm.add(new QueryPath(Migration.SCHEMA_CF, null, ks.name.getBytes()), KSMetaData.serialize(ks), now);
         }
         rm.apply();
     }
@@ -283,8 +56,8 @@ public class DefsTable
     public static synchronized Collection<KSMetaData> loadFromStorage(UUID version) throws IOException
     {
         Table defs = Table.open(Table.DEFINITIONS);
-        ColumnFamilyStore cfStore = defs.getColumnFamilyStore(SCHEMA_CF);
-        QueryFilter filter = QueryFilter.getSliceFilter(version.toString(), new QueryPath(SCHEMA_CF), "".getBytes(), "".getBytes(), null, false, 1024);
+        ColumnFamilyStore cfStore = defs.getColumnFamilyStore(Migration.SCHEMA_CF);
+        QueryFilter filter = QueryFilter.getSliceFilter(version.toString(), new QueryPath(Migration.SCHEMA_CF), "".getBytes(), "".getBytes(), null, false, 1024);
         ColumnFamily cf = cfStore.getColumnFamily(filter);
         Collection<KSMetaData> tables = new ArrayList<KSMetaData>();
         for (IColumn col : cf.getSortedColumns())
@@ -296,7 +69,7 @@ public class DefsTable
     }
     
     /** gets all the files that belong to a given column family. */
-    static Collection<File> getFiles(String table, final String cf)
+    public static Collection<File> getFiles(String table, final String cf)
     {
         List<File> found = new ArrayList<File>();
         for (String path : DatabaseDescriptor.getAllDataFileLocationsForTable(table))
@@ -313,82 +86,4 @@ public class DefsTable
         return found;
     }
     
-    private static void cleanupDeadFiles(boolean wait)
-    {
-        Future cleanup = CompactionManager.instance.submitGraveyardCleanup();
-        if (wait)
-        {
-            // notify the compaction manager that it needs to clean up the dropped cf files.
-            try
-            {
-                cleanup.get();
-            }
-            catch (InterruptedException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (ExecutionException e)
-            {
-                throw new RuntimeException(e);
-            }
-        } 
-    }
-    
-    private static void renameKsStorageFiles(String oldKs, String newKs) throws IOException
-    {
-        IOException mostRecentProblem = null;
-        Set<String> cfNames = DatabaseDescriptor.getTableDefinition(oldKs).cfMetaData().keySet();
-        for (String cfName : cfNames)
-        {
-            for (File existing : getFiles(oldKs, cfName))
-            {
-                try
-                {
-                    File newParent = new File(existing.getParentFile().getParent(), newKs);
-                    newParent.mkdirs();
-                    FileUtils.renameWithConfirm(existing, new File(newParent, existing.getName()));
-                }
-                catch (IOException ex)
-                {
-                    mostRecentProblem = ex;
-                }
-            }
-        }
-        if (mostRecentProblem != null)
-            throw new IOException("One or more IOExceptions encountered while renaming files. Most recent problem is included.", mostRecentProblem);
-    }
-    
-    // if this errors out, we are in a world of hurt.
-    private static void renameCfStorageFiles(String table, String oldCfName, String newCfName) throws IOException
-    {
-        // complete as much of the job as possible.  Don't let errors long the way prevent as much renaming as possible
-        // from happening.
-        IOException mostRecentProblem = null;
-        for (File existing : getFiles(table, oldCfName))
-        {
-            try
-            {
-                String newFileName = existing.getName().replaceFirst("\\w+-", newCfName + "-");
-                FileUtils.renameWithConfirm(existing, new File(existing.getParent(), newFileName));
-            }
-            catch (IOException ex)
-            {
-                mostRecentProblem = ex;
-            }
-        }
-        if (mostRecentProblem != null)
-            throw new IOException("One or more IOExceptions encountered while renaming files. Most recent problem is included.", mostRecentProblem);
-    }
-    
-    private static UUID saveKeyspaceDefs(KSMetaData add, KSMetaData remove) throws IOException
-    {
-        UUID versionId = UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress());
-        RowMutation rm = new RowMutation(Table.DEFINITIONS, versionId.toString());
-        if (remove != null)
-            rm.delete(new QueryPath(SCHEMA_CF, null, remove.name.getBytes()), System.currentTimeMillis());
-        if (add != null)
-            rm.add(new QueryPath(SCHEMA_CF, null, add.name.getBytes()), KSMetaData.serialize(add), System.currentTimeMillis());
-        rm.apply();
-        return versionId;
-    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=931198&r1=931197&r2=931198&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Tue Apr  6 16:01:31 2010
@@ -265,6 +265,36 @@ public class HintedHandOffManager
           logger_.debug("Finished hinted handoff for endpoint " + endPoint);
     }
 
+    /** called when a keyspace is dropped or rename. newTable==null in the case of a drop. */
+    public static void renameHints(String oldTable, String newTable) throws IOException
+    {
+        // we're basically going to fetch, drop and add the scf for the old and new table. we need to do it piecemeal 
+        // though since there could be GB of data.
+        ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
+        byte[] startCol = ArrayUtils.EMPTY_BYTE_ARRAY;
+        long now = System.currentTimeMillis();
+        while (true)
+        {
+            QueryFilter filter = QueryFilter.getSliceFilter(oldTable, new QueryPath(HINTS_CF), startCol, ArrayUtils.EMPTY_BYTE_ARRAY, null, false, PAGE_SIZE);
+            ColumnFamily cf = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
+            if (pagingFinished(cf, startCol))
+                break;
+            if (newTable != null)
+            {
+                RowMutation insert = new RowMutation(Table.SYSTEM_TABLE, newTable);
+                insert.add(cf);
+                insert.apply();
+            }
+            RowMutation drop = new RowMutation(Table.SYSTEM_TABLE, oldTable);
+            for (byte[] key : cf.getColumnNames())
+            {
+                drop.delete(new QueryPath(HINTS_CF, key), now);
+                startCol = key;
+            }
+            drop.apply();
+        }
+    }
+
     /*
      * This method is used to deliver hints to a particular endpoint.
      * When we learn that some endpoint is back up we deliver the data

Added: cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java?rev=931198&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java Tue Apr  6 16:01:31 2010
@@ -0,0 +1,111 @@
+package org.apache.cassandra.db.migration;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DatabaseDescriptor.ConfigurationException;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.DefsTable;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+public class AddColumnFamily extends Migration
+{
+    private static final Serializer serializer = new Serializer();
+    private CFMetaData cfm;
+    
+    private AddColumnFamily(DataInputStream din) throws IOException
+    {
+        super(UUIDGen.makeType1UUID(din), UUIDGen.makeType1UUID(din));
+        rm = RowMutation.serializer().deserialize(din);
+        cfm = CFMetaData.deserialize(din);
+    }
+    
+    public AddColumnFamily(CFMetaData cfm) throws ConfigurationException, IOException
+    {
+        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+        this.cfm = cfm;
+        KSMetaData ksm = DatabaseDescriptor.getTableDefinition(cfm.tableName);
+        
+        if (ksm == null)
+            throw new DatabaseDescriptor.ConfigurationException("Keyspace does not already exist.");
+        else if (ksm.cfMetaData().containsKey(cfm.cfName))
+            throw new DatabaseDescriptor.ConfigurationException("CF is already defined in that keyspace.");
+        
+        // clone ksm but include the new cf def.
+        KSMetaData newKsm = makeNewKeyspaceDefinition(ksm);
+        
+        rm = Migration.makeDefinitionMutation(newKsm, null, newVersion);
+    }
+    
+    private KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm)
+    {
+        List<CFMetaData> newCfs = new ArrayList<CFMetaData>(ksm.cfMetaData().values());
+        newCfs.add(cfm);
+        return new KSMetaData(ksm.name, ksm.strategyClass, ksm.replicationFactor, ksm.snitch, newCfs.toArray(new CFMetaData[newCfs.size()]));
+    }
+    
+    public void applyModels()
+    {
+        // reinitialize the table.
+        KSMetaData ksm = DatabaseDescriptor.getTableDefinition(cfm.tableName);
+        ksm = makeNewKeyspaceDefinition(ksm);
+        Table.open(ksm.name).addCf(cfm.cfName);
+        DatabaseDescriptor.setTableDefinition(ksm, newVersion);
+        
+        // force creation of a new commit log segment.
+        CommitLog.instance().forceNewSegment();    
+    }
+
+    @Override
+    public ICompactSerializer getSerializer()
+    {
+        return serializer;
+    }
+
+    private static final class Serializer implements ICompactSerializer<AddColumnFamily>
+    {
+        public void serialize(AddColumnFamily addColumnFamily, DataOutputStream dos) throws IOException
+        {
+            dos.write(UUIDGen.decompose(addColumnFamily.newVersion));
+            dos.write(UUIDGen.decompose(addColumnFamily.lastVersion));
+            RowMutation.serializer().serialize(addColumnFamily.rm, dos);
+            dos.write(CFMetaData.serialize(addColumnFamily.cfm));
+        }
+
+        public AddColumnFamily deserialize(DataInputStream dis) throws IOException
+        { 
+            return new AddColumnFamily(dis);
+        }
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java?rev=931198&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java Tue Apr  6 16:01:31 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.migration;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DatabaseDescriptor.ConfigurationException;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class AddKeyspace extends Migration
+{
+    private static final Serializer serializer = new Serializer();
+    
+    private KSMetaData ksm;
+    
+    private AddKeyspace(DataInputStream din) throws IOException
+    {
+        super(UUIDGen.makeType1UUID(din), UUIDGen.makeType1UUID(din));
+        rm = RowMutation.serializer().deserialize(din);
+        ksm = KSMetaData.deserialize(din);
+    }
+    
+    public AddKeyspace(KSMetaData ksm) throws ConfigurationException, IOException
+    {
+        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+        
+        if (DatabaseDescriptor.getTableDefinition(ksm.name) != null)
+            throw new ConfigurationException("Keyspace already exists.");
+        
+        this.ksm = ksm;
+        rm = makeDefinitionMutation(ksm, null, newVersion);
+    }
+
+    @Override
+    public ICompactSerializer getSerializer()
+    {
+        return serializer;
+    }
+
+    @Override
+    public void applyModels() throws IOException
+    {
+        DatabaseDescriptor.setTableDefinition(ksm, newVersion);
+        // these definitions could have come from somewhere else.
+        CFMetaData.fixMaxId();
+        Table.open(ksm.name);
+        CommitLog.instance().forceNewSegment();
+    }
+    
+    private static final class Serializer implements ICompactSerializer<AddKeyspace>
+    {
+        public void serialize(AddKeyspace addKeyspace, DataOutputStream dos) throws IOException
+        {
+            dos.write(UUIDGen.decompose(addKeyspace.newVersion));
+            dos.write(UUIDGen.decompose(addKeyspace.lastVersion));
+            RowMutation.serializer().serialize(addKeyspace.rm, dos);
+            dos.write(KSMetaData.serialize(addKeyspace.ksm));
+        }
+
+        public AddKeyspace deserialize(DataInputStream dis) throws IOException
+        {
+            return new AddKeyspace(dis);
+        }
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java?rev=931198&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java Tue Apr  6 16:01:31 2010
@@ -0,0 +1,129 @@
+package org.apache.cassandra.db.migration;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DatabaseDescriptor.ConfigurationException;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.DefsTable;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.SystemTable;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+public class DropColumnFamily extends Migration
+{
+    private static final Serializer serializer = new Serializer();
+    
+    private String tableName;
+    private String cfName;
+    private boolean blockOnFileDeletion;
+    
+    private DropColumnFamily(DataInputStream din) throws IOException
+    {
+        super(UUIDGen.makeType1UUID(din), UUIDGen.makeType1UUID(din));
+        rm = RowMutation.serializer().deserialize(din);
+        tableName = din.readUTF();
+        cfName = din.readUTF();
+        blockOnFileDeletion = din.readBoolean();
+    }
+    
+    public DropColumnFamily(String tableName, String cfName, boolean blockOnFileDeletion) throws ConfigurationException, IOException
+    {
+        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+        this.tableName = tableName;
+        this.cfName = cfName;
+        this.blockOnFileDeletion = blockOnFileDeletion;
+        
+        KSMetaData ksm = DatabaseDescriptor.getTableDefinition(tableName);
+        if (ksm == null)
+            throw new ConfigurationException("Keyspace does not already exist.");
+        else if (!ksm.cfMetaData().containsKey(cfName))
+            throw new ConfigurationException("CF is not defined in that keyspace.");
+        
+        KSMetaData newKsm = makeNewKeyspaceDefinition(ksm);
+        rm = Migration.makeDefinitionMutation(newKsm, null, newVersion);
+    }
+    
+    private KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm)
+    {
+        // clone ksm but do not include the new def
+        CFMetaData cfm = ksm.cfMetaData().get(cfName);
+        List<CFMetaData> newCfs = new ArrayList<CFMetaData>(ksm.cfMetaData().values());
+        newCfs.remove(cfm);
+        assert newCfs.size() == ksm.cfMetaData().size() - 1;
+        return new KSMetaData(ksm.name, ksm.strategyClass, ksm.replicationFactor, ksm.snitch, newCfs.toArray(new CFMetaData[newCfs.size()]));
+    }
+
+    @Override
+    public ICompactSerializer getSerializer()
+    {
+        return serializer;
+    }
+
+    @Override
+    public void applyModels() throws IOException
+    {
+        // reinitialize the table.
+        KSMetaData existing = DatabaseDescriptor.getTableDefinition(tableName);
+        CFMetaData cfm = existing.cfMetaData().get(cfName);
+        KSMetaData ksm = makeNewKeyspaceDefinition(existing);
+        CFMetaData.purge(cfm);
+        DatabaseDescriptor.setTableDefinition(ksm, newVersion);
+        Table.open(ksm.name).dropCf(cfm.cfName);
+        
+        // indicate that some files need to be deleted (eventually)
+        SystemTable.markForRemoval(cfm);
+        
+        // we don't really need a new segment, but let's force it to be consistent with other operations.
+        CommitLog.instance().forceNewSegment();
+
+        Migration.cleanupDeadFiles(blockOnFileDeletion);   
+    }
+    
+    private static final class Serializer implements ICompactSerializer<DropColumnFamily>
+    {
+        public void serialize(DropColumnFamily dropColumnFamily, DataOutputStream dos) throws IOException
+        {
+            dos.write(UUIDGen.decompose(dropColumnFamily.newVersion));
+            dos.write(UUIDGen.decompose(dropColumnFamily.lastVersion));
+            RowMutation.serializer().serialize(dropColumnFamily.rm, dos);
+            dos.writeUTF(dropColumnFamily.tableName);
+            dos.writeUTF(dropColumnFamily.cfName);
+            dos.writeBoolean(dropColumnFamily.blockOnFileDeletion);       
+        }
+
+        public DropColumnFamily deserialize(DataInputStream dis) throws IOException
+        {
+            return new DropColumnFamily(dis);
+        }
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java?rev=931198&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java Tue Apr  6 16:01:31 2010
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.migration;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DatabaseDescriptor.ConfigurationException;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.HintedHandOffManager;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.SystemTable;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class DropKeyspace extends Migration
+{
+    private static final Serializer serializer = new Serializer();
+    
+    private String name;
+    private boolean blockOnFileDeletion;
+    
+    private DropKeyspace(DataInputStream din) throws IOException
+    {
+        super(UUIDGen.makeType1UUID(din), UUIDGen.makeType1UUID(din));
+        rm = RowMutation.serializer().deserialize(din);
+        name = din.readUTF();
+    }
+    
+    public DropKeyspace(String name, boolean blockOnFileDeletion) throws ConfigurationException, IOException
+    {
+        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+        this.name = name;
+        this.blockOnFileDeletion = blockOnFileDeletion;
+        KSMetaData ksm = DatabaseDescriptor.getTableDefinition(name);
+        if (ksm == null)
+            throw new ConfigurationException("Keyspace does not exist.");
+        rm = makeDefinitionMutation(null, ksm, newVersion);
+    }
+
+    @Override
+    public ICompactSerializer getSerializer()
+    {
+        return serializer;
+    }
+
+    @Override
+    public void applyModels() throws IOException
+    {
+        KSMetaData ksm = DatabaseDescriptor.getTableDefinition(name);
+        // remove the table from the static instances.
+        Table table = Table.clear(ksm.name);
+        if (table == null)
+            throw new IOException("Table is not active. " + ksm.name);
+        
+        // remove all cfs from the table instance.
+        for (CFMetaData cfm : ksm.cfMetaData().values())
+        {
+            CFMetaData.purge(cfm);
+            table.dropCf(cfm.cfName);
+            SystemTable.markForRemoval(cfm);
+        }
+                        
+        // reset defs.
+        DatabaseDescriptor.clearTableDefinition(ksm, newVersion);
+        CommitLog.instance().forceNewSegment();
+        Migration.cleanupDeadFiles(blockOnFileDeletion);
+        
+        // clear up any local hinted data for this keyspace.
+        HintedHandOffManager.renameHints(name, null);
+    }
+    
+    private static final class Serializer implements ICompactSerializer<DropKeyspace>
+    {
+        public void serialize(DropKeyspace dropKeyspace, DataOutputStream dos) throws IOException
+        {
+            dos.write(UUIDGen.decompose(dropKeyspace.newVersion));
+            dos.write(UUIDGen.decompose(dropKeyspace.lastVersion));
+            RowMutation.serializer().serialize(dropKeyspace.rm, dos);
+            
+            dos.writeUTF(dropKeyspace.name);
+        }
+
+        public DropKeyspace deserialize(DataInputStream dis) throws IOException
+        {
+            return new DropKeyspace(dis);
+        }
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=931198&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java Tue Apr  6 16:01:31 2010
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.migration;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.CompactionManager;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.utils.UUIDGen;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * A migration represents a single metadata mutation (cf dropped, added, etc.).  Migrations can be applied locally, or
+ * serialized and sent to another machine where it can be applied there. Each migration has a version represented by
+ * a TimeUUID that can be used to look up both the Migration itself (see getLocalMigrations) as well as a serialization
+ * of the Keyspace definition that was modified.
+ * 
+ * There are three parts to a migration (think of it as a schema update):
+ * 1. data is written to the schema cf.
+ * 2. the migration is serialized to the migrations cf.
+ * 3. updated models are applied to the cassandra instance.
+ * 
+ * Since steps 1, 2 and 3 are not committed atomically, care should be taken to ensure that a node/cluster is reasonably
+ * quiescent with regard to the keyspace or columnfamily whose schema is being modified.
+ * 
+ * Each class that extends Migration is required to implement a constructor that takes a DataInputStream as its only
+ * argument.  Also, each implementation must take care to ensure that its serialization can be deserialized.  For 
+ * example, it is required that the class name be serialized first always.
+ */
+public abstract class Migration
+{
+    private static final Logger logger = LoggerFactory.getLogger(Migration.class);
+    
+    public static final String MIGRATIONS_CF = "Migrations";
+    public static final String SCHEMA_CF = "Schema";
+    public static final String MIGRATIONS_KEY = "Migrations Key";
+    public static final String LAST_MIGRATION_KEY = "Last Migration";
+    
+    protected RowMutation rm;
+    protected final UUID newVersion;
+    protected UUID lastVersion;
+    
+    Migration(UUID newVersion, UUID lastVersion)
+    {
+        this.newVersion = newVersion;
+        this.lastVersion = lastVersion;
+    }
+    
+    /** apply changes */
+    public final void apply() throws IOException
+    {
+        // ensure migration is serial. don't apply unless the previous version matches.
+        if (!DatabaseDescriptor.getDefsVersion().equals(lastVersion))
+            throw new IOException("Previous version mismatch. cannot apply.");
+        // write to schema
+        assert rm != null;
+        rm.apply();
+        
+        // write migration.
+        long now = System.currentTimeMillis();
+        byte[] buf = getBytes();
+        RowMutation migration = new RowMutation(Table.DEFINITIONS, MIGRATIONS_KEY);
+        migration.add(new QueryPath(MIGRATIONS_CF, null, UUIDGen.decompose(newVersion)), buf, now);
+        migration.apply();
+        
+        // note that we storing this in the system table, which is not replicated, instead of the definitions table, which is.
+        logger.debug("Applying migration " + newVersion.toString());
+        migration = new RowMutation(Table.DEFINITIONS, LAST_MIGRATION_KEY);
+        migration.add(new QueryPath(SCHEMA_CF, null, LAST_MIGRATION_KEY.getBytes()), UUIDGen.decompose(newVersion), now);
+        migration.apply();
+        
+        // flush changes out of memtables so we don't need to rely on the commit log.
+        for (Future f : Table.open(Table.DEFINITIONS).flush())
+        {
+            try
+            {
+                f.get();
+            }
+            catch (InterruptedException e)
+            {
+                throw new IOException(e);
+            }
+            catch (ExecutionException e)
+            {
+                throw new IOException(e);
+            }
+        }
+        
+        applyModels(); 
+    }
+    
+    public final void announce()
+    {
+        // immediate notification for esiting nodes.
+        MigrationManager.announce(newVersion, Gossiper.instance.getLiveMembers());
+        // this is for notifying nodes as they arrive in the cluster.
+        Gossiper.instance.addLocalApplicationState(MigrationManager.MIGRATION_STATE, new ApplicationState(newVersion.toString()));
+    }
+    
+    public static UUID getLastMigrationId()
+    {
+        Table defs = Table.open(Table.DEFINITIONS);
+        ColumnFamilyStore cfStore = defs.getColumnFamilyStore(SCHEMA_CF);
+        QueryFilter filter = QueryFilter.getNamesFilter(LAST_MIGRATION_KEY, new QueryPath(SCHEMA_CF), LAST_MIGRATION_KEY.getBytes());
+        ColumnFamily cf = cfStore.getColumnFamily(filter);
+        if (cf.getColumnNames().size() == 0)
+            return null;
+        else
+            return UUIDGen.makeType1UUID(cf.getColumn(LAST_MIGRATION_KEY.getBytes()).value());
+    }
+    
+    /** keep in mind that applyLive might happen on another machine */
+    abstract void applyModels() throws IOException;
+    
+    /** serialize migration */
+    public abstract ICompactSerializer getSerializer();
+    
+    private byte[] getBytes() throws IOException
+    {
+        ByteArrayOutputStream bout = new ByteArrayOutputStream();
+        DataOutputStream dout = new DataOutputStream(bout);
+        dout.writeUTF(getClass().getName());
+        getSerializer().serialize(this, dout);
+        dout.close();
+        return bout.toByteArray();
+    }
+    
+    public UUID getVersion()
+    {
+        return newVersion;
+    }
+    
+    static RowMutation makeDefinitionMutation(KSMetaData add, KSMetaData remove, UUID versionId) throws IOException
+    {
+        final long now = System.currentTimeMillis();
+        RowMutation rm = new RowMutation(Table.DEFINITIONS, versionId.toString());
+        if (remove != null)
+            rm.delete(new QueryPath(SCHEMA_CF, null, remove.name.getBytes()), System.currentTimeMillis());
+        if (add != null)
+            rm.add(new QueryPath(SCHEMA_CF, null, add.name.getBytes()), KSMetaData.serialize(add), now);
+        return rm;
+    }
+    
+    static void cleanupDeadFiles(boolean wait)
+    {
+        Future cleanup = CompactionManager.instance.submitGraveyardCleanup();
+        if (wait)
+        {
+            // notify the compaction manager that it needs to clean up the dropped cf files.
+            try
+            {
+                cleanup.get();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (ExecutionException e)
+            {
+                throw new RuntimeException(e);
+            }
+        } 
+    }
+    
+    /** deserialize any Migration. */
+    public static Migration deserialize(InputStream in) throws IOException
+    {
+        DataInputStream din = new DataInputStream(in);
+        String className = din.readUTF();
+        try
+        {
+            Class migrationClass = Class.forName(className);
+            Field serializerField = migrationClass.getDeclaredField("serializer");
+            serializerField.setAccessible(true);
+            ICompactSerializer serializer = (ICompactSerializer)serializerField.get(migrationClass);
+            return (Migration)serializer.deserialize(din);
+        }
+        catch (Exception e)
+        {
+            throw new IOException(e);
+        }
+    }
+    
+    /** load serialized migrations. */
+    public static Collection<IColumn> getLocalMigrations(UUID start, UUID end)
+    {
+        Table defs = Table.open(Table.DEFINITIONS);
+        ColumnFamilyStore cfStore = defs.getColumnFamilyStore(Migration.MIGRATIONS_CF);
+        QueryFilter filter = QueryFilter.getSliceFilter(Migration.MIGRATIONS_KEY, new QueryPath(MIGRATIONS_CF), UUIDGen.decompose(start), UUIDGen.decompose(end), null, false, 1000);
+        ColumnFamily cf = cfStore.getColumnFamily(filter);
+        return cf.getSortedColumns();
+    }
+    
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java?rev=931198&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java Tue Apr  6 16:01:31 2010
@@ -0,0 +1,152 @@
+package org.apache.cassandra.db.migration;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DatabaseDescriptor.ConfigurationException;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.DefsTable;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+public class RenameColumnFamily extends Migration
+{
+    private static final Serializer serializer = new Serializer();
+    
+    private String tableName;
+    private String oldName;
+    private String newName;
+    
+    RenameColumnFamily(DataInputStream din) throws IOException
+    {
+        super(UUIDGen.makeType1UUID(din), UUIDGen.makeType1UUID(din));
+        rm = RowMutation.serializer().deserialize(din);
+        tableName = din.readUTF();
+        oldName = din.readUTF();
+        newName = din.readUTF();
+    }
+    
+    public RenameColumnFamily(String tableName, String oldName, String newName) throws ConfigurationException, IOException
+    {
+        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+        this.tableName = tableName;
+        this.oldName = oldName;
+        this.newName = newName;
+        
+        KSMetaData ksm = DatabaseDescriptor.getTableDefinition(tableName);
+        if (ksm == null)
+            throw new DatabaseDescriptor.ConfigurationException("Keyspace does not already exist.");
+        if (!ksm.cfMetaData().containsKey(oldName))
+            throw new DatabaseDescriptor.ConfigurationException("CF is not defined in that keyspace.");
+        if (ksm.cfMetaData().containsKey(newName))
+            throw new DatabaseDescriptor.ConfigurationException("CF is already defined in that keyspace.");
+        
+        // clone the ksm, replacing cfm with the new one.
+        KSMetaData newKsm = makeNewKeyspaceDefinition(ksm);
+        rm = Migration.makeDefinitionMutation(newKsm, null, newVersion);
+    }
+    
+    private KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm)
+    {
+        CFMetaData oldCfm = ksm.cfMetaData().get(oldName);
+        List<CFMetaData> newCfs = new ArrayList<CFMetaData>(ksm.cfMetaData().values());
+        newCfs.remove(oldCfm);
+        assert newCfs.size() == ksm.cfMetaData().size() - 1;
+        CFMetaData newCfm = CFMetaData.rename(oldCfm, newName);
+        newCfs.add(newCfm);
+        return new KSMetaData(ksm.name, ksm.strategyClass, ksm.replicationFactor, ksm.snitch, newCfs.toArray(new CFMetaData[newCfs.size()]));
+    }
+
+    @Override
+    public ICompactSerializer getSerializer()
+    {
+        return serializer;
+    }
+
+    @Override
+    public void applyModels() throws IOException
+    {
+        // leave it up to operators to ensure there are no writes going on durng the file rename. Just know that
+        // attempting row mutations on oldcfName right now would be really bad.
+        renameCfStorageFiles(tableName, oldName, newName);
+        
+        // reset defs.
+        KSMetaData ksm = makeNewKeyspaceDefinition(DatabaseDescriptor.getTableDefinition(tableName));
+        DatabaseDescriptor.setTableDefinition(ksm, newVersion);
+        Table.open(ksm.name).renameCf(oldName, newName);
+        
+        CommitLog.instance().forceNewSegment();
+    }
+    
+    // if this errors out, we are in a world of hurt.
+    private static void renameCfStorageFiles(String table, String oldCfName, String newCfName) throws IOException
+    {
+        // complete as much of the job as possible.  Don't let errors long the way prevent as much renaming as possible
+        // from happening.
+        IOException mostRecentProblem = null;
+        for (File existing : DefsTable.getFiles(table, oldCfName))
+        {
+            try
+            {
+                String newFileName = existing.getName().replaceFirst("\\w+-", newCfName + "-");
+                FileUtils.renameWithConfirm(existing, new File(existing.getParent(), newFileName));
+            }
+            catch (IOException ex)
+            {
+                mostRecentProblem = ex;
+            }
+        }
+        if (mostRecentProblem != null)
+            throw new IOException("One or more IOExceptions encountered while renaming files. Most recent problem is included.", mostRecentProblem);
+    }
+    
+    private static final class Serializer implements ICompactSerializer<RenameColumnFamily>
+    {
+        public void serialize(RenameColumnFamily renameColumnFamily, DataOutputStream dos) throws IOException
+        {
+            dos.write(UUIDGen.decompose(renameColumnFamily.newVersion));
+            dos.write(UUIDGen.decompose(renameColumnFamily.lastVersion));
+            RowMutation.serializer().serialize(renameColumnFamily.rm, dos);
+            
+            dos.writeUTF(renameColumnFamily.tableName);
+            dos.writeUTF(renameColumnFamily.oldName);
+            dos.writeUTF(renameColumnFamily.newName);
+        }
+
+        public RenameColumnFamily deserialize(DataInputStream dis) throws IOException
+        {
+            return new RenameColumnFamily(dis);
+        }
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java?rev=931198&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java Tue Apr  6 16:01:31 2010
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.migration;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DatabaseDescriptor.ConfigurationException;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.DefsTable;
+import org.apache.cassandra.db.HintedHandOffManager;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
+public class RenameKeyspace extends Migration
+{
+    private static final Serializer serializer = new Serializer();
+    
+    private String oldName;
+    private String newName;
+    
+    RenameKeyspace(DataInputStream din) throws IOException
+    {
+        super(UUIDGen.makeType1UUID(din), UUIDGen.makeType1UUID(din));
+        rm = RowMutation.serializer().deserialize(din);
+        oldName = din.readUTF();
+        newName = din.readUTF();
+    }
+    
+    public RenameKeyspace(String oldName, String newName) throws ConfigurationException, IOException
+    {
+        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress()), DatabaseDescriptor.getDefsVersion());
+        this.oldName = oldName;
+        this.newName = newName;
+        
+        KSMetaData oldKsm = DatabaseDescriptor.getTableDefinition(oldName);
+        if (oldKsm == null)
+            throw new ConfigurationException("Keyspace either does not exist or does not match the one currently defined.");
+        if (DatabaseDescriptor.getTableDefinition(newName) != null)
+            throw new ConfigurationException("Keyspace already exists.");
+        
+        // clone the ksm, replacing thename.
+        KSMetaData newKsm = KSMetaData.rename(oldKsm, newName, false); 
+        
+        rm = makeDefinitionMutation(newKsm, oldKsm, newVersion);
+    }
+
+    @Override
+    public ICompactSerializer getSerializer()
+    {
+        return serializer;
+    }
+
+    @Override
+    public void applyModels() throws IOException
+    {
+        renameKsStorageFiles(oldName, newName);
+        
+        KSMetaData oldKsm = DatabaseDescriptor.getTableDefinition(oldName);
+        KSMetaData newKsm = KSMetaData.rename(oldKsm, newName, true);
+        // ^^ at this point, the static methods in CFMetaData will start returning references to the new table, so
+        // it helps if the node is reasonably quiescent with respect to this ks.
+        DatabaseDescriptor.clearTableDefinition(oldKsm, newVersion);
+        DatabaseDescriptor.setTableDefinition(newKsm, newVersion);
+        Table.clear(oldKsm.name);
+        Table.open(newName);
+        // this isn't strictly necessary since the set of all cfs was not modified.
+        CommitLog.instance().forceNewSegment();
+
+        HintedHandOffManager.renameHints(oldName, newName);
+    }
+    
+    private static void renameKsStorageFiles(String oldKs, String newKs) throws IOException
+    {
+        IOException mostRecentProblem = null;
+        Set<String> cfNames = DatabaseDescriptor.getTableDefinition(oldKs).cfMetaData().keySet();
+        for (String cfName : cfNames)
+        {
+            for (File existing : DefsTable.getFiles(oldKs, cfName))
+            {
+                try
+                {
+                    File newParent = new File(existing.getParentFile().getParent(), newKs);
+                    newParent.mkdirs();
+                    FileUtils.renameWithConfirm(existing, new File(newParent, existing.getName()));
+                }
+                catch (IOException ex)
+                {
+                    mostRecentProblem = ex;
+                }
+            }
+        }
+        if (mostRecentProblem != null)
+            throw new IOException("One or more IOExceptions encountered while renaming files. Most recent problem is included.", mostRecentProblem);
+    }
+    
+    private static final class Serializer implements ICompactSerializer<RenameKeyspace>
+    {
+        public void serialize(RenameKeyspace renameKeyspace, DataOutputStream dout) throws IOException
+        {
+            dout.write(UUIDGen.decompose(renameKeyspace.newVersion));
+            dout.write(UUIDGen.decompose(renameKeyspace.lastVersion));
+            RowMutation.serializer().serialize(renameKeyspace.rm, dout);
+            
+            dout.writeUTF(renameKeyspace.oldName);
+            dout.writeUTF(renameKeyspace.newName);
+        }
+
+        public RenameKeyspace deserialize(DataInputStream dis) throws IOException
+        {
+            return new RenameKeyspace(dis);
+        }
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=931198&r1=931197&r2=931198&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Tue Apr  6 16:01:31 2010
@@ -293,7 +293,7 @@ public class AntiEntropyService
      */
     public IValidator getValidator(String table, String cf, InetAddress initiator, boolean major)
     {
-        if (!major || table.equals(Table.SYSTEM_TABLE))
+        if (!major || table.equals(Table.SYSTEM_TABLE) || table.equals(Table.DEFINITIONS))
             return new NoopValidator();
         if (StorageService.instance.getTokenMetadata().sortedTokens().size()  < 1)
             // gossiper isn't started

Added: cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=931198&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java Tue Apr  6 16:01:31 2010
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.migration.Migration;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndPointState;
+import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOError;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.UUID;
+
+public class MigrationManager implements IEndPointStateChangeSubscriber
+{
+    public static final String MIGRATION_STATE = "MIGRATION";
+    private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class);
+    
+    /** I'm not going to act here. */
+    public void onJoin(InetAddress endpoint, EndPointState epState) { }
+
+    public void onChange(InetAddress endpoint, String stateName, ApplicationState state)
+    {
+        if (!MIGRATION_STATE.equals(stateName))
+            return;
+        UUID theirVersion = UUID.fromString(state.getValue());
+        rectify(theirVersion, endpoint);
+    }
+
+    /** gets called after a this node joins a cluster */
+    public void onAlive(InetAddress endpoint, EndPointState state) 
+    { 
+        ApplicationState appState = state.getApplicationState(MIGRATION_STATE);
+        if (appState != null)
+        {
+            UUID theirVersion = UUID.fromString(appState.getValue());
+            rectify(theirVersion, endpoint);
+        }
+    }
+
+    public void onDead(InetAddress endpoint, EndPointState state) { }
+    
+    /** will either push or pull an updating depending on who is behind. */
+    public static void rectify(UUID theirVersion, InetAddress endpoint)
+    {
+        UUID myVersion = DatabaseDescriptor.getDefsVersion();
+        if (theirVersion.timestamp() == myVersion.timestamp())
+            return;
+        else if (theirVersion.timestamp() > myVersion.timestamp())
+        {
+            logger.debug("My data definitions are old. Asking for updates since {}", myVersion.toString());
+            announce(myVersion, Collections.singleton(endpoint));
+        }
+        else
+        {
+            logger.debug("Their data definitions are old. Sending updates since {}", theirVersion.toString());
+            pushMigrations(theirVersion, myVersion, endpoint);
+        }
+    }
+
+    /** announce my version to a set of hosts.  They may culminate with them sending me migrations. */
+    public static void announce(UUID version, Set<InetAddress> hosts)
+    {
+        Message msg = makeVersionMessage(version);
+        for (InetAddress host : hosts)
+            MessagingService.instance.sendOneWay(msg, host);
+    }
+    
+    /** pushes migrations from this host to another host */
+    public static void pushMigrations(UUID from, UUID to, InetAddress host)
+    {
+        // I want all the rows from theirVersion through myVersion.
+        Collection<IColumn> migrations = Migration.getLocalMigrations(from, to);
+        try
+        {
+            Message msg = makeMigrationMessage(migrations);
+            MessagingService.instance.sendOneWay(msg, host);
+        }
+        catch (IOException ex)
+        {
+            throw new IOError(ex);
+        }
+    }
+    
+    private static Message makeVersionMessage(UUID version)
+    {
+        byte[] body = version.toString().getBytes();
+        return new Message(FBUtilities.getLocalAddress(), StageManager.READ_STAGE, StorageService.Verb.DEFINITIONS_ANNOUNCE, body);
+    }
+    
+    // other half of transformation is in DefinitionsUpdateResponseVerbHandler.
+    private static Message makeMigrationMessage(Collection<IColumn> migrations) throws IOException
+    {
+        ByteArrayOutputStream bout = new ByteArrayOutputStream();
+        DataOutputStream dout = new DataOutputStream(bout);
+        dout.writeInt(migrations.size());
+        for (IColumn col : migrations)
+        {
+            assert col instanceof Column;
+            dout.writeInt(col.name().length);
+            dout.write(col.name());
+            dout.writeInt(col.value().length);
+            dout.write(col.value());
+        }
+        dout.close();
+        byte[] body = bout.toByteArray();
+        return new Message(FBUtilities.getLocalAddress(), StageManager.MUTATION_STAGE, StorageService.Verb.DEFINITIONS_UPDATE_RESPONSE, body);
+    }
+    
+    // other half of this transformation is in MigrationManager.
+    public static Collection<Column> makeColumns(Message msg) throws IOException
+    {
+        Collection<Column> cols = new ArrayList<Column>();
+        DataInputStream in = new DataInputStream(new ByteArrayInputStream(msg.getMessageBody()));
+        int count = in.readInt();
+        for (int i = 0; i < count; i++)
+        {
+            byte[] name = new byte[in.readInt()];
+            in.readFully(name);
+            byte[] value = new byte[in.readInt()];
+            in.readFully(value);
+            cols.add(new Column(name, value));
+        }
+        in.close();
+        return cols;
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=931198&r1=931197&r2=931198&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Apr  6 16:01:31 2010
@@ -248,6 +248,17 @@ public class StorageService implements I
         else
             return ars;
     }
+    
+    public void initReplicationStrategy(String table)
+    {
+        AbstractReplicationStrategy strat = getReplicationStrategy(tokenMetadata_, table);
+        replicationStrategies.put(table, strat);
+    }
+    
+    public void clearReplicationStrategy(String table)
+    {
+        replicationStrategies.remove(table);
+    }
 
     public static AbstractReplicationStrategy getReplicationStrategy(TokenMetadata tokenMetadata, String table)
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java?rev=931198&r1=931197&r2=931198&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java Tue Apr  6 16:01:31 2010
@@ -24,11 +24,12 @@ package org.apache.cassandra.utils;
 import org.safehaus.uuid.EthernetAddress;
 import org.safehaus.uuid.UUIDGenerator;
 
-import java.math.BigInteger;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetAddress;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
-import java.util.Random;
 import java.util.UUID;
 
 /**
@@ -53,9 +54,16 @@ public class UUIDGen
             throw new RuntimeException("Your platform has no support for generating MD5 sums");
         }
     }
+    
+    public static UUID makeType1UUID(DataInputStream in) throws IOException
+    {
+        byte[] b = new byte[16];
+        in.readFully(b);
+        return makeType1UUID(b);
+    }
 
     /** creates a type 1 uuid from raw bytes. */
-    static UUID makeType1UUID(byte[] raw)
+    public static UUID makeType1UUID(byte[] raw)
     {
         long most = 0;
         long least = 0;
@@ -68,7 +76,7 @@ public class UUIDGen
     }
 
     /** decomposes a uuid into raw bytes. */
-    static byte[] decompose(UUID uuid)
+    public static byte[] decompose(UUID uuid)
     {
         long most = uuid.getMostSignificantBits();
         long least = uuid.getLeastSignificantBits();



Mime
View raw message