drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] Ben-Zvi closed pull request #1444: DRILL-6709: Extended the batch stats utility to other operators
Date Sun, 09 Sep 2018 05:05:42 GMT
Ben-Zvi closed pull request #1444: DRILL-6709: Extended the batch stats utility to other operators
URL: https://github.com/apache/drill/pull/1444
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index cda98fc7a51..cd24a4ca592 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -48,6 +48,7 @@
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
 import org.apache.drill.exec.util.CallBack;
 import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableVarCharVector;
@@ -302,7 +303,7 @@ private void logRecordBatchStats() {
       return; // NOOP
     }
 
-    RecordBatchStats.logRecordBatchStats(getFQNForLogging(MAX_FQN_LENGTH), this, batchStatsContext);
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, getFQNForLogging(MAX_FQN_LENGTH), this, batchStatsContext);
   }
 
   /** Might truncate the FQN if too long */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 28f8263d1f3..9de9aae06b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -60,6 +60,8 @@
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
@@ -159,9 +161,7 @@ public void update(RecordBatch incomingRecordBatch) {
       }
 
       updateIncomingStats();
-      if (logger.isDebugEnabled()) {
-        logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
-      }
+      RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), getRecordBatchStatsContext());
     }
   }
 
@@ -204,7 +204,9 @@ public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentConte
     }
 
     hashAggMemoryManager = new HashAggMemoryManager(configuredBatchSize);
-    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+
+      RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+        "configured output batch size: %d", configuredBatchSize);
 
     columnMapping = CaseInsensitiveMap.newHashMap();
   }
@@ -474,15 +476,15 @@ private void updateStats() {
     stats.setLongStat(HashAggTemplate.Metric.AVG_OUTPUT_ROW_BYTES, hashAggMemoryManager.getAvgOutputRowWidth());
     stats.setLongStat(HashAggTemplate.Metric.OUTPUT_RECORD_COUNT, hashAggMemoryManager.getTotalOutputRecords());
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        hashAggMemoryManager.getNumIncomingBatches(), hashAggMemoryManager.getAvgInputBatchSize(),
-        hashAggMemoryManager.getAvgInputRowWidth(), hashAggMemoryManager.getTotalInputRecords());
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate: count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      hashAggMemoryManager.getNumIncomingBatches(), hashAggMemoryManager.getAvgInputBatchSize(),
+      hashAggMemoryManager.getAvgInputRowWidth(), hashAggMemoryManager.getTotalInputRecords());
 
-      logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        hashAggMemoryManager.getNumOutgoingBatches(), hashAggMemoryManager.getAvgOutputBatchSize(),
-        hashAggMemoryManager.getAvgOutputRowWidth(), hashAggMemoryManager.getTotalOutputRecords());
-    }
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "outgoing aggregate: count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      hashAggMemoryManager.getNumOutgoingBatches(), hashAggMemoryManager.getAvgOutputBatchSize(),
+      hashAggMemoryManager.getAvgOutputRowWidth(), hashAggMemoryManager.getTotalOutputRecords());
   }
   @Override
   public void close() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 65ca82972a0..e8ae30e6fe8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -70,7 +70,8 @@
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
-
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.AllocationHelper;
 
 import org.apache.drill.exec.vector.FixedWidthVector;
@@ -1223,10 +1224,7 @@ public AggIterOutcome outputCurrentBatch() {
     }
 
     outgoing.getRecordBatchMemoryManager().updateOutgoingStats(numOutputRecords);
-
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(outgoing));
-    }
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, outgoing, outgoing.getRecordBatchStatsContext());
 
     this.outcome = IterOutcome.OK;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index d634fb24ad0..1623319c398 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -52,6 +52,8 @@
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
@@ -157,7 +159,7 @@ public void update() {
       // i.e. all rows fit within memory budget.
       setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount()));
 
-      logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
+      RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), getRecordBatchStatsContext());
 
       updateIncomingStats();
     }
@@ -170,7 +172,8 @@ public FlattenRecordBatch(FlattenPOP pop, RecordBatch incoming, FragmentContext
     int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     flattenMemoryManager = new FlattenMemoryManager(configuredBatchSize);
 
-    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+      RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+        "configured output batch size: %d", configuredBatchSize);
   }
 
   @Override
@@ -261,10 +264,7 @@ protected IterOutcome doWork() {
     }
 
     flattenMemoryManager.updateOutgoingStats(outputRecords);
