Github user ifesdjeen commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/269#discussion_r219488654
--- Diff: src/java/org/apache/cassandra/service/RangeRelocator.java ---
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Multimap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.RangeStreamer;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.EndpointsByReplica;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.RangesByEndpoint;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+@VisibleForTesting
+public class RangeRelocator
+{
+ private static final Logger logger = LoggerFactory.getLogger(StorageService.class);
+
+ private final StreamPlan streamPlan = new StreamPlan(StreamOperation.RELOCATION);
+ private final InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
+ private final TokenMetadata tokenMetaCloneAllSettled;
+ // clone to avoid concurrent modification in calculateNaturalReplicas
+ private final TokenMetadata tokenMetaClone;
+ private final Collection<Token> tokens;
+ private final List<String> keyspaceNames;
+
+
+ RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames,
TokenMetadata tmd)
+ {
+ this.tokens = tokens;
+ this.keyspaceNames = keyspaceNames;
+ this.tokenMetaCloneAllSettled = tmd.cloneAfterAllSettled();
+ // clone to avoid concurrent modification in calculateNaturalReplicas
+ this.tokenMetaClone = tmd.cloneOnlyTokenMap();
+ }
+
+ @VisibleForTesting
+ public RangeRelocator()
+ {
+ this.tokens = null;
+ this.keyspaceNames = null;
+ this.tokenMetaCloneAllSettled = null;
+ this.tokenMetaClone = null;
+ }
+
+ /**
+ * Wrapper that supplies accessors to the real implementations of the various dependencies
for this method
+ */
+ private static Multimap<InetAddressAndPort, RangeStreamer.FetchReplica> calculateRangesToFetchWithPreferredEndpoints(RangesAtEndpoint
fetchRanges,
+
AbstractReplicationStrategy strategy,
+
String keyspace,
+
TokenMetadata tmdBefore,
+
TokenMetadata tmdAfter)
+ {
+ EndpointsByReplica preferredEndpoints =
+ RangeStreamer.calculateRangesToFetchWithPreferredEndpoints(DatabaseDescriptor.getEndpointSnitch()::sortedByProximity,
+ strategy,
+ fetchRanges,
+ StorageService.useStrictConsistency,
+ tmdBefore,
+ tmdAfter,
+ keyspace,
+ Arrays.asList(new
RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance),
+ new
RangeStreamer.ExcludeLocalNodeFilter()));
+ return RangeStreamer.convertPreferredEndpointsToWorkMap(preferredEndpoints);
+ }
+
+ /**
+ * calculating endpoints to stream current ranges to if needed
+ * in some situations node will handle current ranges as part of the new ranges
+ **/
+ public static RangesByEndpoint calculateRangesToStreamWithEndpoints(RangesAtEndpoint
streamRanges,
+ AbstractReplicationStrategy
strat,
+ TokenMetadata
tmdBefore,
+ TokenMetadata
tmdAfter)
+ {
+ RangesByEndpoint.Mutable endpointRanges = new RangesByEndpoint.Mutable();
+ for (Replica toStream : streamRanges)
+ {
+ //If the range we are sending is full only send it to the new full replica
+ //There will also be a new transient replica we need to send the data to,
but not
+ //the repaired data
+ EndpointsForRange oldEndpoints = strat.calculateNaturalReplicas(toStream.range().right,
tmdBefore);
+ EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toStream.range().right,
tmdAfter);
+ logger.debug("Need to stream {}, current endpoints {}, new endpoints {}",
toStream, oldEndpoints, newEndpoints);
+
+ for (Replica newEndpoint : newEndpoints)
+ {
+ Replica oldEndpoint = oldEndpoints.byEndpoint().get(newEndpoint.endpoint());
+
+ // Nothing to do
+ if (newEndpoint.equals(oldEndpoint))
+ continue;
+
+ // Completely new range for this endpoint
+ if (oldEndpoint == null)
+ {
+ if (toStream.isTransient() && newEndpoint.isFull())
+ throw new AssertionError(String.format("Need to stream %s, but
only have %s which is transient and not full", newEndpoint, toStream));
+
+ for (Range<Token> intersection : newEndpoint.range().intersectionWith(toStream.range()))
+ {
+ endpointRanges.put(newEndpoint.endpoint(), newEndpoint.decorateSubrange(intersection));
+ }
+ }
+ else
+ {
+ Set<Range<Token>> subsToStream = Collections.singleton(toStream.range());
+
+ //First subtract what we already have
+ if (oldEndpoint.isFull() == newEndpoint.isFull() || oldEndpoint.isFull())
+ subsToStream = toStream.range().subtract(oldEndpoint.range());
+
+ //Now we only stream what is still replicated
+ subsToStream.stream()
+ .flatMap(range -> range.intersectionWith(newEndpoint.range()).stream())
+ .forEach(tokenRange -> endpointRanges.put(newEndpoint.endpoint(),
newEndpoint.decorateSubrange(tokenRange)));
+ }
+ }
+ }
+ return endpointRanges.asImmutableView();
+ }
+
+ public void calculateToFromStreams()
+ {
+ logger.debug("Current tmd: {}, Updated tmd: {}", tokenMetaClone, tokenMetaCloneAllSettled);
+
+ for (String keyspace : keyspaceNames)
+ {
+ // replication strategy of the current keyspace
+ AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
+
+ logger.info("Calculating ranges to stream and request for keyspace {}", keyspace);
+ //From what I have seen we only ever call this with a single token from StorageService.move(Token)
+ for (Token newToken : tokens)
+ {
+ Collection<Token> currentTokens = tokenMetaClone.getTokens(localAddress);
+ if (currentTokens.size() > 1 || currentTokens.isEmpty())
+ {
+ throw new AssertionError("Unexpected current tokens: " + currentTokens);
+ }
+
+ // calculated parts of the ranges to request/stream from/to nodes in
the ring
+ Pair<RangesAtEndpoint, RangesAtEndpoint> streamAndFetchOwnRanges;
+
+ //In the single node token move there is nothing to do and Range subtraction
is broken
+ //so it's easier to just identify this case up front.
+ if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()
+)).size() > 1)
+ {
+ // getting collection of the currently used ranges by this keyspace
+ RangesAtEndpoint currentReplicas = strategy.getAddressReplicas(localAddress);
+
+ // collection of ranges which this node will serve after move to
the new token
+ RangesAtEndpoint updatedReplicas = strategy.getPendingAddressRanges(tokenMetaClone,
newToken, localAddress);
+
+ streamAndFetchOwnRanges = calculateStreamAndFetchRanges(currentReplicas,
updatedReplicas);
+ }
+ else
+ {
+ streamAndFetchOwnRanges = Pair.create(RangesAtEndpoint.empty(localAddress),
RangesAtEndpoint.empty(localAddress));
+ }
+
+ RangesByEndpoint rangesToStream = calculateRangesToStreamWithEndpoints(streamAndFetchOwnRanges.left,
strategy, tokenMetaClone, tokenMetaCloneAllSettled);
+ logger.info("Endpoint ranges to stream to " + rangesToStream);
+
+ // stream ranges
+ for (InetAddressAndPort address : rangesToStream.keySet())
+ {
+ logger.debug("Will stream range {} of keyspace {} to endpoint {}",
rangesToStream.get(address), keyspace, address);
+ RangesAtEndpoint ranges = rangesToStream.get(address);
+ streamPlan.transferRanges(address, keyspace, ranges);
+ }
+
+ Multimap<InetAddressAndPort, RangeStreamer.FetchReplica> rangesToFetch
= calculateRangesToFetchWithPreferredEndpoints(streamAndFetchOwnRanges.right, strategy, keyspace,
tokenMetaClone, tokenMetaCloneAllSettled);
+
+ // stream requests
+ rangesToFetch.asMap().forEach((address, sourceAndOurReplicas) -> {
+ RangesAtEndpoint full = sourceAndOurReplicas.stream()
+ .filter(pair -> pair.remote.isFull())
+ .map(pair -> pair.local)
+ .collect(RangesAtEndpoint.collector(localAddress));
+ RangesAtEndpoint trans = sourceAndOurReplicas.stream()
+ .filter(pair -> pair.remote.isTransient())
+ .map(pair -> pair.local)
+ .collect(RangesAtEndpoint.collector(localAddress));
+ logger.debug("Will request range {} of keyspace {} from endpoint
{}", rangesToFetch.get(address), keyspace, address);
+ streamPlan.requestRanges(address, keyspace, full, trans);
+ });
+
+ logger.debug("Keyspace {}: work map {}.", keyspace, rangesToFetch);
+ }
+ }
+ }
+
+ /**
+ * Calculate pair of ranges to stream/fetch for given two range collections
+ * (current ranges for keyspace and ranges after move to new token)
+ *
+ * With transient replication the added wrinkle is that if a range transitions from
full to transient then
+ * we need to stream the range despite the fact that we are retaining it as transient.
Some replica
+ * somewhere needs to transition from transient to full and we will be the source.
+ *
+ * If the range is transient and is transitioning to full then always fetch even
if the range was already transient
+ * since a transiently replicated obviously needs to fetch data to become full.
+ *
+ * This why there is a continue after checking for instersection because intersection
is not sufficient reason
+ * to do the subtraction since we might need to stream/fetch data anyways.
+ *
+ * @param currentRanges collection of the ranges by current token
+ * @param updatedRanges collection of the ranges after token is changed
+ * @return pair of ranges to stream/fetch for given current and updated range collections
+ */
+ public static Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRanges(RangesAtEndpoint
currentRanges, RangesAtEndpoint updatedRanges)
+ {
+ // FIXME: transient replication
--- End diff --
Done
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
|