cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [8/9] Replace supercolumns internally by composites
Date Wed, 02 Jan 2013 12:41:16 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
index 8d813a3..863e8f5 100644
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
@@ -40,7 +40,7 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
 {
     private final AbstractType<?> comparator;
     private final boolean reversed;
-    private final ArrayList<IColumn> columns;
+    private final ArrayList<Column> columns;
 
     public static final ISortedColumns.Factory factory = new Factory()
     {
@@ -49,7 +49,7 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
             return new ArrayBackedSortedColumns(comparator, insertReversed);
         }
 
-        public ISortedColumns fromSorted(SortedMap<ByteBuffer, IColumn> sortedMap, boolean insertReversed)
+        public ISortedColumns fromSorted(SortedMap<ByteBuffer, Column> sortedMap, boolean insertReversed)
         {
             return new ArrayBackedSortedColumns(sortedMap.values(), (AbstractType<?>)sortedMap.comparator(), insertReversed);
         }
@@ -65,14 +65,14 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
         super();
         this.comparator = comparator;
         this.reversed = reversed;
-        this.columns = new ArrayList<IColumn>();
+        this.columns = new ArrayList<Column>();
     }
 
-    private ArrayBackedSortedColumns(Collection<IColumn> columns, AbstractType<?> comparator, boolean reversed)
+    private ArrayBackedSortedColumns(Collection<Column> columns, AbstractType<?> comparator, boolean reversed)
     {
         this.comparator = comparator;
         this.reversed = reversed;
-        this.columns = new ArrayList<IColumn>(columns);
+        this.columns = new ArrayList<Column>(columns);
     }
 
     public ISortedColumns.Factory getFactory()
@@ -100,7 +100,7 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
         return reversed ? comparator.reverseComparator : comparator;
     }
 
-    public IColumn getColumn(ByteBuffer name)
+    public Column getColumn(ByteBuffer name)
     {
         int pos = binarySearch(name);
         return pos >= 0 ? columns.get(pos) : null;
@@ -116,7 +116,7 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
      * without knowing about (we can revisit that decision later if we have
      * use cases where most insert are in sorted order but a few are not).
      */
-    public void addColumn(IColumn column, Allocator allocator)
+    public void addColumn(Column column, Allocator allocator)
     {
         if (columns.isEmpty())
         {
@@ -154,21 +154,13 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
      * Resolve against element at position i.
      * Assume that i is a valid position.
      */
-    private void resolveAgainst(int i, IColumn column, Allocator allocator)
+    private void resolveAgainst(int i, Column column, Allocator allocator)
     {
-        IColumn oldColumn = columns.get(i);
-        if (oldColumn instanceof SuperColumn)
-        {
-            // Delegated to SuperColumn
-            assert column instanceof SuperColumn;
-            ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator);
-        }
-        else
-        {
-            // calculate reconciled col from old (existing) col and new col
-            IColumn reconciledColumn = column.reconcile(oldColumn, allocator);
-            columns.set(i, reconciledColumn);
-        }
+        Column oldColumn = columns.get(i);
+
+        // calculate reconciled col from old (existing) col and new col
+        Column reconciledColumn = column.reconcile(oldColumn, allocator);
+        columns.set(i, reconciledColumn);
     }
 
     private int binarySearch(ByteBuffer name)
@@ -180,9 +172,9 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
      * Simple binary search for a given column name.
      * The return value has the exact same meaning that the one of Collections.binarySearch().
      * (We don't use Collections.binarySearch() directly because it would require us to create
-     * a fake IColumn (as well as an IColumn comparator) to do the search, which is ugly.
+     * a fake Column (as well as an Column comparator) to do the search, which is ugly.
      */
-    private static int binarySearch(List<IColumn> columns, Comparator<ByteBuffer> comparator, ByteBuffer name, int start)
+    private static int binarySearch(List<Column> columns, Comparator<ByteBuffer> comparator, ByteBuffer name, int start)
     {
         int low = start;
         int mid = columns.size();
@@ -207,21 +199,21 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
         return -mid - (result < 0 ? 1 : 2);
     }
 
-    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation, SecondaryIndexManager.Updater indexer)
+    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<Column, Column> transformation, SecondaryIndexManager.Updater indexer)
     {
         throw new UnsupportedOperationException();
     }
 
-    public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
+    public void addAll(ISortedColumns cm, Allocator allocator, Function<Column, Column> transformation)
     {
         delete(cm.getDeletionInfo());
         if (cm.isEmpty())
             return;
 
-        IColumn[] copy = columns.toArray(new IColumn[size()]);
+        Column[] copy = columns.toArray(new Column[size()]);
         int idx = 0;
-        Iterator<IColumn> other = reversed ? cm.reverseIterator(ColumnSlice.ALL_COLUMNS_ARRAY) : cm.iterator();
-        IColumn otherColumn = other.next();
+        Iterator<Column> other = reversed ? cm.reverseIterator(ColumnSlice.ALL_COLUMNS_ARRAY) : cm.iterator();
+        Column otherColumn = other.next();
 
         columns.clear();
 
@@ -257,7 +249,7 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
         }
     }
 
-    public boolean replace(IColumn oldColumn, IColumn newColumn)
+    public boolean replace(Column oldColumn, Column newColumn)
     {
         if (!oldColumn.name().equals(newColumn.name()))
             throw new IllegalArgumentException();
@@ -271,12 +263,12 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
         return pos >= 0;
     }
 
-    public Collection<IColumn> getSortedColumns()
+    public Collection<Column> getSortedColumns()
     {
         return reversed ? new ReverseSortedCollection() : columns;
     }
 
-    public Collection<IColumn> getReverseSortedColumns()
+    public Collection<Column> getReverseSortedColumns()
     {
         // If reversed, the element are sorted reversely, so we could expect
         // to return *this*, but *this* redefine the iterator to be in sorted
@@ -307,39 +299,39 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
         return new ColumnNamesSet();
     }
 
-    public Iterator<IColumn> iterator()
+    public Iterator<Column> iterator()
     {
         return reversed ? Lists.reverse(columns).iterator() : columns.iterator();
     }
 
-    public Iterator<IColumn> iterator(ColumnSlice[] slices)
+    public Iterator<Column> iterator(ColumnSlice[] slices)
     {
         return new SlicesIterator(columns, comparator, slices, reversed);
     }
 
-    public Iterator<IColumn> reverseIterator(ColumnSlice[] slices)
+    public Iterator<Column> reverseIterator(ColumnSlice[] slices)
     {
         return new SlicesIterator(columns, comparator, slices, !reversed);
     }
 
