cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maedh...@apache.org
Subject [cassandra] branch cassandra-4.0 updated: Move RepairedDataInfo to the execution controller rather than the ReadCommand to avoid unintended sharing
Date Fri, 27 Aug 2021 17:00:50 GMT
This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new 585bc69  Move RepairedDataInfo to the execution controller rather than the ReadCommand to avoid unintended sharing
585bc69 is described below

commit 585bc692918deea2b8c4b1098ee7e7478881f138
Author: Sam Tunnicliffe <sam@beobal.com>
AuthorDate: Wed Jun 2 12:54:42 2021 +0100

    Move RepairedDataInfo to the execution controller rather than the ReadCommand to avoid unintended sharing
    
    patch by Caleb Rackliffe; reviewed by Sam Tunnicliffe and Alex Petrov for CASSANDRA-16721
    
    Co-authored-by: Sam Tunnicliffe <sam@beobal.com>
    Co-authored-by: Caleb Rackliffe <calebrackliffe@gmail.com>
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/concurrent/SEPExecutor.java   |   1 +
 .../cassandra/db/PartitionRangeReadCommand.java    |  16 +-
 src/java/org/apache/cassandra/db/ReadCommand.java  | 128 +++-----------
 .../cassandra/db/ReadCommandVerbHandler.java       |   9 +-
 .../cassandra/db/ReadExecutionController.java      |  75 ++++++--
 src/java/org/apache/cassandra/db/ReadResponse.java |   9 +-
 .../org/apache/cassandra/db/RepairedDataInfo.java  |  47 +++++-
 .../cassandra/db/SinglePartitionReadCommand.java   |  95 +++++------
 .../org/apache/cassandra/service/StorageProxy.java |  13 +-
 .../service/reads/AbstractReadExecutor.java        |   2 +-
 .../cassandra/service/reads/DataResolver.java      |   9 +-
 .../service/reads/range/RangeCommandIterator.java  |  19 +--
 .../service/reads/repair/AbstractReadRepair.java   |  57 ++++---
 .../distributed/test/RepairDigestTrackingTest.java | 162 +++++++++++++++---
 test/unit/org/apache/cassandra/Util.java           |  40 ++++-
 .../org/apache/cassandra/db/PartitionTest.java     |   9 +-
 .../org/apache/cassandra/db/ReadCommandTest.java   | 188 +++++++++++----------
 .../cassandra/db/ReadCommandVerbHandlerTest.java   |  69 +++++---
 .../org/apache/cassandra/db/ReadResponseTest.java  |  89 ++++++----
 .../db/SSTableAndMemTableDigestMatchTest.java      |   4 +-
 .../db/SinglePartitionSliceCommandTest.java        |   6 +-
 .../cassandra/db/compaction/CompactionsTest.java   |   5 -
 .../cassandra/service/reads/DataResolverTest.java  |   3 +-
 .../reads/repair/AbstractReadRepairTest.java       |   2 +-
 .../reads/repair/BlockingReadRepairTest.java       |   2 +-
 .../repair/DiagEventsBlockingReadRepairTest.java   |   3 +-
 .../reads/repair/ReadOnlyReadRepairTest.java       |   2 +-
 28 files changed, 641 insertions(+), 424 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6e6481b..22818c9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.1
+ * Move RepairedDataInfo to the execution controller rather than the ReadCommand to avoid unintended sharing (CASSANDRA-16721)
  * Bump zstd-jni version to 1.5.0-4 (CASSANDRA-16884)
  * Remove assumption that all urgent messages are small (CASSANDRA-16877)
  * ArrayClustering.unsharedHeapSize does not include the data so undercounts the heap size (CASSANDRA-16845)
diff --git a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
index ee81126..675e047 100644
--- a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
@@ -191,6 +191,7 @@ public class SEPExecutor extends AbstractLocalAwareExecutorService implements SE
         }
     }
 
+    @Override
     public void maybeExecuteImmediately(Runnable command)
     {
         FutureTask<?> ft = newTaskFor(command, null);
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 82b6e8a..e55ab63 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import org.apache.cassandra.net.MessageFlag;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -41,7 +40,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.TableMetrics;
-import org.apache.cassandra.net.Message;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
@@ -256,20 +254,20 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
     }
 
     @VisibleForTesting
-    public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController executionController)
+    public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController controller)
     {
         ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(dataRange().keyRange()));
         Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().partitionKeyType));
 
         // fetch data from current memtable, historical memtables, and SSTables in the correct order.
-        InputCollector<UnfilteredPartitionIterator> inputCollector = iteratorsForRange(view);
+        InputCollector<UnfilteredPartitionIterator> inputCollector = iteratorsForRange(view, controller);
         try
         {
             for (Memtable memtable : view.memtables)
             {
                 @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
                 Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange());
-                oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.getMinLocalDeletionTime());
+                controller.updateMinOldestUnrepairedTombstone(iter.getMinLocalDeletionTime());
                 inputCollector.addMemtableIterator(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false));
             }
 
@@ -281,13 +279,13 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
                 inputCollector.addSSTableIterator(sstable, RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false));
 
                 if (!sstable.isRepaired())
-                    oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+                    controller.updateMinOldestUnrepairedTombstone(sstable.getMinLocalDeletionTime());
             }
             // iterators can be empty for offline tools
             if (inputCollector.isEmpty())
                 return EmptyIterators.unfilteredPartition(metadata());
 
