cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r981906 - in /cassandra/trunk: interface/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/thrift/ test/system/ test/unit/org/apache/cassandra/db/ test/unit/org/apache/cassandra/io/sstable/
Date Tue, 03 Aug 2010 14:58:15 GMT
Author: jbellis
Date: Tue Aug  3 14:58:14 2010
New Revision: 981906

URL: http://svn.apache.org/viewvc?rev=981906&view=rev
Log:
handle index scans across multiple nodes and consistency levels

Modified:
    cassandra/trunk/interface/cassandra.thrift
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
    cassandra/trunk/test/system/test_thrift_server.py
    cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java

Modified: cassandra/trunk/interface/cassandra.thrift
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.thrift?rev=981906&r1=981905&r2=981906&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.thrift (original)
+++ cassandra/trunk/interface/cassandra.thrift Tue Aug  3 14:58:14 2010
@@ -46,7 +46,7 @@ namespace rb CassandraThrift
 #           for every edit that doesn't result in a change to major/minor.
 #
 # See the Semantic Versioning Specification (SemVer) http://semver.org.
-const string VERSION = "9.0.0"
+const string VERSION = "10.0.0"
 
 
 #
@@ -254,8 +254,8 @@ struct IndexExpression {
 
 struct IndexClause {
     1: required list<IndexExpression> expressions
-    2: required i32 count=100,
-    3: optional binary start_key,
+    2: required binary start_key,
+    3: required i32 count=100,
 }
 
 /**
@@ -274,12 +274,6 @@ struct KeyRange {
     5: required i32 count=100
 }
 
-struct RowPredicate {
-    1: optional list<binary> keys,
-    2: optional KeyRange key_range,
-    3: optional IndexClause index_clause
-}
-
 /**
     A KeySlice is key followed by the data it maps to. A collection of KeySlice is returned
by the get_range_slice operation.
 
@@ -409,16 +403,6 @@ service Cassandra {
                             throws (1:InvalidRequestException ire, 2:UnavailableException
ue, 3:TimedOutException te),
 
   /**
-    Performs a get_slice for column_parent and predicate for the given keys in parallel.
-    @Deprecated; use `scan`
-  */
-  map<binary,list<ColumnOrSuperColumn>> multiget_slice(1:required list<binary>
keys, 
-                                                       2:required ColumnParent column_parent,

-                                                       3:required SlicePredicate predicate,

-                                                       4:required ConsistencyLevel consistency_level=ONE)
-                                        throws (1:InvalidRequestException ire, 2:UnavailableException
ue, 3:TimedOutException te),
-
-  /**
     returns the number of columns matching <code>predicate</code> for a particular
<code>key</code>, 
     <code>ColumnFamily</code> and optionally <code>SuperColumn</code>.
   */
@@ -429,6 +413,15 @@ service Cassandra {
       throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException
te),
 
   /**
+    Performs a get_slice for column_parent and predicate for the given keys in parallel.
+  */
+  map<binary,list<ColumnOrSuperColumn>> multiget_slice(1:required list<binary>
keys, 
+                                                       2:required ColumnParent column_parent,

+                                                       3:required SlicePredicate predicate,

+                                                       4:required ConsistencyLevel consistency_level=ONE)
+                                        throws (1:InvalidRequestException ire, 2:UnavailableException
ue, 3:TimedOutException te),
+
+  /**
     Perform a get_count in parallel on the given list<binary> keys. The return value
maps keys to the count found.
   */
   map<binary, i32> multiget_count(1:required string keyspace,
@@ -439,8 +432,7 @@ service Cassandra {
       throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException
te),
 
   /**
-   returns a subset of columns for a range of keys.
-   @Deprecated; use `scan`
+   returns a subset of columns for a contiguous range of keys.
   */
   list<KeySlice> get_range_slices(1:required ColumnParent column_parent, 
                                   2:required SlicePredicate predicate,
@@ -448,20 +440,13 @@ service Cassandra {
                                   4:required ConsistencyLevel consistency_level=ONE)
                  throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException
te),
 
-  /** Returns the subset of columns specified in SlicePredicate for the rows requested in
RowsPredicate */
-  list<KeySlice> scan(1:required ColumnParent column_parent,
-                      2:required RowPredicate row_predicate,
-                      3:required SlicePredicate column_predicate,
-                      4:required ConsistencyLevel consistency_level=ONE)
+  /** Returns the subset of columns specified in SlicePredicate for the rows matching the
IndexClause */
+  list<KeySlice> get_indexed_slices(1:required ColumnParent column_parent,
+                                    2:required IndexClause index_clause,
+                                    3:required SlicePredicate column_predicate,
+                                    4:required ConsistencyLevel consistency_level=ONE)
                  throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException
te),
 
-  /** Counts the subset of columns specified in SlicePredicate for the rows requested in
RowsPredicate */
-  list<KeyCount> scan_count(1:required ColumnParent column_parent,
-                           2:required RowPredicate row_predicate,
-                           3:required SlicePredicate column_predicate,
-                           4:required ConsistencyLevel consistency_level=ONE)
-      throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException
te),
-
   # modification methods
 
   /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=981906&r1=981905&r2=981906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Aug  3 14:58:14
