Merge branch cassandra-2.2 into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1b5e3a9b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1b5e3a9b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1b5e3a9b
Branch: refs/heads/trunk
Commit: 1b5e3a9b1be0c945782492e269acb4ea44730ad3
Parents: a880739 98be5de
Author: blerer <benjamin.lerer@datastax.com>
Authored: Tue Oct 20 14:38:51 2015 +0200
Committer: blerer <benjamin.lerer@datastax.com>
Committed: Tue Oct 20 14:39:39 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../service/pager/AbstractQueryPager.java | 21 ++++++++++++++++++--
.../service/pager/RangeSliceQueryPager.java | 5 +++--
.../service/pager/SinglePartitionPager.java | 3 +--
4 files changed, 24 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b5e3a9b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 25ad1fb,458d0d5..616ff47
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,35 -1,20 +1,36 @@@
-2.2.4
+3.0
+Merged from 2.2:
* Expose phi values from failure detector via JMX and tweak debug
and trace logging (CASSANDRA-9526)
- * Fix RangeNamesQueryPager (CASSANDRA-10509)
- * Deprecate Pig support (CASSANDRA-10542)
- * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
Merged from 2.1:
+ * Fix paging issues with partitions containing only static columns data (CASSANDRA-10381)
* Fix conditions on static columns (CASSANDRA-10264)
* AssertionError: attempted to delete non-existing file CommitLog (CASSANDRA-10377)
- * (cqlsh) Distinguish negative and positive infinity in output (CASSANDRA-10523)
- * (cqlsh) allow custom time_format for COPY TO (CASSANDRA-8970)
- * Don't allow startup if the node's rack has changed (CASSANDRA-10242)
- * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
-2.2.3
+3.0-rc2
+ * Fix SELECT DISTINCT queries between 2.2.2 nodes and 3.0 nodes (CASSANDRA-10473)
+ * Remove circular references in SegmentedFile (CASSANDRA-10543)
+ * Ensure validation of indexed values only occurs once per-partition (CASSANDRA-10536)
+ * Fix handling of static columns for range tombstones in thrift (CASSANDRA-10174)
+ * Support empty ColumnFilter for backward compatility on empty IN (CASSANDRA-10471)
+ * Remove Pig support (CASSANDRA-10542)
+ * Fix LogFile throws Exception when assertion is disabled (CASSANDRA-10522)
+ * Revert CASSANDRA-7486, make CMS default GC, move GC config to
+ conf/jvm.options (CASSANDRA-10403)
+ * Fix TeeingAppender causing some logs to be truncated/empty (CASSANDRA-10447)
+ * Allow EACH_QUORUM for reads (CASSANDRA-9602)
+ * Fix potential ClassCastException while upgrading (CASSANDRA-10468)
+ * Fix NPE in MVs on update (CASSANDRA-10503)
+ * Only include modified cell data in indexing deltas (CASSANDRA-10438)
+ * Do not load keyspace when creating sstable writer (CASSANDRA-10443)
+ * If node is not yet gossiping write all MV updates to batchlog only (CASSANDRA-10413)
+ * Re-populate token metadata after commit log recovery (CASSANDRA-10293)
+ * Provide additional metrics for materialized views (CASSANDRA-10323)
+ * Flush system schema tables after local schema changes (CASSANDRA-10429)
+Merged from 2.2:
+ * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
+ * Fix the regression when using LIMIT with aggregates (CASSANDRA-10487)
* Avoid NoClassDefFoundError during DataDescriptor initialization on windows (CASSANDRA-10412)
* Preserve case of quoted Role & User names (CASSANDRA-10394)
* cqlsh pg-style-strings broken (CASSANDRA-10484)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b5e3a9b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index b92d1e1,2a35e4b..bdebd43
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@@ -17,179 -17,381 +17,196 @@@
*/
package org.apache.cassandra.service.pager;
-import java.util.*;
+import java.util.NoSuchElementException;
-import com.google.common.annotations.VisibleForTesting;
-
--import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnCounter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestValidationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.cassandra.service.ClientState;
abstract class AbstractQueryPager implements QueryPager
{
- private static final Logger logger = LoggerFactory.getLogger(AbstractQueryPager.class);
-
- private final ConsistencyLevel consistencyLevel;
- private final boolean localQuery;
-
- protected final CFMetaData cfm;
- protected final IDiskAtomFilter columnFilter;
- private final long timestamp;
+ protected final ReadCommand command;
+ protected final DataLimits limits;
+ protected final int protocolVersion;
private int remaining;
- private boolean exhausted;
- private boolean shouldFetchExtraRow;
-
- protected AbstractQueryPager(ConsistencyLevel consistencyLevel,
- int toFetch,
- boolean localQuery,
- String keyspace,
- String columnFamily,
- IDiskAtomFilter columnFilter,
- long timestamp)
- {
- this(consistencyLevel, toFetch, localQuery, Schema.instance.getCFMetaData(keyspace,
columnFamily), columnFilter, timestamp);
- }
-
- protected AbstractQueryPager(ConsistencyLevel consistencyLevel,
- int toFetch,
- boolean localQuery,
- CFMetaData cfm,
- IDiskAtomFilter columnFilter,
- long timestamp)
- {
- this.consistencyLevel = consistencyLevel;
- this.localQuery = localQuery;
-
- this.cfm = cfm;
- this.columnFilter = columnFilter;
- this.timestamp = timestamp;
-
- this.remaining = toFetch;
- }
-
-
- public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
- {
- if (isExhausted())
- return Collections.emptyList();
-
- int currentPageSize = nextPageSize(pageSize);
- List<Row> rows = filterEmpty(queryNextPage(currentPageSize, consistencyLevel,
localQuery));
-
- if (rows.isEmpty())
- {
- logger.debug("Got empty set of rows, considering pager exhausted");
- exhausted = true;
- return Collections.emptyList();
- }
-
- int liveCount = getPageLiveCount(rows);
- logger.debug("Fetched {} live rows", liveCount);
-
- // Because SP.getRangeSlice doesn't trim the result (see SP.trim()), liveCount may
be greater than what asked
- // (currentPageSize). This would throw off the paging logic so we trim the excess.
It's not extremely efficient
- // but most of the time there should be nothing or very little to trim.
- if (liveCount > currentPageSize)
- {
- rows = discardLast(rows, liveCount - currentPageSize);
- liveCount = currentPageSize;
- }
-
- remaining -= liveCount;
-
- // If we've got less than requested, there is no more query to do (but
- // we still need to return the current page)
- if (liveCount < currentPageSize)
- {
- logger.debug("Got result ({}) smaller than page size ({}), considering pager
exhausted", liveCount, currentPageSize);
- exhausted = true;
- }
-
- // If it's not the first query and the first column is the last one returned (likely
- // but not certain since paging can race with deletes/expiration), then remove the
- // first column.
- if (containsPreviousLast(rows.get(0)))
- {
- rows = discardFirst(rows);
- remaining++;
- }
- // Otherwise, if 'shouldFetchExtraRow' was set, we queried for one more than the
page size,
- // so if the page is full, trim the last entry
- else if (shouldFetchExtraRow && !exhausted)
- {
- // We've asked for one more than necessary
- rows = discardLast(rows);
- remaining++;
- }
-
- logger.debug("Remaining rows to page: {}", remaining);
- if (!isExhausted())
- shouldFetchExtraRow = recordLast(rows.get(rows.size() - 1));
+ // This is the last key we've been reading from (or can still be reading within). This
the key for
+ // which remainingInPartition makes sense: if we're starting another key, we should
reset remainingInPartition
+ // (and this is done in PagerIterator). This can be null (when we start).
+ private DecoratedKey lastKey;
+ private int remainingInPartition;
- return rows;
- }
+ private boolean exhausted;
- private List<Row> filterEmpty(List<Row> result)
+ protected AbstractQueryPager(ReadCommand command, int protocolVersion)
{
- for (Row row : result)
- {
- if (row.cf == null || !row.cf.hasColumns())
- {
- List<Row> newResult = new ArrayList<Row>(result.size() - 1);
- for (Row row2 : result)
- {
- if (row2.cf == null || !row2.cf.hasColumns())
- continue;
+ this.command = command;
+ this.protocolVersion = protocolVersion;
+ this.limits = command.limits();
- newResult.add(row2);
- }
- return newResult;
- }
- }
- return result;
+ this.remaining = limits.count();
+ this.remainingInPartition = limits.perPartitionCount();
}
- protected void restoreState(int remaining, boolean shouldFetchExtraRow)
+ public ReadOrderGroup startOrderGroup()
{
- this.remaining = remaining;
- this.shouldFetchExtraRow = shouldFetchExtraRow;
+ return command.startOrderGroup();
}
- public boolean isExhausted()
+ public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState
clientState) throws RequestValidationException, RequestExecutionException
{
- return exhausted || remaining == 0;
- }
+ if (isExhausted())
+ return PartitionIterators.EMPTY;
- public int maxRemaining()
- {
- return remaining;
+ pageSize = Math.min(pageSize, remaining);
+ return new PagerIterator(nextPageReadCommand(pageSize).execute(consistency, clientState),
limits.forPaging(pageSize), command.nowInSec());
}
- public long timestamp()
+ public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup)
throws RequestValidationException, RequestExecutionException
{
- return timestamp;
- }
+ if (isExhausted())
+ return PartitionIterators.EMPTY;
- private int nextPageSize(int pageSize)
- {
- return Math.min(remaining, pageSize) + (shouldFetchExtraRow ? 1 : 0);
+ pageSize = Math.min(pageSize, remaining);
+ return new PagerIterator(nextPageReadCommand(pageSize).executeInternal(orderGroup),
limits.forPaging(pageSize), command.nowInSec());
}
- public ColumnCounter columnCounter()
+ private class PagerIterator extends CountingPartitionIterator
{
- return columnFilter.columnCounter(cfm.comparator, timestamp);
- }
+ private final DataLimits pageLimits;
- protected abstract List<Row> queryNextPage(int pageSize, ConsistencyLevel consistency,
boolean localQuery) throws RequestValidationException, RequestExecutionException;
+ private Row lastRow;
- /**
- * Checks to see if the first row of a new page contains the last row from the previous
page.
- * @param first the first row of the new page
- * @return true if <code>first</code> contains the last from from the previous
page and it is live, false otherwise
- */
- protected abstract boolean containsPreviousLast(Row first);
+ private boolean isFirstPartition = true;
+ private RowIterator nextPartition;
- /**
- * Saves the paging state by recording the last seen partition key and cell name (where
applicable).
- * @param last the last row in the current page
- * @return true if an extra row should be fetched in the next page,false otherwise
- */
- protected abstract boolean recordLast(Row last);
-
- protected abstract boolean isReversed();
-
- private List<Row> discardFirst(List<Row> rows)
- {
- return discardFirst(rows, 1);
- }
-
- @VisibleForTesting
- List<Row> discardFirst(List<Row> rows, int toDiscard)
- {
- if (toDiscard == 0 || rows.isEmpty())
- return rows;
-
- int i = 0;
- DecoratedKey firstKey = null;
- ColumnFamily firstCf = null;
- while (toDiscard > 0 && i < rows.size())
+ private PagerIterator(PartitionIterator iter, DataLimits pageLimits, int nowInSec)
{
- Row first = rows.get(i++);
- firstKey = first.key;
- firstCf = first.cf.cloneMeShallow(isReversed());
- toDiscard -= isReversed()
- ? discardLast(first.cf, toDiscard, firstCf)
- : discardFirst(first.cf, toDiscard, firstCf);
+ super(iter, pageLimits, nowInSec);
+ this.pageLimits = pageLimits;
}
- // If there is less live data than to discard, all is discarded
- if (toDiscard > 0)
- return Collections.<Row>emptyList();
-
- // i is the index of the first row that we are sure to keep. On top of that,
- // we also keep firstCf is it hasn't been fully emptied by the last iteration above.
- int count = firstCf.getColumnCount();
- int newSize = rows.size() - (count == 0 ? i : i - 1);
- List<Row> newRows = new ArrayList<Row>(newSize);
- if (count != 0)
- newRows.add(new Row(firstKey, firstCf));
- newRows.addAll(rows.subList(i, rows.size()));
+ @Override
+ @SuppressWarnings("resource") // iter is closed by closing the result or in close()
+ public boolean hasNext()
+ {
+ while (nextPartition == null && super.hasNext())
+ {
+ if (nextPartition == null)
+ nextPartition = super.next();
- return newRows;
- }
+ DecoratedKey key = nextPartition.partitionKey();
+ if (lastKey == null || !lastKey.equals(key))
+ remainingInPartition = limits.perPartitionCount();
- private List<Row> discardLast(List<Row> rows)
- {
- return discardLast(rows, 1);
- }
+ lastKey = key;
- @VisibleForTesting
- List<Row> discardLast(List<Row> rows, int toDiscard)
- {
- if (toDiscard == 0 || rows.isEmpty())
- return rows;
+ // If this is the first partition of this page, this could be the continuation
of a partition we've started
+ // on the previous page. In which case, we could have the problem that the
partition has no more "regular"
+ // rows (but the page size is such we didn't knew before) but it does has
a static row. We should then skip
+ // the partition as returning it would means to the upper layer that the
partition has "only" static columns,
+ // which is not the case (and we know the static results have been sent
on the previous page).
+ if (isFirstPartition && isPreviouslyReturnedPartition(key) &&
!nextPartition.hasNext())
+ {
+ nextPartition.close();
+ nextPartition = null;
+ }
- int i = rows.size()-1;
- DecoratedKey lastKey = null;
- ColumnFamily lastCf = null;
- while (toDiscard > 0 && i >= 0)
- {
- Row last = rows.get(i--);
- lastKey = last.key;
- lastCf = last.cf.cloneMeShallow(isReversed());
- toDiscard -= isReversed()
- ? discardFirst(last.cf, toDiscard, lastCf)
- : discardLast(last.cf, toDiscard, lastCf);
+ isFirstPartition = false;
+ }
+ return nextPartition != null;
}
- // If there is less live data than to discard, all is discarded
- if (toDiscard > 0)
- return Collections.<Row>emptyList();
-
- // i is the index of the last row that we are sure to keep. On top of that,
- // we also keep lastCf is it hasn't been fully emptied by the last iteration above.
- int count = lastCf.getColumnCount();
- int newSize = count == 0 ? i+1 : i+2;
- List<Row> newRows = new ArrayList<Row>(newSize);
- newRows.addAll(rows.subList(0, i+1));
- if (count != 0)
- newRows.add(new Row(lastKey, lastCf));
-
- return newRows;
- }
+ @Override
+ @SuppressWarnings("resource") // iter is closed by closing the result
+ public RowIterator next()
+ {
+ if (!hasNext())
+ throw new NoSuchElementException();
- private int getPageLiveCount(List<Row> page)
- {
- int count = 0;
- for (Row row : page)
- count += columnCounter().countAll(row.cf).live();
- return count;
- }
+ RowIterator toReturn = nextPartition;
+ nextPartition = null;
- private int discardFirst(ColumnFamily cf, int toDiscard, ColumnFamily newCf)
- {
- boolean isReversed = isReversed();
- DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(isReversed);
- return isReversed
- ? discardTail(cf, toDiscard, newCf, cf.reverseIterator(), tester)
- : discardHead(toDiscard, newCf, cf.iterator(), tester);
- }
+ return new RowPagerIterator(toReturn);
+ }
- private int discardLast(ColumnFamily cf, int toDiscard, ColumnFamily newCf)
- {
- boolean isReversed = isReversed();
- DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(isReversed);
- return isReversed
- ? discardHead(toDiscard, newCf, cf.reverseIterator(), tester)
- : discardTail(cf, toDiscard, newCf, cf.iterator(), tester);
- }
+ @Override
+ public void close()
+ {
+ super.close();
+ if (nextPartition != null)
+ nextPartition.close();
- private int discardHead(int toDiscard, ColumnFamily copy, Iterator<Cell> iter,
DeletionInfo.InOrderTester tester)
- {
- ColumnCounter counter = columnCounter();
+ recordLast(lastKey, lastRow);
- List<Cell> staticCells = new ArrayList<>(cfm.staticColumns().size());
+ int counted = counter.counted();
+ remaining -= counted;
- remainingInPartition -= counter.countedInCurrentPartition();
++ // If the clustering of the last row returned is a static one, it means that
the partition was only
++ // containing data within the static columns. Therefore, there are not data
remaining within the partition.
++ if (lastRow != null && lastRow.clustering() == Clustering.STATIC_CLUSTERING)
++ {
++ remainingInPartition = 0;
++ }
++ else
++ {
++ remainingInPartition -= counter.countedInCurrentPartition();
++ }
+ exhausted = counted < pageLimits.count();
+ }
- // Discard the first 'toDiscard' live, non-static cells
- while (iter.hasNext())
+ private class RowPagerIterator extends WrappingRowIterator
{
- Cell c = iter.next();
-
- // if it's a static column, don't count it and save it to add to the trimmed
results
- ColumnDefinition columnDef = cfm.getColumnDefinition(c.name());
- if (columnDef != null && columnDef.kind == ColumnDefinition.Kind.STATIC)
+ RowPagerIterator(RowIterator iter)
{
- staticCells.add(c);
- continue;
+ super(iter);
}
- counter.count(c, tester);
-
- // once we've discarded the required amount, add the rest
- if (counter.live() > toDiscard)
+ @Override
++ public Row staticRow()
+ {
- for (Cell staticCell : staticCells)
- copy.addColumn(staticCell);
++ Row staticRow = super.staticRow();
++ if (!staticRow.isEmpty())
++ lastRow = staticRow;
++ return staticRow;
++ }
+
- copy.addColumn(c);
- while (iter.hasNext())
- copy.addColumn(iter.next());
++ @Override
+ public Row next()
+ {
+ lastRow = super.next();
+ return lastRow;
}
}
- int live = counter.live();
- // We want to take into account the row even if it was containing only static columns
- if (live == 0 && !staticCells.isEmpty())
- live = 1;
- return Math.min(live, toDiscard);
}
- private int discardTail(ColumnFamily cf, int toDiscard, ColumnFamily copy, Iterator<Cell>
iter, DeletionInfo.InOrderTester tester)
+ protected void restoreState(DecoratedKey lastKey, int remaining, int remainingInPartition)
{
- // Redoing the counting like that is not extremely efficient.
- // This is called only for reversed slices or in the case of a race between
- // paging and a deletion (pretty unlikely), so this is probably acceptable.
- int liveCount = columnCounter().countAll(cf).live();
-
- ColumnCounter counter = columnCounter();
- // Discard the last 'toDiscard' live (so stop adding as sound as we're past 'liveCount
- toDiscard')
- while (iter.hasNext())
- {
- Cell c = iter.next();
- counter.count(c, tester);
- if (counter.live() > liveCount - toDiscard)
- break;
+ this.lastKey = lastKey;
+ this.remaining = remaining;
+ this.remainingInPartition = remainingInPartition;
+ }
- copy.addColumn(c);
- }
- return Math.min(liveCount, toDiscard);
+ public boolean isExhausted()
+ {
+ return exhausted || remaining == 0 || ((this instanceof SinglePartitionPager) &&
remainingInPartition == 0);
}
- /**
- * Returns the first non-static cell in the ColumnFamily. This is necessary to avoid
recording a static column
- * as the "last" cell seen in a reversed query. Because we will always query static
columns alongside the normal
- * data for a page, they are not a good indicator of where paging should resume. When
we begin the next page, we
- * need to start from the last non-static cell.
- */
- protected Cell firstNonStaticCell(ColumnFamily cf)
+ public int maxRemaining()
{
- for (Cell cell : cf)
- {
- ColumnDefinition def = cfm.getColumnDefinition(cell.name());
- if (def == null || def.kind != ColumnDefinition.Kind.STATIC)
- return cell;
- }
- return null;
+ return remaining;
}
- protected static Cell lastCell(ColumnFamily cf)
+ protected int remainingInPartition()
{
- return cf.getReverseSortedColumns().iterator().next();
+ return remainingInPartition;
}
+
+ protected abstract ReadCommand nextPageReadCommand(int pageSize);
+ protected abstract void recordLast(DecoratedKey key, Row row);
+ protected abstract boolean isPreviouslyReturnedPartition(DecoratedKey key);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b5e3a9b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index 770875a,fd14c82..fd35b29
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@@ -58,65 -66,68 +58,66 @@@ public class RangeSliceQueryPager exten
{
return lastReturnedKey == null
? null
- : new PagingState(lastReturnedKey.getKey(), lastReturnedName.toByteBuffer(),
maxRemaining());
+ : new PagingState(lastReturnedKey.getKey(), lastReturnedRow, maxRemaining(),
remainingInPartition());
}
- protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel,
boolean localQuery)
+ protected ReadCommand nextPageReadCommand(int pageSize)
throws RequestExecutionException
{
- SliceQueryFilter sf = (SliceQueryFilter)columnFilter;
- AbstractBounds<RowPosition> keyRange = lastReturnedKey == null ? command.keyRange
: makeIncludingKeyBounds(lastReturnedKey);
- Composite start = lastReturnedName == null ? sf.start() : lastReturnedName;
- PagedRangeCommand pageCmd = new PagedRangeCommand(command.keyspace,
- command.columnFamily,
- command.timestamp,
- keyRange,
- sf,
- start,
- sf.finish(),
- command.rowFilter,
- pageSize,
- command.countCQL3Rows);
-
- return localQuery
- ? pageCmd.executeLocally()
- : StorageProxy.getRangeSlice(pageCmd, consistencyLevel);
- }
-
- protected boolean containsPreviousLast(Row first)
- {
- if (lastReturnedKey == null || !lastReturnedKey.equals(first.key))
- return false;
-
- // Same as SliceQueryPager, we ignore a deleted column
- Cell firstCell = isReversed() ? lastCell(first.cf) : firstNonStaticCell(first.cf);
- // If the row was containing only static columns it has already been returned and
we can skip it.
- if (firstCell == null)
- return true;
+ DataLimits limits;
+ DataRange fullRange = ((PartitionRangeReadCommand)command).dataRange();
+ DataRange pageRange;
+ if (lastReturnedKey == null)
+ {
+ pageRange = fullRange;
+ limits = command.limits().forPaging(pageSize);
+ }
+ else
+ {
+ // We want to include the last returned key only if we haven't achieved our
per-partition limit, otherwise, don't bother.
- boolean includeLastKey = remainingInPartition() > 0;
++ boolean includeLastKey = remainingInPartition() > 0 && lastReturnedRow
!= null;
+ AbstractBounds<PartitionPosition> bounds = makeKeyBounds(lastReturnedKey,
includeLastKey);
+ if (includeLastKey)
+ {
+ pageRange = fullRange.forPaging(bounds, command.metadata().comparator, lastReturnedRow.clustering(command.metadata()),
false);
+ limits = command.limits().forPaging(pageSize, lastReturnedKey.getKey(),
remainingInPartition());
+ }
+ else
+ {
+ pageRange = fullRange.forSubRange(bounds);
+ limits = command.limits().forPaging(pageSize);
+ }
+ }
- CFMetaData metadata = Schema.instance.getCFMetaData(command.keyspace, command.columnFamily);
- return !first.cf.deletionInfo().isDeleted(firstCell)
- && firstCell.isLive(timestamp())
- && firstCell.name().isSameCQL3RowAs(metadata.comparator, lastReturnedName);
+ // it won't hurt for the next page command to query the index manager
+ // again to check for an applicable index, so don't supply one here
+ return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(),
command.rowFilter(), limits, pageRange, Optional.empty());
}
- protected boolean recordLast(Row last)
+ protected void recordLast(DecoratedKey key, Row last)
{
- lastReturnedKey = last.key;
- lastReturnedName = (isReversed() ? firstNonStaticCell(last.cf) : lastCell(last.cf)).name();
- return true;
+ if (last != null)
+ {
+ lastReturnedKey = key;
- lastReturnedRow = PagingState.RowMark.create(command.metadata(), last, protocolVersion);
++ if (last.clustering() != Clustering.STATIC_CLUSTERING)
++ lastReturnedRow = PagingState.RowMark.create(command.metadata(), last, protocolVersion);
+ }
}
- protected boolean isReversed()
+ protected boolean isPreviouslyReturnedPartition(DecoratedKey key)
{
- return ((SliceQueryFilter)command.predicate).reversed;
+ // Note that lastReturnedKey can be null, but key cannot.
+ return key.equals(lastReturnedKey);
}
- private AbstractBounds<RowPosition> makeIncludingKeyBounds(RowPosition lastReturnedKey)
+ private AbstractBounds<PartitionPosition> makeKeyBounds(PartitionPosition lastReturnedKey,
boolean includeLastKey)
{
- // We always include lastReturnedKey since we may still be paging within a row,
- // and PagedRangeCommand will move over if we're not anyway
- AbstractBounds<RowPosition> bounds = command.keyRange;
+ AbstractBounds<PartitionPosition> bounds = ((PartitionRangeReadCommand)command).dataRange().keyRange();
if (bounds instanceof Range || bounds instanceof Bounds)
{
- return new Bounds<RowPosition>(lastReturnedKey, bounds.right);
+ return includeLastKey
+ ? new Bounds<PartitionPosition>(lastReturnedKey, bounds.right)
+ : new Range<PartitionPosition>(lastReturnedKey, bounds.right);
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b5e3a9b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
index 7057e79,51bbf90..70d4559
--- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@@ -31,57 -26,8 +31,56 @@@ import org.apache.cassandra.db.filter.*
*
* For use by MultiPartitionPager.
*/
-public interface SinglePartitionPager extends QueryPager
+public class SinglePartitionPager extends AbstractQueryPager
{
- public ByteBuffer key();
- public ColumnCounter columnCounter();
+ private static final Logger logger = LoggerFactory.getLogger(SinglePartitionPager.class);
+
+ private final SinglePartitionReadCommand<?> command;
+
+ private volatile PagingState.RowMark lastReturned;
+
+ public SinglePartitionPager(SinglePartitionReadCommand<?> command, PagingState
state, int protocolVersion)
+ {
+ super(command, protocolVersion);
+ this.command = command;
+
+ if (state != null)
+ {
+ lastReturned = state.rowMark;
+ restoreState(command.partitionKey(), state.remaining, state.remainingInPartition);
+ }
+ }
+
+ public ByteBuffer key()
+ {
+ return command.partitionKey().getKey();
+ }
+
+ public DataLimits limits()
+ {
+ return command.limits();
+ }
+
+ public PagingState state()
+ {
+ return lastReturned == null
+ ? null
+ : new PagingState(null, lastReturned, maxRemaining(), remainingInPartition());
+ }
+
+ protected ReadCommand nextPageReadCommand(int pageSize)
+ {
+ return command.forPaging(lastReturned == null ? null : lastReturned.clustering(command.metadata()),
pageSize);
+ }
+
+ protected void recordLast(DecoratedKey key, Row last)
+ {
- if (last != null)
++ if (last != null && last.clustering() != Clustering.STATIC_CLUSTERING)
+ lastReturned = PagingState.RowMark.create(command.metadata(), last, protocolVersion);
+ }
+
+ protected boolean isPreviouslyReturnedPartition(DecoratedKey key)
+ {
- // We're querying a single partition, so if it's not the first page, it is the previously
returned one.
+ return lastReturned != null;
+ }
}
|