cassandra-pr mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ifesdjeen <...@git.apache.org>
Subject [GitHub] cassandra pull request #269: Review tr range movements
Date Fri, 21 Sep 2018 13:02:20 GMT
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/269#discussion_r219488682
  
    --- Diff: src/java/org/apache/cassandra/dht/RangeStreamer.java ---
    @@ -338,165 +386,152 @@ else if (useStrictConsistency)
                                                       boolean useStrictConsistency,
                                                       TokenMetadata tmdBefore,
                                                       TokenMetadata tmdAfter,
    -                                                  Predicate<Replica> isAlive,
                                                       String keyspace,
    -                                                  Collection<Predicate<Replica>>
sourceFilters)
    -    {
    -        EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
    -
    -        InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
    -        logger.debug ("Keyspace: {}", keyspace);
    -        logger.debug("To fetch RN: {}", fetchRanges);
    -        logger.debug("Fetch ranges: {}", rangeAddresses);
    -
    -        Predicate<Replica> testSourceFilters = and(sourceFilters);
    -        Function<EndpointsForRange, EndpointsForRange> sorted =
    -                endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
    -
    -        //This list of replicas is just candidates. With strict consistency it's going
to be a narrow list.
    -        EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
    -        for (Replica toFetch : fetchRanges)
    -        {
    -            //Replica that is sufficient to provide the data we need
    -            //With strict consistency and transient replication we may end up with multiple
types
    -            //so this isn't used with strict consistency
    -            Predicate<Replica> isSufficient = r -> (toFetch.isTransient() ||
r.isFull());
    -            Predicate<Replica> accept = r ->
    -                       isSufficient.test(r)                 // is sufficient
    -                    && !r.endpoint().equals(localAddress)   // is not self
    -                    && isAlive.test(r);                     // is alive
    -
    -            logger.debug("To fetch {}", toFetch);
    -            for (Range<Token> range : rangeAddresses.keySet())
    -            {
    -                if (range.contains(toFetch.range()))
    -                {
    -                    EndpointsForRange oldEndpoints = rangeAddresses.get(range);
    -
    -                    //Ultimately we populate this with whatever is going to be fetched
from to satisfy toFetch
    -                    //It could be multiple endpoints and we must fetch from all of them
if they are there
    -                    //With transient replication and strict consistency this is to get
the full data from a full replica and
    -                    //transient data from the transient replica losing data
    -                    EndpointsForRange sources;
    -                    if (useStrictConsistency)
    -                    {
    -                        //Start with two sets of who replicates the range before and
who replicates it after
    -                        EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right,
tmdAfter);
    -                        logger.debug("Old endpoints {}", oldEndpoints);
    -                        logger.debug("New endpoints {}", newEndpoints);
    -
    -                        //Due to CASSANDRA-5953 we can have a higher RF then we have
endpoints.
    -                        //So we need to be careful to only be strict when endpoints ==
RF
    -                        if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
    -                        {
    -                            Set<InetAddressAndPort> endpointsStillReplicated =
newEndpoints.endpoints();
    -                            // Remove new endpoints from old endpoints based on address
    -                            oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
    -
    -                            if (!all(oldEndpoints, isAlive))
    -                                throw new IllegalStateException("A node required to move
the data consistently is down: "
    -                                                                + oldEndpoints.filter(not(isAlive)));
    -
    -                            if (oldEndpoints.size() > 1)
    -                                throw new AssertionError("Expected <= 1 endpoint but
found " + oldEndpoints);
    -
    -                            //If we are transitioning from transient to full and and
the set of replicas for the range is not changing
    -                            //we might end up with no endpoints to fetch from by address.
In that case we can pick any full replica safely
    -                            //since we are already a transient replica and the existing
replica remains.
    -                            //The old behavior where we might be asked to fetch ranges
we don't need shouldn't occur anymore.
    -                            //So it's an error if we don't find what we need.
    -                            if (oldEndpoints.isEmpty() && toFetch.isTransient())
    -                            {
    -                                throw new AssertionError("If there are no endpoints to
fetch from then we must be transitioning from transient to full for range " + toFetch);
    -                            }
    -
    -                            if (!any(oldEndpoints, isSufficient))
    -                            {
    -                                // need an additional replica
    -                                EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range));
    -                                // include all our filters, to ensure we include a matching
node
    -                                Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange,
and(accept, testSourceFilters)).toJavaUtil();
    -                                if (fullReplica.isPresent())
    -                                    oldEndpoints = Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get()));
    -                                else
    -                                    throw new IllegalStateException("Couldn't find any
matching sufficient replica out of " + endpointsForRange);
    -                            }
    -
    -                            //We have to check the source filters here to see if they
will remove any replicas
    -                            //required for strict consistency
    -                            if (!all(oldEndpoints, testSourceFilters))
    -                                throw new IllegalStateException("Necessary replicas for
strict consistency were removed by source filters: " + oldEndpoints.filter(not(testSourceFilters)));
    -                        }
    -                        else
    -                        {
    -                            oldEndpoints = sorted.apply(oldEndpoints.filter(accept));
    -                        }
    -
    -                        //Apply testSourceFilters that were given to us, and establish
everything remaining is alive for the strict case
    -                        sources = oldEndpoints.filter(testSourceFilters);
    -                    }
    -                    else
    -                    {
    -                        //Without strict consistency we have given up on correctness
so no point in fetching from
    -                        //a random full + transient replica since it's also likely to
lose data
    -                        //Also apply testSourceFilters that were given to us so we can
safely select a single source
    -                        sources = sorted.apply(rangeAddresses.get(range).filter(and(accept,
testSourceFilters)));
    -                        //Limit it to just the first possible source, we don't need more
than one and downstream
    -                        //will fetch from every source we supply
    -                        sources = sources.size() > 0 ? sources.subList(0, 1) : sources;
    -                    }
    -
    -                    // storing range and preferred endpoint set
    -                    rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, Conflict.NONE);
    -                    logger.debug("Endpoints to fetch for {} are {}", toFetch, sources);
    -                }
    -            }
    -
    -            EndpointsForRange addressList = rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch);
    -            if (addressList == null)
    -                throw new IllegalStateException("Failed to find endpoints to fetch "
+ toFetch);
    -
    -            /*
    -             * When we move forwards (shrink our bucket) we are the one losing a range
and no one else loses
    -             * from that action (we also don't gain). When we move backwards there are
two people losing a range. One is a full replica
    -             * and the other is a transient replica. So we must need fetch from two places
in that case for the full range we gain.
    -             * For a transient range we only need to fetch from one.
    -             */
    -            if (useStrictConsistency && addressList.size() > 1 &&
(addressList.filter(Replica::isFull).size() > 1 || addressList.filter(Replica::isTransient).size()
> 1))
    -                throw new IllegalStateException(String.format("Multiple strict sources
found for %s, sources: %s", toFetch, addressList));
    -
    -            //We must have enough stuff to fetch from
    -            if ((toFetch.isFull() && !any(addressList, Replica::isFull)) || addressList.isEmpty())
    -            {
    -                if (strat.getReplicationFactor().allReplicas == 1)
    -                {
    -                    if (useStrictConsistency)
    -                    {
    -                        logger.warn("A node required to move the data consistently is
down");
    -                        throw new IllegalStateException("Unable to find sufficient sources
for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. " +
    -                                                        "Ensure this keyspace contains
replicas in the source datacenter.");
    -                    }
    -                    else
    -                        logger.warn("Unable to find sufficient sources for streaming
range {} in keyspace {} with RF=1. " +
    -                                    "Keyspace might be missing data.", toFetch, keyspace);
    -
    -                }
    -                else
    -                {
    -                    if (useStrictConsistency)
    -                        logger.warn("A node required to move the data consistently is
down");
    -                    throw new IllegalStateException("Unable to find sufficient sources
for streaming range " + toFetch + " in keyspace " + keyspace);
    -                }
    -            }
    -        }
    -        return rangesToFetchWithPreferredEndpoints.asImmutableView();
    -    }
    +                                                  Collection<SourceFilter> sourceFilters)
    +     {
    +         EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore);
    +
    +         InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
    +         logger.debug ("Keyspace: {}", keyspace);
    +         logger.debug("To fetch RN: {}", fetchRanges);
    +         logger.debug("Fetch ranges: {}", rangeAddresses);
    +
    +         Predicate<Replica> testSourceFilters = and(sourceFilters);
    +         Function<EndpointsForRange, EndpointsForRange> sorted =
    +         endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints);
    +
    +         //This list of replicas is just candidates. With strict consistency it's going
to be a narrow list.
    +         EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable();
    +         for (Replica toFetch : fetchRanges)
    +         {
    +             //Replica that is sufficient to provide the data we need
    +             //With strict consistency and transient replication we may end up with multiple
types
    +             //so this isn't used with strict consistency
    +             Predicate<Replica> isSufficient = r -> toFetch.isTransient() ||
r.isFull();
    +
    +             logger.debug("To fetch {}", toFetch);
    +             for (Range<Token> range : rangeAddresses.keySet())
    +             {
    +                 if (!range.contains(toFetch.range()))
    +                     continue;
    +
    +                 EndpointsForRange oldEndpoints = rangeAddresses.get(range);
    +
    +                 //Ultimately we populate this with whatever is going to be fetched from
to satisfy toFetch
    +                 //It could be multiple endpoints and we must fetch from all of them
if they are there
    +                 //With transient replication and strict consistency this is to get the
full data from a full replica and
    +                 //transient data from the transient replica losing data
    +                 EndpointsForRange sources;
    +                 if (useStrictConsistency)
    +                 {
    +                     //Start with two sets of who replicates the range before and who
replicates it after
    +                     EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right,
tmdAfter);
    +                     logger.debug("Old endpoints {}", oldEndpoints);
    +                     logger.debug("New endpoints {}", newEndpoints);
    +
    +                     //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
    +                     //So we need to be careful to only be strict when endpoints == RF
    +                     if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
    +                     {
    +                         Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints();
    +                         // Remove new endpoints from old endpoints based on address
    +                         oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint()));
    --- End diff --
    
    Fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


Mime
View raw message