2010
@@ -1055,42 +1055,56 @@ public class ColumnFamilyStore implement
         return rows;
     }
 
-    public List<Row> scan(IndexClause indexClause, IFilter dataFilter)
+    public List<Row> scan(IndexClause clause, AbstractBounds range, IFilter dataFilter)
     {
         // TODO: allow merge join instead of just one index + loop
-        IndexExpression first = highestSelectivityPredicate(indexClause);
+        IndexExpression first = highestSelectivityPredicate(clause);
         ColumnFamilyStore indexCFS = getIndexedColumnFamilyStore(first.column_name);
         assert indexCFS != null;
         DecoratedKey indexKey = indexCFS.partitioner_.decorateKey(first.value);
-        QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
-                                                             new QueryPath(indexCFS.getColumnFamilyName()),
-                                                             ArrayUtils.EMPTY_BYTE_ARRAY,
-                                                             ArrayUtils.EMPTY_BYTE_ARRAY,
-                                                             null,
-                                                             false,
-                                                             indexClause.count);
 
         List<Row> rows = new ArrayList<Row>();
-        ColumnFamily indexRow = indexCFS.getColumnFamily(indexFilter);
-        if (indexRow == null)
-            return rows;
-
-        for (byte[] dataKey : indexRow.getColumnNames())
-        {
-            DecoratedKey dk = partitioner_.decorateKey(dataKey);
-            ColumnFamily data = getColumnFamily(new QueryFilter(dk, new QueryPath(columnFamily_),
dataFilter));
-            boolean accepted = true;
-            for (IndexExpression expression : indexClause.expressions)
+        byte[] startKey = clause.start_key;
+        
+        outer:
+        while (true)
+        {
+            /* we don't have a way to get the key back from the DK -- we just have a token
--
+             * so, we need to loop after starting with start_key, until we get to keys in
the given `range`.
+             * But, if the calling StorageProxy is doing a good job estimating data from
each range, the range
+             * should be pretty close to `start_key`. */
+            QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
+                                                                 new QueryPath(indexCFS.getColumnFamilyName()),
+                                                                 startKey,
+                                                                 ArrayUtils.EMPTY_BYTE_ARRAY,
+                                                                 null,
+                                                                 false,
+                                                                 clause.count);
+            ColumnFamily indexRow = indexCFS.getColumnFamily(indexFilter);
+            if (indexRow == null)
+                break;
+
+            byte[] dataKey = null;
+            int n = 0;
+            Iterator<byte[]> iter = indexRow.getColumnNames().iterator();
+            while (iter.hasNext())
             {
-                // (we can skip "first" since we already know it's satisfied)
-                if (expression != first && !satisfies(data, expression))
-                {
-                    accepted = false;
-                    break;
-                }
+                dataKey = iter.next();
+                n++;
+                DecoratedKey dk = partitioner_.decorateKey(dataKey);
+                if (!range.right.equals(partitioner_.getMinimumToken()) && range.right.compareTo(dk.token)
< 0)
+                    break outer;
+                if (!range.contains(dk.token))
+                    continue;
+                ColumnFamily data = getColumnFamily(new QueryFilter(dk, new QueryPath(columnFamily_),
dataFilter));
+                if (satisfies(data, clause, first))
+                    rows.add(new Row(dk, data));
+                if (rows.size() == clause.count)
+                    break outer;
             }
-            if (accepted)
-                rows.add(new Row(dk, data));
+            startKey = dataKey;
+            if (n < clause.count)
+                break;
         }
 
         return rows;
