From commits-return-5299-apmail-drill-commits-archive=drill.apache.org@drill.apache.org Wed Nov 22 22:52:07 2017 Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7A37910CCA for ; Wed, 22 Nov 2017 22:52:07 +0000 (UTC) Received: (qmail 33619 invoked by uid 500); 22 Nov 2017 22:52:06 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 32674 invoked by uid 500); 22 Nov 2017 22:52:06 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 32280 invoked by uid 99); 22 Nov 2017 22:52:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Nov 2017 22:52:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 938E7F5F9C; Wed, 22 Nov 2017 22:52:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: parthc@apache.org To: commits@drill.apache.org Date: Wed, 22 Nov 2017 22:52:08 -0000 Message-Id: <61a8ce09bddc45499da0601870fd7549@git.apache.org> In-Reply-To: <6b7b21ddbc8b44d4b56c35faf668e64c@git.apache.org> References: <6b7b21ddbc8b44d4b56c35faf668e64c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/12] drill git commit: DRILL-5936: Refactor MergingRecordBatch based on code inspection DRILL-5936: Refactor MergingRecordBatch based on code inspection This closes #1025 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/36abdd79 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/36abdd79 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/36abdd79 Branch: refs/heads/master Commit: 36abdd79fe57a596dc1c508306acefdd6b3163ea Parents: 23e6565 Author: Vlad Rozov Authored: Mon Nov 6 17:55:56 2017 -0800 Committer: Parth Chandra Committed: Wed Nov 22 10:35:06 2017 -0800 ---------------------------------------------------------------------- .../impl/mergereceiver/MergingRecordBatch.java | 41 +++++++++----------- 1 file changed, 18 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/36abdd79/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index a7d3f39..f9ceff2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -101,7 +101,7 @@ public class MergingRecordBatch extends AbstractRecordBatch private MergingReceiverGeneratorBase merger; private final MergingReceiverPOP config; private boolean hasRun = false; - private boolean prevBatchWasFull = false; + private boolean outgoingBatchHasSpace = true; private boolean hasMoreIncoming = true; private int outgoingPosition = 0; @@ -177,11 +177,11 @@ public class MergingRecordBatch extends AbstractRecordBatch } boolean schemaChanged = false; - if (prevBatchWasFull) { + if (!outgoingBatchHasSpace) { logger.debug("Outgoing vectors were full on last iteration"); allocateOutgoing(); outgoingPosition = 0; - prevBatchWasFull = false; + outgoingBatchHasSpace = true; } if (!hasMoreIncoming) { @@ -398,14 +398,13 @@ public class MergingRecordBatch extends AbstractRecordBatch // finished lazy initialization } - while (!pqueue.isEmpty()) { - // pop next value from pq and copy to outgoing batch - final Node node = pqueue.peek(); - if (!copyRecordToOutgoingBatch(node)) { - logger.debug("Outgoing vectors space is full; breaking"); - prevBatchWasFull = true; + while (outgoingBatchHasSpace) { + // poll next value from pq and copy to outgoing batch + final Node node = pqueue.poll(); + if (node == null) { + break; } - pqueue.poll(); + outgoingBatchHasSpace = copyRecordToOutgoingBatch(node); if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) { // reached the end of an incoming record batch @@ -448,11 +447,7 @@ public class MergingRecordBatch extends AbstractRecordBatch // this batch is empty; since the pqueue no longer references this batch, it will be // ignored in subsequent iterations. - if (prevBatchWasFull) { - break; - } else { - continue; - } + continue; } final UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef(); @@ -469,15 +464,13 @@ public class MergingRecordBatch extends AbstractRecordBatch // add front value from batch[x] to priority queue if (batchLoaders[node.batchId].getRecordCount() != 0) { - pqueue.add(new Node(node.batchId, 0)); + node.valueIndex = 0; + pqueue.add(node); } } else { - pqueue.add(new Node(node.batchId, node.valueIndex + 1)); - } - - if (prevBatchWasFull) { - break; + node.valueIndex++; + pqueue.add(node); } } @@ -786,6 +779,8 @@ public class MergingRecordBatch extends AbstractRecordBatch * @param node Reference to the next record to copy from the incoming batches */ private boolean copyRecordToOutgoingBatch(final Node node) { + assert outgoingPosition < OUTGOING_BATCH_SIZE + : String.format("Outgoing position %d must be less than bath size %d", outgoingPosition, OUTGOING_BATCH_SIZE); assert ++outputCounts[node.batchId] <= inputCounts[node.batchId] : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]); final int inIndex = (node.batchId << 16) + node.valueIndex; @@ -794,8 +789,8 @@ public class MergingRecordBatch extends AbstractRecordBatch } catch (SchemaChangeException e) { throw new UnsupportedOperationException(e); } - outgoingPosition++; - if (outgoingPosition == OUTGOING_BATCH_SIZE) { + if (++outgoingPosition == OUTGOING_BATCH_SIZE) { + logger.debug("Outgoing vectors space is full (batch size {}).", OUTGOING_BATCH_SIZE); return false; } return true;