-            return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(inputCollector.finalizeIterators(cfs, nowInSec(), oldestUnrepairedTombstone)), cfs);
+            return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(inputCollector.finalizeIterators(cfs, nowInSec(), controller.oldestUnrepairedTombstone())), cfs);
         }
         catch (RuntimeException | Error e)
         {
@@ -321,10 +319,10 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
 
     private UnfilteredPartitionIterator checkCacheFilter(UnfilteredPartitionIterator iter, final ColumnFamilyStore cfs)
     {
-        class CacheFilter extends Transformation
+        class CacheFilter extends Transformation<BaseRowIterator<?>>
         {
             @Override
-            public BaseRowIterator applyToPartition(BaseRowIterator iter)
+            public BaseRowIterator<?> applyToPartition(BaseRowIterator<?> iter)
             {
                 // Note that we rely on the fact that until we actually advance 'iter', no really costly operation is actually done
                 // (except for reading the partition key from the index file) due to the call to mergeLazily in queryStorage.
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 7b889d1..d3aef4c 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
@@ -81,6 +80,7 @@ import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.Me
 public abstract class ReadCommand extends AbstractReadQuery
 {
     private static final int TEST_ITERATION_DELAY_MILLIS = Integer.parseInt(System.getProperty("cassandra.test.read_iteration_delay_ms", "0"));
+
     protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
     public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
 
@@ -91,13 +91,6 @@ public abstract class ReadCommand extends AbstractReadQuery
     // if a digest query, the version for which the digest is expected. Ignored if not a digest.
     private int digestVersion;
 
-    // for data queries, coordinators may request information on the repaired data used in constructing the response
-    private boolean trackRepairedStatus = false;
-    // tracker for repaired data, initialized to singleton null object
-    private RepairedDataInfo repairedDataInfo = RepairedDataInfo.NULL_REPAIRED_DATA_INFO;
-
-    int oldestUnrepairedTombstone = Integer.MAX_VALUE;
-
     @Nullable
     private final IndexMetadata index;
 
@@ -219,68 +212,6 @@ public abstract class ReadCommand extends AbstractReadQuery
     }
 
     /**
-     * Activates repaired data tracking for this command.
-     *
-     * When active, a digest will be created from data read from repaired SSTables. The digests
-     * from each replica can then be compared on the coordinator to detect any divergence in their
-     * repaired datasets. In this context, an sstable is considered repaired if it is marked
-     * repaired or has a pending repair session which has been committed.
-     * In addition to the digest, a set of ids for any pending but as yet uncommitted repair sessions
-     * is recorded and returned to the coordinator. This is to help reduce false positives caused
-     * by compaction lagging which can leave sstables from committed sessions in the pending state
-     * for a time.
-     */
-    public void trackRepairedStatus()
-    {
-        trackRepairedStatus = true;
-    }
-
-    /**
-     * Whether or not repaired status of any data read is being tracked or not
-     *
-     * @return Whether repaired status tracking is active for this command
-     */
-    public boolean isTrackingRepairedStatus()
-    {
-        return trackRepairedStatus;
-    }
-
-    /**
-     * Returns a digest of the repaired data read in the execution of this command.
-     *
-     * If either repaired status tracking is not active or the command has not yet been
-     * executed, then this digest will be an empty buffer.
-     * Otherwise, it will contain a digest* of the repaired data read, or empty buffer
-     * if no repaired data was read.
-     * @return digest of the repaired data read in the execution of the command
-     */
-    public ByteBuffer getRepairedDataDigest()
-    {
-        return repairedDataInfo.getDigest();
-    }
-
-    /**
-     * Returns a boolean indicating whether any relevant sstables were skipped during the read
-     * that produced the repaired data digest.
-     *
-     * If true, then no pending repair sessions or partition deletes have influenced the extent
-     * of the repaired sstables that went into generating the digest.
-     * This indicates whether or not the digest can reliably be used to infer consistency
-     * issues between the repaired sets across replicas.
-     *
-     * If either repaired status tracking is not active or the command has not yet been
-     * executed, then this will always return true.
-     *
-     * @return boolean to indicate confidence in the dwhether or not the digest of the repaired data can be
-     * reliably be used to infer inconsistency issues between the repaired sets across
-     * replicas.
-     */
-    public boolean isRepairedDataDigestConclusive()
-    {
-        return repairedDataInfo.isConclusive();
-    }
-
-    /**
      * Index (metadata) chosen for this query. Can be null.
      *
      * @return index (metadata) chosen for this query
@@ -358,11 +289,6 @@ public abstract class ReadCommand extends AbstractReadQuery
 
     protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadExecutionController executionController);
 
-    protected int oldestUnrepairedTombstone()
-    {
-        return oldestUnrepairedTombstone;
-    }
-
     /**
      * Whether the underlying {@code ClusteringIndexFilter} is reversed or not.
      *
@@ -371,15 +297,15 @@ public abstract class ReadCommand extends AbstractReadQuery
     public abstract boolean isReversed();
 
     @SuppressWarnings("resource")
-    public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
+    public ReadResponse createResponse(UnfilteredPartitionIterator iterator, RepairedDataInfo rdi)
     {
         // validate that the sequence of RT markers is correct: open is followed by close, deletion times for both
         // ends equal, and there are no dangling RT bound in any partition.
         iterator = RTBoundValidator.validate(iterator, Stage.PROCESSED, true);
 
         return isDigestQuery()
-             ? ReadResponse.createDigestResponse(iterator, this)
-             : ReadResponse.createDataResponse(iterator, this);
+               ? ReadResponse.createDigestResponse(iterator, this)
+               : ReadResponse.createDataResponse(iterator, this, rdi);
     }
 
     long indexSerializedSize(int version)
@@ -448,22 +374,13 @@ public abstract class ReadCommand extends AbstractReadQuery
             Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.keyspace, cfs.metadata.name, index.getIndexMetadata().name);
         }
 
-        if (isTrackingRepairedStatus())
-        {
-            final DataLimits.Counter repairedReadCount = limits().newCounter(nowInSec(),
-                                                                             false,
-                                                                             selectsFullPartition(),
-                                                                             metadata().enforceStrictLiveness()).onlyCount();
-            repairedDataInfo = new RepairedDataInfo(repairedReadCount);
-        }
-
         UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, executionController) : searcher.search(executionController);
         iterator = RTBoundValidator.validate(iterator, Stage.MERGED, false);
 
         try
         {
             iterator = withStateTracking(iterator);
-            iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs), Stage.PURGED, false);
+            iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, executionController), Stage.PURGED, false);
             iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos);
 
             // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
@@ -482,7 +399,7 @@ public abstract class ReadCommand extends AbstractReadQuery
             // as the count is observed; if that happens in the middle of an open RT, its end bound will not be included.
             // If tracking repaired data, the counter is needed for overreading repaired data, otherwise we can
             // optimise the case where this.limit = DataLimits.NONE which skips an unnecessary transform
-            if (isTrackingRepairedStatus())
+            if (executionController.isTrackingRepairedStatus())
             {
                 DataLimits.Counter limit =
                     limits().newCounter(nowInSec(), false, selectsFullPartition(), metadata().enforceStrictLiveness());
@@ -490,7 +407,7 @@ public abstract class ReadCommand extends AbstractReadQuery
                 // ensure that a consistent amount of repaired data is read on each replica. This causes silent
                 // overreading from the repaired data set, up to limits(). The extra data is not visible to
                 // the caller, only iterated to produce the repaired data digest.
-                iterator = repairedDataInfo.extend(iterator, limit);
+                iterator = executionController.getRepairedDataInfo().extend(iterator, limit);
             }
             else
             {
@@ -509,9 +426,14 @@ public abstract class ReadCommand extends AbstractReadQuery
 
     protected abstract void recordLatency(TableMetrics metric, long latencyNanos);
 
+    public ReadExecutionController executionController(boolean trackRepairedStatus)
+    {
+        return ReadExecutionController.forCommand(this, trackRepairedStatus);
+    }
+
     public ReadExecutionController executionController()
     {
-        return ReadExecutionController.forCommand(this);
+        return ReadExecutionController.forCommand(this, false);
     }
 
     /**
@@ -697,13 +619,15 @@ public abstract class ReadCommand extends AbstractReadQuery
     // Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it
     // can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for purgeable tombstones (which
     // are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive).
-    protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs)
+    protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, 
+                                                                     ColumnFamilyStore cfs,
+                                                                     ReadExecutionController controller)
     {
         class WithoutPurgeableTombstones extends PurgeFunction
         {
             public WithoutPurgeableTombstones()
             {
-                super(nowInSec(), cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(),
+                super(nowInSec(), cfs.gcBefore(nowInSec()), controller.oldestUnrepairedTombstone(),
                       cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(),
                       iterator.metadata().enforceStrictLiveness());
             }
@@ -744,7 +668,7 @@ public abstract class ReadCommand extends AbstractReadQuery
     }
 
     @SuppressWarnings("resource") // resultant iterators are closed by their callers
-    InputCollector<UnfilteredRowIterator> iteratorsForPartition(ColumnFamilyStore.ViewFragment view)
+    InputCollector<UnfilteredRowIterator> iteratorsForPartition(ColumnFamilyStore.ViewFragment view, ReadExecutionController controller)
     {
         final BiFunction<List<UnfilteredRowIterator>, RepairedDataInfo, UnfilteredRowIterator> merge =
             (unfilteredRowIterators, repairedDataInfo) -> {
@@ -757,11 +681,11 @@ public abstract class ReadCommand extends AbstractReadQuery
         // internal counter is satisfied
         final Function<UnfilteredRowIterator, UnfilteredPartitionIterator> postLimitPartitions =
             (rows) -> EmptyIterators.unfilteredPartition(metadata());
-        return new InputCollector<>(view, repairedDataInfo, merge, postLimitPartitions, isTrackingRepairedStatus());
+        return new InputCollector<>(view, controller, merge, postLimitPartitions);
     }
 
     @SuppressWarnings("resource") // resultant iterators are closed by their callers
-    InputCollector<UnfilteredPartitionIterator> iteratorsForRange(ColumnFamilyStore.ViewFragment view)
+    InputCollector<UnfilteredPartitionIterator> iteratorsForRange(ColumnFamilyStore.ViewFragment view, ReadExecutionController controller)
     {
         final BiFunction<List<UnfilteredPartitionIterator>, RepairedDataInfo, UnfilteredPartitionIterator> merge =
             (unfilteredPartitionIterators, repairedDataInfo) -> {
@@ -773,7 +697,7 @@ public abstract class ReadCommand extends AbstractReadQuery
         // Uses identity function to provide additional partitions to be consumed after the command's
         // DataLimits are satisfied. The input to the function will be the iterator of merged, repaired partitions
         // which we'll keep reading until the RepairedDataInfo's internal counter is satisfied.
-        return new InputCollector<>(view, repairedDataInfo, merge, Function.identity(), isTrackingRepairedStatus());
+        return new InputCollector<>(view, controller, merge, Function.identity());
     }
 
     /**
@@ -796,13 +720,13 @@ public abstract class ReadCommand extends AbstractReadQuery
         List<T> unrepairedIters;
 
         InputCollector(ColumnFamilyStore.ViewFragment view,
-                       RepairedDataInfo repairedDataInfo,
+                       ReadExecutionController controller,
                        BiFunction<List<T>, RepairedDataInfo, T> repairedMerger,
-                       Function<T, UnfilteredPartitionIterator> postLimitAdditionalPartitions,
-                       boolean isTrackingRepairedStatus)
+                       Function<T, UnfilteredPartitionIterator> postLimitAdditionalPartitions)
         {
-            this.repairedDataInfo = repairedDataInfo;
-            this.isTrackingRepairedStatus = isTrackingRepairedStatus;
+            this.repairedDataInfo = controller.getRepairedDataInfo();
+            this.isTrackingRepairedStatus = controller.isTrackingRepairedStatus();
+            
             if (isTrackingRepairedStatus)
             {
                 for (SSTableReader sstable : view.sstables)
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index 2c28ed9..a86852f 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -52,14 +52,11 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
         long timeout = message.expiresAtNanos() - message.createdAtNanos();
         command.setMonitoringTime(message.createdAtNanos(), message.isCrossNode(), timeout, DatabaseDescriptor.getSlowQueryTimeout(NANOSECONDS));
 
-        if (message.trackRepairedData())
-            command.trackRepairedStatus();
-
         ReadResponse response;
-        try (ReadExecutionController executionController = command.executionController();
-             UnfilteredPartitionIterator iterator = command.executeLocally(executionController))
+        try (ReadExecutionController controller = command.executionController(message.trackRepairedData());
+             UnfilteredPartitionIterator iterator = command.executeLocally(controller))
         {
-            response = command.createResponse(iterator);
+            response = command.createResponse(iterator, controller.getRepairedDataInfo());
         }
 
         if (!command.complete())
diff --git a/src/java/org/apache/cassandra/db/ReadExecutionController.java b/src/java/org/apache/cassandra/db/ReadExecutionController.java
index 73ddad8..5bcd84b 100644
--- a/src/java/org/apache/cassandra/db/ReadExecutionController.java
+++ b/src/java/org/apache/cassandra/db/ReadExecutionController.java
@@ -17,8 +17,12 @@
  */
 package org.apache.cassandra.db;
 
+import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.MonotonicClock;
@@ -42,12 +46,16 @@ public class ReadExecutionController implements AutoCloseable
 
     private final long createdAtNanos; // Only used while sampling
 
-    private ReadExecutionController(ReadCommand command,
-                                    OpOrder.Group baseOp,
-                                    TableMetadata baseMetadata,
-                                    ReadExecutionController indexController,
-                                    WriteContext writeContext,
-                                    long createdAtNanos)
+    private final RepairedDataInfo repairedDataInfo;
+    private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
+
+    ReadExecutionController(ReadCommand command,
+                            OpOrder.Group baseOp,
+                            TableMetadata baseMetadata,
+                            ReadExecutionController indexController,
+                            WriteContext writeContext,
+                            long createdAtNanos,
+                            boolean trackRepairedStatus)
     {
         // We can have baseOp == null, but only when empty() is called, in which case the controller will never really be used
         // (which validForReadOn should ensure). But if it's not null, we should have the proper metadata too.
@@ -58,6 +66,19 @@ public class ReadExecutionController implements AutoCloseable
         this.writeContext = writeContext;
         this.command = command;
         this.createdAtNanos = createdAtNanos;
+
+        if (trackRepairedStatus)
+        {
+            DataLimits.Counter repairedReadCount = command.limits().newCounter(command.nowInSec(),
+                                                                               false,
+                                                                               command.selectsFullPartition(),
+                                                                               metadata().enforceStrictLiveness()).onlyCount();
+            repairedDataInfo = new RepairedDataInfo(repairedReadCount);
+        }
+        else
+        {
+            repairedDataInfo = RepairedDataInfo.NO_OP_REPAIRED_DATA_INFO;
+        }
     }
 
     public ReadExecutionController indexReadController()
@@ -70,6 +91,16 @@ public class ReadExecutionController implements AutoCloseable
         return writeContext;
     }
 
+    int oldestUnrepairedTombstone()
+    {
+        return oldestUnrepairedTombstone;
+    }
+    
+    void updateMinOldestUnrepairedTombstone(int candidate)
+    {
+        oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, candidate);
+    }
+
     boolean validForReadOn(ColumnFamilyStore cfs)
     {
         return baseOp != null && cfs.metadata.id.equals(baseMetadata.id);
@@ -77,7 +108,7 @@ public class ReadExecutionController implements AutoCloseable
 
     public static ReadExecutionController empty()
     {
-        return new ReadExecutionController(null, null, null, null, null, NO_SAMPLING);
+        return new ReadExecutionController(null, null, null, null, null, NO_SAMPLING, false);
     }
 
     /**
@@ -90,7 +121,7 @@ public class ReadExecutionController implements AutoCloseable
      * @return the created execution controller, which must always be closed.
      */
     @SuppressWarnings("resource") // ops closed during controller close
-    static ReadExecutionController forCommand(ReadCommand command)
+    static ReadExecutionController forCommand(ReadCommand command, boolean trackRepairedStatus)
     {
         ColumnFamilyStore baseCfs = Keyspace.openAndGetStore(command.metadata());
         ColumnFamilyStore indexCfs = maybeGetIndexCfs(baseCfs, command);
@@ -98,7 +129,7 @@ public class ReadExecutionController implements AutoCloseable
         long createdAtNanos = baseCfs.metric.topLocalReadQueryTime.isEnabled() ? clock.now() : NO_SAMPLING;
 
         if (indexCfs == null)
-            return new ReadExecutionController(command, baseCfs.readOrdering.start(), baseCfs.metadata(), null, null, createdAtNanos);
+            return new ReadExecutionController(command, baseCfs.readOrdering.start(), baseCfs.metadata(), null, null, createdAtNanos, trackRepairedStatus);
 
         OpOrder.Group baseOp = null;
         WriteContext writeContext = null;
@@ -107,14 +138,14 @@ public class ReadExecutionController implements AutoCloseable
         try
         {
             baseOp = baseCfs.readOrdering.start();
-            indexController = new ReadExecutionController(command, indexCfs.readOrdering.start(), indexCfs.metadata(), null, null, NO_SAMPLING);
+            indexController = new ReadExecutionController(command, indexCfs.readOrdering.start(), indexCfs.metadata(), null, null, NO_SAMPLING, false);
             /*
              * TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try*
              * to delete stale entries, without blocking if there's no room
              * as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room being made
              */
             writeContext = baseCfs.keyspace.getWriteHandler().createContextForRead();
-            return new ReadExecutionController(command, baseOp, baseCfs.metadata(), indexController, writeContext, createdAtNanos);
+            return new ReadExecutionController(command, baseOp, baseCfs.metadata(), indexController, writeContext, createdAtNanos, trackRepairedStatus);
         }
         catch (RuntimeException e)
         {
@@ -171,6 +202,28 @@ public class ReadExecutionController implements AutoCloseable
             addSample();
     }
 
+    public boolean isTrackingRepairedStatus()
+    {
+        return repairedDataInfo != RepairedDataInfo.NO_OP_REPAIRED_DATA_INFO;
+    }
+
+    @VisibleForTesting
+    public ByteBuffer getRepairedDataDigest()
+    {
+        return repairedDataInfo.getDigest();
+    }
+
+    @VisibleForTesting
+    public boolean isRepairedDataDigestConclusive()
+    {
+        return repairedDataInfo.isConclusive();
+    }
+    
+    public RepairedDataInfo getRepairedDataInfo()
+    {
+        return repairedDataInfo;
+    }
+
     private void addSample()
     {
         String cql = command.toCQLString();
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 3f6481d..52e6fd5 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -43,9 +43,9 @@ public abstract class ReadResponse
     {
     }
 
-    public static ReadResponse createDataResponse(UnfilteredPartitionIterator data, ReadCommand command)
+    public static ReadResponse createDataResponse(UnfilteredPartitionIterator data, ReadCommand command, RepairedDataInfo rdi)
     {
-        return new LocalDataResponse(data, command);
+        return new LocalDataResponse(data, command, rdi);
     }
 
     @VisibleForTesting
@@ -176,11 +176,10 @@ public abstract class ReadResponse
     // built on the owning node responding to a query
     private static class LocalDataResponse extends DataResponse
     {
-        private LocalDataResponse(UnfilteredPartitionIterator iter, ReadCommand command)
+        private LocalDataResponse(UnfilteredPartitionIterator iter, ReadCommand command, RepairedDataInfo rdi)
         {
             super(build(iter, command.columnFilter()),
-                  command.getRepairedDataDigest(),
-                  command.isRepairedDataDigestConclusive(),
+                  rdi.getDigest(), rdi.isConclusive(),
                   MessagingService.current_version,
                   DeserializationHelper.Flag.LOCAL);
         }
diff --git a/src/java/org/apache/cassandra/db/RepairedDataInfo.java b/src/java/org/apache/cassandra/db/RepairedDataInfo.java
index c136f26..f80b113 100644
--- a/src/java/org/apache/cassandra/db/RepairedDataInfo.java
+++ b/src/java/org/apache/cassandra/db/RepairedDataInfo.java
@@ -22,6 +22,8 @@ import java.nio.ByteBuffer;
 import java.util.function.LongPredicate;
 import java.util.concurrent.TimeUnit;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.partitions.PurgeFunction;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
@@ -34,12 +36,28 @@ import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+@NotThreadSafe
 class RepairedDataInfo
 {
-    public static final RepairedDataInfo NULL_REPAIRED_DATA_INFO = new RepairedDataInfo(null)
+    public static final RepairedDataInfo NO_OP_REPAIRED_DATA_INFO = new RepairedDataInfo(null)
     {
-        boolean isConclusive(){ return true; }
-        ByteBuffer getDigest(){ return ByteBufferUtil.EMPTY_BYTE_BUFFER; }
+        @Override
+        public UnfilteredPartitionIterator withRepairedDataInfo(UnfilteredPartitionIterator iterator)
+        {
+            return iterator;
+        }
+
+        @Override
+        public UnfilteredRowIterator withRepairedDataInfo(UnfilteredRowIterator iterator)
+        {
+            return iterator;
+        }
+            
+        @Override
+        public UnfilteredPartitionIterator extend(UnfilteredPartitionIterator partitions, DataLimits.Counter limit)
+        {
+           return partitions;
+        }
     };
 
     // Keeps a digest of the partition currently being processed. Since we won't know
@@ -72,6 +90,14 @@ class RepairedDataInfo
         this.repairedCounter = repairedCounter;
     }
 
+    /**
+     * If either repaired status tracking is not active or the command has not yet been
+     * executed, then this digest will be an empty buffer.
+     * Otherwise, it will contain a digest of the repaired data read, or an empty buffer
+     * if no repaired data was read.
+     *
+     * @return a digest of the repaired data read during local execution of a command
+     */
     ByteBuffer getDigest()
     {
         if (calculatedDigest != null)
@@ -95,6 +121,21 @@ class RepairedDataInfo
         this.postLimitPartitions = postLimitPartitions;
     }
 
+    /**
+     * Returns a boolean indicating whether any relevant sstables were skipped during the read
+     * that produced the repaired data digest.
+     *
+     * If true, then no pending repair sessions or partition deletes have influenced the extent
+     * of the repaired sstables that went into generating the digest.
+     * This indicates whether or not the digest can reliably be used to infer consistency
+     * issues between the repaired sets across replicas.
+     *
+     * If either repaired status tracking is not active or the command has not yet been
+     * executed, then this will always return true.
+     *
+     * @return boolean to indicate confidence in the whether or not the digest of the repaired data can be
+     *         reliably be used to infer inconsistency issues between the repaired sets across replicas
+     */
     boolean isConclusive()
     {
         return isConclusive;
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index b17506f..5e804c7 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -398,7 +398,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
     {
         // skip the row cache and go directly to sstables/memtable if repaired status of
         // data is being tracked. This is only requested after an initial digest mismatch
-        UnfilteredRowIterator partition = cfs.isRowCacheEnabled() && !isTrackingRepairedStatus()
+        UnfilteredRowIterator partition = cfs.isRowCacheEnabled() && !executionController.isTrackingRepairedStatus()
                                         ? getThroughCache(cfs, executionController)
                                         : queryMemtableAndDisk(cfs, executionController);
         return new SingletonUnfilteredPartitionIterator(partition);
@@ -566,10 +566,10 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
         assert executionController != null && executionController.validForReadOn(cfs);
         Tracing.trace("Executing single-partition query on {}", cfs.name);
 
-        return queryMemtableAndDiskInternal(cfs);
+        return queryMemtableAndDiskInternal(cfs, executionController);
     }
 
-    private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs)
+    private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, ReadExecutionController controller)
     {
         /*
          * We have 2 main strategies:
@@ -589,20 +589,20 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
          *      and generate a digest over their merge, which procludes an early return.
          */
         if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter
-                && !metadata().isCounter()
-                && !queriesMulticellType()
-                && !isTrackingRepairedStatus())
+            && !metadata().isCounter()
+            && !queriesMulticellType()
+            && !controller.isTrackingRepairedStatus())
         {
-            return queryMemtableAndSSTablesInTimestampOrder(cfs, (ClusteringIndexNamesFilter)clusteringIndexFilter());
+            return queryMemtableAndSSTablesInTimestampOrder(cfs, (ClusteringIndexNamesFilter)clusteringIndexFilter(), controller);
         }
 
         Tracing.trace("Acquiring sstable references");
         ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
-        Collections.sort(view.sstables, SSTableReader.maxTimestampDescending);
+        view.sstables.sort(SSTableReader.maxTimestampDescending);
         ClusteringIndexFilter filter = clusteringIndexFilter();
         long minTimestamp = Long.MAX_VALUE;
         long mostRecentPartitionTombstone = Long.MIN_VALUE;
-        InputCollector<UnfilteredRowIterator> inputCollector = iteratorsForPartition(view);
+        InputCollector<UnfilteredRowIterator> inputCollector = iteratorsForPartition(view, controller);
         try
         {
             for (Memtable memtable : view.memtables)
@@ -617,7 +617,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                 UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
 
                 // Memtable data is always considered unrepaired
-                oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
+                controller.updateMinOldestUnrepairedTombstone(partition.stats().minLocalDeletionTime);
                 inputCollector.addMemtableIterator(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false));
 
                 mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
@@ -636,13 +636,13 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
              * In other words, iterating in descending maxTimestamp order allow to do our mostRecentPartitionTombstone
              * elimination in one pass, and minimize the number of sstables for which we read a partition tombstone.
             */
-            Collections.sort(view.sstables, SSTableReader.maxTimestampDescending);
+            view.sstables.sort(SSTableReader.maxTimestampDescending);
             int nonIntersectingSSTables = 0;
             int includedDueToTombstones = 0;
 
             SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
 
-            if (isTrackingRepairedStatus())
+            if (controller.isTrackingRepairedStatus())
                 Tracing.trace("Collecting data from sstables and tracking repaired status");
 
             for (SSTableReader sstable : view.sstables)
@@ -661,7 +661,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                 if (shouldInclude(sstable))
                 {
                     if (!sstable.isRepaired())
-                        oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+                        controller.updateMinOldestUnrepairedTombstone(sstable.getMinLocalDeletionTime());
 
                     // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
                     @SuppressWarnings("resource")
@@ -684,7 +684,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                         if (!iter.partitionLevelDeletion().isLive())
                         {
                             if (!sstable.isRepaired())
-                                oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+                                controller.updateMinOldestUnrepairedTombstone(sstable.getMinLocalDeletionTime());
                             inputCollector.addSSTableIterator(sstable, iter);
                             includedDueToTombstones++;
                             mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
@@ -707,7 +707,8 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
 
             StorageHook.instance.reportRead(cfs.metadata().id, partitionKey());
 
-            return withSSTablesIterated(inputCollector.finalizeIterators(cfs, nowInSec(), oldestUnrepairedTombstone), cfs.metric, metricsCollector);
+            List<UnfilteredRowIterator> iterators = inputCollector.finalizeIterators(cfs, nowInSec(), controller.oldestUnrepairedTombstone());
+            return withSSTablesIterated(iterators, cfs.metric, metricsCollector);
         }
         catch (RuntimeException | Error e)
         {
@@ -766,7 +767,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
             metrics.topReadPartitionFrequency.addSample(key.getKey(), 1);
         }
 
-        class UpdateSstablesIterated extends Transformation
+        class UpdateSstablesIterated extends Transformation<UnfilteredRowIterator>
         {
            public void onPartitionClose()
            {
@@ -774,7 +775,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                metrics.updateSSTableIterated(mergedSSTablesIterated);
                Tracing.trace("Merged data from memtables and {} sstables", mergedSSTablesIterated);
            }
-        };
+        }
         return Transformation.apply(merged, new UpdateSstablesIterated());
     }
 
@@ -797,7 +798,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
      * no collection or counters are included).
      * This method assumes the filter is a {@code ClusteringIndexNamesFilter}.
      */
-    private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, ClusteringIndexNamesFilter filter)
+    private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, ClusteringIndexNamesFilter filter, ReadExecutionController controller)
     {
         Tracing.trace("Acquiring sstable references");
         ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
@@ -816,17 +817,16 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                 if (iter.isEmpty())
                     continue;
 
-                result = add(
-                    RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false),
-                    result,
-                    filter,
-                    false
-                );
+                result = add(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false),
+                             result,
+                             filter,
+                             false,
+                             controller);
             }
         }
 
         /* add the SSTables on disk */