-    private static class SlicesIterator extends AbstractIterator<IColumn>
+    private static class SlicesIterator extends AbstractIterator<Column>
     {
-        private final List<IColumn> list;
+        private final List<Column> list;
         private final ColumnSlice[] slices;
         private final Comparator<ByteBuffer> comparator;
 
         private int idx = 0;
         private int previousSliceEnd = 0;
-        private Iterator<IColumn> currentSlice;
+        private Iterator<Column> currentSlice;
 
-        public SlicesIterator(List<IColumn> list, AbstractType<?> comparator, ColumnSlice[] slices, boolean reversed)
+        public SlicesIterator(List<Column> list, AbstractType<?> comparator, ColumnSlice[] slices, boolean reversed)
         {
             this.list = reversed ? Lists.reverse(list) : list;
             this.slices = slices;
             this.comparator = reversed ? comparator.reverseComparator : comparator;
         }
 
-        protected IColumn computeNext()
+        protected Column computeNext()
         {
             if (currentSlice == null)
             {
@@ -375,16 +367,16 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
         }
     }
 
-    private class ReverseSortedCollection extends AbstractCollection<IColumn>
+    private class ReverseSortedCollection extends AbstractCollection<Column>
     {
         public int size()
         {
             return columns.size();
         }
 
-        public Iterator<IColumn> iterator()
+        public Iterator<Column> iterator()
         {
-            return new Iterator<IColumn>()
+            return new Iterator<Column>()
             {
                 int idx = size() - 1;
 
@@ -393,7 +385,7 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
                     return idx >= 0;
                 }
 
-                public IColumn next()
+                public Column next()
                 {
                     return columns.get(idx--);
                 }
@@ -406,14 +398,14 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
         }
     }
 
-    private class ForwardSortedCollection extends AbstractCollection<IColumn>
+    private class ForwardSortedCollection extends AbstractCollection<Column>
     {
         public int size()
         {
             return columns.size();
         }
 
-        public Iterator<IColumn> iterator()
+        public Iterator<Column> iterator()
         {
             return columns.iterator();
         }
@@ -428,7 +420,7 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
 
         public Iterator<ByteBuffer> iterator()
         {
-            final Iterator<IColumn> outerIterator = ArrayBackedSortedColumns.this.iterator(); // handles reversed
+            final Iterator<Column> outerIterator = ArrayBackedSortedColumns.this.iterator(); // handles reversed
             return new Iterator<ByteBuffer>()
             {
                 public boolean hasNext()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index 83aabea..fbfcd75 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -58,7 +58,7 @@ public class AtomicSortedColumns implements ISortedColumns
             return new AtomicSortedColumns(comparator);
         }
 
-        public ISortedColumns fromSorted(SortedMap<ByteBuffer, IColumn> sortedMap, boolean insertReversed)
+        public ISortedColumns fromSorted(SortedMap<ByteBuffer, Column> sortedMap, boolean insertReversed)
         {
             return new AtomicSortedColumns(sortedMap);
         }
@@ -74,7 +74,7 @@ public class AtomicSortedColumns implements ISortedColumns
         this(new Holder(comparator));
     }
 
-    private AtomicSortedColumns(SortedMap<ByteBuffer, IColumn> columns)
+    private AtomicSortedColumns(SortedMap<ByteBuffer, Column> columns)
     {
         this(new Holder(columns));
     }
@@ -144,7 +144,7 @@ public class AtomicSortedColumns implements ISortedColumns
         while (!ref.compareAndSet(current, modified));
     }
 
-    public void addColumn(IColumn column, Allocator allocator)
+    public void addColumn(Column column, Allocator allocator)
     {
         Holder current, modified;
         do
@@ -156,12 +156,12 @@ public class AtomicSortedColumns implements ISortedColumns
         while (!ref.compareAndSet(current, modified));
     }
 
-    public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
+    public void addAll(ISortedColumns cm, Allocator allocator, Function<Column, Column> transformation)
     {
         addAllWithSizeDelta(cm, allocator, transformation, SecondaryIndexManager.nullUpdater);
     }
 
-    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation, SecondaryIndexManager.Updater indexer)
+    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<Column, Column> transformation, SecondaryIndexManager.Updater indexer)
     {
         /*
          * This operation needs to atomicity and isolation. To that end, we
@@ -185,7 +185,7 @@ public class AtomicSortedColumns implements ISortedColumns
             DeletionInfo newDelInfo = current.deletionInfo.add(cm.getDeletionInfo());
             modified = new Holder(current.map.clone(), newDelInfo);
 
-            for (IColumn column : cm.getSortedColumns())
+            for (Column column : cm.getSortedColumns())
             {
                 sizeDelta += modified.addColumn(transformation.apply(column), allocator, indexer);
                 // bail early if we know we've been beaten
@@ -198,7 +198,7 @@ public class AtomicSortedColumns implements ISortedColumns
         return sizeDelta;
     }
 
-    public boolean replace(IColumn oldColumn, IColumn newColumn)
+    public boolean replace(Column oldColumn, Column newColumn)
     {
         if (!oldColumn.name().equals(newColumn.name()))
             throw new IllegalArgumentException();
@@ -238,7 +238,7 @@ public class AtomicSortedColumns implements ISortedColumns
         while (!ref.compareAndSet(current, modified));
     }
 
-    public IColumn getColumn(ByteBuffer name)
+    public Column getColumn(ByteBuffer name)
     {
         return ref.get().map.get(name);
     }
@@ -248,12 +248,12 @@ public class AtomicSortedColumns implements ISortedColumns
         return ref.get().map.keySet();
     }
 
-    public Collection<IColumn> getSortedColumns()
+    public Collection<Column> getSortedColumns()
     {
         return ref.get().map.values();
     }
 
-    public Collection<IColumn> getReverseSortedColumns()
+    public Collection<Column> getReverseSortedColumns()
     {
         return ref.get().map.descendingMap().values();
     }
@@ -273,17 +273,17 @@ public class AtomicSortedColumns implements ISortedColumns
         return ref.get().map.isEmpty();
     }
 
-    public Iterator<IColumn> iterator()
+    public Iterator<Column> iterator()
     {
         return getSortedColumns().iterator();
     }
 
-    public Iterator<IColumn> iterator(ColumnSlice[] slices)
+    public Iterator<Column> iterator(ColumnSlice[] slices)
     {
         return new ColumnSlice.NavigableMapIterator(ref.get().map, slices);
     }
 
-    public Iterator<IColumn> reverseIterator(ColumnSlice[] slices)
+    public Iterator<Column> reverseIterator(ColumnSlice[] slices)
     {
         return new ColumnSlice.NavigableMapIterator(ref.get().map.descendingMap(), slices);
     }
@@ -295,20 +295,20 @@ public class AtomicSortedColumns implements ISortedColumns
 
     private static class Holder
     {
-        final SnapTreeMap<ByteBuffer, IColumn> map;
+        final SnapTreeMap<ByteBuffer, Column> map;
         final DeletionInfo deletionInfo;
 
         Holder(AbstractType<?> comparator)
         {
-            this(new SnapTreeMap<ByteBuffer, IColumn>(comparator), DeletionInfo.LIVE);
+            this(new SnapTreeMap<ByteBuffer, Column>(comparator), DeletionInfo.LIVE);
         }
 
-        Holder(SortedMap<ByteBuffer, IColumn> columns)
+        Holder(SortedMap<ByteBuffer, Column> columns)
         {
-            this(new SnapTreeMap<ByteBuffer, IColumn>(columns), DeletionInfo.LIVE);
+            this(new SnapTreeMap<ByteBuffer, Column>(columns), DeletionInfo.LIVE);
         }
 
-        Holder(SnapTreeMap<ByteBuffer, IColumn> map, DeletionInfo deletionInfo)
+        Holder(SnapTreeMap<ByteBuffer, Column> map, DeletionInfo deletionInfo)
         {
             this.map = map;
             this.deletionInfo = deletionInfo;
@@ -324,7 +324,7 @@ public class AtomicSortedColumns implements ISortedColumns
             return new Holder(map, info);
         }
 
-        Holder with(SnapTreeMap<ByteBuffer, IColumn> newMap)
+        Holder with(SnapTreeMap<ByteBuffer, Column> newMap)
         {
             return new Holder(newMap, deletionInfo);
         }
@@ -333,64 +333,49 @@ public class AtomicSortedColumns implements ISortedColumns
         // afterwards.
         Holder clear()
         {
-            return new Holder(new SnapTreeMap<ByteBuffer, IColumn>(map.comparator()), deletionInfo);
+            return new Holder(new SnapTreeMap<ByteBuffer, Column>(map.comparator()), deletionInfo);
         }
 
-        long addColumn(IColumn column, Allocator allocator, SecondaryIndexManager.Updater indexer)
+        long addColumn(Column column, Allocator allocator, SecondaryIndexManager.Updater indexer)
         {
             ByteBuffer name = column.name();
             while (true)
             {
-                IColumn oldColumn = map.putIfAbsent(name, column);
+                Column oldColumn = map.putIfAbsent(name, column);
                 if (oldColumn == null)
                 {
                     indexer.insert(column);
                     return column.dataSize();
                 }
 
-                if (oldColumn instanceof SuperColumn)
+                Column reconciledColumn = column.reconcile(oldColumn, allocator);
+                if (map.replace(name, oldColumn, reconciledColumn))
                 {
-                    assert column instanceof SuperColumn;
-                    long previousSize = oldColumn.dataSize();
-                    ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator);
-                    return oldColumn.dataSize() - previousSize;
-                }
-                else
-                {
-                    IColumn reconciledColumn = column.reconcile(oldColumn, allocator);
-                    if (map.replace(name, oldColumn, reconciledColumn))
-                    {
-                        // for memtable updates we only care about oldcolumn, reconciledcolumn, but when compacting
-                        // we need to make sure we update indexes no matter the order we merge
-                        if (reconciledColumn == column)
-                            indexer.update(oldColumn, reconciledColumn);
-                        else
-                            indexer.update(column, reconciledColumn);
-                        return reconciledColumn.dataSize() - oldColumn.dataSize();
-                    }
-                    // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
-                    // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
+                    // for memtable updates we only care about oldcolumn, reconciledcolumn, but when compacting
+                    // we need to make sure we update indexes no matter the order we merge
+                    if (reconciledColumn == column)
+                        indexer.update(oldColumn, reconciledColumn);
+                    else
+                        indexer.update(column, reconciledColumn);
+                    return reconciledColumn.dataSize() - oldColumn.dataSize();
                 }
+                // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
+                // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
             }
         }
 
         void retainAll(ISortedColumns columns)
         {
-            Iterator<IColumn> iter = map.values().iterator();
-            Iterator<IColumn> toRetain = columns.iterator();
-            IColumn current = iter.hasNext() ? iter.next() : null;
-            IColumn retain = toRetain.hasNext() ? toRetain.next() : null;
+            Iterator<Column> iter = map.values().iterator();
+            Iterator<Column> toRetain = columns.iterator();
+            Column current = iter.hasNext() ? iter.next() : null;
+            Column retain = toRetain.hasNext() ? toRetain.next() : null;
             Comparator<? super ByteBuffer> comparator = map.comparator();
             while (current != null && retain != null)
             {
                 int c = comparator.compare(current.name(), retain.name());
                 if (c == 0)
                 {
-                    if (current instanceof SuperColumn)
-                    {
-                        assert retain instanceof SuperColumn;
-                        ((SuperColumn)current).retainAll((SuperColumn)retain);
-                    }
                     current = iter.hasNext() ? iter.next() : null;
                     retain = toRetain.hasNext() ? toRetain.next() : null;
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 843cf44..fcbfa9b 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -40,7 +40,6 @@ import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.marshal.UUIDType;
@@ -177,7 +176,7 @@ public class BatchlogManager implements BatchlogManagerMBean
                 if (row.cf == null || row.cf.isMarkedForDelete())
                     continue;
 
-                IColumn writtenAt = row.cf.getColumn(WRITTEN_AT);
+                Column writtenAt = row.cf.getColumn(WRITTEN_AT);
                 if (writtenAt == null || System.currentTimeMillis() > LongType.instance.compose(writtenAt.value()) + TIMEOUT)
                     replayBatch(row.key);
             }
@@ -199,13 +198,13 @@ public class BatchlogManager implements BatchlogManagerMBean
         logger.debug("Replaying batch {}", uuid);
 
         ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
-        QueryFilter filter = QueryFilter.getIdentityFilter(key, new QueryPath(SystemTable.BATCHLOG_CF));
+        QueryFilter filter = QueryFilter.getIdentityFilter(key, SystemTable.BATCHLOG_CF);
         ColumnFamily batch = store.getColumnFamily(filter);
 
         if (batch == null || batch.isMarkedForDelete())
             return;
 
-        IColumn dataColumn = batch.getColumn(DATA);
+        Column dataColumn = batch.getColumn(DATA);
         try
         {
             if (dataColumn != null)
@@ -246,7 +245,7 @@ public class BatchlogManager implements BatchlogManagerMBean
     private static void deleteBatch(DecoratedKey key)
     {
         RowMutation rm = new RowMutation(Table.SYSTEM_KS, key.key);
-        rm.delete(new QueryPath(SystemTable.BATCHLOG_CF), FBUtilities.timestampMicros());
+        rm.delete(SystemTable.BATCHLOG_CF, FBUtilities.timestampMicros());
         rm.apply();
     }
 
@@ -262,7 +261,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         IPartitioner partitioner = StorageService.getPartitioner();
         RowPosition minPosition = partitioner.getMinimumToken().minKeyBound();
         AbstractBounds<RowPosition> range = new Range<RowPosition>(minPosition, minPosition, partitioner);
-        return store.getRangeSlice(null, range, Integer.MAX_VALUE, columnFilter, null);
+        return store.getRangeSlice(range, Integer.MAX_VALUE, columnFilter, null);
     }
 
     /** force flush + compaction to reclaim space from replayed batches */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index 8c1a939..3e659cb 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -54,16 +54,14 @@ public class CollationController
         this.filter = filter;
         this.gcBefore = gcBefore;
 
-        // AtomicSortedColumns doesn't work for super columns (see #3821)
         this.factory = mutableColumns
-                     ? cfs.metadata.cfType == ColumnFamilyType.Super ? ThreadSafeSortedColumns.factory() : AtomicSortedColumns.factory()
+                     ? AtomicSortedColumns.factory()
                      : ArrayBackedSortedColumns.factory();
     }
 
     public ColumnFamily getTopLevelColumns()
     {
         return filter.filter instanceof NamesQueryFilter
-               && (cfs.metadata.cfType == ColumnFamilyType.Standard || filter.path.superColumnName != null)
                && cfs.metadata.getDefaultValidator() != CounterColumnType.instance
                ? collectTimeOrderedData()
                : collectAllData();
@@ -110,7 +108,7 @@ public class CollationController
             // (reduceNameFilter removes columns that are known to be irrelevant)
             NamesQueryFilter namesFilter = (NamesQueryFilter) filter.filter;
             TreeSet<ByteBuffer> filterColumns = new TreeSet<ByteBuffer>(namesFilter.columns);
-            QueryFilter reducedFilter = new QueryFilter(filter.key, filter.path, namesFilter.withUpdatedColumns(filterColumns));
+            QueryFilter reducedFilter = new QueryFilter(filter.key, filter.cfName, namesFilter.withUpdatedColumns(filterColumns));
 
             /* add the SSTables on disk */
             Collections.sort(view.sstables, SSTable.maxTimestampComparator);
@@ -161,7 +159,7 @@ public class CollationController
             final ColumnFamily c2 = container;
             CloseableIterator<OnDiskAtom> toCollate = new SimpleAbstractColumnIterator()
             {
-                final Iterator<IColumn> iter = c2.iterator();
+                final Iterator<Column> iter = c2.iterator();
 
                 protected OnDiskAtom computeNext()
                 {
@@ -205,13 +203,10 @@ public class CollationController
     }
 
     /**
-     * remove columns from @param filter where we already have data in @param returnCF newer than @param sstableTimestamp
+     * remove columns from @param filter where we already have data in @param container newer than @param sstableTimestamp
      */
-    private void reduceNameFilter(QueryFilter filter, ColumnFamily returnCF, long sstableTimestamp)
+    private void reduceNameFilter(QueryFilter filter, ColumnFamily container, long sstableTimestamp)
     {
-        AbstractColumnContainer container = filter.path.superColumnName == null
-                                          ? returnCF
-                                          : (SuperColumn) returnCF.getColumn(filter.path.superColumnName);
         // MIN_VALUE means we don't know any information
         if (container == null || sstableTimestamp == Long.MIN_VALUE)
             return;
@@ -219,7 +214,7 @@ public class CollationController
         for (Iterator<ByteBuffer> iterator = ((NamesQueryFilter) filter.filter).columns.iterator(); iterator.hasNext(); )
         {
             ByteBuffer filterColumn = iterator.next();
-            IColumn column = container.getColumn(filterColumn);
+            Column column = container.getColumn(filterColumn);
             if (column != null && column.timestamp() > sstableTimestamp)
                 iterator.remove();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/Column.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Column.java b/src/java/org/apache/cassandra/db/Column.java
index ebfb9f5..4efcfbd 100644
--- a/src/java/org/apache/cassandra/db/Column.java
+++ b/src/java/org/apache/cassandra/db/Column.java
@@ -17,31 +17,34 @@
  */
 package org.apache.cassandra.db;
 
+import java.io.DataInput;
+import java.io.IOError;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.HeapAllocator;
 
 /**
  * Column is immutable, which prevents all kinds of confusion in a multithreaded environment.
- * (TODO: look at making SuperColumn immutable too.  This is trickier but is probably doable
- *  with something like PCollections -- http://code.google.com
  */
-
-public class Column implements IColumn
+public class Column implements OnDiskAtom
 {
+    public static final int MAX_NAME_LENGTH = FBUtilities.MAX_UNSIGNED_SHORT;
+
     private static final ColumnSerializer serializer = new ColumnSerializer();
-    private static final OnDiskAtom.Serializer onDiskSerializer = new OnDiskAtom.Serializer(serializer);
 
     public static ColumnSerializer serializer()
     {
@@ -50,7 +53,38 @@ public class Column implements IColumn
 
     public static OnDiskAtom.Serializer onDiskSerializer()
     {
-        return onDiskSerializer;
+        return OnDiskAtom.Serializer.instance;
+    }
+
+    public static Iterator<OnDiskAtom> onDiskIterator(final DataInput dis, final int count, final ColumnSerializer.Flag flag, final int expireBefore, final Descriptor.Version version)
+    {
+        return new Iterator<OnDiskAtom>()
+        {
+            int i = 0;
+
+            public boolean hasNext()
+            {
+                return i < count;
+            }
+
+            public OnDiskAtom next()
+            {
+                ++i;
+                try
+                {
+                    return onDiskSerializer().deserializeFromSSTable(dis, flag, expireBefore, version);
+                }
+                catch (IOException e)
+                {
+                    throw new IOError(e);
+                }
+            }
+
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+        };
     }
 
     protected final ByteBuffer name;
@@ -71,20 +105,20 @@ public class Column implements IColumn
     {
         assert name != null;
         assert value != null;
-        assert name.remaining() <= IColumn.MAX_NAME_LENGTH;
+        assert name.remaining() <= Column.MAX_NAME_LENGTH;
         this.name = name;
         this.value = value;
         this.timestamp = timestamp;
     }
 
-    public ByteBuffer name()
+    public Column withUpdatedName(ByteBuffer newName)
     {
-        return name;
+        return new Column(newName, value, timestamp);
     }
 
-    public Column getSubColumn(ByteBuffer columnName)
+    public ByteBuffer name()
     {
-        throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+        return name;
     }
 
     public ByteBuffer value()
@@ -92,11 +126,6 @@ public class Column implements IColumn
         return value;
     }
 
-    public Collection<IColumn> getSubColumns()
-    {
-        throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
-    }
-
     public long timestamp()
     {
         return timestamp;
@@ -162,17 +191,7 @@ public class Column implements IColumn
         return 0;
     }
 
-    public void addColumn(IColumn column)
-    {
-        addColumn(null, null);
-    }
-
-    public void addColumn(IColumn column, Allocator allocator)
-    {
-        throw new UnsupportedOperationException("This operation is not supported for simple columns.");
-    }
-
-    public IColumn diff(IColumn column)
+    public Column diff(Column column)
     {
         if (timestamp() < column.timestamp())
         {
@@ -204,12 +223,12 @@ public class Column implements IColumn
         return Integer.MAX_VALUE;
     }
 
-    public IColumn reconcile(IColumn column)
+    public Column reconcile(Column column)
     {
         return reconcile(column, HeapAllocator.instance);
     }
 
-    public IColumn reconcile(IColumn column, Allocator allocator)
+    public Column reconcile(Column column, Allocator allocator)
     {
         // tombstones take precedence.  (if both are tombstones, then it doesn't matter which one we use.)
         if (isMarkedForDelete())
@@ -250,12 +269,12 @@ public class Column implements IColumn
         return result;
     }
 
-    public IColumn localCopy(ColumnFamilyStore cfs)
+    public Column localCopy(ColumnFamilyStore cfs)
     {
         return localCopy(cfs, HeapAllocator.instance);
     }
 
-    public IColumn localCopy(ColumnFamilyStore cfs, Allocator allocator)
+    public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
         return new Column(cfs.internOrCopy(name, allocator), allocator.clone(value), timestamp);
     }
@@ -280,8 +299,7 @@ public class Column implements IColumn
 
     protected void validateName(CFMetaData metadata) throws MarshalException
     {
-        AbstractType<?> nameValidator = metadata.cfType == ColumnFamilyType.Super ? metadata.subcolumnComparator : metadata.comparator;
-        nameValidator.validate(name());
+        metadata.comparator.validate(name());
     }
 
     public void validateFields(CFMetaData metadata) throws MarshalException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 28621d7..fd255d3 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.db;
 
+import java.io.DataInput;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.UUID;
@@ -29,10 +30,9 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.cassandra.cache.IRowCacheEntry;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
-import org.apache.cassandra.io.IColumnSerializer;
+import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.ColumnStats;
 
 public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEntry
@@ -90,12 +90,6 @@ public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEn
         return cloneMeShallow(columns.getFactory(), columns.isInsertReversed());
     }
 
-    public AbstractType<?> getSubComparator()
-    {
-        IColumnSerializer s = getColumnSerializer();
-        return (s instanceof SuperColumnSerializer) ? ((SuperColumnSerializer) s).getComparator() : null;
-    }
-
     public ColumnFamilyType getType()
     {
         return cfm.cfType;
@@ -121,98 +115,38 @@ public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEn
         return cfm;
     }
 
-    public IColumnSerializer getColumnSerializer()
-    {
-        return cfm.getColumnSerializer();
-    }
-
-    public OnDiskAtom.Serializer getOnDiskSerializer()
-    {
-        return cfm.getOnDiskSerializer();
-    }
-
-    public boolean isSuper()
-    {
-        return getType() == ColumnFamilyType.Super;
-    }
-
-    /**
-     * Same as addAll() but do a cloneMe of SuperColumn if necessary to
-     * avoid keeping references to the structure (see #3957).
-     */
-    public void addAllWithSCCopy(ColumnFamily cf, Allocator allocator)
-    {
-        if (cf.isSuper())
-        {
-            for (IColumn c : cf)
-            {
-                columns.addColumn(((SuperColumn)c).cloneMe(), allocator);
-            }
-            delete(cf);
-        }
-        else
-        {
-            addAll(cf, allocator);
-        }
-    }
-
-    public void addColumn(QueryPath path, ByteBuffer value, long timestamp)
+    public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp)
     {
-        addColumn(path, value, timestamp, 0);
+        addColumn(name, value, timestamp, 0);
     }
 
-    public void addColumn(QueryPath path, ByteBuffer value, long timestamp, int timeToLive)
+    public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive)
     {
-        assert path.columnName != null : path;
         assert !metadata().getDefaultValidator().isCommutative();
-        Column column = Column.create(path.columnName, value, timestamp, timeToLive, metadata());
-        addColumn(path.superColumnName, column);
+        Column column = Column.create(name, value, timestamp, timeToLive, metadata());
+        addColumn(column);
     }
 
-    public void addCounter(QueryPath path, long value)
+    public void addCounter(ByteBuffer name, long value)
     {
-        assert path.columnName != null : path;
-        addColumn(path.superColumnName, new CounterUpdateColumn(path.columnName, value, System.currentTimeMillis()));
+        addColumn(new CounterUpdateColumn(name, value, System.currentTimeMillis()));
     }
 
-    public void addTombstone(QueryPath path, ByteBuffer localDeletionTime, long timestamp)
+    public void addTombstone(ByteBuffer name, ByteBuffer localDeletionTime, long timestamp)
     {
-        assert path.columnName != null : path;
-        addColumn(path.superColumnName, new DeletedColumn(path.columnName, localDeletionTime, timestamp));
-    }
-
-    public void addTombstone(QueryPath path, int localDeletionTime, long timestamp)
-    {
-        assert path.columnName != null : path;
-        addColumn(path.superColumnName, new DeletedColumn(path.columnName, localDeletionTime, timestamp));
+        addColumn(new DeletedColumn(name, localDeletionTime, timestamp));
     }
 
     public void addTombstone(ByteBuffer name, int localDeletionTime, long timestamp)
     {
-        addColumn(null, new DeletedColumn(name, localDeletionTime, timestamp));
-    }
-
-    public void addColumn(ByteBuffer superColumnName, Column column)
-    {
-        IColumn c;
-        if (superColumnName == null)
-        {
-            c = column;
-        }
-        else
-        {
-            assert isSuper();
-            c = new SuperColumn(superColumnName, getSubComparator());
-            c.addColumn(column); // checks subcolumn name
-        }
-        addColumn(c);
+        addColumn(new DeletedColumn(name, localDeletionTime, timestamp));
     }
 
     public void addAtom(OnDiskAtom atom)
     {
-        if (atom instanceof IColumn)
+        if (atom instanceof Column)
         {
-            addColumn((IColumn)atom);
+            addColumn((Column)atom);
         }
         else
         {
@@ -236,20 +170,20 @@ public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEn
         ColumnFamily cfDiff = ColumnFamily.create(cfm);
         cfDiff.delete(cfComposite.deletionInfo());
 
-        // (don't need to worry about cfNew containing IColumns that are shadowed by
+        // (don't need to worry about cfNew containing Columns that are shadowed by
         // the delete tombstone, since cfNew was generated by CF.resolve, which
         // takes care of those for us.)
-        for (IColumn columnExternal : cfComposite)
+        for (Column columnExternal : cfComposite)
         {
             ByteBuffer cName = columnExternal.name();
-            IColumn columnInternal = this.columns.getColumn(cName);
+            Column columnInternal = this.columns.getColumn(cName);
             if (columnInternal == null)
             {
                 cfDiff.addColumn(columnExternal);
             }
             else
             {
-                IColumn columnDiff = columnInternal.diff(columnExternal);
+                Column columnDiff = columnInternal.diff(columnExternal);
                 if (columnDiff != null)
                 {
                     cfDiff.addColumn(columnDiff);
@@ -266,7 +200,7 @@ public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEn
     int dataSize()
     {
         int size = deletionInfo().dataSize();
-        for (IColumn column : columns)
+        for (Column column : columns)
         {
             size += column.dataSize();
         }
@@ -276,7 +210,7 @@ public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEn
     public long maxTimestamp()
     {
         long maxTimestamp = deletionInfo().maxTimestamp();
-        for (IColumn column : columns)
+        for (Column column : columns)
             maxTimestamp = Math.max(maxTimestamp, column.maxTimestamp());
         return maxTimestamp;
     }
@@ -329,17 +263,10 @@ public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEn
 
     public void updateDigest(MessageDigest digest)
     {
-        for (IColumn column : columns)
+        for (Column column : columns)
             column.updateDigest(digest);
     }
 
-    public static AbstractType<?> getComparatorFor(String table, String columnFamilyName, ByteBuffer superColumnName)
-    {
-        return superColumnName == null
-               ? Schema.instance.getComparator(table, columnFamilyName)
-               : Schema.instance.getSubComparator(table, columnFamilyName);
-    }
-
     public static ColumnFamily diff(ColumnFamily cf1, ColumnFamily cf2)
     {
         if (cf1 == null)
@@ -368,7 +295,7 @@ public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEn
     public void validateColumnFields() throws MarshalException
     {
         CFMetaData metadata = metadata();
-        for (IColumn column : this)
+        for (Column column : this)
         {
             column.validateFields(metadata);
         }
@@ -380,7 +307,7 @@ public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEn
         long maxTimestampSeen = deletionInfo().maxTimestamp();
         StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
 
-        for (IColumn column : columns)
+        for (Column column : columns)
         {
             minTimestampSeen = Math.min(minTimestampSeen, column.minTimestamp());
             maxTimestampSeen = Math.max(maxTimestampSeen, column.maxTimestamp());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
index bac1e8b..2b3b598 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
@@ -20,10 +20,10 @@ package org.apache.cassandra.db;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.UUID;
 
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.ISSTableSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -62,13 +62,18 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
             dos.writeBoolean(true);
             serializeCfId(cf.id(), dos, version);
 
-            DeletionInfo.serializer().serialize(cf.deletionInfo(), dos, version);
+            if (cf.metadata().isSuper() && version < MessagingService.VERSION_20)
+            {
+                SuperColumns.serializeSuperColumnFamily(cf, dos, version);
+                return;
+            }
 
-            IColumnSerializer columnSerializer = cf.getColumnSerializer();
+            DeletionInfo.serializer().serialize(cf.deletionInfo(), dos, version);
+            ColumnSerializer columnSerializer = Column.serializer();
             int count = cf.getColumnCount();
             dos.writeInt(count);
             int written = 0;
-            for (IColumn column : cf)
+            for (Column column : cf)
             {
                 columnSerializer.serialize(column, dos);
                 written++;
@@ -83,32 +88,50 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
 
     public ColumnFamily deserialize(DataInput dis, int version) throws IOException
     {
-        return deserialize(dis, IColumnSerializer.Flag.LOCAL, TreeMapBackedSortedColumns.factory(), version);
+        return deserialize(dis, ColumnSerializer.Flag.LOCAL, TreeMapBackedSortedColumns.factory(), version);
     }
 
-    public ColumnFamily deserialize(DataInput dis, IColumnSerializer.Flag flag, ISortedColumns.Factory factory, int version) throws IOException
+    public ColumnFamily deserialize(DataInput dis, ColumnSerializer.Flag flag, ISortedColumns.Factory factory, int version) throws IOException
     {
         if (!dis.readBoolean())
             return null;
 
         ColumnFamily cf = ColumnFamily.create(deserializeCfId(dis, version), factory);
-        IColumnSerializer columnSerializer = cf.getColumnSerializer();
-        cf.delete(DeletionInfo.serializer().deserialize(dis, version, cf.getComparator()));
         int expireBefore = (int) (System.currentTimeMillis() / 1000);
-        int size = dis.readInt();
-        for (int i = 0; i < size; ++i)
+
+        if (cf.metadata().isSuper() && version < MessagingService.VERSION_20)
         {
-            cf.addColumn(columnSerializer.deserialize(dis, flag, expireBefore));
+            SuperColumns.deserializerSuperColumnFamily(dis, cf, flag, expireBefore, version);
+        }
+        else
+        {
+            cf.delete(DeletionInfo.serializer().deserialize(dis, version, cf.getComparator()));
+
+            ColumnSerializer columnSerializer = Column.serializer();
+            int size = dis.readInt();
+            for (int i = 0; i < size; ++i)
+            {
+                cf.addColumn(columnSerializer.deserialize(dis, flag, expireBefore));
+            }
         }
         return cf;
     }
 
     public long contentSerializedSize(ColumnFamily cf, TypeSizes typeSizes, int version)
     {
-        long size = DeletionInfo.serializer().serializedSize(cf.deletionInfo(), typeSizes, version);
-        size += typeSizes.sizeof(cf.getColumnCount());
-        for (IColumn column : cf)
-            size += column.serializedSize(typeSizes);
+        long size = 0L;
+
+        if (cf.metadata().isSuper() && version < MessagingService.VERSION_20)
+        {
+            size += SuperColumns.serializedSize(cf, typeSizes, version);
+        }
+        else
+        {
+            size += DeletionInfo.serializer().serializedSize(cf.deletionInfo(), typeSizes, version);
+            size += typeSizes.sizeof(cf.getColumnCount());
+            for (Column column : cf)
+                size += column.serializedSize(typeSizes);
+        }
         return size;
     }
 
@@ -142,14 +165,14 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
         throw new UnsupportedOperationException();
     }
 
-    public void deserializeColumnsFromSSTable(DataInput dis, ColumnFamily cf, int size, IColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version) throws IOException
+    public void deserializeColumnsFromSSTable(DataInput dis, ColumnFamily cf, int size, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version) throws IOException
     {
-        OnDiskAtom.Serializer atomSerializer = cf.getOnDiskSerializer();
-        for (int i = 0; i < size; ++i)
-            cf.addAtom(atomSerializer.deserializeFromSSTable(dis, flag, expireBefore, version));
+        Iterator<OnDiskAtom> iter = cf.metadata().getOnDiskIterator(dis, size, flag, expireBefore, version);
+        while (iter.hasNext())
+            cf.addAtom(iter.next());
     }
 
-    public void deserializeFromSSTable(DataInput dis, ColumnFamily cf, IColumnSerializer.Flag flag, Descriptor.Version version) throws IOException
+    public void deserializeFromSSTable(DataInput dis, ColumnFamily cf, ColumnSerializer.Flag flag, Descriptor.Version version) throws IOException
     {
         cf.delete(DeletionInfo.serializer().deserializeFromSSTable(dis, version));
         int size = dis.readInt();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 88719e3..9aa0d6d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -54,7 +54,6 @@ import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -752,10 +751,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             if (cachedRow instanceof RowCacheSentinel)
                 invalidateCachedRow(cacheKey);
             else
-                // columnFamily is what is written in the commit log. Because of the PeriodicCommitLog, this can be done in concurrency
-                // with this. So columnFamily shouldn't be modified and if it contains super columns, neither should they. So for super
-                // columns, we must make sure to clone them when adding to the cache. That's what addAllWithSCCopy does (see #3957)
-                ((ColumnFamily) cachedRow).addAllWithSCCopy(columnFamily, HeapAllocator.instance);
+                ((ColumnFamily) cachedRow).addAll(columnFamily, HeapAllocator.instance);
         }
     }
 
@@ -820,23 +816,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private static void removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
     {
-        if (cf.isSuper())
-            removeDeletedSuper(cf, gcBefore);
-        else
-            removeDeletedStandard(cf, gcBefore, indexer);
-    }
-
-    public static void removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore)
-    {
-        removeDeletedColumnsOnly(cf, gcBefore, SecondaryIndexManager.nullUpdater);
-    }
-
-    private static void removeDeletedStandard(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
-    {
-        Iterator<IColumn> iter = cf.iterator();
+        Iterator<Column> iter = cf.iterator();
         while (iter.hasNext())
         {
-            IColumn c = iter.next();
+            Column c = iter.next();
             // remove columns if
             // (a) the column itself is gcable or
             // (b) the column is shadowed by a CF tombstone
@@ -848,36 +831,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    private static void removeDeletedSuper(ColumnFamily cf, int gcBefore)
+    public static void removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore)
     {
-        // TODO assume deletion means "most are deleted?" and add to clone, instead of remove from original?
-        // this could be improved by having compaction, or possibly even removeDeleted, r/m the tombstone
-        // once gcBefore has passed, so if new stuff is added in it doesn't used the wrong algorithm forever
-        Iterator<IColumn> iter = cf.iterator();
-        while (iter.hasNext())
-        {
-            SuperColumn c = (SuperColumn)iter.next();
-            Iterator<IColumn> subIter = c.getSubColumns().iterator();
-            while (subIter.hasNext())
-            {
-                IColumn subColumn = subIter.next();
-                // remove subcolumns if
-                // (a) the subcolumn itself is gcable or
-                // (b) the supercolumn is shadowed by the CF and the column is not newer
-                // (b) the subcolumn is shadowed by the supercolumn
-                if (subColumn.getLocalDeletionTime() < gcBefore
-                    || cf.deletionInfo().isDeleted(c.name(), subColumn.timestamp())
-                    || c.deletionInfo().isDeleted(subColumn))
-                {
-                    subIter.remove();
-                }
-            }
-            c.maybeResetDeletionTimes(gcBefore);
-            if (c.getSubColumns().isEmpty() && !c.isMarkedForDelete())
-            {
-                iter.remove();
-            }
-        }
+        removeDeletedColumnsOnly(cf, gcBefore, SecondaryIndexManager.nullUpdater);
     }
 
     /**
@@ -1139,9 +1095,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return metric.writeLatency.recentLatencyHistogram.getBuckets(true);
     }
 
-    public ColumnFamily getColumnFamily(DecoratedKey key, QueryPath path, ByteBuffer start, ByteBuffer finish, boolean reversed, int limit)
+    public ColumnFamily getColumnFamily(DecoratedKey key, ByteBuffer start, ByteBuffer finish, boolean reversed, int limit)
     {
-        return getColumnFamily(QueryFilter.getSliceFilter(key, path, start, finish, reversed, limit));
+        return getColumnFamily(QueryFilter.getSliceFilter(key, name, start, finish, reversed, limit));
     }
 
     /**
@@ -1195,7 +1151,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         try
         {
-            ColumnFamily data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, new QueryPath(name)),
+            ColumnFamily data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, name),
                                                    Integer.MIN_VALUE,
                                                    true);
             if (sentinelSuccess && data != null)
@@ -1244,9 +1200,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 if (cf == null)
                     return null;
 
-                // TODO this is necessary because when we collate supercolumns together, we don't check
-                // their subcolumns for relevance, so we need to do a second prune post facto here.
-                result = cf.isSuper() ? removeDeleted(cf, gcBefore) : removeDeletedCF(cf, gcBefore);
+                result = removeDeletedCF(cf, gcBefore);
 
             }
         }
@@ -1268,9 +1222,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         ColumnFamily cf = cached.cloneMeShallow(ArrayBackedSortedColumns.factory(), filter.filter.isReversed());
         OnDiskAtomIterator ci = filter.getMemtableColumnIterator(cached, null);
         filter.collateOnDiskAtom(cf, Collections.singletonList(ci), gcBefore);
-        // TODO this is necessary because when we collate supercolumns together, we don't check
-        // their subcolumns for relevance, so we need to do a second prune post facto here.
-        return cf.isSuper() ? removeDeleted(cf, gcBefore) : removeDeletedCF(cf, gcBefore);
+        return removeDeletedCF(cf, gcBefore);
     }
 
     /**
@@ -1402,14 +1354,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
       * @param range Either a Bounds, which includes start key, or a Range, which does not.
       * @param columnFilter description of the columns we're interested in for each row
      */
-    public AbstractScanIterator getSequentialIterator(ByteBuffer superColumn, final AbstractBounds<RowPosition> range, IDiskAtomFilter columnFilter)
+    public AbstractScanIterator getSequentialIterator(final AbstractBounds<RowPosition> range, IDiskAtomFilter columnFilter)
     {
         assert !(range instanceof Range) || !((Range)range).isWrapAround() || range.right.isMinimum() : range;
 
         final RowPosition startWith = range.left;
         final RowPosition stopAt = range.right;
 
-        QueryFilter filter = new QueryFilter(null, new QueryPath(name, superColumn, null), columnFilter);
+        QueryFilter filter = new QueryFilter(null, name, columnFilter);
 
         final ViewFragment view = markReferenced(startWith, stopAt);
         Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), range.getString(metadata.getKeyValidator()));
@@ -1439,11 +1391,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
                     logger.trace("scanned {}", key);
 
-                    // TODO this is necessary because when we collate supercolumns together, we don't check
-                    // their subcolumns for relevance, so we need to do a second prune post facto here.
-                    return current.cf != null && current.cf.isSuper()
-                        ? new Row(current.key, removeDeleted(current.cf, gcBefore))
-                        : current;
+                    return current;
                 }
 
                 public void close() throws IOException
@@ -1461,14 +1409,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    public List<Row> getRangeSlice(ByteBuffer superColumn, final AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter columnFilter, List<IndexExpression> rowFilter)
+    public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter columnFilter, List<IndexExpression> rowFilter)
     {
-        return getRangeSlice(superColumn, range, maxResults, columnFilter, rowFilter, false, false);
+        return getRangeSlice(range, maxResults, columnFilter, rowFilter, false, false);
     }
 
-    public List<Row> getRangeSlice(ByteBuffer superColumn, final AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter columnFilter, List<IndexExpression> rowFilter, boolean countCQL3Rows, boolean isPaging)
+    public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter columnFilter, List<IndexExpression> rowFilter, boolean countCQL3Rows, boolean isPaging)
     {
-        return filter(getSequentialIterator(superColumn, range, columnFilter), ExtendedFilter.create(this, columnFilter, rowFilter, maxResults, countCQL3Rows, isPaging));
+        return filter(getSequentialIterator(range, columnFilter), ExtendedFilter.create(this, columnFilter, rowFilter, maxResults, countCQL3Rows, isPaging));
     }
 
     public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter)
@@ -1503,8 +1451,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                     IDiskAtomFilter extraFilter = filter.getExtraFilter(data);
                     if (extraFilter != null)
                     {
-                        QueryPath path = new QueryPath(name);
-                        ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, path, extraFilter));
+                        ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, name, extraFilter));
                         if (cf != null)
                             data.addAll(cf, HeapAllocator.instance);
                     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index bd1c35a..b72638b 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -52,6 +52,8 @@ public class ColumnIndex
      */
     public static class Builder
     {
+        private static final OnDiskAtom.Serializer atomSerializer = Column.onDiskSerializer();
+
         private final ColumnIndex result;
         private final long indexOffset;
         private long startPosition = -1;
@@ -62,7 +64,6 @@ public class ColumnIndex
         private OnDiskAtom lastBlockClosing;
         private final DataOutput output;
         private final RangeTombstone.Tracker tombstoneTracker;
-        private final OnDiskAtom.Serializer atomSerializer;
         private int atomCount;
 
         public Builder(ColumnFamily cf,
@@ -73,7 +74,6 @@ public class ColumnIndex
             this.indexOffset = rowHeaderSize(key, cf.deletionInfo());
             this.result = new ColumnIndex(estimatedColumnCount);
             this.output = output;
-            this.atomSerializer = cf.getOnDiskSerializer();
             this.tombstoneTracker = new RangeTombstone.Tracker(cf.getComparator());
         }
 
@@ -116,7 +116,7 @@ public class ColumnIndex
             RangeTombstone tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
             Comparator<ByteBuffer> comparator = cf.getComparator();
 
-            for (IColumn c : cf)
+            for (Column c : cf)
             {
                 while (tombstone != null && comparator.compare(c.name(), tombstone.min) >= 0)
                 {
@@ -146,7 +146,7 @@ public class ColumnIndex
         {
             atomCount++;
 
-            if (column instanceof IColumn)
+            if (column instanceof Column)
                 result.bloomFilter.add(column.name());
 
             if (firstColumn == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/ColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnSerializer.java b/src/java/org/apache/cassandra/db/ColumnSerializer.java
index d4a3e64..bc1d788 100644
--- a/src/java/org/apache/cassandra/db/ColumnSerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnSerializer.java
@@ -22,12 +22,12 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.FSReadError;
-import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class ColumnSerializer implements IColumnSerializer
+public class ColumnSerializer implements ISerializer<Column>
 {
     public final static int DELETION_MASK        = 0x01;
     public final static int EXPIRATION_MASK      = 0x02;
@@ -35,7 +35,23 @@ public class ColumnSerializer implements IColumnSerializer
     public final static int COUNTER_UPDATE_MASK  = 0x08;
     public final static int RANGE_TOMBSTONE_MASK = 0x10;
 
-    public void serialize(IColumn column, DataOutput dos) throws IOException
+    /**
+     * Flag affecting deserialization behavior.
+     *  - LOCAL: for deserialization of local data (Expired columns are
+     *      converted to tombstones (to gain disk space)).
+     *  - FROM_REMOTE: for deserialization of data received from remote hosts
+     *      (Expired columns are converted to tombstone and counters have
+     *      their delta cleared)
+     *  - PRESERVE_SIZE: used when no transformation must be performed, i.e,
+     *      when we must ensure that deserializing and reserializing the
+     *      result yield the exact same bytes. Streaming uses this.
+     */
+    public static enum Flag
+    {
+        LOCAL, FROM_REMOTE, PRESERVE_SIZE;
+    }
+
+    public void serialize(Column column, DataOutput dos) throws IOException
     {
         assert column.name().remaining() > 0;
         ByteBufferUtil.writeWithShortLength(column.name(), dos);
@@ -70,12 +86,12 @@ public class ColumnSerializer implements IColumnSerializer
      * deserialize comes from a remote host. If it does, then we must clear
      * the delta.
      */
-    public Column deserialize(DataInput dis, IColumnSerializer.Flag flag) throws IOException
+    public Column deserialize(DataInput dis, ColumnSerializer.Flag flag) throws IOException
     {
         return deserialize(dis, flag, (int) (System.currentTimeMillis() / 1000));
     }
 
-    public Column deserialize(DataInput dis, IColumnSerializer.Flag flag, int expireBefore) throws IOException
+    public Column deserialize(DataInput dis, ColumnSerializer.Flag flag, int expireBefore) throws IOException
     {
         ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
         if (name.remaining() <= 0)
@@ -85,7 +101,7 @@ public class ColumnSerializer implements IColumnSerializer
         return deserializeColumnBody(dis, name, b, flag, expireBefore);
     }
 
-    Column deserializeColumnBody(DataInput dis, ByteBuffer name, int mask, IColumnSerializer.Flag flag, int expireBefore) throws IOException
+    Column deserializeColumnBody(DataInput dis, ByteBuffer name, int mask, ColumnSerializer.Flag flag, int expireBefore) throws IOException
     {
         if ((mask & COUNTER_MASK) != 0)
         {
@@ -114,7 +130,7 @@ public class ColumnSerializer implements IColumnSerializer
         }
     }
 
-    public long serializedSize(IColumn column, TypeSizes type)
+    public long serializedSize(Column column, TypeSizes type)
     {
         return column.serializedSize(type);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
index a10cb57..a24687b 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.exceptions.OverloadedException;
 import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.service.AbstractWriteResponseHandler;
@@ -75,15 +74,21 @@ public class CounterColumn extends Column
         this.timestampOfLastDelete = timestampOfLastDelete;
     }
 
-    public static CounterColumn create(ByteBuffer name, ByteBuffer value, long timestamp, long timestampOfLastDelete, IColumnSerializer.Flag flag)
+    public static CounterColumn create(ByteBuffer name, ByteBuffer value, long timestamp, long timestampOfLastDelete, ColumnSerializer.Flag flag)
     {
         // #elt being negative means we have to clean delta
         short count = value.getShort(value.position());
-        if (flag == IColumnSerializer.Flag.FROM_REMOTE || (flag == IColumnSerializer.Flag.LOCAL && count < 0))
+        if (flag == ColumnSerializer.Flag.FROM_REMOTE || (flag == ColumnSerializer.Flag.LOCAL && count < 0))
             value = CounterContext.instance().clearAllDelta(value);
         return new CounterColumn(name, value, timestamp, timestampOfLastDelete);
     }
 
+    @Override
+    public Column withUpdatedName(ByteBuffer newName)
+    {
+        return new CounterColumn(newName, value, timestamp, timestampOfLastDelete);
+    }
+
     public long timestampOfLastDelete()
     {
         return timestampOfLastDelete;
@@ -111,7 +116,7 @@ public class CounterColumn extends Column
     }
 
     @Override
-    public IColumn diff(IColumn column)
+    public Column diff(Column column)
     {
         assert (column instanceof CounterColumn) || (column instanceof DeletedColumn) : "Wrong class type: " + column.getClass();
 
@@ -160,7 +165,7 @@ public class CounterColumn extends Column
     }
 
     @Override
-    public IColumn reconcile(IColumn column, Allocator allocator)
+    public Column reconcile(Column column, Allocator allocator)
     {
         assert (column instanceof CounterColumn) || (column instanceof DeletedColumn) : "Wrong class type: " + column.getClass();
 
@@ -208,13 +213,13 @@ public class CounterColumn extends Column
     }
 
     @Override
-    public IColumn localCopy(ColumnFamilyStore cfs)
+    public Column localCopy(ColumnFamilyStore cfs)
     {
         return new CounterColumn(cfs.internOrCopy(name, HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp, timestampOfLastDelete);
     }
 
     @Override
-    public IColumn localCopy(ColumnFamilyStore cfs, Allocator allocator)
+    public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
         return new CounterColumn(cfs.internOrCopy(name, allocator), allocator.clone(value), timestamp, timestampOfLastDelete);
     }
@@ -299,52 +304,25 @@ public class CounterColumn extends Column
     public static void mergeAndRemoveOldShards(DecoratedKey key, ColumnFamily cf, int gcBefore, int mergeBefore, boolean sendToOtherReplica)
     {
         ColumnFamily remoteMerger = null;
-        if (!cf.isSuper())
+
+        for (Column c : cf)
         {
-            for (IColumn c : cf)
+            if (!(c instanceof CounterColumn))
+                continue;
+            CounterColumn cc = (CounterColumn) c;
+            CounterColumn shardMerger = cc.computeOldShardMerger(mergeBefore);
+            CounterColumn merged = cc;
+            if (shardMerger != null)
             {
-                if (!(c instanceof CounterColumn))
-                    continue;
-                CounterColumn cc = (CounterColumn) c;
-                CounterColumn shardMerger = cc.computeOldShardMerger(mergeBefore);
-                CounterColumn merged = cc;
-                if (shardMerger != null)
-                {
-                    merged = (CounterColumn) cc.reconcile(shardMerger);
-                    if (remoteMerger == null)
-                        remoteMerger = cf.cloneMeShallow();
-                    remoteMerger.addColumn(merged);
-                }
-                CounterColumn cleaned = merged.removeOldShards(gcBefore);
-                if (cleaned != cc)
-                {
-                    cf.replace(cc, cleaned);
-                }
+                merged = (CounterColumn) cc.reconcile(shardMerger);
+                if (remoteMerger == null)
+                    remoteMerger = cf.cloneMeShallow();
+                remoteMerger.addColumn(merged);
             }
-        }
-        else
-        {
-            for (IColumn col : cf)
+            CounterColumn cleaned = merged.removeOldShards(gcBefore);
+            if (cleaned != cc)
             {
-                SuperColumn c = (SuperColumn)col;
-                for (IColumn subColumn : c.getSubColumns())
-                {
-                    if (!(subColumn instanceof CounterColumn))
-                        continue;
-                    CounterColumn cc = (CounterColumn) subColumn;
-                    CounterColumn shardMerger = cc.computeOldShardMerger(mergeBefore);
-                    CounterColumn merged = cc;
-                    if (shardMerger != null)
-                    {
-                        merged = (CounterColumn) cc.reconcile(shardMerger);
-                        if (remoteMerger == null)
-                            remoteMerger = cf.cloneMeShallow();
-                        remoteMerger.addColumn(c.name(), merged);
-                    }
-                    CounterColumn cleaned = merged.removeOldShards(gcBefore);
-                    if (cleaned != subColumn)
-                        c.replace(subColumn, cleaned);
-                }
+                cf.replace(cc, cleaned);
             }
         }
 
@@ -361,7 +339,7 @@ public class CounterColumn extends Column
         }
     }
 
-    public IColumn markDeltaToBeCleared()
+    public Column markDeltaToBeCleared()
     {
         return new CounterColumn(name, contextManager.markDeltaToBeCleared(value), timestamp, timestampOfLastDelete);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index d06df33..dfc4918 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -24,9 +24,11 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 
-import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -92,8 +94,6 @@ public class CounterMutation implements IMutation
                 continue;
 
             ColumnFamily cf = row.cf;
-            if (cf.isSuper())
-                cf.retainAll(rowMutation.getColumnFamily(cf.metadata().cfId));
             replicationMutation.add(cf);
         }
         return replicationMutation;
@@ -101,8 +101,9 @@ public class CounterMutation implements IMutation
 
     private void addReadCommandFromColumnFamily(String table, ByteBuffer key, ColumnFamily columnFamily, List<ReadCommand> commands)
     {
-        QueryPath queryPath = new QueryPath(columnFamily.metadata().cfName);
-        commands.add(new SliceByNamesReadCommand(table, key, queryPath, columnFamily.getColumnNames()));
+        SortedSet s = new TreeSet<ByteBuffer>(columnFamily.metadata().comparator);
+        s.addAll(columnFamily.getColumnNames());
+        commands.add(new SliceByNamesReadCommand(table, key, columnFamily.metadata().cfName, new NamesQueryFilter(s)));
     }
 
     public MessageOut<CounterMutation> makeMutationMessage() throws IOException
@@ -128,7 +129,7 @@ public class CounterMutation implements IMutation
         {
             ColumnFamily cf = cf_.cloneMeShallow();
             ColumnFamilyStore cfs = table.getColumnFamilyStore(cf.id());
-            for (IColumn column : cf_)
+            for (Column column : cf_)
             {
                 cf.addColumn(column.localCopy(cfs), HeapAllocator.instance);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
index 58241c0..9d9530e 100644
--- a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
@@ -49,14 +49,14 @@ public class CounterUpdateColumn extends Column
     }
 
     @Override
-    public IColumn diff(IColumn column)
+    public Column diff(Column column)
     {
         // Diff is used during reads, but we should never read those columns
         throw new UnsupportedOperationException("This operation is unsupported on CounterUpdateColumn.");
     }
 
     @Override
-    public IColumn reconcile(IColumn column, Allocator allocator)
+    public Column reconcile(Column column, Allocator allocator)
     {
         // The only time this could happen is if a batchAdd ships two
         // increment for the same column. Hence we simply sums the delta.
@@ -88,7 +88,7 @@ public class CounterUpdateColumn extends Column
     }
 
     @Override
-    public IColumn localCopy(ColumnFamilyStore cfs, Allocator allocator)
+    public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
         return new CounterColumn(cfs.internOrCopy(name, allocator),
                                  CounterContext.instance().create(delta(), allocator),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
index 94d1958..7fd5164 100644
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ b/src/java/org/apache/cassandra/db/DefsTable.java
@@ -37,7 +37,6 @@ import org.apache.avro.specific.SpecificRecord;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.migration.avro.KsDef;
@@ -181,7 +180,7 @@ public class DefsTable
             if (Schema.invalidSchemaRow(row))
                 continue;
 
-            for (IColumn column : row.cf.columns)
+            for (Column column : row.cf.columns)
             {
                 Date columnDate = new Date(column.timestamp());
 
@@ -233,10 +232,10 @@ public class DefsTable
 
             RowMutation mutation = new RowMutation(Table.SYSTEM_KS, row.key.key);
 
-            for (IColumn column : row.cf.columns)
+            for (Column column : row.cf.columns)
             {
                 if (column.isLive())
-                    mutation.add(new QueryPath(columnFamily, null, column.name()), column.value(), microTimestamp);
+                    mutation.add(columnFamily, column.name(), column.value(), microTimestamp);
             }
 
             mutation.apply();
@@ -272,7 +271,7 @@ public class DefsTable
     private static Row serializedColumnFamilies(DecoratedKey ksNameKey)
     {
         ColumnFamilyStore cfsStore = SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNFAMILIES_CF);
-        return new Row(ksNameKey, cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey, new QueryPath(SystemTable.SCHEMA_COLUMNFAMILIES_CF))));
+        return new Row(ksNameKey, cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey, SystemTable.SCHEMA_COLUMNFAMILIES_CF)));
     }
 
     /**
@@ -290,8 +289,8 @@ public class DefsTable
         DecoratedKey vkey = StorageService.getPartitioner().decorateKey(toUTF8Bytes(version));
         Table defs = Table.open(Table.SYSTEM_KS);
         ColumnFamilyStore cfStore = defs.getColumnFamilyStore(OLD_SCHEMA_CF);
-        ColumnFamily cf = cfStore.getColumnFamily(QueryFilter.getIdentityFilter(vkey, new QueryPath(OLD_SCHEMA_CF)));
-        IColumn avroschema = cf.getColumn(DEFINITION_SCHEMA_COLUMN_NAME);
+        ColumnFamily cf = cfStore.getColumnFamily(QueryFilter.getIdentityFilter(vkey, OLD_SCHEMA_CF));
+        Column avroschema = cf.getColumn(DEFINITION_SCHEMA_COLUMN_NAME);
 
         Collection<KSMetaData> keyspaces = Collections.emptyList();
 
@@ -301,10 +300,10 @@ public class DefsTable
             org.apache.avro.Schema schema = org.apache.avro.Schema.parse(ByteBufferUtil.string(value));
 
             // deserialize keyspaces using schema
-            Collection<IColumn> columns = cf.getSortedColumns();
+            Collection<Column> columns = cf.getSortedColumns();
             keyspaces = new ArrayList<KSMetaData>(columns.size());
 
-            for (IColumn column : columns)
+            for (Column column : columns)
             {
                 if (column.name().equals(DEFINITION_SCHEMA_COLUMN_NAME))
                     continue;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/DeletedColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedColumn.java b/src/java/org/apache/cassandra/db/DeletedColumn.java
index 18faeef..c1ca18c 100644
--- a/src/java/org/apache/cassandra/db/DeletedColumn.java
+++ b/src/java/org/apache/cassandra/db/DeletedColumn.java
@@ -38,6 +38,12 @@ public class DeletedColumn extends Column
     }
 
     @Override
+    public Column withUpdatedName(ByteBuffer newName)
+    {
+        return new DeletedColumn(newName, value, timestamp);
+    }
+
+    @Override
     public boolean isMarkedForDelete()
     {
         // We don't rely on the column implementation because it could mistakenly return false if
@@ -58,7 +64,7 @@ public class DeletedColumn extends Column
     }
 
     @Override
-    public IColumn reconcile(IColumn column, Allocator allocator)
+    public Column reconcile(Column column, Allocator allocator)
     {
         if (column instanceof DeletedColumn)
             return super.reconcile(column, allocator);
@@ -66,13 +72,13 @@ public class DeletedColumn extends Column
     }
 
     @Override
-    public IColumn localCopy(ColumnFamilyStore cfs)
+    public Column localCopy(ColumnFamilyStore cfs)
     {
         return new DeletedColumn(cfs.internOrCopy(name, HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp);
     }
 
     @Override
-    public IColumn localCopy(ColumnFamilyStore cfs, Allocator allocator)
+    public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
         return new DeletedColumn(cfs.internOrCopy(name, allocator), allocator.clone(value), timestamp);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index be64224..eab9f37 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -51,8 +51,12 @@ public class DeletionInfo
     {
         // Pre-1.1 node may return MIN_VALUE for non-deleted container, but the new default is MAX_VALUE
         // (see CASSANDRA-3872)
-        this(new DeletionTime(markedForDeleteAt, localDeletionTime == Integer.MIN_VALUE ? Integer.MAX_VALUE : localDeletionTime),
-             IntervalTree.<ByteBuffer, DeletionTime, RangeTombstone>emptyTree());
+        this(new DeletionTime(markedForDeleteAt, localDeletionTime == Integer.MIN_VALUE ? Integer.MAX_VALUE : localDeletionTime));
+    }
+
+    public DeletionInfo(DeletionTime topLevel)
+    {
+        this(topLevel, IntervalTree.<ByteBuffer, DeletionTime, RangeTombstone>emptyTree());
     }
 
     public DeletionInfo(ByteBuffer start, ByteBuffer end, Comparator<ByteBuffer> comparator, long markedForDeleteAt, int localDeletionTime)
@@ -94,7 +98,7 @@ public class DeletionInfo
      * @param column the column to check.
      * @return true if the column is deleted, false otherwise
      */
-    public boolean isDeleted(IColumn column)
+    public boolean isDeleted(Column column)
     {
         return isDeleted(column.name(), column.mostRecentLiveChangeAt());
     }
@@ -211,6 +215,11 @@ public class DeletionInfo
         return ranges.iterator();
     }
 
+    public List<DeletionTime> rangeCovering(ByteBuffer name)
+    {
+        return ranges.search(name);
+    }
+
     public int dataSize()
     {
         int size = TypeSizes.NATIVE.sizeof(topLevel.markedForDeleteAt);
@@ -331,7 +340,7 @@ public class DeletionInfo
 
         public DeletionInfo deserialize(DataInput in, int version, Comparator<ByteBuffer> comparator) throws IOException
         {
-            assert comparator != null;
+            assert version < MessagingService.VERSION_12 || comparator != null;
             DeletionTime topLevel = DeletionTime.serializer.deserialize(in);
             if (version < MessagingService.VERSION_12)
                 return new DeletionInfo(topLevel, IntervalTree.<ByteBuffer, DeletionTime, RangeTombstone>emptyTree());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index 2b9efcd..c28b936 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -80,7 +80,7 @@ public class DeletionTime implements Comparable<DeletionTime>
         return localDeletionTime < gcBefore;
     }
 
-    public boolean isDeleted(IColumn column)
+    public boolean isDeleted(Column column)
     {
         return column.isMarkedForDelete() && column.getMarkedForDeleteAt() <= markedForDeleteAt;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/ExpiringColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringColumn.java b/src/java/org/apache/cassandra/db/ExpiringColumn.java
index 1d52182..fa21d27 100644
--- a/src/java/org/apache/cassandra/db/ExpiringColumn.java
+++ b/src/java/org/apache/cassandra/db/ExpiringColumn.java
@@ -24,7 +24,6 @@ import java.security.MessageDigest;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
-import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -62,9 +61,9 @@ public class ExpiringColumn extends Column
     }
 
     /** @return Either a DeletedColumn, or an ExpiringColumn. */
-    public static Column create(ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore, IColumnSerializer.Flag flag)
+    public static Column create(ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore, ColumnSerializer.Flag flag)
     {
-        if (localExpirationTime >= expireBefore || flag == IColumnSerializer.Flag.PRESERVE_SIZE)
+        if (localExpirationTime >= expireBefore || flag == ColumnSerializer.Flag.PRESERVE_SIZE)
             return new ExpiringColumn(name, value, timestamp, timeToLive, localExpirationTime);
         // the column is now expired, we can safely return a simple tombstone
         return new DeletedColumn(name, localExpirationTime, timestamp);
@@ -76,6 +75,12 @@ public class ExpiringColumn extends Column
     }
 
     @Override
+    public Column withUpdatedName(ByteBuffer newName)
+    {
+        return new ExpiringColumn(newName, value, timestamp, timeToLive, localExpirationTime);
+    }
+
+    @Override
     public int dataSize()
     {
         return super.dataSize() + TypeSizes.NATIVE.sizeof(localExpirationTime) + TypeSizes.NATIVE.sizeof(timeToLive);
@@ -119,13 +124,13 @@ public class ExpiringColumn extends Column
     }
 
     @Override
-    public IColumn localCopy(ColumnFamilyStore cfs)
+    public Column localCopy(ColumnFamilyStore cfs)
     {
         return new ExpiringColumn(cfs.internOrCopy(name, HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp, timeToLive, localExpirationTime);
     }
 
     @Override
-    public IColumn localCopy(ColumnFamilyStore cfs, Allocator allocator)
+    public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
         ByteBuffer clonedName = cfs.maybeIntern(name);
         if (clonedName == null)


Mime
View raw message