cassandra-pr mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aweisberg <...@git.apache.org>
Subject [GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Date Mon, 10 Dec 2018 17:58:48 GMT
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/283#discussion_r239953888
  
    --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---
    @@ -21,133 +21,505 @@
     import java.net.InetAddress;
     import java.net.UnknownHostException;
     import java.util.*;
    +import java.util.concurrent.CancellationException;
     import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutionException;
     import java.util.concurrent.ScheduledFuture;
     import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.concurrent.atomic.AtomicLong;
     import java.util.stream.Collectors;
     
    -import com.codahale.metrics.ExponentiallyDecayingReservoir;
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.util.concurrent.Futures;
    +import com.google.common.util.concurrent.MoreExecutors;
    +import com.google.common.util.concurrent.RateLimiter;
    +import com.google.common.util.concurrent.SettableFuture;
     
    -import com.codahale.metrics.Snapshot;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
     import org.apache.cassandra.concurrent.ScheduledExecutors;
     import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.exceptions.RequestFailureReason;
     import org.apache.cassandra.gms.ApplicationState;
     import org.apache.cassandra.gms.EndpointState;
     import org.apache.cassandra.gms.Gossiper;
     import org.apache.cassandra.gms.VersionedValue;
    +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
    +import org.apache.cassandra.net.IAsyncCallbackWithFailure;
    +import org.apache.cassandra.net.LatencyMeasurementType;
    +import org.apache.cassandra.net.MessageIn;
    +import org.apache.cassandra.net.MessageOut;
     import org.apache.cassandra.net.MessagingService;
    +import org.apache.cassandra.net.PingMessage;
     import org.apache.cassandra.service.StorageService;
     import org.apache.cassandra.utils.FBUtilities;
     import org.apache.cassandra.utils.MBeanWrapper;
     
    +import static org.apache.cassandra.net.MessagingService.Verb.PING;
    +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
    +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
    +
    +
     /**
      * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector
    + * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible
for actually measuring
    + * latency and providing an ISnitchMeasurement implementation back to this class.
      */
    -public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber,
DynamicEndpointSnitchMBean
    +public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements
ILatencySubscriber, DynamicEndpointSnitchMBean
     {
    -    private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
    +    private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class);
    +
    +    // Latency measurement and ranking. The samples contain latency measurements and
the scores are used for ranking
    +    protected boolean registered = false;
    +    protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
    +    protected volatile Map<InetAddressAndPort, Double> scores = new HashMap<>();
     
    -    private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased
to towards the newer values
    -    private static final int WINDOW_SIZE = 100;
    +    protected final Map<InetAddressAndPort, AnnotatedMeasurement> samples = new
ConcurrentHashMap<>();
     
    -    private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval();
    -    private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval();
    -    private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold();
    +    // Latency probe functionality for actively probing endpoints that we haven't measured
recently but are ranking
    +    public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms",
60 * 10 * 1000L);
    +    public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms",
60 * 1000L) ;
    +    // The probe rate is set later when configuration is read in applyConfigChanges
    +    protected static final RateLimiter probeRateLimiter = RateLimiter.create(1);
    +    protected static final DebuggableScheduledThreadPoolExecutor latencyProbeExecutor
= new DebuggableScheduledThreadPoolExecutor("LatencyProbes");
    +    private long lastUpdateSamplesNanos = System.nanoTime();
    +
    +    // User configuration of the snitch tunables
    +    protected volatile int dynamicUpdateInterval = -1;
    +    protected volatile int dynamicSampleUpdateInterval = -1;
    +    protected volatile double dynamicBadnessThreshold = 0;
     
         // the score for a merged set of endpoints must be this much worse than the score
for separate endpoints to
         // warrant not merging two ranges into a single range
         private static final double RANGE_MERGING_PREFERENCE = 1.5;
     
         private String mbeanName;
    -    private boolean registered = false;
    -
    -    private volatile HashMap<InetAddressAndPort, Double> scores = new HashMap<>();
    -    private final ConcurrentHashMap<InetAddressAndPort, ExponentiallyDecayingReservoir>