-        Collections.sort(view.sstables, SSTableReader.maxTimestampDescending);
+        view.sstables.sort(SSTableReader.maxTimestampDescending);
         // read sorted sstables
         SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
         for (SSTableReader sstable : view.sstables)
@@ -862,25 +862,23 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                 {
                     if (!iter.partitionLevelDeletion().isLive())
                     {
-                        result = add(
-                            UnfilteredRowIterators.noRowsIterator(iter.metadata(),
-                                                                  iter.partitionKey(),
-                                                                  Rows.EMPTY_STATIC_ROW,
-                                                                  iter.partitionLevelDeletion(),
-                                                                  filter.isReversed()),
-                            result,
-                            filter,
-                            sstable.isRepaired()
-                        );
+                        result = add(UnfilteredRowIterators.noRowsIterator(iter.metadata(),
+                                                                           iter.partitionKey(),
+                                                                           Rows.EMPTY_STATIC_ROW,
+                                                                           iter.partitionLevelDeletion(),
+                                                                           filter.isReversed()),
+                                     result,
+                                     filter,
+                                     sstable.isRepaired(),
+                                     controller);
                     }
                     else
                     {
-                        result = add(
-                            RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false),
-                            result,
-                            filter,
-                            sstable.isRepaired()
-                        );
+                        result = add(RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false),
+                                     result,
+                                     filter,
+                                     sstable.isRepaired(),
+                                     controller);
                     }
                 }
 