-
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
-    }
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
 
     // Get the final outcome based on hasRemainder since that will determine if all the incoming records were
     // consumed in current output batch or not
@@ -516,15 +516,15 @@ private void updateStats() {
     stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, flattenMemoryManager.getAvgOutputRowWidth());
     stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, flattenMemoryManager.getTotalOutputRecords());
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(),
-        flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords());
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate: count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(),
+      flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords());
 
-      logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(),
-        flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords());
-    }
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "outgoing aggregate: count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(),
+      flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords());
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index dc40b24bc76..368bb5dc91b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -68,9 +68,10 @@
 import org.apache.drill.exec.record.JoinBatchMemoryManager;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
@@ -291,7 +292,7 @@ private void prefetchFirstBuildBatch() {
       buildBatch,
       () -> {
         batchMemoryManager.update(RIGHT_INDEX, 0, true);
-        logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+        RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT, batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), getRecordBatchStatsContext());
       });
   }
 
@@ -306,7 +307,7 @@ private void prefetchFirstProbeBatch() {
       probeBatch,
       () -> {
         batchMemoryManager.update(LEFT_INDEX, 0);
-        logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+        RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT, batchMemoryManager.getRecordBatchSizer(LEFT_INDEX), getRecordBatchStatsContext());
       });
   }
 