samples = new ConcurrentHashMap<>();
    +    private boolean mbeanRegistered = false;
     
         public final IEndpointSnitch subsnitch;
     
    -    private volatile ScheduledFuture<?> updateSchedular;
    -    private volatile ScheduledFuture<?> resetSchedular;
    -
    -    private final Runnable update;
    -    private final Runnable reset;
    +    private volatile ScheduledFuture<?> updateScoresScheduler = null;
    +    private volatile ScheduledFuture<?> updateSamplesScheduler = null;
     
         public DynamicEndpointSnitch(IEndpointSnitch snitch)
         {
             this(snitch, null);
         }
     
    -    public DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
    +    protected DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
         {
             mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch";
             if (instance != null)
                 mbeanName += ",instance=" + instance;
             subsnitch = snitch;
    -        update = new Runnable()
    +
    +        if (DatabaseDescriptor.isDaemonInitialized())
             {
    -            public void run()
    +            open();
    +        }
    +    }
    +
    +    /**
    +     * Update configuration of the background tasks and restart the various scheduler
tasks
    +     * if the configured rates for these tasks have changed.
    +     */
    +    public void applyConfigChanges(int newDynamicUpdateInternal, int newDynamicSampleUpdateInterval,
double newDynamicBadnessThreshold)
    +    {
    +        if (DatabaseDescriptor.isDaemonInitialized())
    +        {
    +            if (dynamicUpdateInterval != newDynamicUpdateInternal || updateScoresScheduler
== null)
                 {
    -                updateScores();
    +                cancelAndWait(updateScoresScheduler, Math.max(1, dynamicUpdateInterval),
TimeUnit.MILLISECONDS);
    +                updateScoresScheduler = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(this::updateScores,
newDynamicUpdateInternal, newDynamicUpdateInternal, TimeUnit.MILLISECONDS);
                 }
    -        };
    -        reset = new Runnable()
    +
    +            if (dynamicSampleUpdateInterval != newDynamicSampleUpdateInterval || updateSamplesScheduler
== null)
    +            {
    +                cancelAndWait(updateSamplesScheduler, Math.max(1, dynamicSampleUpdateInterval),
TimeUnit.MILLISECONDS);
    +                if (newDynamicSampleUpdateInterval > 0)
    +                    updateSamplesScheduler = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(this::updateSamples,
newDynamicSampleUpdateInterval, newDynamicSampleUpdateInterval, TimeUnit.MILLISECONDS);
    +            }
    +        }
    +
    +        dynamicUpdateInterval = newDynamicUpdateInternal;
    +        dynamicSampleUpdateInterval = newDynamicSampleUpdateInterval;
    +        dynamicBadnessThreshold = newDynamicBadnessThreshold;
    +
    +        if (dynamicSampleUpdateInterval > 0)
    +            probeRateLimiter.setRate(dynamicSampleUpdateInterval);
    +    }
    +
    +    public synchronized void open()
    +    {
    +        applyConfigChanges(DatabaseDescriptor.getDynamicUpdateInterval(),
    +                           DatabaseDescriptor.getDynamicSampleUpdateInterval(),
    +                           DatabaseDescriptor.getDynamicBadnessThreshold());
    +
    +        MBeanWrapper.instance.registerMBean(this, mbeanName);
    +        mbeanRegistered = true;
    +    }
    +
    +    public synchronized void close()
    +    {
    +        cancelAndWait(updateScoresScheduler, Math.max(1, dynamicUpdateInterval), TimeUnit.MILLISECONDS);
    +        cancelAndWait(updateSamplesScheduler, Math.max(1, dynamicSampleUpdateInterval),
TimeUnit.MILLISECONDS);
    +        updateScoresScheduler = null;
    +        updateSamplesScheduler = null;
    +
    +        for (AnnotatedMeasurement measurement : samples.values())
             {
    -            public void run()
    +            cancelAndWait(measurement.probeFuture, PING.getTimeout(), TimeUnit.MILLISECONDS);
    --- End diff --
    
    I don't get why the PING timeout is appropriate here?
    
    Also this should cancel all of them and then wait on them I think.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


Mime
View raw message