@@ -898,12 +896,11 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                 if (iter.isEmpty())
                     continue;
 
-                result = add(
-                    RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false),
-                    result,
-                    filter,
-                    sstable.isRepaired()
-                );
+                result = add(RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false),
+                             result,
+                             filter,
+                             sstable.isRepaired(),
+                             controller);
             }
         }
 
@@ -919,10 +916,10 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
         return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed());
     }
 
-    private ImmutableBTreePartition add(UnfilteredRowIterator iter, ImmutableBTreePartition result, ClusteringIndexNamesFilter filter, boolean isRepaired)
+    private ImmutableBTreePartition add(UnfilteredRowIterator iter, ImmutableBTreePartition result, ClusteringIndexNamesFilter filter, boolean isRepaired, ReadExecutionController controller)
     {
         if (!isRepaired)
-            oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.stats().minLocalDeletionTime);
+            controller.updateMinOldestUnrepairedTombstone(iter.stats().minLocalDeletionTime);
 
         int maxRows = Math.max(filter.requestedRows().size(), 1);
         if (result == null)
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 2f6ad38..41e0404 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1968,12 +1968,19 @@ public class StorageProxy implements StorageProxyMBean
     {
         private final ReadCommand command;
         private final ReadCallback handler;
+        private final boolean trackRepairedStatus;
 
         public LocalReadRunnable(ReadCommand command, ReadCallback handler)
         {
+            this(command, handler, false);
+        }
+
+        public LocalReadRunnable(ReadCommand command, ReadCallback handler, boolean trackRepairedStatus)
+        {
             super(Verb.READ_REQ);
             this.command = command;
             this.handler = handler;
+            this.trackRepairedStatus = trackRepairedStatus;
         }
 
         protected void runMayThrow()
@@ -1983,10 +1990,10 @@ public class StorageProxy implements StorageProxyMBean
                 command.setMonitoringTime(approxCreationTimeNanos, false, verb.expiresAfterNanos(), DatabaseDescriptor.getSlowQueryTimeout(NANOSECONDS));
 
                 ReadResponse response;
-                try (ReadExecutionController executionController = command.executionController();
-                     UnfilteredPartitionIterator iterator = command.executeLocally(executionController))
+                try (ReadExecutionController controller = command.executionController(trackRepairedStatus);
+                     UnfilteredPartitionIterator iterator = command.executeLocally(controller))
                 {
-                    response = command.createResponse(iterator);
+                    response = command.createResponse(iterator, controller.getRepairedDataInfo());
                 }
 
                 if (command.complete())
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 8a83d3e..fd1b372 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -155,7 +155,7 @@ public abstract class AbstractReadExecutor
         if (hasLocalEndpoint)
         {
             logger.trace("reading {} locally", readCommand.isDigestQuery() ? "digest" : "data");
-            Stage.READ.maybeExecuteImmediately(new LocalReadRunnable(command, handler));
+            Stage.READ.maybeExecuteImmediately(new LocalReadRunnable(readCommand, handler));
         }
     }
 
diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java b/src/java/org/apache/cassandra/service/reads/DataResolver.java
index 7c76336..6abb2ad 100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@ -60,12 +60,19 @@ public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
 {
     private final boolean enforceStrictLiveness;
     private final ReadRepair<E, P> readRepair;
+    private final boolean trackRepairedStatus;
 
     public DataResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, ReadRepair<E, P> readRepair, long queryStartNanoTime)
     {
+        this(command, replicaPlan, readRepair, queryStartNanoTime, false);
+    }
+
+    public DataResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, ReadRepair<E, P> readRepair, long queryStartNanoTime, boolean trackRepairedStatus)
+    {
         super(command, replicaPlan, queryStartNanoTime);
         this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
         this.readRepair = readRepair;
+        this.trackRepairedStatus = trackRepairedStatus;
     }
 
     public PartitionIterator getData()
@@ -89,7 +96,7 @@ public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
         E replicas = replicaPlan().candidates().select(transform(messages, Message::from), false);
 
         // If requested, inspect each response for a digest of the replica's repaired data set
-        RepairedDataTracker repairedDataTracker = command.isTrackingRepairedStatus()
+        RepairedDataTracker repairedDataTracker = trackRepairedStatus
                                                   ? new RepairedDataTracker(getRepairedDataVerifier(command))
                                                   : null;
         if (repairedDataTracker != null)
diff --git a/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java b/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java
index ae7ee60..38014e2 100644
--- a/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java
+++ b/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java
@@ -176,26 +176,23 @@ class RangeCommandIterator extends AbstractIterator<RowIterator> implements Part
     private SingleRangeResponse query(ReplicaPlan.ForRangeRead replicaPlan, boolean isFirst)
     {
         PartitionRangeReadCommand rangeCommand = command.forSubRange(replicaPlan.range(), isFirst);
-        // If enabled, request repaired data tracking info from full replicas but
-        // only if there are multiple full replicas to compare results from
-        if (DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled()
-            && replicaPlan.contacts().filter(Replica::isFull).size() > 1)
-        {
-            command.trackRepairedStatus();
-            rangeCommand.trackRepairedStatus();
-        }
+        
+        // If enabled, request repaired data tracking info from full replicas, but
+        // only if there are multiple full replicas to compare results from.
+        boolean trackRepairedStatus = DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled()
+                                      && replicaPlan.contacts().filter(Replica::isFull).size() > 1;
 
         ReplicaPlan.SharedForRangeRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan);
         ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair =
                 ReadRepair.create(command, sharedReplicaPlan, queryStartNanoTime);
         DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver =
-                new DataResolver<>(rangeCommand, sharedReplicaPlan, readRepair, queryStartNanoTime);
+                new DataResolver<>(rangeCommand, sharedReplicaPlan, readRepair, queryStartNanoTime, trackRepairedStatus);
         ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler =
                 new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan, queryStartNanoTime);
 
         if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isSelf())
         {
-            Stage.READ.execute(new StorageProxy.LocalReadRunnable(rangeCommand, handler));
+            Stage.READ.execute(new StorageProxy.LocalReadRunnable(rangeCommand, handler, trackRepairedStatus));
         }
         else
         {
@@ -203,7 +200,7 @@ class RangeCommandIterator extends AbstractIterator<RowIterator> implements Part
             {
                 Tracing.trace("Enqueuing request to {}", replica);
                 ReadCommand command = replica.isFull() ? rangeCommand : rangeCommand.copyAsTransientQuery(replica);
-                Message<ReadCommand> message = command.createMessage(command.isTrackingRepairedStatus() && replica.isFull());
+                Message<ReadCommand> message = command.createMessage(trackRepairedStatus && replica.isFull());
                 MessagingService.instance().sendWithCallback(message, replica.endpoint(), handler);
             }
         }
diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
index ca47612..086766e 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
@@ -20,10 +20,8 @@ package org.apache.cassandra.service.reads.repair;
 
 import java.util.function.Consumer;
 
-import com.google.common.base.Preconditions;
-
 import com.codahale.metrics.Meter;
-import com.google.common.base.Predicates;
+import com.google.common.base.Preconditions;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -48,23 +46,22 @@ import org.apache.cassandra.tracing.Tracing;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
-public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
-        implements ReadRepair<E, P>
+public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> implements ReadRepair<E, P>
 {
     protected final ReadCommand command;
     protected final long queryStartNanoTime;
     protected final ReplicaPlan.Shared<E, P> replicaPlan;
     protected final ColumnFamilyStore cfs;
 
-    private volatile DigestRepair digestRepair = null;
+    private volatile DigestRepair<E, P> digestRepair = null;
 
-    private static class DigestRepair
+    private static class DigestRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
     {
-        private final DataResolver dataResolver;
-        private final ReadCallback readCallback;
+        private final DataResolver<E, P> dataResolver;
+        private final ReadCallback<E, P> readCallback;
         private final Consumer<PartitionIterator> resultConsumer;
 
-        public DigestRepair(DataResolver dataResolver, ReadCallback readCallback, Consumer<PartitionIterator> resultConsumer)
+        public DigestRepair(DataResolver<E, P> dataResolver, ReadCallback<E, P> readCallback, Consumer<PartitionIterator> resultConsumer)
         {
             this.dataResolver = dataResolver;
             this.readCallback = readCallback;
@@ -87,13 +84,13 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends Repli
         return replicaPlan.get();
     }
 
-    void sendReadCommand(Replica to, ReadCallback readCallback, boolean speculative)
+    void sendReadCommand(Replica to, ReadCallback<E, P> readCallback, boolean speculative, boolean trackRepairedStatus)
     {
         ReadCommand command = this.command;
-
+        
         if (to.isSelf())
         {
-            Stage.READ.maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(command, readCallback));
+            Stage.READ.maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(command, readCallback, trackRepairedStatus));
             return;
         }
 
@@ -112,8 +109,8 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends Repli
             else type = to.isFull() ? "full" : "transient";
             Tracing.trace("Enqueuing {} data read to {}", type, to);
         }
-        // if enabled, request additional info about repaired data from any full replicas
-        Message<ReadCommand> message = command.createMessage(command.isTrackingRepairedStatus() && to.isFull());
+
+        Message<ReadCommand> message = command.createMessage(trackRepairedStatus && to.isFull());
         MessagingService.instance().sendWithCallback(message, to.endpoint(), readCallback);
     }
 
