Github user belliottsmith commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/269#discussion_r218383055
--- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
@@ -337,165 +364,167 @@ else if (useStrictConsistency)
boolean useStrictConsistency,
TokenMetadata tmdBefore,
TokenMetadata tmdAfter,
- Predicate<Replica> isAlive,
String keyspace,
- Collection<Predicate<Replica>>
sourceFilters)
- {
- EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
-
- InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
- logger.debug ("Keyspace: {}", keyspace);
- logger.debug("To fetch RN: {}", fetchRanges);
- logger.debug("Fetch ranges: {}", rangeAddresses);
-
- Predicate<Replica> testSourceFilters = and(sourceFilters);
- Function<EndpointsForRange, EndpointsForRange> sorted =
- endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
-
- //This list of replicas is just candidates. With strict consistency it's going
to be a narrow list.
- EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
- for (Replica toFetch : fetchRanges)
- {
- //Replica that is sufficient to provide the data we need
- //With strict consistency and transient replication we may end up with multiple
types
- //so this isn't used with strict consistency
- Predicate<Replica> isSufficient = r -> (toFetch.isTransient() ||
r.isFull());
- Predicate<Replica> accept = r ->
- isSufficient.test(r) // is sufficient
- && !r.endpoint().equals(localAddress) // is not self
- && isAlive.test(r); // is alive
-
- logger.debug("To fetch {}", toFetch);
- for (Range<Token> range : rangeAddresses.keySet())
- {
- if (range.contains(toFetch.range()))
- {
- EndpointsForRange oldEndpoints = rangeAddresses.get(range);
-
- //Ultimately we populate this with whatever is going to be fetched
from to satisfy toFetch
- //It could be multiple endpoints and we must fetch from all of them
if they are there
- //With transient replication and strict consistency this is to get
the full data from a full replica and
- //transient data from the transient replica losing data
- EndpointsForRange sources;
- if (useStrictConsistency)
- {
- //Start with two sets of who replicates the range before and
who replicates it after
- EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right,
tmdAfter);
- logger.debug("Old endpoints {}", oldEndpoints);
- logger.debug("New endpoints {}", newEndpoints);
-
- //Due to CASSANDRA-5953 we can have a higher RF then we have
endpoints.
- //So we need to be careful to only be strict when endpoints ==
RF
- if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
- {
- Set<InetAddressAndPort> endpointsStillReplicated =
newEndpoints.endpoints();
- // Remove new endpoints from old endpoints based on address
- oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
-
- if (!all(oldEndpoints, isAlive))
- throw new IllegalStateException("A node required to move
the data consistently is down: "
- + oldEndpoints.filter(not(isAlive)));
-
- if (oldEndpoints.size() > 1)
- throw new AssertionError("Expected <= 1 endpoint but
found " + oldEndpoints);
-
- //If we are transitioning from transient to full and and
the set of replicas for the range is not changing
- //we might end up with no endpoints to fetch from by address.
In that case we can pick any full replica safely
- //since we are already a transient replica and the existing
replica remains.
- //The old behavior where we might be asked to fetch ranges
we don't need shouldn't occur anymore.
- //So it's an error if we don't find what we need.
- if (oldEndpoints.isEmpty() && toFetch.isTransient())
- {
- throw new AssertionError("If there are no endpoints to
fetch from then we must be transitioning from transient to full for range " + toFetch);
- }
-
- if (!any(oldEndpoints, isSufficient))
- {
- // need an additional replica
- EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range));
- // include all our filters, to ensure we include a matching
node
- Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange,
and(accept, testSourceFilters)).toJavaUtil();
- if (fullReplica.isPresent())
- oldEndpoints = Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get()));
- else
- throw new IllegalStateException("Couldn't find any
matching sufficient replica out of " + endpointsForRange);
- }
-
- //We have to check the source filters here to see if they
will remove any replicas
- //required for strict consistency
- if (!all(oldEndpoints, testSourceFilters))
- throw new IllegalStateException("Necessary replicas for
strict consistency were removed by source filters: " + oldEndpoints.filter(not(testSourceFilters)));
- }
- else
- {
- oldEndpoints = sorted.apply(oldEndpoints.filter(accept));
- }
-
- //Apply testSourceFilters that were given to us, and establish
everything remaining is alive for the strict case
- sources = oldEndpoints.filter(testSourceFilters);
- }
- else
- {
- //Without strict consistency we have given up on correctness
so no point in fetching from
- //a random full + transient replica since it's also likely to
lose data
- //Also apply testSourceFilters that were given to us so we can
safely select a single source
- sources = sorted.apply(rangeAddresses.get(range).filter(and(accept,
testSourceFilters)));
- //Limit it to just the first possible source, we don't need more
than one and downstream
- //will fetch from every source we supply
- sources = sources.size() > 0 ? sources.subList(0, 1) : sources;
- }
-
- // storing range and preferred endpoint set
- rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, Conflict.NONE);
- logger.debug("Endpoints to fetch for {} are {}", toFetch, sources);
- }
- }
-
- EndpointsForRange addressList = rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch);
- if (addressList == null)
- throw new IllegalStateException("Failed to find endpoints to fetch "
+ toFetch);
-
- /*
- * When we move forwards (shrink our bucket) we are the one losing a range
and no one else loses
- * from that action (we also don't gain). When we move backwards there are
two people losing a range. One is a full replica
- * and the other is a transient replica. So we must need fetch from two places
in that case for the full range we gain.
- * For a transient range we only need to fetch from one.
- */
- if (useStrictConsistency && addressList.size() > 1 &&
(addressList.filter(Replica::isFull).size() > 1 || addressList.filter(Replica::isTransient).size()
> 1))
- throw new IllegalStateException(String.format("Multiple strict sources
found for %s, sources: %s", toFetch, addressList));
-
- //We must have enough stuff to fetch from
- if ((toFetch.isFull() && !any(addressList, Replica::isFull)) || addressList.isEmpty())
- {
- if (strat.getReplicationFactor().allReplicas == 1)
- {
- if (useStrictConsistency)
- {
- logger.warn("A node required to move the data consistently is
down");
- throw new IllegalStateException("Unable to find sufficient sources
for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. " +
- "Ensure this keyspace contains
replicas in the source datacenter.");
- }
- else
- logger.warn("Unable to find sufficient sources for streaming
range {} in keyspace {} with RF=1. " +
- "Keyspace might be missing data.", toFetch, keyspace);
-
- }
- else
- {
- if (useStrictConsistency)
- logger.warn("A node required to move the data consistently is
down");
- throw new IllegalStateException("Unable to find sufficient sources
for streaming range " + toFetch + " in keyspace " + keyspace);
- }
- }
- }
- return rangesToFetchWithPreferredEndpoints.asImmutableView();
- }
+ Collection<SourceFilter> sourceFilters)
+ {
+ EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
+
+ InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
+ logger.debug ("Keyspace: {}", keyspace);
+ logger.debug("To fetch RN: {}", fetchRanges);
+ logger.debug("Fetch ranges: {}", rangeAddresses);
+
+ Predicate<Replica> testSourceFilters = and(sourceFilters);
+ Function<EndpointsForRange, EndpointsForRange> sorted =
+ endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
+
+ //This list of replicas is just candidates. With strict consistency it's going
to be a narrow list.
+ EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
+ for (Replica toFetch : fetchRanges)
+ {
+ //Replica that is sufficient to provide the data we need
+ //With strict consistency and transient replication we may end up with multiple
types
+ //so this isn't used with strict consistency
+ Predicate<Replica> isSufficient = r -> (toFetch.isTransient() ||
r.isFull());
+
+ logger.debug("To fetch {}", toFetch);
+ for (Range<Token> range : rangeAddresses.keySet())
+ {
+ if (!range.contains(toFetch.range()))
+ continue;
+
+ EndpointsForRange oldEndpoints = rangeAddresses.get(range);
+
+ //Ultimately we populate this with whatever is going to be fetched from
to satisfy toFetch
+ //It could be multiple endpoints and we must fetch from all of them
if they are there
+ //With transient replication and strict consistency this is to get the
full data from a full replica and
+ //transient data from the transient replica losing data
+ EndpointsForRange sources;
+ if (useStrictConsistency)
+ {
+ //Start with two sets of who replicates the range before and who
replicates it after
+ EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right,
tmdAfter);
+ logger.debug("Old endpoints {}", oldEndpoints);
+ logger.debug("New endpoints {}", newEndpoints);
+
+ //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
+ //So we need to be careful to only be strict when endpoints == RF
+ if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
+ {
+ Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
+ // Remove new endpoints from old endpoints based on address
+ oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
+
+ if (oldEndpoints.size() > 1)
+ throw new AssertionError("Expected <= 1 endpoint but
found " + oldEndpoints);
+
+ //If we are transitioning from transient to full and and the
set of replicas for the range is not changing
+ //we might end up with no endpoints to fetch from by address.
In that case we can pick any full replica safely
+ //since we are already a transient replica and the existing
replica remains.
+ //The old behavior where we might be asked to fetch ranges we
don't need shouldn't occur anymore.
+ //So it's an error if we don't find what we need.
+ if (oldEndpoints.isEmpty() && toFetch.isTransient())
+ {
+ throw new AssertionError("If there are no endpoints to fetch
from then we must be transitioning from transient to full for range " + toFetch);
+ }
+
+ if (!any(oldEndpoints, isSufficient))
+ {
+ // need an additional replica
+ EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range));
+ // include all our filters, to ensure we include a matching
node
+ Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange,
and(isSufficient, testSourceFilters)).toJavaUtil();
+ if (fullReplica.isPresent())
+ oldEndpoints = Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get()));
+ else
+ throw new IllegalStateException("Couldn't find any matching
sufficient replica out of " + endpointsForRange);
+ }
+
+ //We have to check the source filters here to see if they will
remove any replicas
+ //required for strict consistency
+ if (!all(oldEndpoints, testSourceFilters))
+ {
+ StringBuilder failureMessage = new StringBuilder();
+ for (Replica r : oldEndpoints)
+ {
+ for (SourceFilter filter : sourceFilters)
+ {
+ if (!filter.apply(r))
+ {
+ failureMessage.append(filter.message(r));
+ break;
+ }
+ }
+ }
+ throw new IllegalStateException("Necessary replicas for
strict consistency were removed by source filters: " + failureMessage);
+ }
+ }
+ else
+ {
+ oldEndpoints = sorted.apply(oldEndpoints.filter(and(isSufficient,
testSourceFilters)));
--- End diff --
We are now filtering testSourceFilters twice, once here, and once on L468
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
|