@@ -1115,10 +1129,19 @@ public class ColumnFamilyStore implement
         return best;
     }
 
-    private static boolean satisfies(ColumnFamily data, IndexExpression expression)
+    private static boolean satisfies(ColumnFamily data, IndexClause clause, IndexExpression
first)
     {
-        IColumn column = data.getColumn(expression.column_name);
-        return column != null && Arrays.equals(column.value(), expression.value);
+        for (IndexExpression expression : clause.expressions)
+        {
+            // (we can skip "first" since we already know it's satisfied)
+            if (expression == first)
+                continue;
+            // check column data vs expression
+            IColumn column = data.getColumn(expression.column_name);
+            if (column != null && !Arrays.equals(column.value(), expression.value))
+                 return false;
+        }
+        return true;
     }
 
     public AbstractType getComparator()

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java?rev=981906&r1=981905&r2=981906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java Tue Aug  3 14:58:14
2010
@@ -23,6 +23,7 @@ import java.io.*;
 import java.util.Arrays;
 
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.Message;
@@ -42,14 +43,16 @@ public class IndexScanCommand
     public final String column_family;
     public final IndexClause index_clause;
     public final SlicePredicate predicate;
+    public final AbstractBounds range;
 
-    public IndexScanCommand(String keyspace, String column_family, IndexClause index_clause,
SlicePredicate predicate)
+    public IndexScanCommand(String keyspace, String column_family, IndexClause index_clause,
SlicePredicate predicate, AbstractBounds range)
     {
 
         this.keyspace = keyspace;
         this.column_family = column_family;
         this.index_clause = index_clause;
         this.predicate = predicate;
+        this.range = range;
     }
 
     public Message getMessage()
@@ -85,6 +88,7 @@ public class IndexScanCommand
             TSerializer ser = new TSerializer(new TBinaryProtocol.Factory());
             FBUtilities.serialize(ser, o.index_clause, out);
             FBUtilities.serialize(ser, o.predicate, out);
+            AbstractBounds.serializer().serialize(o.range, out);
         }
 
         public IndexScanCommand deserialize(DataInput in) throws IOException
@@ -97,8 +101,9 @@ public class IndexScanCommand
             FBUtilities.deserialize(dser, indexClause, in);
             SlicePredicate predicate = new SlicePredicate();
             FBUtilities.deserialize(dser, predicate, in);
+            AbstractBounds range = AbstractBounds.serializer().deserialize(in);
 
-            return new IndexScanCommand(keyspace, columnFamily, indexClause, predicate);
+            return new IndexScanCommand(keyspace, columnFamily, indexClause, predicate, range);
         }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java?rev=981906&r1=981905&r2=981906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java Tue Aug
 3 14:58:14 2010
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.service;
 