@@ -124,25 +121,35 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends Repli
     {
         getRepairMeter().mark();
 
+        /*
+         * When repaired data tracking is enabled, a digest will be created from data reads from repaired SSTables.
+         * The digests from each replica can then be compared on the coordinator to detect any divergence in their
+         * repaired datasets. In this context, an SSTable is considered repaired if it is marked repaired or has a 
+         * pending repair session which has been committed. In addition to the digest, a set of ids for any pending but 
+         * as yet uncommitted repair sessions is recorded and returned to the coordinator. This is to help reduce false 
+         * positives caused by compaction lagging which can leave sstables from committed sessions in the pending state
+         * for a time.
+         */
+        boolean trackRepairedStatus = DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled();
+
         // Do a full data read to resolve the correct response (and repair node that need be)
-        DataResolver<E, P> resolver = new DataResolver<>(command, replicaPlan, this, queryStartNanoTime);
+        DataResolver<E, P> resolver = new DataResolver<>(command, replicaPlan, this, queryStartNanoTime, trackRepairedStatus);
         ReadCallback<E, P> readCallback = new ReadCallback<>(resolver, command, replicaPlan, queryStartNanoTime);
 
-        digestRepair = new DigestRepair(resolver, readCallback, resultConsumer);
+        digestRepair = new DigestRepair<>(resolver, readCallback, resultConsumer);
 
         // if enabled, request additional info about repaired data from any full replicas
-        if (DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled())
-            command.trackRepairedStatus();
-
         for (Replica replica : replicaPlan().contacts())
-            sendReadCommand(replica, readCallback, false);
+        {
+            sendReadCommand(replica, readCallback, false, trackRepairedStatus);
+        }
 
         ReadRepairDiagnostics.startRepair(this, replicaPlan(), digestResolver);
     }
 
     public void awaitReads() throws ReadTimeoutException
     {
-        DigestRepair repair = digestRepair;
+        DigestRepair<E, P> repair = digestRepair;
         if (repair == null)
             return;
 
@@ -163,18 +170,18 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends Repli
     {
         Preconditions.checkState(command instanceof SinglePartitionReadCommand,
                                  "maybeSendAdditionalReads can only be called for SinglePartitionReadCommand");
-        DigestRepair repair = digestRepair;
+        DigestRepair<E, P> repair = digestRepair;
         if (repair == null)
             return;
 
         if (shouldSpeculate() && !repair.readCallback.await(cfs.sampleReadLatencyNanos, NANOSECONDS))
         {
-            Replica uncontacted = replicaPlan().firstUncontactedCandidate(Predicates.alwaysTrue());
+            Replica uncontacted = replicaPlan().firstUncontactedCandidate(replica -> true);
             if (uncontacted == null)
                 return;
 
             replicaPlan.addToContacts(uncontacted);
-            sendReadCommand(uncontacted, repair.readCallback, true);
+            sendReadCommand(uncontacted, repair.readCallback, true, false);
             ReadRepairMetrics.speculatedRead.mark();
             ReadRepairDiagnostics.speculatedRead(this, uncontacted.endpoint(), replicaPlan());
         }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
index 308702a..c8fc088 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
@@ -19,21 +19,25 @@
 package org.apache.cassandra.distributed.test;
 
 import java.io.IOException;
-import java.time.LocalDate;
-import java.time.format.DateTimeFormatter;
-import java.util.EnumSet;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.cassandra.concurrent.SEPExecutor;
+import org.apache.cassandra.utils.Throwables;
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
@@ -44,27 +48,31 @@ import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.service.SnapshotVerbHandler;
 import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
 import org.apache.cassandra.utils.DiagnosticSnapshotService;
 
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class RepairDigestTrackingTest extends TestBaseImpl
 {
     private static final String TABLE = "tbl";
-    private static final String KS_TABLE = KEYSPACE + "." + TABLE;
+    private static final String KS_TABLE = KEYSPACE + '.' + TABLE;
 
+    @SuppressWarnings("Convert2MethodRef")
     @Test
     public void testInconsistenciesFound() throws Throwable
     {
-        try (Cluster cluster = (Cluster) init(builder().withNodes(2).start()))
+        try (Cluster cluster = init(builder().withNodes(2).start()))
         {
 
-            cluster.get(1).runOnInstance(() -> {
-                StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
-            });
+            cluster.get(1).runOnInstance(() -> StorageProxy.instance.enableRepairedDataTrackingForRangeReads());
 
             cluster.schemaChange("CREATE TABLE " + KS_TABLE+ " (k INT, c INT, v INT, PRIMARY KEY (k,c)) with read_repair='NONE'");
             for (int i = 0; i < 10; i++)
@@ -99,14 +107,13 @@ public class RepairDigestTrackingTest extends TestBaseImpl
         }
     }
 
+    @SuppressWarnings("Convert2MethodRef")
     @Test
     public void testPurgeableTombstonesAreIgnored() throws Throwable
     {
-        try (Cluster cluster = (Cluster) init(builder().withNodes(2).start()))
+        try (Cluster cluster = init(builder().withNodes(2).start()))
         {
-            cluster.get(1).runOnInstance(() -> {
-                StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
-            });
+            cluster.get(1).runOnInstance(() -> StorageProxy.instance.enableRepairedDataTrackingForRangeReads());
 
             cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v1 INT, v2 INT, PRIMARY KEY (k,c)) WITH gc_grace_seconds=0");
             // on node1 only insert some tombstones, then flush
@@ -147,14 +154,13 @@ public class RepairDigestTrackingTest extends TestBaseImpl
         }
     }
 
+    @SuppressWarnings("Convert2MethodRef")
     @Test
     public void testSnapshottingOnInconsistency() throws Throwable
     {
         try (Cluster cluster = init(Cluster.create(2)))
         {
-            cluster.get(1).runOnInstance(() -> {
-                StorageProxy.instance.enableRepairedDataTrackingForPartitionReads();
-            });
+            cluster.get(1).runOnInstance(() -> StorageProxy.instance.enableRepairedDataTrackingForPartitionReads());
 
             cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v INT, PRIMARY KEY (k,c))");
             for (int i = 0; i < 10; i++)
@@ -191,9 +197,7 @@ public class RepairDigestTrackingTest extends TestBaseImpl
 
             // re-introduce a mismatch, enable snapshotting and try again
             cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)", 5, 555);
-            cluster.get(1).runOnInstance(() -> {
-                StorageProxy.instance.enableSnapshotOnRepairedDataMismatch();
-            });
+            cluster.get(1).runOnInstance(() -> StorageProxy.instance.enableSnapshotOnRepairedDataMismatch());
 
             cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=0", ConsistencyLevel.ALL);
             ccAfter = getConfirmedInconsistencies(cluster.get(1));
@@ -340,6 +344,107 @@ public class RepairDigestTrackingTest extends TestBaseImpl
         }
     }
 
+    /**
+     * In CASSANDRA-16721, we discovered that if responses from remote replicas came back while the local runnable was 
+     * still executing, the fact that {@link ReadCommand} was mutable meant that the trackRepairedStatus flag on the
+     * command instance could move from false to true in executeLocally(), between setting the 
+     * RepairedDataInfo/gathering the sstables and calling extend(). When this happened, the RDI was still the 
+     * stand-in object NO_OP_REPAIRED_DATA_INFO, which has a null repairedDataCounter, and we hit the NPE.
+     * 
+     * Similarly, the trackRepairedStatus flag could be set after the point at which the RDI is set on the local 
+     * read, assigned to the repairedDataInfo in {@link ReadCommand}, and improperly shared between initial local read
+     * and the local read triggered by read repair.
+     * 
+     * These problems are sidestepped completely by CASSANDRA-16721, as an RDI instance is now created and destroyed 
+     * entirely within the scope of single {@link LocalReadRunnable}, but this test still attempts to validate some
+     * assumptions about the cleanliness of the logs and the correctness of queries made when initial local reads and
+     * local reads triggered by read repair (after speculative reads) execute at roughly the same time.
+     */
+    @Test
+    public void testLocalDataAndRemoteRequestConcurrency() throws Exception
+    {
+        try (Cluster cluster = init(Cluster.build(3)
+                                           .withInstanceInitializer(BBHelper::install)
+                                           .withConfig(config -> config.set("repaired_data_tracking_for_partition_reads_enabled", true)
+                                                                       .with(GOSSIP)
+                                                                       .with(NETWORK))
+                                           .start()))
+        {
+            // A speculative read is the reason we have two remote replicas in play that can return results before
+            // the local replica does.
+            setupSchema(cluster, "create table " + KS_TABLE + " (id int primary key, t int) WITH speculative_retry = 'ALWAYS'");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (id, t) values (0, 0)");
+            cluster.get(2).executeInternal("INSERT INTO " + KS_TABLE + " (id, t) values (0, 0)");
+            cluster.get(3).executeInternal("INSERT INTO " + KS_TABLE + " (id, t) values (0, 1)");
+            cluster.forEach(c -> c.flush(KEYSPACE));
+            cluster.forEach(i -> i.runOnInstance(markAllRepaired()));
+            cluster.forEach(i -> i.runOnInstance(assertRepaired()));
+
+            long logPositionBeforeQuery = cluster.get(1).logs().mark();
+            Object[][] rows = cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE id=0", ConsistencyLevel.QUORUM);
+            assertEquals(1, rows.length);
+            
+            // Given we didn't write at QUORUM, both 0 and 1 are acceptable values.
+            assertTrue((int) rows[0][1] == 0 || (int) rows[0][1] == 1);
+
+            List<String> result = cluster.get(1).logs().grepForErrors(logPositionBeforeQuery).getResult();
+            assertEquals(Collections.emptyList(), result);
+            Assert.assertTrue("Encountered an error", result.isEmpty());
+        }
+    }
+
+    public static class BBHelper
+    {
+        private static final CyclicBarrier barrier = new CyclicBarrier(2);
+        
+        public static void install(ClassLoader classLoader, Integer num)
+        {
+            // Only install on the coordinating node, which is also a replica...
+            if (num == 1)
+            {
+                new ByteBuddy().rebase(SEPExecutor.class)
+                               .method(named("maybeExecuteImmediately"))
+                               .intercept(MethodDelegation.to(BBHelper.class))
+                               .make()
+                               .load(classLoader, ClassLoadingStrategy.Default.INJECTION);
+
+                new ByteBuddy().rebase(SinglePartitionReadCommand.class)
+                               .method(named("executeLocally"))
+                               .intercept(MethodDelegation.to(BBHelper.class))
+                               .make()
+                               .load(classLoader, ClassLoadingStrategy.Default.INJECTION);
+            }
+        }
+
+        @SuppressWarnings("unused")
+        public static void maybeExecuteImmediately(Runnable command)
+        {
+            // Force local read runnables (from initial read and read-repair) to execute in separate threads.
+            new Thread(command).start();
+        }
+
+        @SuppressWarnings({ "unused" })
+        public static UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController,
+                                                                 @SuperCall Callable<UnfilteredPartitionIterator> zuperCall)
+        {
+            try
+            {
+                if (executionController.metadata().name.equals(TABLE))
+                {
+                    // Force both the initial local read and the local read triggered by read-repair to proceed at
+                    // roughly the same time.
+                    barrier.await();
+                }
+                return zuperCall.call();
+            }
+            catch (Exception e)
+            {
+                throw Throwables.unchecked(e);
+            }
+        }
+    }
+
     private Object[][] rows(Object[][] head, Object[][]...tail)
     {
         return Stream.concat(Stream.of(head),
@@ -439,6 +544,7 @@ public class RepairDigestTrackingTest extends TestBaseImpl
         };
     }
 
+    @SuppressWarnings("UnstableApiUsage")
     private IInvokableInstance.SerializableRunnable assertSnapshotPresent(String snapshotName)
     {
         return () ->
@@ -474,4 +580,14 @@ public class RepairDigestTrackingTest extends TestBaseImpl
                                              .table
                                              .getCount());
     }
