Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 425bfaf20 -> 62ffa355e
Provide additional metrics for materialized views
patch by clnwsu; reviewed by carlyeks for CASSANDRA-10323
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/62ffa355
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/62ffa355
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/62ffa355
Branch: refs/heads/cassandra-3.0
Commit: 62ffa355ee0431c87ee9ec665c35f881bcad4c74
Parents: 425bfaf
Author: Chris Lohfink <Chris.Lohfink@datastax.com>
Authored: Mon Sep 28 16:45:02 2015 -0500
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Fri Oct 2 17:09:25 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Keyspace.java | 27 +++++++---
src/java/org/apache/cassandra/db/Mutation.java | 6 ++-
src/java/org/apache/cassandra/db/view/View.java | 3 ++
.../apache/cassandra/db/view/ViewBuilder.java | 4 +-
.../apache/cassandra/db/view/ViewManager.java | 19 ++-----
.../cassandra/metrics/KeyspaceMetrics.java | 8 ++-
.../apache/cassandra/metrics/TableMetrics.java | 57 ++++++++++++++++++--
.../cassandra/metrics/ViewWriteMetrics.java | 18 ++++++-
.../apache/cassandra/service/StorageProxy.java | 22 +++++---
10 files changed, 128 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5fddce9..5bf70ca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0
+ * Provide additional metrics for materialized views (CASSANDRA-10323)
* Flush system schema tables after local schema changes (CASSANDRA-10429)
Merged from 2.2:
* cqlsh prompt includes name of keyspace after failed `use` statement (CASSANDRA-10369)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index cf34e9a..293f8a3 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -20,9 +20,8 @@ package org.apache.cassandra.db;
import java.io.File;
import java.io.IOException;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import com.google.common.base.Function;
@@ -30,10 +29,7 @@ import com.google.common.collect.Iterables;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.*;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.CompactionManager;
@@ -420,6 +416,7 @@ public class Keyspace
if (requiresViewUpdate)
{
+ mutation.viewLockAcquireStart.compareAndSet(0L, System.currentTimeMillis());
lock = ViewManager.acquireLockFor(mutation.key().getKey());
if (lock == null)
@@ -444,6 +441,17 @@ public class Keyspace
return;
}
}
+ else
+ {
+ long acquireTime = System.currentTimeMillis() - mutation.viewLockAcquireStart.get();
+ if (!isClReplay)
+ {
+ for(UUID cfid : mutation.getColumnFamilyIds())
+ {
+ columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime,
TimeUnit.MILLISECONDS);
+ }
+ }
+ }
}
int nowInSec = FBUtilities.nowInSeconds();
try (OpOrder.Group opGroup = writeOrder.start())
@@ -464,13 +472,14 @@ public class Keyspace
logger.error("Attempting to mutate non-existant table {} ({}.{})", upd.metadata().cfId,
upd.metadata().ksName, upd.metadata().cfName);
continue;
}
+ AtomicLong baseComplete = new AtomicLong(Long.MAX_VALUE);
if (requiresViewUpdate)
{
try
{
Tracing.trace("Creating materialized view mutations from base table
replica");
- viewManager.pushViewReplicaUpdates(upd, !isClReplay);
+ viewManager.pushViewReplicaUpdates(upd, !isClReplay, baseComplete);
}
catch (Throwable t)
{
@@ -486,6 +495,8 @@ public class Keyspace
? cfs.indexManager.newUpdateTransaction(upd,
opGroup, nowInSec)
: UpdateTransaction.NO_OP;
cfs.apply(upd, indexTransaction, opGroup, replayPosition);
+ if (requiresViewUpdate)
+ baseComplete.set(System.currentTimeMillis());
}
}
finally
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 7696e04..cbc7e17 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -20,13 +20,14 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.SerializationHelper;
-import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -56,6 +57,9 @@ public class Mutation implements IMutation
// Time at which this mutation was instantiated
public final long createdAt = System.currentTimeMillis();
+ // keep track of when mutation has started waiting for a MV partition lock
+ public final AtomicLong viewLockAcquireStart = new AtomicLong(0);
+
public Mutation(String keyspaceName, DecoratedKey key)
{
this(keyspaceName, key, new HashMap<>());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java
index 0a7f747..87ea2ec 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.view;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
@@ -527,6 +528,7 @@ public class View
*/
private void readLocalRows(TemporalRow.Set rowSet)
{
+ long start = System.currentTimeMillis();
SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
for (TemporalRow temporalRow : rowSet)
@@ -551,6 +553,7 @@ public class View
}
}
}
+ baseCfs.metric.viewReadTime.update(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 0a0fe08..35b023b 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.db.view;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import com.google.common.base.Function;
@@ -70,6 +71,7 @@ public class ViewBuilder extends CompactionInfo.Holder
private void buildKey(DecoratedKey key)
{
+ AtomicLong noBase = new AtomicLong(Long.MAX_VALUE);
ReadQuery selectQuery = view.getReadQuery();
if (!selectQuery.selectsKey(key))
return;
@@ -92,7 +94,7 @@ public class ViewBuilder extends CompactionInfo.Holder
Collection<Mutation> mutations = view.createMutations(partition,
temporalRows, true);
if (mutations != null)
- StorageProxy.mutateMV(key.getKey(), mutations, true);
+ StorageProxy.mutateMV(key.getKey(), mutations, true, noBase);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/src/java/org/apache/cassandra/db/view/ViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java
index 2364ed1..efadd72 100644
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -18,15 +18,10 @@
package org.apache.cassandra.db.view;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import com.google.common.collect.Lists;
@@ -34,11 +29,7 @@ import com.google.common.util.concurrent.Striped;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ViewDefinition;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.IMutation;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.dht.Token;
@@ -128,7 +119,7 @@ public class ViewManager
* Calculates and pushes updates to the views replicas. The replicas are determined by
* {@link ViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
*/
- public void pushViewReplicaUpdates(PartitionUpdate update, boolean writeCommitLog)
+ public void pushViewReplicaUpdates(PartitionUpdate update, boolean writeCommitLog, AtomicLong
baseComplete)
{
List<Mutation> mutations = null;
TemporalRow.Set temporalRows = null;
@@ -146,7 +137,7 @@ public class ViewManager
}
if (mutations != null)
- StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog);
+ StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog,
baseComplete);
}
public boolean updatesAffectView(Collection<? extends IMutation> mutations, boolean
coordinatorBatchlog)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 9c886b0..62add07 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -21,6 +21,7 @@ import java.util.Set;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Timer;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
@@ -29,7 +30,6 @@ import com.google.common.collect.Sets;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
-
/**
* Metrics for {@link ColumnFamilyStore}.
*/
@@ -81,6 +81,10 @@ public class KeyspaceMetrics
public final Histogram liveScannedHistogram;
/** Column update time delta on this Keyspace */
public final Histogram colUpdateTimeDeltaHistogram;
+ /** time taken acquiring the partition lock for materialized view updates on this keyspace
*/
+ public final Timer viewLockAcquireTime;
+ /** time taken during the local read of a materialized view update */
+ public final Timer viewReadTime;
/** CAS Prepare metric */
public final LatencyMetrics casPrepare;
/** CAS Propose metrics */
@@ -224,6 +228,8 @@ public class KeyspaceMetrics
tombstoneScannedHistogram = Metrics.histogram(factory.createMetricName("TombstoneScannedHistogram"));
liveScannedHistogram = Metrics.histogram(factory.createMetricName("LiveScannedHistogram"));
colUpdateTimeDeltaHistogram = Metrics.histogram(factory.createMetricName("ColUpdateTimeDeltaHistogram"));
+ viewLockAcquireTime = Metrics.timer(factory.createMetricName("ViewLockAcquireTime"));
+ viewReadTime = Metrics.timer(factory.createMetricName("ViewReadTime"));
// add manually since histograms do not use createKeyspaceGauge method
allMetrics.addAll(Lists.newArrayList("SSTablesPerReadHistogram", "TombstoneScannedHistogram",
"LiveScannedHistogram"));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 124b8ca..3cd5b5b 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -20,16 +20,15 @@ package org.apache.cassandra.metrics;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
-
-import com.google.common.collect.Maps;
+import java.util.concurrent.TimeUnit;
import com.codahale.metrics.*;
import com.codahale.metrics.Timer;
+import com.google.common.collect.Maps;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.utils.EstimatedHistogram;
@@ -37,7 +36,6 @@ import org.apache.cassandra.utils.TopKSampler;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
-
/**
* Metrics for {@link ColumnFamilyStore}.
*/
@@ -116,6 +114,10 @@ public class TableMetrics
public final TableHistogram liveScannedHistogram;
/** Column update time delta on this CF */
public final TableHistogram colUpdateTimeDeltaHistogram;
+ /** time taken acquiring the partition lock for materialized view updates for this table
*/
+ public final TableTimer viewLockAcquireTime;
+ /** time taken during the local read of a materialized view update */
+ public final TableTimer viewReadTime;
/** Disk space used by snapshot files which */
public final Gauge<Long> trueSnapshotsSize;
/** Row cache hits, but result out of range */
@@ -615,6 +617,19 @@ public class TableMetrics
coordinatorScanLatency = Metrics.timer(factory.createMetricName("CoordinatorScanLatency"));
waitingOnFreeMemtableSpace = Metrics.histogram(factory.createMetricName("WaitingOnFreeMemtableSpace"));
+ // We do not want to capture view mutation specific metrics for a view
+ // They only makes sense to capture on the base table
+ if (cfs.metadata.isView())
+ {
+ viewLockAcquireTime = null;
+ viewReadTime = null;
+ }
+ else
+ {
+ viewLockAcquireTime = createTableTimer("ViewLockAcquireTime", cfs.keyspace.metric.viewLockAcquireTime);
+ viewReadTime = createTableTimer("ViewReadTime", cfs.keyspace.metric.viewReadTime);
+ }
+
trueSnapshotsSize = createTableGauge("SnapshotsSize", new Gauge<Long>()
{
public Long getValue()
@@ -751,6 +766,21 @@ public class TableMetrics
globalAliasFactory.createMetricName(alias)));
}
+ protected TableTimer createTableTimer(String name, Timer keyspaceTimer)
+ {
+ return createTableTimer(name, name, keyspaceTimer);
+ }
+
+ protected TableTimer createTableTimer(String name, String alias, Timer keyspaceTimer)
+ {
+ Timer cfTimer = Metrics.timer(factory.createMetricName(name), aliasFactory.createMetricName(alias));
+ register(name, alias, cfTimer);
+ return new TableTimer(cfTimer,
+ keyspaceTimer,
+ Metrics.timer(globalFactory.createMetricName(name),
+ globalAliasFactory.createMetricName(alias)));
+ }
+
/**
* Registers a metric to be removed when unloading CF.
* @return true if first time metric with that name has been registered
@@ -782,6 +812,25 @@ public class TableMetrics
}
}
+ public static class TableTimer
+ {
+ public final Timer[] all;
+ public final Timer cf;
+ private TableTimer(Timer cf, Timer keyspace, Timer global)
+ {
+ this.cf = cf;
+ this.all = new Timer[]{cf, keyspace, global};
+ }
+
+ public void update(long i, TimeUnit unit)
+ {
+ for(Timer timer : all)
+ {
+ timer.update(i, unit);
+ }
+ }
+ }
+
static class TableMetricNameFactory implements MetricNameFactory
{
private final String keyspaceName;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java b/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java
index c99cc5c..df98865 100644
--- a/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java
@@ -18,19 +18,31 @@
package org.apache.cassandra.metrics;
-import com.codahale.metrics.Counter;
-
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
+import com.codahale.metrics.Gauge;
+
public class ViewWriteMetrics extends ClientRequestMetrics
{
public final Counter viewReplicasAttempted;
public final Counter viewReplicasSuccess;
+ // time between when mutation is applied to local memtable to when CL.ONE is achieved
on MV
+ public final Timer viewWriteLatency;
public ViewWriteMetrics(String scope) {
super(scope);
viewReplicasAttempted = Metrics.counter(factory.createMetricName("ViewReplicasAttempted"));
viewReplicasSuccess = Metrics.counter(factory.createMetricName("ViewReplicasSuccess"));
+ viewWriteLatency = Metrics.timer(factory.createMetricName("ViewWriteLatency"));
+ Metrics.register(factory.createMetricName("ViewPendingMutations"), new Gauge<Long>()
+ {
+ public Long getValue()
+ {
+ return viewReplicasAttempted.getCount() - viewReplicasSuccess.getCount();
+ }
+ });
}
public void release()
@@ -38,5 +50,7 @@ public class ViewWriteMetrics extends ClientRequestMetrics
super.release();
Metrics.remove(factory.createMetricName("ViewReplicasAttempted"));
Metrics.remove(factory.createMetricName("ViewReplicasSuccess"));
+ Metrics.remove(factory.createMetricName("ViewWriteLatency"));
+ Metrics.remove(factory.createMetricName("ViewPendingMutations"));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 5c94f08..d1142fc 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -35,20 +36,19 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.batchlog.Batch;
+import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.HintedHandOffManager;
-import org.apache.cassandra.batchlog.*;
-import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.db.view.ViewManager;
import org.apache.cassandra.db.view.ViewUtils;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.exceptions.*;
@@ -650,8 +650,10 @@ public class StorageProxy implements StorageProxyMBean
* across all replicas.
*
* @param mutations the mutations to be applied across the replicas
+ * @param writeCommitLog if commitlog should be written
+ * @param baseComplete time from epoch in ms that the local base mutation was(or will
be) completed
*/
- public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations,
boolean writeCommitLog)
+ public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations,
boolean writeCommitLog, AtomicLong baseComplete)
throws UnavailableException, OverloadedException, WriteTimeoutException
{
Tracing.trace("Determining replicas for mutation");
@@ -684,13 +686,17 @@ public class StorageProxy implements StorageProxyMBean
consistencyLevel,
consistencyLevel,
naturalEndpoints,
+ baseComplete,
WriteType.BATCH,
cleanup);
// When local node is the endpoint and there are no pending nodes we can
// Just apply the mutation locally.
if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) && wrapper.handler.pendingEndpoints.isEmpty())
+ {
mutation.apply(writeCommitLog);
+ viewWriteMetrics.viewReplicasSuccess.inc();
+ }
else
wrappers.add(wrapper);
}
@@ -980,6 +986,7 @@ public class StorageProxy implements StorageProxyMBean
ConsistencyLevel
consistency_level,
ConsistencyLevel
batchConsistencyLevel,
List<InetAddress>
naturalEndpoints,
+ AtomicLong baseComplete,
WriteType writeType,
BatchlogResponseHandler.BatchlogCleanup
cleanup)
{
@@ -988,7 +995,10 @@ public class StorageProxy implements StorageProxyMBean
String keyspaceName = mutation.getKeyspaceName();
Token tk = mutation.key().getToken();
Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk,
keyspaceName);
- AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints,
pendingEndpoints, consistency_level, null, writeType);
+ AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints,
pendingEndpoints, consistency_level, () -> {
+ long delay = Math.max(0, System.currentTimeMillis() - baseComplete.get());
+ viewWriteMetrics.viewWriteLatency.update(delay, TimeUnit.MILLISECONDS);
+ }, writeType);
BatchlogResponseHandler<IMutation> batchHandler = new ViewWriteMetricsWrapped(writeHandler,
batchConsistencyLevel.blockFor(keyspace), cleanup);
return new WriteResponseHandlerWrapper(batchHandler, mutation);
}
|