DRILL-6197: Skip duplicate entry for OperatorStats
org.apache.drill.exec.ops.FragmentStats should skip injecting the org.apache.drill.exec.ops.OperatorStats
instance for these operators:
org.apache.drill.exec.proto.beans.CoreOperatorType.SCREEN
org.apache.drill.exec.proto.beans.CoreOperatorType.SINGLE_SENDER
org.apache.drill.exec.proto.beans.CoreOperatorType.BROADCAST_SENDER
org.apache.drill.exec.proto.beans.CoreOperatorType.HASH_PARTITION_SENDER
closes #1141
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6af651fc
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6af651fc
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6af651fc
Branch: refs/heads/master
Commit: 6af651fcde8059dbf557a1f2f897557425fb950d
Parents: 161a046
Author: Kunal Khatua <kkhatua@maprtech.com>
Authored: Wed Feb 28 14:07:27 2018 -0800
Committer: Arina Ielchiieva <arina.yelchiyeva@gmail.com>
Committed: Sat Mar 3 19:47:48 2018 +0200
----------------------------------------------------------------------
.../apache/drill/exec/ops/FragmentStats.java | 20 +++++++++++---------
.../drill/exec/physical/impl/BaseRootExec.java | 16 ++++++++--------
2 files changed, 19 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/6af651fc/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
index a173073..cdad6e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
@@ -17,21 +17,22 @@
*/
package org.apache.drill.exec.ops;
-import java.util.List;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
-import com.google.common.collect.Lists;
-
/**
* Holds statistics of a particular (minor) fragment.
*/
public class FragmentStats {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStats.class);
- private List<OperatorStats> operators = Lists.newArrayList();
+ private Map<ImmutablePair<Integer, Integer>, OperatorStats> operators = new
LinkedHashMap<>();
private final long startTime;
private final DrillbitEndpoint endpoint;
private final BufferAllocator allocator;
@@ -47,8 +48,8 @@ public class FragmentStats {
prfB.setMaxMemoryUsed(allocator.getPeakMemoryAllocation());
prfB.setEndTime(System.currentTimeMillis());
prfB.setEndpoint(endpoint);
- for(OperatorStats o : operators){
- prfB.addOperatorProfile(o.getProfile());
+ for(Entry<ImmutablePair<Integer, Integer>, OperatorStats> o : operators.entrySet()){
+ prfB.addOperatorProfile(o.getValue().getProfile());
}
}
@@ -62,13 +63,14 @@ public class FragmentStats {
public OperatorStats newOperatorStats(final OpProfileDef profileDef, final BufferAllocator
allocator) {
final OperatorStats stats = new OperatorStats(profileDef, allocator);
if(profileDef.operatorType != -1) {
- operators.add(stats);
+ @SuppressWarnings("unused")
+ OperatorStats existingStatsHolder = addOperatorStats(stats);
}
return stats;
}
- public void addOperatorStats(OperatorStats stats) {
- operators.add(stats);
+ public OperatorStats addOperatorStats(OperatorStats stats) {
+ return operators.put(new ImmutablePair<>(stats.operatorId, stats.operatorType),
stats);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6af651fc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 82887ec..bf52d04 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -43,20 +43,20 @@ public abstract class BaseRootExec implements RootExec {
private List<CloseableRecordBatch> operators;
public BaseRootExec(final RootFragmentContext fragmentContext, final PhysicalOperator config)
throws OutOfMemoryException {
- this.oContext = fragmentContext.newOperatorContext(config, stats);
- stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
- config.getOperatorType(), OperatorUtilities.getChildCount(config)),
- oContext.getAllocator());
- fragmentContext.getStats().addOperatorStats(this.stats);
- this.fragmentContext = fragmentContext;
+ this(fragmentContext, null, config);
}
public BaseRootExec(final RootFragmentContext fragmentContext, final OperatorContext oContext,
final PhysicalOperator config) throws OutOfMemoryException {
- this.oContext = oContext;
+ if (oContext == null) {
+ this.oContext = fragmentContext.newOperatorContext(config, stats);
+ } else {
+ this.oContext = oContext;
+ }
+ //Creating new stat for appending to list
stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
config.getOperatorType(), OperatorUtilities.getChildCount(config)),
- oContext.getAllocator());
+ this.oContext.getAllocator());
fragmentContext.getStats().addOperatorStats(this.stats);
this.fragmentContext = fragmentContext;
}
|