+
+    private void setupSchema(Cluster cluster, String cql)
+    {
+        cluster.schemaChange(cql);
+        // disable auto compaction to prevent nodes from trying to compact
+        // new sstables with ones we've modified to mark repaired
+        cluster.forEach(i -> i.runOnInstance(() -> Keyspace.open(KEYSPACE)
+                                                           .getColumnFamilyStore(TABLE)
+                                                           .disableAutoCompaction()));
+    }
 }
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 23736e9..e0d29b6 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -336,9 +336,16 @@ public class Util
 
     public static List<ImmutableBTreePartition> getAllUnfiltered(ReadCommand command)
     {
+        try (ReadExecutionController controller = command.executionController())
+        {
+            return getAllUnfiltered(command, controller);
+        }
+    }
+    
+    public static List<ImmutableBTreePartition> getAllUnfiltered(ReadCommand command, ReadExecutionController controller)
+    {
         List<ImmutableBTreePartition> results = new ArrayList<>();
-        try (ReadExecutionController executionController = command.executionController();
-             UnfilteredPartitionIterator iterator = command.executeLocally(executionController))
+        try (UnfilteredPartitionIterator iterator = command.executeLocally(controller))
         {
             while (iterator.hasNext())
             {
@@ -353,9 +360,16 @@ public class Util
 
     public static List<FilteredPartition> getAll(ReadCommand command)
     {
+        try (ReadExecutionController controller = command.executionController())
+        {
+            return getAll(command, controller);
+        }
+    }
+    
+    public static List<FilteredPartition> getAll(ReadCommand command, ReadExecutionController controller)
+    {
         List<FilteredPartition> results = new ArrayList<>();
-        try (ReadExecutionController executionController = command.executionController();
-             PartitionIterator iterator = command.executeInternal(executionController))
+        try (PartitionIterator iterator = command.executeInternal(controller))
         {
             while (iterator.hasNext())
             {
@@ -405,8 +419,15 @@ public class Util
 
     public static ImmutableBTreePartition getOnlyPartitionUnfiltered(ReadCommand cmd)
     {
-        try (ReadExecutionController executionController = cmd.executionController();
-             UnfilteredPartitionIterator iterator = cmd.executeLocally(executionController))
+        try (ReadExecutionController controller = cmd.executionController())
+        {
+            return getOnlyPartitionUnfiltered(cmd, controller);
+        }
+    }
+    
+    public static ImmutableBTreePartition getOnlyPartitionUnfiltered(ReadCommand cmd, ReadExecutionController controller)
+    {
+        try (UnfilteredPartitionIterator iterator = cmd.executeLocally(controller))
         {
             assert iterator.hasNext() : "Expecting a single partition but got nothing";
             try (UnfilteredRowIterator partition = iterator.next())
@@ -419,7 +440,12 @@ public class Util
 
     public static FilteredPartition getOnlyPartition(ReadCommand cmd)
     {
-        try (ReadExecutionController executionController = cmd.executionController();
+        return getOnlyPartition(cmd, false);
+    }
+    
+    public static FilteredPartition getOnlyPartition(ReadCommand cmd, boolean trackRepairedStatus)
+    {
+        try (ReadExecutionController executionController = cmd.executionController(trackRepairedStatus);
              PartitionIterator iterator = cmd.executeInternal(executionController))
         {
             assert iterator.hasNext() : "Expecting a single partition but got nothing";
diff --git a/test/unit/org/apache/cassandra/db/PartitionTest.java b/test/unit/org/apache/cassandra/db/PartitionTest.java
index 1700835..fd3e04d 100644
--- a/test/unit/org/apache/cassandra/db/PartitionTest.java
+++ b/test/unit/org/apache/cassandra/db/PartitionTest.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -31,7 +30,6 @@ import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -43,7 +41,6 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static junit.framework.Assert.assertTrue;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -106,11 +103,11 @@ public class PartitionTest
         CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(new DataInputBuffer(bufOut.getData()));
 
         assertEquals(partition.columns().regulars.size(), deserialized.columns().regulars.size());
-        assertTrue(deserialized.columns().regulars.getSimple(1).equals(partition.columns().regulars.getSimple(1)));
-        assertTrue(deserialized.columns().regulars.getSimple(5).equals(partition.columns().regulars.getSimple(5)));
+        assertEquals(deserialized.columns().regulars.getSimple(1), partition.columns().regulars.getSimple(1));
+        assertEquals(deserialized.columns().regulars.getSimple(5), partition.columns().regulars.getSimple(5));
 
         ColumnMetadata cDef = cfs.metadata().getColumn(ByteBufferUtil.bytes("val8"));
-        assertTrue(partition.lastRow().getCell(cDef).buffer().equals(deserialized.lastRow().getCell(cDef).buffer()));
+        assertEquals(partition.lastRow().getCell(cDef).buffer(), deserialized.lastRow().getCell(cDef).buffer());
         assert deserialized.partitionKey().equals(partition.partitionKey());
     }
 
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index dcd6331..8eec769 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -49,7 +49,6 @@ import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.db.rows.SerializationHelper;
 import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.DeserializationHelper;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -679,8 +678,7 @@ public class ReadCommandTest
         assertEquals(0, readCount(sstables.get(0)));
         assertEquals(0, readCount(sstables.get(1)));
         ReadCommand withTracking = readCommand.copy();
-        withTracking.trackRepairedStatus();
-        Util.getAll(withTracking);
+        Util.getAll(withTracking, withTracking.executionController(true));
         assertEquals(1, readCount(sstables.get(0)));
         assertEquals(1, readCount(sstables.get(1)));
 
@@ -793,17 +791,20 @@ public class ReadCommandTest
         for (DecoratedKey key : keys)
         {
             ReadCommand cmd = Util.cmd(cfs, key).withNowInSeconds(nowInSec).build();
-            cmd.trackRepairedStatus();
-            Partition partition = Util.getOnlyPartitionUnfiltered(cmd);
-            assertFalse(partition.isEmpty());
-            partition.unfilteredIterator().forEachRemaining(u -> {
-                // must be either a RT, or a row containing some kind of deletion
-                assertTrue(u.isRangeTombstoneMarker() || ((Row)u).hasDeletion(cmd.nowInSec()));
-            });
-            ByteBuffer digestWithTombstones = cmd.getRepairedDataDigest();
-            // None should generate an empty digest
-            assertDigestsDiffer(EMPTY_BYTE_BUFFER, digestWithTombstones);
-            digestsWithTombstones.put(key, digestWithTombstones);
+
+            try (ReadExecutionController controller = cmd.executionController(true))
+            {
+                Partition partition = Util.getOnlyPartitionUnfiltered(cmd, controller);
+                assertFalse(partition.isEmpty());
+                partition.unfilteredIterator().forEachRemaining(u -> {
+                    // must be either a RT, or a row containing some kind of deletion
+                    assertTrue(u.isRangeTombstoneMarker() || ((Row) u).hasDeletion(cmd.nowInSec()));
+                });
+                ByteBuffer digestWithTombstones = controller.getRepairedDataDigest();
+                // None should generate an empty digest
+                assertDigestsDiffer(EMPTY_BYTE_BUFFER, digestWithTombstones);
+                digestsWithTombstones.put(key, digestWithTombstones);
+            }
         }
 
         // Make tombstones eligible for purging and re-run cmd with an incremented nowInSec
@@ -813,27 +814,29 @@ public class ReadCommandTest
         for (DecoratedKey key : keys)
         {
             ReadCommand cmd = Util.cmd(cfs, key).withNowInSeconds(nowInSec + 60).build();
-            cmd.trackRepairedStatus();
-            Partition partition = Util.getOnlyPartitionUnfiltered(cmd);
-            assertFalse(partition.isEmpty());
-            partition.unfilteredIterator().forEachRemaining(u -> {
-                // After purging, only rows without any deletions should remain.
-                // The one exception is "key2:cc" which has a regular column tombstone which is not
-                // eligible for purging. This is to prevent the partition from being fully purged
-                // when its RT is removed.
-                assertTrue(u.isRow());
-                Row r = (Row)u;
-                assertTrue(!r.hasDeletion(cmd.nowInSec())
-                           || (key.equals(keys[2]) && r.clustering()
-                                                       .bufferAt(0)
-                                                       .equals(AsciiType.instance.fromString("cc"))));
-
-            });
-            ByteBuffer digestWithoutTombstones = cmd.getRepairedDataDigest();
-            // not an empty digest
-            assertDigestsDiffer(EMPTY_BYTE_BUFFER, digestWithoutTombstones);
-            // should not match the pre-purge digest
-            assertDigestsDiffer(digestsWithTombstones.get(key), digestWithoutTombstones);
+            
+            try (ReadExecutionController controller = cmd.executionController(true))
+            {
+                Partition partition = Util.getOnlyPartitionUnfiltered(cmd, controller);
+                assertFalse(partition.isEmpty());
+                partition.unfilteredIterator().forEachRemaining(u -> {
+                    // After purging, only rows without any deletions should remain.
+                    // The one exception is "key2:cc" which has a regular column tombstone which is not
+                    // eligible for purging. This is to prevent the partition from being fully purged
+                    // when its RT is removed.
+                    assertTrue(u.isRow());
+                    Row r = (Row) u;
+                    assertTrue(!r.hasDeletion(cmd.nowInSec())
+                               || (key.equals(keys[2]) && r.clustering()
+                                                           .bufferAt(0)
+                                                           .equals(AsciiType.instance.fromString("cc"))));
+                });
+                ByteBuffer digestWithoutTombstones = controller.getRepairedDataDigest();
+                // not an empty digest
+                assertDigestsDiffer(EMPTY_BYTE_BUFFER, digestWithoutTombstones);
+                // should not match the pre-purge digest
+                assertDigestsDiffer(digestsWithTombstones.get(key), digestWithoutTombstones);
+            }
         }
     }
 
@@ -864,8 +867,7 @@ public class ReadCommandTest
 
         // Overread up to (limit - 1) if tracking is enabled
         cmd = cmd.copy();
-        cmd.trackRepairedStatus();
-        readAndCheckRowCount(Collections.singletonList(Util.getOnlyPartition(cmd)), limit);
+        readAndCheckRowCount(Collections.singletonList(Util.getOnlyPartition(cmd, true)), limit);
         // overread count is always < limit as the first read is counted during merging (and so is expected)
         assertEquals(limit - 1, getAndResetOverreadCount(cfs));
 
@@ -885,8 +887,7 @@ public class ReadCommandTest
 
         // Overread up to (limit - 1) if tracking is enabled
         cmd = cmd.copy();
-        cmd.trackRepairedStatus();
-        readAndCheckRowCount(Util.getAll(cmd), limit);
+        readAndCheckRowCount(Util.getAll(cmd, cmd.executionController(true)), limit);
         assertEquals(limit - 1, getAndResetOverreadCount(cfs));
 
         // if limit already requires reading all repaired data, no overreads should be recorded
@@ -986,26 +987,30 @@ public class ReadCommandTest
         cfs.forceBlockingFlush();
         cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
 
-        command.trackRepairedStatus();
-        List<ImmutableBTreePartition> partitions = Util.getAllUnfiltered(command);
-        assertEquals(1, partitions.size());
-        ByteBuffer digestWithTombstones = command.getRepairedDataDigest();
-        assertTrue(ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithTombstones) != 0);
+        try (ReadExecutionController controller = command.executionController(true))
+        {
+            List<ImmutableBTreePartition> partitions = Util.getAllUnfiltered(command, controller);
+            assertEquals(1, partitions.size());
+            ByteBuffer digestWithTombstones = controller.getRepairedDataDigest();
+            assertTrue(ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithTombstones) != 0);
 
-        // Make tombstones eligible for purging and re-run cmd with an incremented nowInSec
-        setGCGrace(cfs, 0);
+            // Make tombstones eligible for purging and re-run cmd with an incremented nowInSec
+            setGCGrace(cfs, 0);
+        }
 
         AbstractReadCommandBuilder builder = command instanceof PartitionRangeReadCommand
                                              ? Util.cmd(cfs)
                                              : Util.cmd(cfs, Util.dk("key"));
         builder.withNowInSeconds(command.nowInSec() + 60);
         command = builder.build();
-        command.trackRepairedStatus();
 
-        partitions = Util.getAllUnfiltered(command);
-        assertTrue(partitions.isEmpty());
-        ByteBuffer digestWithoutTombstones = command.getRepairedDataDigest();
-        assertEquals(0, ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithoutTombstones));
+        try (ReadExecutionController controller = command.executionController(true))
+        {
+            List<ImmutableBTreePartition> partitions = Util.getAllUnfiltered(command, controller);
+            assertTrue(partitions.isEmpty());
+            ByteBuffer digestWithoutTombstones = controller.getRepairedDataDigest();
+            assertEquals(0, ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithoutTombstones));
+        }
     }
 
     /**
@@ -1036,23 +1041,29 @@ public class ReadCommandTest
         RowUpdateBuilder.deleteRow(cfs.metadata(), 0, ByteBufferUtil.bytes("key-1"), "cc").apply();
         cfs.forceBlockingFlush();
 
-        command.trackRepairedStatus();
-        List<ImmutableBTreePartition> partitions = Util.getAllUnfiltered(command);
-        assertEquals(1, partitions.size());
-        ByteBuffer digestWithoutPurgedPartition = command.getRepairedDataDigest();
-        assertTrue(ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithoutPurgedPartition) != 0);
-
+        ByteBuffer digestWithoutPurgedPartition = null;
+        
+        try (ReadExecutionController controller = command.executionController(true))
+        {
+            List<ImmutableBTreePartition> partitions = Util.getAllUnfiltered(command, controller);
+            assertEquals(1, partitions.size());
+            digestWithoutPurgedPartition = controller.getRepairedDataDigest();
+            assertTrue(ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithoutPurgedPartition) != 0);
+        }
+        
         // mark the sstable containing the purged partition as repaired, so both partitions are now
         // read during in the digest calculation. Because the purged partition is entirely
         // discarded, the resultant digest should match the earlier one.
         cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
         command = Util.cmd(cfs).withNowInSeconds(command.nowInSec()).build();
-        command.trackRepairedStatus();
 
-        partitions = Util.getAllUnfiltered(command);
-        assertEquals(1, partitions.size());
-        ByteBuffer digestWithPurgedPartition = command.getRepairedDataDigest();
-        assertEquals(0, ByteBufferUtil.compareUnsigned(digestWithPurgedPartition, digestWithoutPurgedPartition));
+        try (ReadExecutionController controller = command.executionController(true))
+        {
+            List<ImmutableBTreePartition> partitions = Util.getAllUnfiltered(command, controller);
+            assertEquals(1, partitions.size());
+            ByteBuffer digestWithPurgedPartition = controller.getRepairedDataDigest();
+            assertEquals(0, ByteBufferUtil.compareUnsigned(digestWithPurgedPartition, digestWithoutPurgedPartition));
+        }
     }
 
     @Test
@@ -1079,19 +1090,22 @@ public class ReadCommandTest
 
         int nowInSec = FBUtilities.nowInSeconds() + 10;
         ReadCommand cmd = Util.cmd(cfs, key).withNowInSeconds(nowInSec).build();
-        cmd.trackRepairedStatus();
-        Partition partition = Util.getOnlyPartitionUnfiltered(cmd);
-        assertFalse(partition.isEmpty());
-        // check that
-        try (UnfilteredRowIterator rows = partition.unfilteredIterator())
+
+        try (ReadExecutionController controller = cmd.executionController(true))
         {
-            assertFalse(rows.isEmpty());
-            Unfiltered unfiltered = rows.next();
-            assertFalse(rows.hasNext());
-            assertTrue(unfiltered.isRow());
-            assertFalse(((Row) unfiltered).hasDeletion(nowInSec));
+            Partition partition = Util.getOnlyPartitionUnfiltered(cmd, controller);
+            assertFalse(partition.isEmpty());
+            // check that
+            try (UnfilteredRowIterator rows = partition.unfilteredIterator())
+            {
+                assertFalse(rows.isEmpty());
+                Unfiltered unfiltered = rows.next();
+                assertFalse(rows.hasNext());
+                assertTrue(unfiltered.isRow());
+                assertFalse(((Row) unfiltered).hasDeletion(nowInSec));
+            }
+            assertEquals(EMPTY_BYTE_BUFFER, controller.getRepairedDataDigest());
         }
-        assertEquals(EMPTY_BYTE_BUFFER, cmd.getRepairedDataDigest());
     }
 
     private long readCount(SSTableReader sstable)
@@ -1126,9 +1140,11 @@ public class ReadCommandTest
         cacheHits = cfs.metric.rowCacheHit.getCount();
 
         ReadCommand withRepairedInfo = readCommand.copy();
-        withRepairedInfo.trackRepairedStatus();
-        Util.getAll(withRepairedInfo);
-        assertEquals(cacheHits, cfs.metric.rowCacheHit.getCount());
+        try (ReadExecutionController controller = withRepairedInfo.executionController(true))
+        {
+            Util.getAll(withRepairedInfo, controller);
+            assertEquals(cacheHits, cfs.metric.rowCacheHit.getCount());
+        }
     }
 
     @Test (expected = IllegalArgumentException.class)
@@ -1302,16 +1318,18 @@ public class ReadCommandTest
         for (int i = 0; i < 10; i++)
         {
             ReadCommand withRepairedInfo = command.copy();
-            withRepairedInfo.trackRepairedStatus();
-
-            List<FilteredPartition> partitions = Util.getAll(withRepairedInfo);
-            assertEquals(expectedPartitions, partitions.size());
-            partitions.forEach(p -> assertEquals(expectedRowsPerPartition, p.rowCount()));
 
-            ByteBuffer digest = withRepairedInfo.getRepairedDataDigest();
-            digests.add(digest);
-            assertEquals(1, digests.size());
-            assertEquals(expectConclusive, withRepairedInfo.isRepairedDataDigestConclusive());
+            try (ReadExecutionController controller = withRepairedInfo.executionController(true))
+            {
+                List<FilteredPartition> partitions = Util.getAll(withRepairedInfo, controller);
+                assertEquals(expectedPartitions, partitions.size());
+                partitions.forEach(p -> assertEquals(expectedRowsPerPartition, p.rowCount()));
+
+                ByteBuffer digest = controller.getRepairedDataDigest();
+                digests.add(digest);
+                assertEquals(1, digests.size());
+                assertEquals(expectConclusive, controller.isRepairedDataDigestConclusive());
+            }
         }
         return digests.iterator().next();
     }
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
index 8682273..44c065a 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.TokenMetadata;
@@ -91,46 +92,45 @@ public class ReadCommandVerbHandlerTest
     @Test
     public void setRepairedDataTrackingFlagIfHeaderPresent()
     {
-        ReadCommand command = command(metadata);
-        assertFalse(command.isTrackingRepairedStatus());
-
-        handler.doVerb(Message.builder(READ_REQ, command)
+        TrackingSinglePartitionReadCommand command = new TrackingSinglePartitionReadCommand(metadata);
+        assertFalse(command.isTrackingRepairedData());
+        handler.doVerb(Message.builder(READ_REQ, (ReadCommand) command)
                               .from(peer())
                               .withFlag(MessageFlag.TRACK_REPAIRED_DATA)
                               .withId(messageId())
                               .build());
-        assertTrue(command.isTrackingRepairedStatus());
+        assertTrue(command.isTrackingRepairedData());
     }
 
     @Test
     public void dontSetRepairedDataTrackingFlagUnlessHeaderPresent()
     {
-        ReadCommand command = command(metadata);
-        assertFalse(command.isTrackingRepairedStatus());
-        handler.doVerb(Message.builder(READ_REQ, command)
+        TrackingSinglePartitionReadCommand command = new TrackingSinglePartitionReadCommand(metadata);
+        assertFalse(command.isTrackingRepairedData());
+        handler.doVerb(Message.builder(READ_REQ, (ReadCommand) command)
                               .from(peer())
                               .withId(messageId())
                               .withParam(ParamType.TRACE_SESSION, UUID.randomUUID())
                               .build());
-        assertFalse(command.isTrackingRepairedStatus());
+        assertFalse(command.isTrackingRepairedData());
     }
 
     @Test
     public void dontSetRepairedDataTrackingFlagIfHeadersEmpty()
     {
-        ReadCommand command = command(metadata);
-        assertFalse(command.isTrackingRepairedStatus());
-        handler.doVerb(Message.builder(READ_REQ, command)
+        TrackingSinglePartitionReadCommand command = new TrackingSinglePartitionReadCommand(metadata);
+        assertFalse(command.isTrackingRepairedData());
+        handler.doVerb(Message.builder(READ_REQ, (ReadCommand) command)
                               .withId(messageId())
                               .from(peer())
                               .build());
-        assertFalse(command.isTrackingRepairedStatus());
+        assertFalse(command.isTrackingRepairedData());
     }
 
     @Test (expected = InvalidRequestException.class)
     public void rejectsRequestWithNonMatchingTransientness()
     {
-        ReadCommand command = command(metadata_with_transient);
+        ReadCommand command = new TrackingSinglePartitionReadCommand(metadata_with_transient);
         handler.doVerb(Message.builder(READ_REQ, command)
                               .from(peer())
                               .withId(messageId())
@@ -154,19 +154,36 @@ public class ReadCommandVerbHandlerTest
         }
     }
 
-    private static SinglePartitionReadCommand command(TableMetadata metadata)
+    private static class TrackingSinglePartitionReadCommand extends SinglePartitionReadCommand
     {
-        return new SinglePartitionReadCommand(false,
-                                              0,
-                                              false,
-                                              metadata,
-                                              FBUtilities.nowInSeconds(),
-                                              ColumnFilter.all(metadata),
-                                              RowFilter.NONE,
-                                              DataLimits.NONE,
-                                              KEY,
-                                              new ClusteringIndexSliceFilter(Slices.ALL, false),
-                                              null);
+        private boolean trackingRepairedData = false;
+        
+        TrackingSinglePartitionReadCommand(TableMetadata metadata)
+        {
+            super(false,
+                  0,
+                  false,
+                  metadata,
+                  FBUtilities.nowInSeconds(),
+                  ColumnFilter.all(metadata),
+                  RowFilter.NONE,
+                  DataLimits.NONE,
+                  KEY,
+                  new ClusteringIndexSliceFilter(Slices.ALL, false),
+                  null);
+        }
+
+        @Override
+        public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
+        {
+            trackingRepairedData = executionController.isTrackingRepairedStatus();
+            return super.executeLocally(executionController);
+        }
+
+        public boolean isTrackingRepairedData()
+        {
+            return trackingRepairedData;
+        }
     }
 
     private static DecoratedKey key(TableMetadata metadata, int key)
diff --git a/test/unit/org/apache/cassandra/db/ReadResponseTest.java b/test/unit/org/apache/cassandra/db/ReadResponseTest.java
index 6e1a804..29fa784 100644
--- a/test/unit/org/apache/cassandra/db/ReadResponseTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadResponseTest.java
@@ -45,7 +45,6 @@ import static org.junit.Assert.fail;
 
 public class ReadResponseTest
 {
-
     private final Random random = new Random();
     private TableMetadata metadata;
 
@@ -63,8 +62,9 @@ public class ReadResponseTest
     public void fromCommandWithConclusiveRepairedDigest()
     {
         ByteBuffer digest = digest();
-        ReadCommand command = command(key(), metadata, digest, true);
-        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        ReadCommand command = command(key(), metadata);
+        StubRepairedDataInfo rdi = new StubRepairedDataInfo(digest, true);
+        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi);
         assertTrue(response.isRepairedDigestConclusive());
         assertEquals(digest, response.repairedDataDigest());
         verifySerDe(response);
@@ -74,8 +74,9 @@ public class ReadResponseTest
     public void fromCommandWithInconclusiveRepairedDigest()
     {
         ByteBuffer digest = digest();
-        ReadCommand command = command(key(), metadata, digest, false);
-        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        ReadCommand command = command(key(), metadata);
+        StubRepairedDataInfo rdi = new StubRepairedDataInfo(digest, false);
+        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi);
         assertFalse(response.isRepairedDigestConclusive());
         assertEquals(digest, response.repairedDataDigest());
         verifySerDe(response);
@@ -84,8 +85,9 @@ public class ReadResponseTest
     @Test
     public void fromCommandWithConclusiveEmptyRepairedDigest()
     {
-        ReadCommand command = command(key(), metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
-        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        ReadCommand command = command(key(), metadata);
+        StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi);
         assertTrue(response.isRepairedDigestConclusive());
         assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, response.repairedDataDigest());
         verifySerDe(response);
@@ -94,8 +96,9 @@ public class ReadResponseTest
     @Test
     public void fromCommandWithInconclusiveEmptyRepairedDigest()
     {
-        ReadCommand command = command(key(), metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
-        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        ReadCommand command = command(key(), metadata);
+        StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
+        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi);
         assertFalse(response.isRepairedDigestConclusive());
         assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, response.repairedDataDigest());
         verifySerDe(response);
@@ -109,7 +112,8 @@ public class ReadResponseTest
     public void digestResponseErrorsIfRepairedDataDigestRequested()
     {
         ReadCommand command = digestCommand(key(), metadata);
-        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi);
         assertTrue(response.isDigestResponse());
         assertFalse(response.mayIncludeRepairedDigest());
         response.repairedDataDigest();
@@ -119,7 +123,8 @@ public class ReadResponseTest
     public void digestResponseErrorsIfIsConclusiveRequested()
     {
         ReadCommand command = digestCommand(key(), metadata);
-        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi);
         assertTrue(response.isDigestResponse());
         assertFalse(response.mayIncludeRepairedDigest());
         response.isRepairedDigestConclusive();
@@ -129,7 +134,8 @@ public class ReadResponseTest
     public void digestResponseErrorsIfIteratorRequested()
     {
         ReadCommand command = digestCommand(key(), metadata);
-        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi);
         assertTrue(response.isDigestResponse());
         assertFalse(response.mayIncludeRepairedDigest());
         response.makeIterator(command);
@@ -143,12 +149,14 @@ public class ReadResponseTest
         // requests, only following a digest mismatch. Having a test doesn't hurt though
         int key = key();
         ByteBuffer digest1 = digest();
-        ReadCommand command1 = command(key, metadata, digest1, true);
-        ReadResponse response1 = command1.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        ReadCommand command1 = command(key, metadata);
+        StubRepairedDataInfo rdi1 = new StubRepairedDataInfo(digest1, true);
+        ReadResponse response1 = command1.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi1);
 
         ByteBuffer digest2 = digest();
-        ReadCommand command2 = command(key, metadata, digest2, false);
-        ReadResponse response2 = command1.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        ReadCommand command2 = command(key, metadata);
+        StubRepairedDataInfo rdi2 = new StubRepairedDataInfo(digest2, false);
+        ReadResponse response2 = command1.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi2);
 
         assertEquals(response1.digest(command1), response2.digest(command2));
     }
@@ -207,24 +215,42 @@ public class ReadResponseTest
 
     private ReadCommand digestCommand(int key, TableMetadata metadata)
     {
-        return new StubReadCommand(key, metadata, true, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+        return new StubReadCommand(key, metadata, true);
     }
 
-    private ReadCommand command(int key, TableMetadata metadata, ByteBuffer repairedDigest, boolean conclusive)
+    private ReadCommand command(int key, TableMetadata metadata)
     {
-        return new StubReadCommand(key, metadata, false, repairedDigest, conclusive);
+        return new StubReadCommand(key, metadata, false);
     }
 
-    private static class StubReadCommand extends SinglePartitionReadCommand
+    private static class StubRepairedDataInfo extends RepairedDataInfo
     {
-
         private final ByteBuffer repairedDigest;
         private final boolean conclusive;
 
-        StubReadCommand(int key, TableMetadata metadata,
-                        boolean isDigest,
-                        final ByteBuffer repairedDigest,
-                        final boolean conclusive)
+        public StubRepairedDataInfo(ByteBuffer repairedDigest, boolean conclusive)
+        {
+            super(null);
+            this.repairedDigest = repairedDigest;
+            this.conclusive = conclusive;
+        }
+        
+        @Override
+        public ByteBuffer getDigest()
+        {
+            return repairedDigest;
+        }
+        
+        @Override
+        public boolean isConclusive()
+        {
+            return conclusive;
+        }
+    }
+
+    private static class StubReadCommand extends SinglePartitionReadCommand
+    {
+        StubReadCommand(int key, TableMetadata metadata, boolean isDigest)
         {
             super(isDigest,
                   0,
@@ -237,20 +263,13 @@ public class ReadResponseTest
                   metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
                   null,
                   null);
-            this.repairedDigest = repairedDigest;
-            this.conclusive = conclusive;
+           
         }
 
         @Override
-        public ByteBuffer getRepairedDataDigest()
+        public boolean selectsFullPartition()
         {
-            return repairedDigest;
-        }
-
-        @Override
-        public boolean isRepairedDataDigestConclusive()
-        {
-            return conclusive;
+            return true;
         }
 
         public UnfilteredPartitionIterator executeLocally(ReadExecutionController controller)
diff --git a/test/unit/org/apache/cassandra/db/SSTableAndMemTableDigestMatchTest.java b/test/unit/org/apache/cassandra/db/SSTableAndMemTableDigestMatchTest.java
index 1b31abf..6f3158a 100644
--- a/test/unit/org/apache/cassandra/db/SSTableAndMemTableDigestMatchTest.java
+++ b/test/unit/org/apache/cassandra/db/SSTableAndMemTableDigestMatchTest.java
@@ -192,9 +192,9 @@ public class SSTableAndMemTableDigestMatchTest extends CQLTester
                                                  new ClusteringIndexNamesFilter(clusteringSet, false)).copyAsDigestQuery();
         cmd.setDigestVersion(MessagingService.current_version);
         ReadResponse resp;
-        try (ReadExecutionController ctrl = ReadExecutionController.forCommand(cmd); UnfilteredRowIterator iterator = cmd.queryMemtableAndDisk(cfs, ctrl))
+        try (ReadExecutionController ctrl = ReadExecutionController.forCommand(cmd, false); UnfilteredRowIterator iterator = cmd.queryMemtableAndDisk(cfs, ctrl))
         {
-            resp = ReadResponse.createDataResponse(new SingletonUnfilteredPartitionIterator(iterator), cmd);
+            resp = ReadResponse.createDataResponse(new SingletonUnfilteredPartitionIterator(iterator), cmd, ctrl.getRepairedDataInfo());
             logger.info("Response is: {}", resp.toDebugString(cmd, key));
             ByteBuffer digest = resp.digest(cmd);
             return ByteBufferUtil.bytesToHex(digest);
diff --git a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
index 9572f28..692b0c5 100644
--- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
@@ -266,7 +266,7 @@ public class SinglePartitionSliceCommandTest
         // check (de)serialized iterator for memtable static cell
         try (ReadExecutionController executionController = cmd.executionController(); UnfilteredPartitionIterator pi = cmd.executeLocally(executionController))
         {
-            response = ReadResponse.createDataResponse(pi, cmd);
+            response = ReadResponse.createDataResponse(pi, cmd, executionController.getRepairedDataInfo());
         }
 
         out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30));
@@ -282,7 +282,7 @@ public class SinglePartitionSliceCommandTest
         Schema.instance.getColumnFamilyStoreInstance(metadata.id).forceBlockingFlush();
         try (ReadExecutionController executionController = cmd.executionController(); UnfilteredPartitionIterator pi = cmd.executeLocally(executionController))
         {
-            response = ReadResponse.createDataResponse(pi, cmd);
+            response = ReadResponse.createDataResponse(pi, cmd, executionController.getRepairedDataInfo());
         }
         out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30));
         ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30);
@@ -491,7 +491,7 @@ public class SinglePartitionSliceCommandTest
         SinglePartitionReadQuery.Group<SinglePartitionReadCommand> query = (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) stmt.getQuery(QueryOptions.DEFAULT, 0);
         Assert.assertEquals(1, query.queries.size());
         SinglePartitionReadCommand command = Iterables.getOnlyElement(query.queries);
-        try (ReadExecutionController controller = ReadExecutionController.forCommand(command);
+        try (ReadExecutionController controller = ReadExecutionController.forCommand(command, false);
              UnfilteredPartitionIterator partitions = command.executeLocally(controller))
         {
             assert partitions.hasNext();
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 3bdb158..a78c7d4 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.db.compaction;
 
 import java.io.File;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -42,7 +41,6 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.RangeTombstone;
 import org.apache.cassandra.db.ReadExecutionController;
 import org.apache.cassandra.db.RowUpdateBuilder;
@@ -54,7 +52,6 @@ import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.marshal.ValueAccessors;
-import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.partitions.FilteredPartition;
 import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
 import org.apache.cassandra.db.partitions.PartitionIterator;
@@ -63,8 +60,6 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
index faae913..900a40f 100644
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
@@ -107,7 +107,6 @@ public class DataResolverTest extends AbstractReadResponseTest
         }
 
         command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
-        command.trackRepairedStatus();
         readRepair = new TestableReadRepair(command);
         Token token = Murmur3Partitioner.instance.getMinimumToken();
         EndpointsForRange.Builder replicas = EndpointsForRange.builder(ReplicaUtils.FULL_RANGE, num);
@@ -1255,7 +1254,7 @@ public class DataResolverTest extends AbstractReadResponseTest
 
             public TestableDataResolver(ReadCommand command, ReplicaPlan.SharedForRangeRead plan, ReadRepair readRepair, long queryStartNanoTime)
             {
-                super(command, plan, readRepair, queryStartNanoTime);
+                super(command, plan, readRepair, queryStartNanoTime, true);
             }
 
             protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand command)
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
index badcd35..d36808f 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
@@ -191,7 +191,7 @@ public abstract  class AbstractReadRepairTest
     static Message<ReadResponse> msg(InetAddressAndPort from, Cell<?>... cells)
     {
         UnfilteredPartitionIterator iter = new SingletonUnfilteredPartitionIterator(update(cells).unfilteredIterator());
-        return Message.builder(INTERNAL_RSP, ReadResponse.createDataResponse(iter, command))
+        return Message.builder(INTERNAL_RSP, ReadResponse.createDataResponse(iter, command, command.executionController().getRepairedDataInfo()))
                       .from(from)
                       .build();
     }
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
index 8562db7..43a1275 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
@@ -90,7 +90,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         ReadCallback readCallback = null;
 
         @Override
-        void sendReadCommand(Replica to, ReadCallback callback, boolean speculative)
+        void sendReadCommand(Replica to, ReadCallback callback, boolean speculative, boolean trackRepairedStatus)
         {
             assert readCallback == null || readCallback == callback;
             readCommandRecipients.add(to.endpoint());
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
index 7806a3f..ae83efb 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
@@ -142,7 +142,8 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
             Assert.assertNotNull(e.toMap());
         }
 
-        void sendReadCommand(Replica to, ReadCallback callback, boolean speculative)
+        @Override
+        void sendReadCommand(Replica to, ReadCallback callback, boolean speculative, boolean trackRepairedStatus)
         {
             assert readCallback == null || readCallback == callback;
             readCallback = callback;
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
index c0af493..5ea790b 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
@@ -49,7 +49,7 @@ public class ReadOnlyReadRepairTest extends AbstractReadRepairTest
         ReadCallback readCallback = null;
 
         @Override
-        void sendReadCommand(Replica to, ReadCallback callback, boolean speculative)
+        void sendReadCommand(Replica to, ReadCallback callback, boolean speculative, boolean trackRepairedStatus)
         {
             assert readCallback == null || readCallback == callback;
             readCommandRecipients.add(to.endpoint());

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message