@@ -488,9 +489,7 @@ public IterOutcome innerNext() {
           container.setRecordCount(outputRecords);
 
           batchMemoryManager.updateOutgoingStats(outputRecords);
-          if (logger.isDebugEnabled()) {
-            logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
-          }
+          RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
 
           /* We are here because of one the following
            * 1. Completed processing of all the records and we are done
@@ -1121,12 +1120,17 @@ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
     final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     final double avail_mem_factor = (double) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR);
     int outputBatchSize = Math.min(configuredBatchSize, Integer.highestOneBit((int)(allocator.getLimit() * avail_mem_factor)));
-    logger.debug("BATCH_STATS, configured output batch size: {}, allocated memory {}, avail mem factor {}, output batch size: {}",
-      configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize);
 
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "configured output batch size: %d, allocated memory %d, avail mem factor %f, output batch size: %d",
+      configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize);
 
     batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right, new HashSet<>());
-    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+
+
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "configured output batch size: %d", configuredBatchSize);
+
     enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER);
   }
 
@@ -1242,19 +1246,20 @@ public void close() {
 
     updateMetrics();
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate left: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
 
-      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate right: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
 
-      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
-        batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
-    }
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "outgoing aggregate: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
+      batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
 
     this.cleanup();
     super.close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 5b7fd9cc67f..1aaf5e2028f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -39,6 +39,8 @@
 import org.apache.drill.exec.record.SchemaBuilder;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -240,25 +242,26 @@ public IterOutcome innerNext() {
   public void close() {
     updateBatchMemoryManagerStats();
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, " +
-        "record count : {}", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
-        batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
-        batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
-      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, " +
-        "record count : {}", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
-        batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
-        batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
-      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, " +
-        "record count : {}", batchMemoryManager.getNumOutgoingBatches(),
-        batchMemoryManager.getAvgOutputBatchSize(),
-        batchMemoryManager.getAvgOutputRowWidth(),
-        batchMemoryManager.getTotalOutputRecords());
-    }
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate left: batch count : %d, avg bytes : %d,  avg row bytes : %d, " +
+      "record count : %d", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+      batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+      batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate right: batch count : %d, avg bytes : %d,  avg row bytes : %d, " +
+      "record count : %d", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+      batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+      batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "outgoing aggregate: batch count : %d, avg bytes : %d,  avg row bytes : %d, " +
+      "record count : %d", batchMemoryManager.getNumOutgoingBatches(),
+      batchMemoryManager.getAvgOutputBatchSize(),
+      batchMemoryManager.getAvgOutputRowWidth(),
+      batchMemoryManager.getTotalOutputRecords());
 
     super.close();
   }
@@ -733,11 +736,10 @@ private void finalizeOutputContainer() {
 
     batchMemoryManager.updateOutgoingStats(outputIndex);
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
-      logger.debug("Number of records emitted: {} and Allocator Stats: [AllocatedMem: {}, PeakMem: {}]",
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "Number of records emitted: %d and Allocator Stats: [AllocatedMem: %d, PeakMem: %d]",
         outputIndex, container.getAllocator().getAllocatedMemory(), container.getAllocator().getPeakMemoryAllocation());
-    }
 
     // Update the output index for next output batch to zero
     outputIndex = 0;
@@ -1182,10 +1184,11 @@ private void updateMemoryManager(int inputIndex) {
     // a new output batch with new incoming then it will not cause any problem since outputIndex will be 0
     final int newOutputRowCount = batchMemoryManager.update(inputIndex, outputIndex);
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == LEFT_INDEX ? "left" : "right",
-        batchMemoryManager.getRecordBatchSizer(inputIndex));
-      logger.debug("Previous OutputRowCount: {}, New OutputRowCount: {}", maxOutputRowCount, newOutputRowCount);
+    if (isRecordBatchStatsLoggingEnabled()) {
+      RecordBatchIOType type = inputIndex == LEFT_INDEX ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT;
+      RecordBatchStats.logRecordBatchStats(type, batchMemoryManager.getRecordBatchSizer(inputIndex), getRecordBatchStatsContext());
+      RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+        "Previous OutputRowCount: %d, New OutputRowCount: %d", maxOutputRowCount, newOutputRowCount);
     }
 
     if (useMemoryManager) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 7b14e03ec39..72f776a5505 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -58,6 +58,8 @@
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 
@@ -123,7 +125,8 @@
     @Override
     public void update(int inputIndex) {
       status.setTargetOutputRowCount(super.update(inputIndex, status.getOutPosition()));
-      logger.debug("BATCH_STATS, incoming {}: {}", inputIndex == 0 ? "left" : "right", getRecordBatchSizer(inputIndex));
+      RecordBatchIOType type = inputIndex == 0 ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT;
+      RecordBatchStats.logRecordBatchStats(type, getRecordBatchSizer(inputIndex), getRecordBatchStatsContext());
     }
   }
 
@@ -134,7 +137,8 @@ protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, Record
     final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     batchMemoryManager = new MergeJoinMemoryManager(configuredBatchSize, left, right);
 
-    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "configured output batch size: %d", configuredBatchSize);
 
     if (popConfig.getConditions().size() == 0) {
       throw new UnsupportedOperationException("Merge Join currently does not support cartesian join.  This join operator was configured with 0 conditions");
@@ -271,10 +275,7 @@ private void setRecordCountInContainer() {
       vw.getValueVector().getMutator().setValueCount(getRecordCount());
     }
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
-    }
-
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
     batchMemoryManager.updateOutgoingStats(getRecordCount());
   }
 
@@ -282,23 +283,24 @@ private void setRecordCountInContainer() {
   public void close() {
     updateBatchMemoryManagerStats();
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
-        batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
-        batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
-      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
-        batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
-        batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
-      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
-        batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
-    }
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate left: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+      batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+      batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate right: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+      batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+      batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "outgoing aggregate: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
+      batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
 
     super.close();
     leftIterator.close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 8054d7fef79..e2f93ecf245 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -52,8 +52,9 @@
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.record.JoinBatchMemoryManager;
-import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JExpression;
@@ -133,7 +134,9 @@ protected NestedLoopJoinBatch(NestedLoopJoinPOP popConfig, FragmentContext conte
     // get the output batch size from config.
     int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right, new HashSet<>());
-    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+
+      RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+        "configured output batch size: %d", configuredBatchSize);
   }
 
   /**
@@ -168,7 +171,8 @@ public IterOutcome innerNext() {
           case OK:
             // For right side, use aggregate i.e. average row width across batches
             batchMemoryManager.update(RIGHT_INDEX, 0, true);
-            logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+            RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT,
+              batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), getRecordBatchStatsContext());
             addBatchToHyperContainer(right);
             break;
           case OUT_OF_MEMORY:
@@ -202,10 +206,7 @@ public IterOutcome innerNext() {
     container.setRecordCount(outputRecords);
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
-    }
-
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
     logger.debug("Number of records emitted: " + outputRecords);
 
     return (outputRecords > 0) ? IterOutcome.OK : IterOutcome.NONE;
@@ -357,7 +358,8 @@ protected void buildSchema() throws SchemaChangeException {
       }
 
       batchMemoryManager.update(RIGHT_INDEX, 0, true);
-      logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+      RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT,
+        batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), getRecordBatchStatsContext());
 
       if (leftUpstream != IterOutcome.NONE) {
         leftSchema = left.getSchema();
@@ -395,7 +397,8 @@ protected void buildSchema() throws SchemaChangeException {
       }
 
       batchMemoryManager.update(LEFT_INDEX, 0);
-      logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+      RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
+        batchMemoryManager.getRecordBatchSizer(LEFT_INDEX), getRecordBatchStatsContext());
 
       container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
@@ -422,23 +425,24 @@ private void addBatchToHyperContainer(RecordBatch inputBatch) {
   public void close() {
     updateBatchMemoryManagerStats();
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate left: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
               batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
               batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
               batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
               batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
 
-      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate right: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
               batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
               batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
               batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
               batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
 
-      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "outgoing aggregate: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
               batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
               batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
-    }
 
     rightContainer.clear();
     rightCounts.clear();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
index adf681b58e9..3b8ab8d2e23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
@@ -24,7 +24,8 @@
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import javax.inject.Named;
 import java.util.LinkedList;
 import java.util.List;
@@ -192,7 +193,9 @@ private void resetAndGetNextLeft(int outputIndex) {
         break;
       case OK:
         setTargetOutputCount(outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex));
-        logger.debug("BATCH_STATS, incoming left: {}", outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX));
+        RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
+          outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX),
+          outgoing.getRecordBatchStatsContext());
         leftRecordCount = left.getRecordCount();
         break;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
index 6b273d86365..81e54db905c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
@@ -28,6 +28,8 @@
 import org.apache.drill.exec.record.RecordBatchMemoryManager;
 import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.NullableVector;
 import org.apache.drill.exec.vector.ValueVector;
@@ -136,7 +138,6 @@ private void setOutgoingBatch(ProjectRecordBatch outgoingBatch) {
     public ProjectMemoryManager(int configuredOutputSize) {
         super(configuredOutputSize);
         outputColumnSizes = new HashMap<>();
-        logger.debug("BATCH_STATS, configuredOutputSize: {}", configuredOutputSize);
     }
 
     public boolean isComplex(MajorType majorType) {
@@ -253,6 +254,9 @@ public void init(RecordBatch incomingBatch, ProjectRecordBatch outgoingBatch) {
         setIncomingBatch(incomingBatch);
         setOutgoingBatch(outgoingBatch);
         reset();
+
+        RecordBatchStats.logRecordBatchStats(outgoingBatch.getRecordBatchStatsContext(),
+          "configuredOutputSize: %d", getOutputBatchSize());
     }
 
     private void reset() {
@@ -321,7 +325,7 @@ public void update() {
                     rowWidth, totalFixedWidthColumnWidth, totalVariableColumnWidth, totalComplexColumnWidth,
                     (batchSizerEndTime - updateStartTime),(updateEndTime - updateStartTime), this, incomingBatch);
 
-        logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
+        RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), outgoingBatch.getRecordBatchStatsContext());
         updateIncomingStats();
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 247f36f1934..4d55f00340f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -55,13 +55,14 @@
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.SimpleRecordBatch;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.UntypedNullHolder;
@@ -252,7 +253,7 @@ protected IterOutcome doWork() {
     }
 
     memoryManager.updateOutgoingStats(outputRecords);
-    logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
 
     // Get the final outcome based on hasRemainder since that will determine if all the incoming records were
     // consumed in current output batch or not
@@ -298,7 +299,7 @@ private void handleRemainder() {
     }
 
     memoryManager.updateOutgoingStats(projRecords);
-    logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
   }
 
   public void addComplexWriter(final ComplexWriter writer) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 8df83ee79bb..7e16d6a35c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -43,13 +43,14 @@
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchMemoryManager;
-import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.resolver.TypeCastRules;
 import org.apache.drill.exec.util.VectorUtil;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
@@ -76,7 +77,9 @@ public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children, Fragment
     // get the output batch size from config.
     int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     batchMemoryManager = new RecordBatchMemoryManager(numInputs, configuredBatchSize);
-    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "configured output batch size: %d", configuredBatchSize);
   }
 
   @Override
@@ -171,9 +174,7 @@ private IterOutcome doWork(BatchStatusWrappper batchStatus, boolean newSchema) t
     batchStatus.recordsProcessed += recordCount;
     batchMemoryManager.updateOutgoingStats(recordCount);
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
-    }
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
 
     if (callBack.getSchemaChangedAndReset()) {
       return IterOutcome.OK_NEW_SCHEMA;
@@ -368,8 +369,9 @@ public boolean hasNext() {
         if (topStatus.prefetched) {
           topStatus.prefetched = false;
           batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
-          logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex == 0 ? "left" : "right",
-            batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex));
+          RecordBatchStats.logRecordBatchStats(topStatus.inputIndex == 0 ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT,
+            batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex),
+            getRecordBatchStatsContext());
           return Pair.of(topStatus.outcome, topStatus);
         } else {
 
@@ -387,8 +389,9 @@ public boolean hasNext() {
             topStatus.recordsProcessed = 0;
             topStatus.totalRecordsToProcess = topStatus.batch.getRecordCount();
             batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
-            logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex == 0 ? "left" : "right",
-              batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex));
+            RecordBatchStats.logRecordBatchStats(topStatus.inputIndex == 0 ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT,
+              batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex),
+              getRecordBatchStatsContext());
             return Pair.of(outcome, topStatus);
           case OUT_OF_MEMORY:
           case STOP:
@@ -421,19 +424,20 @@ public void close() {
     super.close();
     updateBatchMemoryManagerStats();
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate left: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
 
-      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate right: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
 
-      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
-        batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
-    }
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "outgoing aggregate: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
+      batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
   }
 
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index d58d242c78e..5f6396731a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -38,6 +38,8 @@
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
@@ -121,10 +123,7 @@ public void update() {
       // Limit to lower bound of total number of rows possible for this batch
       // i.e. all rows fit within memory budget.
       setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount()));
-
-      if (logger.isDebugEnabled()) {
-        logger.debug("BATCH_STATS, incoming:\n {}", getRecordBatchSizer());
-      }
+      RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), getRecordBatchStatsContext());
 
       updateIncomingStats();
     }
@@ -306,9 +305,7 @@ protected IterOutcome doWork() {
     // entire incoming recods has been unnested. If the entire records has been
     // unnested, we return EMIT and any blocking operators in the pipeline will
     // unblock.
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
-    }
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
     return hasRemainder ? IterOutcome.OK : IterOutcome.EMIT;
   }
 
@@ -420,14 +417,15 @@ private void updateStats() {
     stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, memoryManager.getAvgOutputRowWidth());
     stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, memoryManager.getTotalOutputRecords());
 
-    logger.debug("BATCH_STATS, incoming aggregate: batch count : {}, avg batch bytes : {},  avg row bytes : {}, record count : {}",
-        memoryManager.getNumIncomingBatches(), memoryManager.getAvgInputBatchSize(),
-        memoryManager.getAvgInputRowWidth(), memoryManager.getTotalInputRecords());
-
-    logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg batch bytes : {},  avg row bytes : {}, record count : {}",
-        memoryManager.getNumOutgoingBatches(), memoryManager.getAvgOutputBatchSize(),
-        memoryManager.getAvgOutputRowWidth(), memoryManager.getTotalOutputRecords());
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate: batch count : %d, avg batch bytes : %d,  avg row bytes : %d, record count : %d",
+      memoryManager.getNumIncomingBatches(), memoryManager.getAvgInputBatchSize(),
+      memoryManager.getAvgInputRowWidth(), memoryManager.getTotalInputRecords());
 
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "outgoing aggregate: batch count : %d, avg batch bytes : %d,  avg row bytes : %d, record count : %d",
+      memoryManager.getNumOutgoingBatches(), memoryManager.getAvgOutputBatchSize(),
+      memoryManager.getAvgOutputRowWidth(), memoryManager.getTotalOutputRecords());
   }
 
   private TypedFieldId checkAndGetUnnestFieldId() throws SchemaChangeException {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index dfa1424ca75..c38de2d2b35 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -33,6 +33,7 @@
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
 
 public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements CloseableRecordBatch {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass());
@@ -41,6 +42,7 @@
   protected final T popConfig;
   protected final FragmentContext context;
   protected final OperatorContext oContext;
+  protected final RecordBatchStatsContext batchStatsContext;
   protected final OperatorStats stats;
   protected final boolean unionTypeEnabled;
   protected BatchState state;
@@ -58,6 +60,7 @@ protected AbstractRecordBatch(final T popConfig, final FragmentContext context,
     this.context = context;
     this.popConfig = popConfig;
     this.oContext = oContext;
+    this.batchStatsContext = new RecordBatchStatsContext(context, oContext);
     stats = oContext.getStats();
     container = new VectorContainer(this.oContext.getAllocator());
     if (buildSchema) {
@@ -241,4 +244,12 @@ public VectorContainer getOutgoingContainer() {
   public VectorContainer getContainer() {
     return  container;
   }
+
+  public RecordBatchStatsContext getRecordBatchStatsContext() {
+    return batchStatsContext;
+  }
+
+  public boolean isRecordBatchStatsLoggingEnabled() {
+    return batchStatsContext.isEnableBatchSzLogging();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
index 4a0e1e81ea6..14b31327cce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
@@ -85,10 +85,8 @@ static RecordOverflowContainer serialize(List<FieldOverflowEntry> fieldOverflowE
     // Allocate the required memory to serialize the overflow fields
     final DrillBuf buffer = allocator.buffer(bufferLength);
 
-    if (batchStatsContext.isEnableBatchSzLogging()) {
-      final String msg = String.format("Allocated a buffer of length [%d] to handle overflow", bufferLength);
-      RecordBatchStats.logRecordBatchStats(msg, batchStatsContext);
-    }
+    RecordBatchStats.logRecordBatchStats(batchStatsContext,
+      "Allocated a buffer of length [%d] to handle overflow", bufferLength);
 
     // Create the result object
     final RecordOverflowContainer recordOverflowContainer = new RecordOverflowContainer();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
index 0b242440929..c61607ef5da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.util.record;
 
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -134,25 +135,79 @@ private boolean isBatchStatsEnabledForOperator(FragmentContext context, Operator
     }
   }
 
+  /** Indicates whether a record batch is Input or Output */
+  public enum RecordBatchIOType {
+    INPUT ("incoming"),
+    INPUT_RIGHT ("incoming right"),
+    INPUT_LEFT ("incoming left"),
+    OUTPUT ("outgoing"),
+    PASSTHROUGH ("passthrough");
+
+    private final String ioTypeString;
+
+    private RecordBatchIOType(String ioTypeString) {
+      this.ioTypeString = ioTypeString;
+    }
+
+    /**
+     * @return IO Type string
+     */
+    public String getIOTypeString() {
+      return ioTypeString;
+    }
+  }
+
+  /**
+   * @see {@link RecordBatchStats#logRecordBatchStats(IOType, String, RecordBatchSizer, RecordBatchStatsContext)}
+   */
+  public static void logRecordBatchStats(RecordBatchIOType ioType,
+    String sourceId,
+    RecordBatch recordBatch,
+    RecordBatchStatsContext batchStatsContext) {
+
+    if (!batchStatsContext.isEnableBatchSzLogging()) {
+      return; // NOOP
+    }
+
+    logRecordBatchStats(ioType, sourceId, new RecordBatchSizer(recordBatch), batchStatsContext);
+  }
+
+  /**
+   * @see {@link RecordBatchStats#logRecordBatchStats(IOType, String, RecordBatchSizer, RecordBatchStatsContext)}
+   */
+  public static void logRecordBatchStats(RecordBatchIOType ioType,
+    RecordBatch recordBatch,
+    RecordBatchStatsContext batchStatsContext) {
+
+    if (!batchStatsContext.isEnableBatchSzLogging()) {
+      return; // NOOP
+    }
+
+    logRecordBatchStats(ioType, null, new RecordBatchSizer(recordBatch), batchStatsContext);
+  }
+
   /**
-   * @see {@link RecordBatchStats#logRecordBatchStats(String, RecordBatch, RecordBatchStatsContext)}
+   * @see {@link RecordBatchStats#logRecordBatchStats(RecordBatchIOType, String, RecordBatchSizer, RecordBatchStatsContext)}
    */
-  public static void logRecordBatchStats(RecordBatch recordBatch,
+  public static void logRecordBatchStats(RecordBatchIOType ioType,
+    RecordBatchSizer recordBatchSizer,
     RecordBatchStatsContext batchStatsContext) {
 
-    logRecordBatchStats(null, recordBatch, batchStatsContext);
+    logRecordBatchStats(ioType, null, recordBatchSizer, batchStatsContext);
   }
 
   /**
    * Logs record batch statistics for the input record batch (logging happens only
    * when record statistics logging is enabled).
    *
+   * @param ioType whether a record batch is an input or/and output
    * @param sourceId optional source identifier for scanners
-   * @param recordBatch a set of records
+   * @param batchSizer contains batch sizing information
    * @param batchStatsContext batch stats context object
    */
-  public static void logRecordBatchStats(String sourceId,
-    RecordBatch recordBatch,
+  public static void logRecordBatchStats(RecordBatchIOType ioType,
+    String sourceId,
+    RecordBatchSizer batchSizer,
     RecordBatchStatsContext batchStatsContext) {
 
     if (!batchStatsContext.isEnableBatchSzLogging()) {
@@ -161,7 +216,7 @@ public static void logRecordBatchStats(String sourceId,
 
     final String statsId = batchStatsContext.getContextOperatorId();
     final boolean verbose = batchStatsContext.isEnableFgBatchSzLogging();
-    final String msg = printRecordBatchStats(statsId, sourceId, recordBatch, verbose);
+    final String msg = printRecordBatchStats(statsId, ioType, sourceId, batchSizer, verbose);
 
     logBatchStatsMsg(batchStatsContext, msg, false);
   }
@@ -170,7 +225,6 @@ public static void logRecordBatchStats(String sourceId,
    * Logs a generic batch statistics message
    *
    * @param message log message
-   * @param batchStatsLogging
    * @param batchStatsContext batch stats context object
    */
   public static void logRecordBatchStats(String message,
@@ -183,6 +237,25 @@ public static void logRecordBatchStats(String message,
     logBatchStatsMsg(batchStatsContext, message, true);
   }
 
+  /**
+   * Logs a generic batch statistics message
+   *
+   * @param batchStatsContext batch stats context object
+   * @param format a string format as in {@link String#format} method
+   * @param args format's arguments
+   */
+  public static void logRecordBatchStats(RecordBatchStatsContext batchStatsContext,
+    String format,
+    Object...args) {
+
+    if (!batchStatsContext.isEnableBatchSzLogging()) {
+      return; // NOOP
+    }
+
+    final String message = String.format(format, args);
+    logBatchStatsMsg(batchStatsContext, message, true);
+  }
+
   /**
    * @param allocator dumps allocator statistics
    * @return string with allocator statistics
@@ -212,27 +285,30 @@ private RecordBatchStats() {
    * Constructs record batch statistics for the input record batch
    *
    * @param stats instance identifier
+   * @param ioType whether a record batch is an input or/and output
    * @param sourceId optional source identifier for scanners
-   * @param recordBatch a set of records
+   * @param batchSizer contains batch sizing information
    * @param verbose whether to include fine-grained stats
    *
    * @return a string containing the record batch statistics
    */
   private static String printRecordBatchStats(String statsId,
+    RecordBatchIOType ioType,
     String sourceId,
-    RecordBatch recordBatch,
+    RecordBatchSizer batchSizer,
     boolean verbose) {
 
-    final RecordBatchSizer batchSizer = new RecordBatchSizer(recordBatch);
     final StringBuilder msg = new StringBuilder();
 
     msg.append(BATCH_STATS_PREFIX);
-    msg.append(" Originator: {");
+    msg.append(" Operator: {");
     msg.append(statsId);
     if (sourceId != null) {
       msg.append(':');
       msg.append(sourceId);
     }
+    msg.append("}, IO Type: {");
+    msg.append(toString(ioType));
     msg.append("}, Batch size: {");
     msg.append( "  Records: " );
     msg.append(batchSizer.rowCount());
@@ -269,7 +345,8 @@ private static void logBatchStatsMsg(RecordBatchStatsContext batchStatsContext,
     boolean includePrefix) {
 
     if (includePrefix) {
-      msg = BATCH_STATS_PREFIX + '\t' + msg;
+      final String statsId = batchStatsContext.getContextOperatorId();
+      msg = BATCH_STATS_PREFIX + " Operator: {" + statsId + "} " + msg;
     }
 
     if (batchStatsContext.useInfoLevelLogging()) {
@@ -279,4 +356,19 @@ private static void logBatchStatsMsg(RecordBatchStatsContext batchStatsContext,
     }
   }
 
+  private static String toString(RecordBatchIOType ioType) {
+    Preconditions.checkNotNull(ioType, "The record batch IO type cannot be null");
+
+    switch (ioType) {
+      case INPUT: return "incoming";
+      case INPUT_RIGHT: return "incoming right";
+      case INPUT_LEFT: return "incoming left";
+      case OUTPUT: return "outgoing";
+      case PASSTHROUGH: return "passthrough";
+
+      default: throw new RuntimeException("Unexpected record batch IO type..");
+    }
+
+  }
+
 }
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message