+import java.util.List;
+
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.net.IVerbHandler;
@@ -36,7 +38,8 @@ public class IndexScanVerbHandler implem
         {
             IndexScanCommand command = IndexScanCommand.read(message);
             ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
-            RangeSliceReply reply = new RangeSliceReply(cfs.scan(command.index_clause, QueryFilter.getFilter(command.predicate,
cfs.getComparator())));
+            List<Row> rows = cfs.scan(command.index_clause, command.range, QueryFilter.getFilter(command.predicate,
cfs.getComparator()));
+            RangeSliceReply reply = new RangeSliceReply(rows);
             Message response = reply.getReply(message);
             if (logger.isDebugEnabled())
                 logger.debug("Sending " + reply+ " to " + message.getMessageId() + "@" +
message.getFrom());

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=981906&r1=981905&r2=981906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue Aug  3 14:58:14
2010
@@ -40,10 +40,7 @@ import org.apache.cassandra.concurrent.S
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.*;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
@@ -51,9 +48,7 @@ import org.apache.cassandra.net.IAsyncCa
 import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.LatencyTracker;
 import org.apache.cassandra.utils.WrappedRunnable;
@@ -496,7 +491,7 @@ public class StorageProxy implements Sto
         List<AbstractBounds> ranges = getRestrictedRanges(command.range);
         // now scan until we have enough results
         List<Row> rows = new ArrayList<Row>(command.max_keys);
-        for (AbstractBounds range : getRangeIterator(ranges, command.range.left))
+        for (AbstractBounds range : ranges)
         {
             List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(command.keyspace,
range.right);
 
@@ -531,8 +526,7 @@ public class StorageProxy implements Sto
                 RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace,
liveEndpoints);
                 AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(command.keyspace);
                 QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver,
consistency_level);
-                // TODO bail early if live endpoints can't satisfy requested
-                // consistency level
+                // TODO bail early if live endpoints can't satisfy requested consistency
level
                 for (InetAddress endpoint : liveEndpoints) 
                 {
                     MessagingService.instance.sendRR(message, endpoint, handler);
@@ -637,43 +631,6 @@ public class StorageProxy implements Sto
     }
 
     /**
-     * returns an iterator that will return ranges in ring order, starting with the one that
contains the start token
-     */
-    private static Iterable<AbstractBounds> getRangeIterator(final List<AbstractBounds>
ranges, Token start)
-    {
-        // find the one to start with
-        int i;
-        for (i = 0; i < ranges.size(); i++)
-        {
-            AbstractBounds range = ranges.get(i);
-            if (range.contains(start) || range.left.equals(start))
-                break;
-        }
-        AbstractBounds range = ranges.get(i);
-        assert range.contains(start) || range.left.equals(start); // make sure the loop didn't
just end b/c ranges were exhausted
-
-        // return an iterable that starts w/ the correct range and iterates the rest in ring
order
-        final int begin = i;
-        return new Iterable<AbstractBounds>()
-        {
-            public Iterator<AbstractBounds> iterator()
-            {
-                return new AbstractIterator<AbstractBounds>()
-                {
-                    int n = 0;
-
-                    protected AbstractBounds computeNext()
-                    {
-                        if (n == ranges.size())
-                            return endOfData();
-                        return ranges.get((begin + n++) % ranges.size());
-                    }
-                };
-            }
-        };
-    }
-
-    /**
      * compute all ranges we're going to query, in sorted order, so that we get the correct
results back.
      *  1) computing range intersections is necessary because nodes can be replica destinations
for many ranges,
      *     so if we do not restrict each scan to the specific range we want we will get duplicate
results.
@@ -720,6 +677,15 @@ public class StorageProxy implements Sto
                 // sort in order that the original query range would see them.
                 int queryOrder1 = queryRange.left.compareTo(o1.left);
                 int queryOrder2 = queryRange.left.compareTo(o2.left);
+
+                // check for exact match with query start
+                assert !(queryOrder1 == 0 && queryOrder2 == 0);
+                if (queryOrder1 == 0)
+                    return -1;
+                if (queryOrder2 == 0)
+                    return 1;
+
+                // order segments in order they should be traversed
                 if (queryOrder1 < queryOrder2)
                     return -1; // o1 comes after query start, o2 wraps to after
                 if (queryOrder1 > queryOrder2)
@@ -785,26 +751,51 @@ public class StorageProxy implements Sto
         return writeStats.getRecentLatencyMicros();
     }
 
-    public static List<Row> scan(IndexScanCommand command, ConsistencyLevel consistency_level)
+    public static List<Row> scan(String keyspace, String column_family, IndexClause
index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
     throws IOException, TimeoutException
     {
         IPartitioner p = StorageService.getPartitioner();
-        Token startToken = command.index_clause.start_key == null ? p.getMinimumToken() :
p.getToken(command.index_clause.start_key);
-        List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.keyspace,
startToken);
-        // TODO iterate through endpoints in token order like getRangeSlice
-        Message message = command.getMessage();
-        RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace,
endpoints);
-        AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(command.keyspace);
-        QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver,
consistency_level);
-        MessagingService.instance.sendRR(message, endpoints.get(0), handler);
-        try
-        {
-            return handler.get();
-        }
-        catch (DigestMismatchException e)
+
+        Token leftToken = index_clause.start_key == null ? p.getMinimumToken() : p.getToken(index_clause.start_key);
+        List<AbstractBounds> ranges = getRestrictedRanges(new Bounds(leftToken, p.getMinimumToken()));
+        logger.debug("scan ranges are " + StringUtils.join(ranges, ","));
+
+        // now scan until we have enough results
+        List<Row> rows = new ArrayList<Row>(index_clause.count);
+        for (AbstractBounds range : ranges)
         {
-            throw new RuntimeException(e);
+            List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(keyspace,
range.right);
+            DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(),
liveEndpoints);
+
+            // collect replies and resolve according to consistency level
+            RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(keyspace,
liveEndpoints);
+            AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(keyspace);
+            QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver,
consistency_level);
+            // TODO bail early if live endpoints can't satisfy requested consistency level
+            IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause,
column_predicate, range);
+            Message message = command.getMessage();
+            for (InetAddress endpoint : liveEndpoints)
+            {
+                MessagingService.instance.sendRR(message, endpoint, handler);
+                if (logger.isDebugEnabled())
+                    logger.debug("reading " + command + " from " + message.getMessageId()
+ "@" + endpoint);
+            }
+
+            List<Row> theseRows;
+            try
+            {
+                theseRows = handler.get();
+            }
+            catch (DigestMismatchException e)
+            {
+                throw new RuntimeException(e);
+            }
+            rows.addAll(theseRows);
+            if (rows.size() >= index_clause.count)
+                return rows.subList(0, index_clause.count);
         }
+
+        return rows;
     }
 
     static class weakReadLocalCallable implements Callable<Object>

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=981906&r1=981905&r2=981906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Tue Aug  3 14:58:14
2010
@@ -522,12 +522,6 @@ public class CassandraServer implements 
         String keyspace = keySpace.get();
         checkKeyspaceAndLoginAuthorized(AccessLevel.READONLY);
 
-        return getRangeSlicesInternal(keyspace, column_parent, range, predicate, consistency_level);
-    }
-
-    private List<KeySlice> getRangeSlicesInternal(String keyspace, ColumnParent column_parent,
KeyRange range, SlicePredicate predicate, ConsistencyLevel consistency_level)
-    throws InvalidRequestException, UnavailableException, TimedOutException
-    {
         ThriftValidation.validateColumnParent(keyspace, column_parent);
         ThriftValidation.validatePredicate(keyspace, column_parent, predicate);
         ThriftValidation.validateKeyRange(range);
@@ -584,48 +578,21 @@ public class CassandraServer implements 
         return keySlices;
     }
 
-    public List<KeySlice> scan(ColumnParent column_parent, RowPredicate row_predicate,
SlicePredicate column_predicate, ConsistencyLevel consistency_level) throws InvalidRequestException,
UnavailableException, TimedOutException, TException
+    public List<KeySlice> get_indexed_slices(ColumnParent column_parent, IndexClause
index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level) throws
InvalidRequestException, UnavailableException, TimedOutException, TException
     {
         if (logger.isDebugEnabled())
             logger.debug("scan");
 
         checkKeyspaceAndLoginAuthorized(AccessLevel.READONLY);
-
-        if (row_predicate.keys != null)
-        {
-            Map<byte[], List<ColumnOrSuperColumn>> rowMap = multigetSliceInternal(keySpace.get(),
row_predicate.keys, column_parent, column_predicate, consistency_level);
-            List<KeySlice> rows = new ArrayList<KeySlice>(rowMap.size());
-            for (Map.Entry<byte[], List<ColumnOrSuperColumn>> entry : rowMap.entrySet())
-            {
-                rows.add(new KeySlice(entry.getKey(), entry.getValue()));
-            }
-            return rows;
-        }
-
-        if (row_predicate.key_range != null)
-        {
-            return getRangeSlicesInternal(keySpace.get(), column_parent, row_predicate.key_range,
column_predicate, consistency_level);
-        }
-
-        if (row_predicate.index_clause != null)
-        {
-            return scanIndexInternal(keySpace.get(), column_parent, row_predicate.index_clause,
column_predicate, consistency_level);
-        }
-
-        throw new InvalidRequestException("row predicate must specify keys, key_range, or
index_clause");
-    }
-
-    private List<KeySlice> scanIndexInternal(String keyspace, ColumnParent column_parent,
IndexClause index_clause, SlicePredicate predicate, ConsistencyLevel consistency_level)
-    throws InvalidRequestException, TimedOutException
-    {
+        String keyspace = keySpace.get();
         ThriftValidation.validateColumnParent(keyspace, column_parent);
-        ThriftValidation.validatePredicate(keyspace, column_parent, predicate);
+        ThriftValidation.validatePredicate(keyspace, column_parent, column_predicate);
         ThriftValidation.validateIndexClauses(keyspace, column_parent.column_family, index_clause);
 
-        List<Row> rows = null;
+        List<Row> rows;
         try
         {
-            rows = StorageProxy.scan(new IndexScanCommand(keyspace, column_parent.column_family,
index_clause, predicate), consistency_level);
+            rows = StorageProxy.scan(keyspace, column_parent.column_family, index_clause,
column_predicate, consistency_level);
         }
         catch (IOException e)
         {
@@ -635,18 +602,7 @@ public class CassandraServer implements 
         {
             throw new TimedOutException();
         }
-        return thriftifyKeySlices(rows, column_parent, predicate);
-    }
-
-    public List<KeyCount> scan_count(ColumnParent column_parent, RowPredicate row_predicate,
SlicePredicate column_predicate, ConsistencyLevel consistency_level) throws InvalidRequestException,
UnavailableException, TimedOutException, TException
-    {
-        List<KeySlice> rows = scan(column_parent, row_predicate, column_predicate,
consistency_level);
-        List<KeyCount> rowCounts = new ArrayList<KeyCount>(rows.size());
-        for (KeySlice slice : rows)
-        {
-            rowCounts.add(new KeyCount(slice.key, slice.columns.size()));
-        }
-        return rowCounts;
+        return thriftifyKeySlices(rows, column_parent, column_predicate);
     }
 
     public Set<String> describe_keyspaces() throws TException

Modified: cassandra/trunk/test/system/test_thrift_server.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_thrift_server.py?rev=981906&r1=981905&r2=981906&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_thrift_server.py (original)
+++ cassandra/trunk/test/system/test_thrift_server.py Tue Aug  3 14:58:14 2010
@@ -1273,20 +1273,21 @@ class TestMutations(ThriftTester):
         # simple query on one index expression
         cp = ColumnParent('Indexed1')
         sp = SlicePredicate(slice_range=SliceRange('', ''))
-        clause = IndexClause([IndexExpression('birthdate', IndexOperator.EQ, _i64(1))])
-        result = client.scan(cp, RowPredicate(index_clause=clause), sp, ConsistencyLevel.ONE)
+        clause = IndexClause([IndexExpression('birthdate', IndexOperator.EQ, _i64(1))], '')
+        result = client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE)
         assert len(result) == 1, result
         assert result[0].key == 'key1'
         assert len(result[0].columns) == 1, result[0].columns
 
         # solo unindexed expression is invalid
-        clause = IndexClause([IndexExpression('b', IndexOperator.EQ, _i64(1))])
-        _expect_exception(lambda: client.scan(cp, RowPredicate(index_clause=clause), sp,
ConsistencyLevel.ONE), InvalidRequestException)
+        clause = IndexClause([IndexExpression('b', IndexOperator.EQ, _i64(1))], '')
+        _expect_exception(lambda: client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE),
InvalidRequestException)
 
         # but unindexed expression added to indexed one is ok
         clause = IndexClause([IndexExpression('b', IndexOperator.EQ, _i64(3)),
-                              IndexExpression('birthdate', IndexOperator.EQ, _i64(3))])
-        result = client.scan(cp, RowPredicate(index_clause=clause), sp, ConsistencyLevel.ONE)
+                              IndexExpression('birthdate', IndexOperator.EQ, _i64(3))],
+                             '')
+        result = client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE)
         assert len(result) == 1, result
         assert result[0].key == 'key3'
         assert len(result[0].columns) == 2, result[0].columns

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=981906&r1=981905&r2=981906&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Tue Aug 
3 14:58:14 2010
@@ -23,6 +23,8 @@ import java.util.*;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+
 import static org.junit.Assert.assertNull;
 import org.junit.Test;
 
@@ -31,6 +33,7 @@ import org.apache.cassandra.CleanupHelpe
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.IndexClause;
 import org.apache.cassandra.thrift.IndexExpression;
@@ -172,12 +175,14 @@ public class ColumnFamilyStoreTest exten
         rm.apply();
 
         IndexExpression expr = new IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ,
FBUtilities.toByteArray(1L));
-        IndexClause clause = new IndexClause(Arrays.asList(expr), 100);
+        IndexClause clause = new IndexClause(Arrays.asList(expr), ArrayUtils.EMPTY_BYTE_ARRAY,
100);
         IFilter filter = new IdentityQueryFilter();
-        List<Row> rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").scan(clause,
filter);
+        IPartitioner p = StorageService.getPartitioner();
+        Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
+        List<Row> rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").scan(clause,
range, filter);
 
         assert rows != null;
-        assert rows.size() == 2;
+        assert rows.size() == 2 : StringUtils.join(rows, ",");
         assert Arrays.equals("k1".getBytes(), rows.get(0).key.key);
         assert Arrays.equals("k3".getBytes(), rows.get(1).key.key);
         assert Arrays.equals(FBUtilities.toByteArray(1L), rows.get(0).cf.getColumn("birthdate".getBytes("UTF8")).value());

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java?rev=981906&r1=981905&r2=981906&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java Tue Aug
 3 14:58:14 2010
@@ -20,7 +20,10 @@ import org.apache.cassandra.db.Timestamp
 import org.apache.cassandra.db.filter.IFilter;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.IndexClause;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
@@ -65,9 +68,11 @@ public class SSTableWriterTest extends C
         cfs.addSSTable(sstr);
         
         IndexExpression expr = new IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ,
FBUtilities.toByteArray(1L));
-        IndexClause clause = new IndexClause(Arrays.asList(expr), 100);
+        IndexClause clause = new IndexClause(Arrays.asList(expr), "".getBytes(), 100);
         IFilter filter = new IdentityQueryFilter();
-        List<Row> rows = cfs.scan(clause, filter);
+        IPartitioner p = StorageService.getPartitioner();
+        Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
+        List<Row> rows = cfs.scan(clause, range, filter);
         
         assertEquals("IndexExpression should return two rows on recoverAndOpen",2, rows.size());
         assertTrue("First result should be 'k1'",Arrays.equals("k1".getBytes(), rows.get(0).key.key));



Mime
View raw message