drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sor...@apache.org
Subject [drill] 03/03: DRILL-6446: Support for EMIT outcome in TopN - Added comments for TopNBatch and PriorityQueueTemplate - Adding support for SchemaChange across next() call with HyperVector in incoming container. This is achieved by adding a new method in HyperVectorWrapper which just updates the vector[] array holding multiple vectors with provided input ValueVector array. And also modifying RemovingRecordBatch GenericSV4Copier to hold reference to VectorWrapper instead of ValueVector[] for each colum [...]
Date Thu, 07 Jun 2018 23:14:10 GMT
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 0b393ad628bfe29ce2ced2d5614db4bab1e4b76f
Author: Sorabh Hamirwasia <shamirwasia@maprtech.com>
AuthorDate: Tue May 8 19:09:38 2018 -0700

    DRILL-6446: Support for EMIT outcome in TopN
    	- Added comments for TopNBatch and PriorityQueueTemplate
    	- Adding support for SchemaChange across next() call with HyperVector in incoming container. This is achieved by adding a new method in HyperVectorWrapper which just updates the vector[] array
    	  holding multiple vectors with provided input ValueVector array. And also modifying RemovingRecordBatch GenericSV4Copier to hold reference to VectorWrapper instead of ValueVector[] for each column in incoming batch
    	- Handling empty batches. Two cases like empty batches in the begining with EMIT outcome and empty batches between consecutive EMIT outcome but after receiving some batches with data and EMIT outcome.
      		Note: In first case of empty batch it was only returning EMIT outcome without properly creating the output container and SV4 vector. Because of that there could be a case where let's say first batch with EMIT outcome 		is empty then TopN will return an empty batch with SV mode NONE and if later batch comes with some records and EMIT outcome, that will generate output batch with OK_NEW_SCHEMA (since TopN always
      		generate first output batch with records with OK_NEW_SCHEMA as it returns output with SV4 mode). Also let's consider both batch with EMIT outcome were produced after processing first 2 rows of an input batch.
    		This is a problem as this is simulating schema change across rows of same incoming batch which will never be the case.
    
    		Note: In second case of empty batches priority queue will not be null and will be uninitialized. Also optimize to send EMIT outcome with output batch which has all the data to return for current iteration
                    rather than sending it with OK followed by empty batch with EMIT outcome.
    
    closes #1293
---
 .../exec/physical/impl/TopN/PriorityQueue.java     |   9 +
 .../physical/impl/TopN/PriorityQueueTemplate.java  |  70 +-
 .../drill/exec/physical/impl/TopN/TopNBatch.java   | 362 +++++++++--
 .../physical/impl/svremover/AbstractSV2Copier.java |   1 -
 .../physical/impl/svremover/AbstractSV4Copier.java |  11 +-
 .../physical/impl/svremover/GenericSV4Copier.java  |   3 +-
 .../drill/exec/record/HyperVectorWrapper.java      |  22 +-
 .../exec/record/selection/SelectionVector4.java    |  25 +-
 .../physical/impl/TopN/TestTopNEmitOutcome.java    | 710 +++++++++++++++++++++
 .../impl/lateraljoin/TestE2EUnnestAndLateral.java  |  34 +
 10 files changed, 1167 insertions(+), 80 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
index e398f47..3744b94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
@@ -66,6 +66,13 @@ public interface PriorityQueue {
    */
   SelectionVector4 getFinalSv4();
 
+  /**
+   * Cleanup the old state of queue and recreate a new one with HyperContainer containing vectors in input container
+   * and the corresponding indexes (in SV4 format) from input SelectionVector4
+   * @param container
+   * @param vector4
+   * @throws SchemaChangeException
+   */
   void resetQueue(VectorContainer container, SelectionVector4 vector4) throws SchemaChangeException;
 
   /**
@@ -73,5 +80,7 @@ public interface PriorityQueue {
    */
   void cleanup();
 
+  boolean isInitialized();
+
   TemplateClassDefinition<PriorityQueue> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(PriorityQueue.class, PriorityQueueTemplate.class);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
index 97e26b6..e39dce3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
@@ -40,10 +40,18 @@ import com.google.common.base.Stopwatch;
 public abstract class PriorityQueueTemplate implements PriorityQueue {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueTemplate.class);
 
-  private SelectionVector4 heapSv4; //This holds the heap
+  // This holds the min heap of the record indexes. Heapify condition is based on actual record though. Only records
+  // meeting the heap condition have their indexes in this heap. Actual record are stored inside the hyperBatch. Since
+  // hyperBatch contains ValueVectors from all the incoming batches, the indexes here consider both BatchNumber and
+  // RecordNumber.
+  private SelectionVector4 heapSv4;
   private SelectionVector4 finalSv4; //This is for final sorted output
+
+  // This stores the actual incoming record batches
   private ExpandableHyperContainer hyperBatch;
   private BufferAllocator allocator;
+
+  // Limit determines the number of record to output and hold in queue.
   private int limit;
   private int queueSize = 0;
   private int batchCount = 0;
@@ -54,7 +62,11 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
     this.limit = limit;
     this.allocator = allocator;
     @SuppressWarnings("resource")
+    // It's allocating memory to store (limit+1) indexes. When first limit number of record indexes are stored then all
+    // the other record indexes are kept at (limit+1) and evaluated with the root element of heap to determine if
+    // this new element will reside in heap or not.
     final DrillBuf drillBuf = allocator.buffer(4 * (limit + 1));
+    // Heap is a SelectionVector4 since it stores indexes for record relative to their batches.
     heapSv4 = new SelectionVector4(drillBuf, limit, Character.MAX_VALUE);
     this.hasSv2 = hasSv2;
   }
@@ -103,11 +115,17 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
     if (hasSv2) {
       sv2 = batch.getSv2();
     }
+    // Will only be called until queueSize has reached the limit which means it has seen limit number of records in
+    // one or many batches. For each new record siftUp (or heapify) to adjust min heap property is called.
     for (; queueSize < limit && count < batch.getRecordCount();  count++) {
       heapSv4.set(queueSize, batchCount, hasSv2 ? sv2.getIndex(count) : count);
       queueSize++;
       siftUp();
     }
+
+    // For all the other records which fall beyond limit, it compares them with the root element in the current heap
+    // and perform heapify if need be. Note: Even though heapSv4 stores only limit+1 indexes but in hyper batch we
+    // are still keeping all the records unless purge is called.
     for (; count < batch.getRecordCount(); count++) {
       heapSv4.set(limit, batchCount, hasSv2 ? sv2.getIndex(count) : count);
       if (compare(limit, 0) < 0) {
@@ -153,15 +171,35 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
   public void cleanup() {
     if (heapSv4 != null) {
       heapSv4.clear();
+      heapSv4 = null;
     }
     if (hyperBatch != null) {
       hyperBatch.clear();
+      hyperBatch = null;
     }
     if (finalSv4 != null) {
       finalSv4.clear();
+      finalSv4 = null;
     }
+    batchCount = 0;
   }
 
+  /**
+   * When cleanup is called then heapSv4 is cleared and set to null and is only initialized during init call. Hence
+   * this is used to determine if priority queue is initialized or not.
+   * @return - true - queue is still initialized
+   *           false - queue is not yet initialized and before using queue init should be called
+   */
+  public boolean isInitialized() {
+    return (heapSv4 != null);
+  }
+
+  /**
+   * Perform Heapify for the record stored at index which was added as leaf node in the array. The new record is
+   * compared with the record stored at parent index. Since the new record index will flow up in the array hence the
+   * name siftUp
+   * @throws SchemaChangeException
+   */
   private void siftUp() throws SchemaChangeException {
     int p = queueSize - 1;
     while (p > 0) {
@@ -174,6 +212,14 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
     }
   }
 
+  /**
+   * Compares the record stored at the index of 0th index element of heapSv4 (or root element) with the record
+   * stored at index of limit index element of heapSv4 (or new element). If the root element is greater than new element
+   * then new element is discarded else root element is replaced with new element and again heapify is performed on
+   * new root element.
+   * This is done for all the records which are seen after the queue is filled with limit number of record indexes.
+   * @throws SchemaChangeException
+   */
   private void siftDown() throws SchemaChangeException {
     int p = 0;
     int next;
@@ -196,6 +242,12 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
     }
   }
 
+  /**
+   * Pop the root element which holds the minimum value in heap. In this case root element will be the index of
+   * record with minimum value. After extracting the root element it swaps the root element with last element in
+   * heapSv4 and does heapify (by calling siftDown) again.
+   * @return - Index for
+   */
   public int pop() {
     int value = heapSv4.get(0);
     swap(0, queueSize - 1);
@@ -220,9 +272,25 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
     return doEval(sv1, sv2);
   }
 
+  /**
+   * Stores the reference to the hyperBatch container which holds all the records across incoming batches in it. This
+   * is used in doEval function to compare records in this hyper batch at given indexes.
+   * @param incoming - reference to hyperBatch
+   * @param outgoing - null
+   * @throws SchemaChangeException
+   */
   public abstract void doSetup(@Named("incoming") VectorContainer incoming,
                                @Named("outgoing") RecordBatch outgoing)
                        throws SchemaChangeException;
+
+  /**
+   * Evaluates the value of record at leftIndex and rightIndex w.r.t min heap condition. It is used in
+   * {@link PriorityQueueTemplate#compare(int, int)} method while Heapifying the queue.
+   * @param leftIndex
+   * @param rightIndex
+   * @return
+   * @throws SchemaChangeException
+   */
   public abstract int doEval(@Named("leftIndex") int leftIndex,
                              @Named("rightIndex") int rightIndex)
                       throws SchemaChangeException;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 366e4e8..a8c6804 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -52,6 +52,7 @@ import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
+import org.apache.drill.exec.record.HyperVectorWrapper;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.SimpleRecordBatch;
@@ -69,6 +70,16 @@ import com.google.common.base.Stopwatch;
 import com.sun.codemodel.JConditional;
 import com.sun.codemodel.JExpr;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
+/**
+ * Operator Batch which implements the TopN functionality. It is more efficient than (sort + limit) since unlike sort
+ * it doesn't have to store all the input data to sort it first and then apply limit on the sorted data. Instead
+ * internally it maintains a priority queue backed by a heap with the size being same as limit value.
+ */
 public class TopNBatch extends AbstractRecordBatch<TopN> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopNBatch.class);
 
@@ -84,12 +95,15 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   private boolean schemaChanged = false;
   private PriorityQueue priorityQueue;
   private TopN config;
-  SelectionVector4 sv4;
+  private SelectionVector4 sv4;
   private long countSincePurge;
   private int batchCount;
   private Copier copier;
   private boolean first = true;
   private int recordCount = 0;
+  private IterOutcome lastKnownOutcome = OK;
+  private boolean firstBatchForSchema = true;
+  private boolean hasOutputRecords = false;
 
   public TopNBatch(TopN popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context);
@@ -117,12 +131,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
 
   @Override
   public void close() {
-    if (sv4 != null) {
-      sv4.clear();
-    }
-    if (priorityQueue != null) {
-      priorityQueue.cleanup();
-    }
+    releaseResource();
     super.close();
   }
 
@@ -134,6 +143,9 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
       case OK:
       case OK_NEW_SCHEMA:
         for (VectorWrapper<?> w : incoming) {
+          // TODO: Not sure why the special handling for AbstractContainerVector is needed since creation of child
+          // vectors is taken care correctly if the field is retrieved from incoming vector and passed to it rather than
+          // creating a new Field instance just based on name and type.
           @SuppressWarnings("resource")
           ValueVector v = c.addOrGet(w.getField());
           if (v instanceof AbstractContainerVector) {
@@ -162,8 +174,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
         return;
       case NONE:
         state = BatchState.DONE;
+      case EMIT:
+        throw new IllegalStateException("Unexpected EMIT outcome received in buildSchema phase");
       default:
-        return;
+        throw new IllegalStateException("Unexpected outcome received in buildSchema phase");
     }
   }
 
@@ -171,47 +185,69 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   public IterOutcome innerNext() {
     recordCount = 0;
     if (state == BatchState.DONE) {
-      return IterOutcome.NONE;
+      return NONE;
     }
-    if (schema != null) {
-      if (getSelectionVector4().next()) {
-        recordCount = sv4.getCount();
-        return IterOutcome.OK;
-      } else {
-        recordCount = 0;
-        return IterOutcome.NONE;
-      }
+
+    // Check if anything is remaining from previous record boundary
+    if (hasOutputRecords) {
+      return handleRemainingOutput();
     }
 
+    // Reset the TopN state for next iteration
+    resetTopNState();
+
     try{
+      boolean incomingHasSv2 = false;
+      switch (incoming.getSchema().getSelectionVectorMode()) {
+        case NONE: {
+          break;
+        }
+        case TWO_BYTE: {
+          incomingHasSv2 = true;
+          break;
+        }
+        case FOUR_BYTE: {
+          throw new SchemaChangeException("TopN doesn't support incoming with SV4 mode");
+        }
+        default:
+          throw new UnsupportedOperationException("Unsupported SV mode detected in TopN incoming batch");
+      }
+
       outer: while (true) {
         Stopwatch watch = Stopwatch.createStarted();
-        IterOutcome upstream;
         if (first) {
-          upstream = IterOutcome.OK_NEW_SCHEMA;
+          lastKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
+          // Create the SV4 object upfront to be used for both empty and non-empty incoming batches at EMIT boundary
+          sv4 = new SelectionVector4(context.getAllocator(), 0);
           first = false;
         } else {
-          upstream = next(incoming);
+          lastKnownOutcome = next(incoming);
         }
-        if (upstream == IterOutcome.OK && schema == null) {
-          upstream = IterOutcome.OK_NEW_SCHEMA;
+        if (lastKnownOutcome == OK && schema == null) {
+          lastKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
           container.clear();
         }
         logger.debug("Took {} us to get next", watch.elapsed(TimeUnit.MICROSECONDS));
-        switch (upstream) {
+        switch (lastKnownOutcome) {
         case NONE:
           break outer;
         case NOT_YET:
           throw new UnsupportedOperationException();
         case OUT_OF_MEMORY:
         case STOP:
-          return upstream;
+          return lastKnownOutcome;
         case OK_NEW_SCHEMA:
           // only change in the case that the schema truly changes.  Artificial schema changes are ignored.
+          // schema change handling in case when EMIT is also seen is same as without EMIT. i.e. only if union type
+          // is enabled it will be handled.
+          container.clear();
+          firstBatchForSchema = true;
           if (!incoming.getSchema().equals(schema)) {
             if (schema != null) {
               if (!unionTypeEnabled) {
-                throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+                throw new UnsupportedOperationException(String.format("TopN currently doesn't support changing " +
+                  "schemas with union type disabled. Please try enabling union type: %s and re-execute the query",
+                  ExecConstants.ENABLE_UNION_TYPE_KEY));
               } else {
                 this.schema = SchemaUtil.mergeSchemas(this.schema, incoming.getSchema());
                 purgeAndResetPriorityQueue();
@@ -223,10 +259,15 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
           }
           // fall through.
         case OK:
+        case EMIT:
           if (incoming.getRecordCount() == 0) {
             for (VectorWrapper<?> w : incoming) {
               w.clear();
             }
+            // Release memory for incoming SV2 vector
+            if (incomingHasSv2) {
+              incoming.getSelectionVector2().clear();
+            }
             break;
           }
           countSincePurge += incoming.getRecordCount();
@@ -240,10 +281,16 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
           boolean success = false;
           try {
             if (priorityQueue == null) {
-              assert !schemaChanged;
               priorityQueue = createNewPriorityQueue(new ExpandableHyperContainer(batch.getContainer()), config.getLimit());
+            } else if (!priorityQueue.isInitialized()) {
+              // means priority queue is cleaned up after producing output for first record boundary. We should
+              // initialize it for next record boundary
+              priorityQueue.init(config.getLimit(), oContext.getAllocator(),
+                schema.getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
             }
             priorityQueue.add(batch);
+            // Based on static threshold of number of batches, perform purge operation to release the memory for
+            // RecordBatches which are of no use or doesn't fall under TopN category
             if (countSincePurge > config.getLimit() && batchCount > batchPurgeThreshold) {
               purge();
               countSincePurge = 0;
@@ -259,25 +306,29 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
         default:
           throw new UnsupportedOperationException();
         }
+
+        // If the last seen outcome is EMIT then break the loop. We do it here since we want to process the batch
+        // with records and EMIT outcome in above case statements
+        if (lastKnownOutcome == EMIT) {
+          break;
+        }
       }
 
-      if (schema == null || priorityQueue == null) {
+      // PriorityQueue can be null here if first batch is received with OK_NEW_SCHEMA and is empty and second next()
+      // call returned NONE or EMIT.
+      // PriorityQueue can be uninitialized here if only empty batch is received between 2 EMIT outcome.
+      if (schema == null || (priorityQueue == null || !priorityQueue.isInitialized())) {
         // builder may be null at this point if the first incoming batch is empty
-        state = BatchState.DONE;
-        return IterOutcome.NONE;
+        return handleEmptyBatches(lastKnownOutcome);
       }
 
       priorityQueue.generate();
+      prepareOutputContainer(priorityQueue.getHyperBatch(), priorityQueue.getFinalSv4());
 
-      this.sv4 = priorityQueue.getFinalSv4();
-      container.clear();
-      for (VectorWrapper<?> w : priorityQueue.getHyperBatch()) {
-        container.add(w.getValueVectors());
-      }
-      container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
-      recordCount = sv4.getCount();
-      return IterOutcome.OK_NEW_SCHEMA;
-
+      // With EMIT outcome control will come here multiple times whereas without EMIT outcome control will only come
+      // here once. In EMIT outcome case if there is schema change in any iteration then that will be handled by
+      // lastKnownOutcome.
+      return getFinalOutcome();
     } catch(SchemaChangeException | ClassTransformationException | IOException ex) {
       kill(false);
       logger.error("Failure during query", ex);
@@ -286,14 +337,27 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     }
   }
 
+  /**
+   * When PriorityQueue is built up then it stores the list of limit number of record indexes (in heapSv4) which falls
+   * under TopN category. But it also stores all the incoming RecordBatches with all records inside a HyperContainer
+   * (hyperBatch). When a certain threshold of batches are reached then this method is called which copies the limit
+   * number of records whose indexes are stored in heapSv4 out of HyperBatch to a new VectorContainer and releases
+   * all other records and their batches. Later this new VectorContainer is stored inside the HyperBatch and it's
+   * corresponding indexes are stored in the heapSv4 vector. This is done to avoid holding up lot's of Record Batches
+   * which can create OutOfMemory condition.
+   * @throws SchemaChangeException
+   */
   private void purge() throws SchemaChangeException {
     Stopwatch watch = Stopwatch.createStarted();
     VectorContainer c = priorityQueue.getHyperBatch();
+
+    // Simple VectorConatiner which stores limit number of records only. The records whose indexes are stored inside
+    // selectionVector4 below are only copied from Hyper container to this simple container.
     VectorContainer newContainer = new VectorContainer(oContext);
     @SuppressWarnings("resource")
+    // SV4 storing the limit number of indexes
     SelectionVector4 selectionVector4 = priorityQueue.getSv4();
     SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
-    SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context);
     if (copier == null) {
       copier = GenericSV4Copier.createCopier(batch, newContainer, null);
     } else {
@@ -308,25 +372,14 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     @SuppressWarnings("resource")
     SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator());
     try {
-      do {
-        int count = selectionVector4.getCount();
-        int copiedRecords = copier.copyRecords(0, count);
-        assert copiedRecords == count;
-        for (VectorWrapper<?> v : newContainer) {
-          ValueVector.Mutator m = v.getValueVector().getMutator();
-          m.setValueCount(count);
-        }
-        newContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-        newContainer.setRecordCount(count);
-        builder.add(newBatch);
-      } while (selectionVector4.next());
-      selectionVector4.clear();
-      c.clear();
+      // Purge all the existing batches to a new batch which only holds the selected records
+      copyToPurge(newContainer, builder);
+      // New VectorContainer that contains only limit number of records and is later passed to resetQueue to create a
+      // HyperContainer backing the priority queue out of it
       VectorContainer newQueue = new VectorContainer();
       builder.build(newQueue);
       priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent());
       builder.getSv4().clear();
-      selectionVector4.clear();
     } finally {
       DrillAutoCloseables.closeNoChecked(builder);
     }
@@ -414,25 +467,12 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     @SuppressWarnings("resource")
     final SelectionVector4 selectionVector4 = priorityQueue.getSv4();
     final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
-    final SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context);
     copier = GenericSV4Copier.createCopier(batch, newContainer, null);
     @SuppressWarnings("resource")
     SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator());
     try {
-      do {
-        final int count = selectionVector4.getCount();
-        final int copiedRecords = copier.copyRecords(0, count);
-        assert copiedRecords == count;
-        for (VectorWrapper<?> v : newContainer) {
-          ValueVector.Mutator m = v.getValueVector().getMutator();
-          m.setValueCount(count);
-        }
-        newContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-        newContainer.setRecordCount(count);
-        builder.add(newBatch);
-      } while (selectionVector4.next());
-      selectionVector4.clear();
-      c.clear();
+      // Purge all the existing batches to a new batch which only holds the selected records
+      copyToPurge(newContainer, builder);
       final VectorContainer oldSchemaContainer = new VectorContainer(oContext);
       builder.build(oldSchemaContainer);
       oldSchemaContainer.setRecordCount(builder.getSv4().getCount());
@@ -458,6 +498,190 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     incoming.kill(sendUpstream);
   }
 
+  /**
+   * Resets TopNBatch state to process next incoming batches independent of already seen incoming batches.
+   */
+  private void resetTopNState() {
+    lastKnownOutcome = OK;
+    countSincePurge = 0;
+    batchCount = 0;
+    hasOutputRecords = false;
+    releaseResource();
+  }
+
+  /**
+   * Cleanup resources held by TopN Batch such as sv4, priority queue and outgoing container
+   */
+  private void releaseResource() {
+    if (sv4 != null) {
+      sv4.clear();
+    }
+
+    if (priorityQueue != null) {
+      priorityQueue.cleanup();
+    }
+    container.zeroVectors();
+  }
+
+  /**
+   * Returns the final IterOutcome which TopN should return for this next call. Return OK_NEW_SCHEMA with first output
+   * batch after a new schema is seen. This is indicated by firstBatchSchema flag. It is also true for very first
+   * output batch after buildSchema()phase too since in buildSchema() a dummy schema was returned downstream without
+   * correct SelectionVectorMode.
+   * In other cases when there is no schema change then either OK or EMIT is returned with output batches depending upon
+   * if EMIT is seen or not. In cases when EMIT is not seen then OK is always returned with an output batch. When all
+   * the data is returned then NONE is sent in the end.
+   *
+   * @return - IterOutcome - outcome to send downstream
+   */
+  private IterOutcome getFinalOutcome() {
+    IterOutcome outcomeToReturn;
+
+    if (firstBatchForSchema) {
+      outcomeToReturn = OK_NEW_SCHEMA;
+      firstBatchForSchema = false;
+    } else if (recordCount == 0) {
+      // get the outcome to return before calling refresh since that resets the lastKnowOutcome to OK
+      outcomeToReturn = lastKnownOutcome == EMIT ? EMIT : NONE;
+      resetTopNState();
+    } else if (lastKnownOutcome == EMIT) {
+      // in case of EMIT check if this output batch returns all the data or not. If yes then return EMIT along with this
+      // output batch else return OK. Remaining data will be sent downstream in subsequent next() call.
+      final boolean hasMoreRecords = sv4.hasNext();
+      outcomeToReturn = (hasMoreRecords) ? OK : EMIT;
+      hasOutputRecords = hasMoreRecords;
+    } else {
+      outcomeToReturn = OK;
+    }
+
+    return outcomeToReturn;
+  }
+
+  /**
+   * Copies all the selected records into the new container to purge all the incoming batches into a single batch.
+   * @param newContainer - New container holding the ValueVectors with selected records
+   * @param batchBuilder - Builder to build hyper vectors batches
+   * @throws SchemaChangeException
+   */
+  private void copyToPurge(VectorContainer newContainer, SortRecordBatchBuilder batchBuilder)
+    throws SchemaChangeException {
+    final VectorContainer c = priorityQueue.getHyperBatch();
+    final SelectionVector4 queueSv4 = priorityQueue.getSv4();
+    final SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context);
+
+    do {
+      // count is the limit number of records required by TopN batch
+      final int count = queueSv4.getCount();
+      // Transfers count number of records from hyperBatch to simple container
+      final int copiedRecords = copier.copyRecords(0, count);
+      assert copiedRecords == count;
+      for (VectorWrapper<?> v : newContainer) {
+        ValueVector.Mutator m = v.getValueVector().getMutator();
+        m.setValueCount(count);
+      }
+      newContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+      newContainer.setRecordCount(count);
+      // Store all the batches containing limit number of records
+      batchBuilder.add(newBatch);
+    } while (queueSv4.next());
+    // Release the memory stored for the priority queue heap to store indexes
+    queueSv4.clear();
+    // Release the memory from HyperBatch container
+    c.clear();
+  }
+
+  /**
+   * Prepares an output container with batches from Priority Queue for each record boundary. In case when this is the
+   * first batch for the known schema (indicated by true value of firstBatchForSchema) the output container is cleared
+   * and recreated with new HyperVectorWrapper objects and ValueVectors from PriorityQueue. In cases when the schema
+   * has not changed then it prepares the container keeping the VectorWrapper and SV4 references as is since that is
+   * what is needed by downstream operator.
+   */
+  private void prepareOutputContainer(VectorContainer dataContainer, SelectionVector4 dataSv4) {
+    container.zeroVectors();
+    hasOutputRecords = true;
+    // Check if this is the first output batch for the new known schema. If yes then prepare the output container
+    // with the proper vectors, otherwise re-use the previous vectors.
+    if (firstBatchForSchema) {
+      container.clear();
+      for (VectorWrapper<?> w : dataContainer) {
+        container.add(w.getValueVectors());
+      }
+      container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
+      sv4 = dataSv4;
+    } else {
+      // Schema didn't changed so we should keep the reference of HyperVectorWrapper in outgoing container intact and
+      // populate the HyperVectorWrapper with new list of vectors. Here the assumption is order of ValueVectors is same
+      // across multiple record boundary unless a new schema is observed
+      int index = 0;
+      for (VectorWrapper<?> w : dataContainer) {
+        HyperVectorWrapper wrapper = (HyperVectorWrapper<?>) container.getValueVector(index++);
+        wrapper.updateVectorList(w.getValueVectors());
+      }
+      // Since the reference of SV4 is held by downstream operator and there is no schema change, so just copy the
+      // underlying buffer from priority queue sv4.
+      this.sv4.copy(dataSv4);
+    }
+    recordCount = sv4.getCount();
+    container.setRecordCount(recordCount);
+  }
+
+  /**
+   * Method handles returning correct outcome and setting recordCount for output container when next() is called
+   * multiple time for single record boundary. It handles cases when some output was already returned at current record
+   * boundary but Either there is more left to return OR proper outcome with empty batch is left to return.
+   * Example: For first EMIT record boundary if all the records were returned in previous call with OK_NEW_SCHEMA
+   * outcome, then this method will handle returning empty output batch with EMIT outcome in subsequent next() call.
+   * @return - Outcome to return downstream
+   */
+  private IterOutcome handleRemainingOutput() {
+    // if priority queue is not null that means the incoming batches were non-empty. And if there are more records
+    // to send downstream for this record boundary
+    if (priorityQueue != null && sv4.next()) {
+      recordCount = sv4.getCount();
+      container.setRecordCount(recordCount);
+    } else { // This means that either:
+      // 1) Priority Queue was not null and all records have been sent downstream for this record boundary
+      // 2) Or Priority Queue is null, since all the incoming batches were empty for current record boundary (or EMIT
+      // outcome). In the previous call we must have returned OK_NEW_SCHEMA along with SV4 container, so it will
+      // return EMIT outcome now
+      recordCount = 0;
+      container.setRecordCount(0);
+    }
+    return getFinalOutcome();
+  }
+
+  /**
+   * Method to handle preparing output container and returning proper outcome to downstream when either NONE or only
+   * empty batches have been seen but with EMIT outcome. In either of the case PriorityQueue is not created yet since no
+   * actual records have been received so far.
+   * @param incomingOutcome - outcome received from upstream. Either NONE or EMIT
+   * @return - outcome to return downstream. NONE when incomingOutcome is NONE. OK_NEW_SCHEMA/EMIT when incomingOutcome
+   * is EMIT and is first/non-first empty input batch respectively.
+   */
+  private IterOutcome handleEmptyBatches(IterOutcome incomingOutcome) {
+    IterOutcome outcomeToReturn = incomingOutcome;
+
+    // In case of NONE it will change state to DONE and return NONE whereas in case of
+    // EMIT it has to still continue working for future records.
+    if (incomingOutcome == NONE) { // this means we saw NONE
+      state = BatchState.DONE;
+      container.clear();
+      recordCount = 0;
+      container.setRecordCount(recordCount);
+    } else if (incomingOutcome == EMIT) {
+      // since priority queue is null that means it has not seen any batch with data
+      assert (countSincePurge == 0 && batchCount == 0);
+      final VectorContainer hyperContainer = new ExpandableHyperContainer(incoming.getContainer());
+      prepareOutputContainer(hyperContainer, sv4);
+
+      // update the outcome to return
+      outcomeToReturn = getFinalOutcome();
+    }
+
+    return outcomeToReturn;
+  }
+
   public static class SimpleSV4RecordBatch extends SimpleRecordBatch {
     private SelectionVector4 sv4;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
index d9f1c8e..321d9a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
@@ -34,7 +34,6 @@ public abstract class AbstractSV2Copier extends AbstractCopier {
     this.sv2 = incoming.getSelectionVector2();
 
     final int count = outgoing.getNumberOfColumns();
-
     vvIn = new ValueVector[count];
 
     {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
index 4f3afc3..cd6af07 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
@@ -22,10 +22,12 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
 
 public abstract class AbstractSV4Copier extends AbstractCopier {
-  protected ValueVector[][] vvIn;
+  // Storing VectorWrapper reference instead of ValueVector[]. With EMIT outcome support underlying operator
+  // operator can generate multiple output batches with no schema changes which will change the ValueVector[]
+  // reference but not VectorWrapper reference.
+  protected VectorWrapper<?>[] vvIn;
   private SelectionVector4 sv4;
 
   @Override
@@ -34,14 +36,13 @@ public abstract class AbstractSV4Copier extends AbstractCopier {
     this.sv4 = incoming.getSelectionVector4();
 
     final int count = outgoing.getNumberOfColumns();
-
-    vvIn = new ValueVector[count][];
+    vvIn = new VectorWrapper[count];
 
     {
       int index = 0;
 
       for (VectorWrapper vectorWrapper: incoming) {
-        vvIn[index] = vectorWrapper.getValueVectors();
+        vvIn[index] = vectorWrapper;
         index++;
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
index f9b153d..1f3d28b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
@@ -30,7 +30,8 @@ public class GenericSV4Copier extends AbstractSV4Copier {
     int inOffset = inIndex & 0xFFFF;
     int inVector = inIndex >>> 16;
     for ( int i = 0;  i < vvIn.length;  i++ ) {
-      vvOut[i].copyEntry(outIndex, vvIn[i][inVector], inOffset);
+      ValueVector[] vectorsFromIncoming = vvIn[i].getValueVectors();
+      vvOut[i].copyEntry(outIndex, vectorsFromIncoming[inVector], inOffset);
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
index 4e47051..eddc9df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -26,6 +26,8 @@ import org.apache.drill.exec.vector.complex.FieldIdUtil;
 
 import com.google.common.base.Preconditions;
 
+import java.lang.reflect.Array;
+
 
 public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HyperVectorWrapper.class);
@@ -138,7 +140,7 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
 
   @SuppressWarnings("unchecked")
   public void addVectors(ValueVector[] vv) {
-    vectors = (T[]) ArrayUtils.add(vectors, vv);
+    vectors = (T[]) ArrayUtils.addAll(vectors, vv);
   }
 
   /**
@@ -157,4 +159,22 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
       vectors[i].makeTransferPair(destionationVectors[i]).transfer();
     }
   }
+
+  /**
+   * Method to replace existing list of vectors with the newly provided ValueVectors list in this HyperVectorWrapper
+   * @param vv - New list of ValueVectors to be stored
+   */
+  @SuppressWarnings("unchecked")
+  public void updateVectorList(ValueVector[] vv) {
+    Preconditions.checkArgument(vv.length > 0);
+    Preconditions.checkArgument(getField().getType().equals(vv[0].getField().getType()));
+    // vectors.length will always be > 0 since in constructor that is enforced
+    Preconditions.checkArgument(vv[0].getClass().equals(vectors[0].getClass()));
+    clear();
+
+    final Class<?> clazz = vv[0].getClass();
+    final ValueVector[] c = (ValueVector[]) Array.newInstance(clazz, vv.length);
+    System.arraycopy(vv, 0, c, 0, vv.length);
+    vectors = (T[])c;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index 2c10d6d..a0b47ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -60,6 +60,10 @@ public class SelectionVector4 implements AutoCloseable {
     return length;
   }
 
+  private ByteBuf getData() {
+    return data;
+  }
+
   public void setCount(int length) {
     this.length = length;
     this.recordCount = length;
@@ -104,8 +108,7 @@ public class SelectionVector4 implements AutoCloseable {
   public boolean next() {
 //    logger.debug("Next called. Start: {}, Length: {}, recordCount: " + recordCount, start, length);
 
-    if (start + length >= recordCount) {
-
+    if (!hasNext()) {
       start = recordCount;
       length = 0;
 //      logger.debug("Setting count to zero.");
@@ -119,15 +122,33 @@ public class SelectionVector4 implements AutoCloseable {
     return true;
   }
 
+  public boolean hasNext() {
+    final int endIndex = start + length;
+    return endIndex < recordCount;
+  }
+
   public void clear() {
     start = 0;
     length = 0;
+    recordCount = 0;
     if (data != DeadBuf.DEAD_BUFFER) {
       data.release();
       data = DeadBuf.DEAD_BUFFER;
     }
   }
 
+  public void copy(SelectionVector4 fromSV4) {
+    clear();
+    this.recordCount = fromSV4.getTotalCount();
+    this.length = fromSV4.getCount();
+    this.data = fromSV4.getData();
+    // Need to retain the data buffer since if fromSV4 clears out the buffer it's not actually released unless the
+    // copied SV4 has also released it
+    if (data != DeadBuf.DEAD_BUFFER) {
+      this.data.retain();
+    }
+  }
+
   public static int getBatchIndex(int sv4Index) {
     return (sv4Index >> 16) & 0xFFFF;
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java
new file mode 100644
index 0000000..6066572
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java
@@ -0,0 +1,710 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.TopN;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.exec.physical.config.TopN;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.test.rowSet.RowSet;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(OperatorTest.class)
+public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTopNEmitOutcome.class);
+
+  /**
+   * Verifies count of column in the received batch is same as expected count of columns.
+   * @param batch - Incoming record batch
+   * @param expectedColCount - Expected count of columns in the record batch
+   */
+  private void verifyColumnCount(VectorAccessible batch, int expectedColCount) {
+    List<String> columns = Lists.newArrayList();
+    SelectionVector4 sv4 = batch.getSelectionVector4();
+    for (VectorWrapper<?> vw : batch) {
+      if (sv4 != null) {
+        columns.add(vw.getValueVectors()[0].getField().getName());
+      } else {
+        columns.add(vw.getValueVector().getField().getName());
+      }
+    }
+    assertEquals(String.format("Actual number of columns: %d is different than expected count: %d",
+      columns.size(), expectedColCount), columns.size(), expectedColCount);
+  }
+
+  /**
+   * Verifies the data received in incoming record batch with the expected data stored inside the expected batch.
+   * Assumes input record batch has associated sv4 with it.
+   * @param batch - incoming record batch
+   * @param sv4 - associated sv4 with incoming record batch
+   * @param expectedBatch - expected record batch with expected data
+   */
+  private void verifyBaseline(VectorAccessible batch, SelectionVector4 sv4, VectorContainer expectedBatch) {
+    assertTrue(sv4 != null);
+    List<String> columns = Lists.newArrayList();
+    for (VectorWrapper<?> vw : batch) {
+      columns.add(vw.getValueVectors()[0].getField().getName());
+    }
+
+    for (int j = 0; j < sv4.getCount(); j++) {
+      List<String> eValue = new ArrayList<>(columns.size());
+      List<String> value = new ArrayList<>(columns.size());
+
+      for (VectorWrapper<?> vw : batch) {
+        Object o = vw.getValueVectors()[sv4.get(j) >>> 16].getAccessor().getObject(sv4.get(j) & 65535);
+        decodeAndAddValue(o, value);
+      }
+
+      for (VectorWrapper<?> vw : expectedBatch) {
+        Object e = vw.getValueVector().getAccessor().getObject(j);
+        decodeAndAddValue(e, eValue);
+      }
+      assertTrue("Some of expected value didn't matches with actual value",eValue.equals(value));
+    }
+  }
+
+  private void decodeAndAddValue(Object currentValue, List<String> listToAdd) {
+    if (currentValue == null) {
+      listToAdd.add("null");
+    } else if (currentValue instanceof byte[]) {
+      listToAdd.add(new String((byte[]) currentValue));
+    } else {
+      listToAdd.add(currentValue.toString());
+    }
+  }
+
+  /**
+   * Verifies that if TopNBatch receives empty batches with OK_NEW_SCHEMA and EMIT outcome then it correctly produces
+   * empty batches as output. First empty batch will be with OK_NEW_SCHEMA and second will be with EMIT outcome.
+   * @throws Exception
+   */
+  @Test
+  public void testTopNEmptyBatchEmitOutcome() {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.ASCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 10);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(topNBatch.next() == OK_NEW_SCHEMA);
+    outputRecordCount += topNBatch.getRecordCount();
+    assertTrue(topNBatch.next() == OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == EMIT);
+    outputRecordCount += topNBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+    assertTrue(topNBatch.next() == NONE);
+  }
+
+  /**
+   * Verifies that if TopNBatch receives a RecordBatch with EMIT outcome post build schema phase then it produces
+   * output for those input batch correctly. The first output batch will always be returned with OK_NEW_SCHEMA
+   * outcome followed by EMIT with empty batch. The test verifies the output order with the expected baseline.
+   * @throws Exception
+   */
+  @Test
+  public void testTopNNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(13, 130, "item13")
+      .addRow(4, 40, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(13, 130, "item13")
+      .addRow(4, 40, "item4")
+      .addRow(2, 20, "item2")
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 10);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += topNBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += topNBatch.getRecordCount();
+    assertEquals(3, outputRecordCount);
+
+    // verify results
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    outputRecordCount += topNBatch.getRecordCount();
+    assertEquals(3, outputRecordCount);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  @Test
+  public void testTopNEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(13, 130, "item13")
+      .addRow(4, 40, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(13, 130, "item13")
+      .addRow(4, 40, "item4")
+      .addRow(2, 20, "item2")
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 10);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += topNBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    outputRecordCount += topNBatch.getRecordCount();
+    assertEquals(3, outputRecordCount);
+
+    // verify results
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  @Test
+  public void testTopNMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(13, 130, "item13")
+      .addRow(4, 40, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(13, 130, "item13")
+      .addRow(4, 40, "item4")
+      .addRow(2, 20, "item2")
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 10);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += topNBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    outputRecordCount += topNBatch.getRecordCount();
+    assertEquals(3, outputRecordCount);
+
+    // verify results
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   * Verifies that if TopNBatch receives multiple non-empty record batch with EMIT outcome in between then it produces
+   * output for those input batch correctly. In this case it receives first non-empty batch with OK_NEW_SCHEMA in
+   * buildSchema phase followed by an empty batch with EMIT outcome. For this combination it produces output for the
+   * record received so far along with EMIT outcome. Then it receives second non-empty batch with OK outcome and
+   * produces output for it differently. The test validates that for each output received the order of the records are
+   * correct.
+   * @throws Exception
+   */
+  @Test
+  public void testTopNResetsAfterFirstEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 30, "item3")
+      .addRow(2, 20, "item2")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 10);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, topNBatch.getRecordCount());
+
+    // verify results with baseline
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet1.container());
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, topNBatch.getRecordCount());
+
+    // State refresh happens and limit again works on new data batches
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(2, topNBatch.getRecordCount());
+
+    // verify results with baseline
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet2.container());
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet2.clear();
+    expectedRowSet1.clear();
+  }
+
+  /**
+   * Verifies TopNBatch correctness for the case where it receives non-empty batch in build schema phase followed by
+   * empty batchs with OK and EMIT outcomes.
+   * @throws Exception
+   */
+  @Test
+  public void testTopN_NonEmptyFirst_EmptyOKEmitOutcome() {
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 10);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(0, topNBatch.getRecordCount());
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, topNBatch.getRecordCount());
+
+    // verify results with baseline
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet1.container());
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, topNBatch.getRecordCount());
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
+    // Release memory for row set
+    expectedRowSet1.clear();
+  }
+
+  /**
+   * Verifies that if TopNBatch receives multiple non-empty record batch with EMIT outcome in between then it produces
+   * output for those input batch correctly. In this case it receives first non-empty batch with OK_NEW_SCHEMA in
+   * buildSchema phase followed by an empty batch with EMIT outcome. For this combination it produces output for the
+   * record received so far along with EMIT outcome. Then it receives second non-empty batch with OK outcome and
+   * produces output for it differently. The test validates that for each output received the order of the records are
+   * correct.
+   * @throws Exception
+   */
+  @Test
+  public void testTopNMultipleOutputBatchWithLowerLimits() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(4, 40, "item4")
+      .addRow(2, 20, "item2")
+      .addRow(5, 50, "item5")
+      .addRow(3, 30, "item3")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(5, 50, "item5")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 1);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, topNBatch.getRecordCount());
+
+    // verify results with baseline
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet1.container());
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, topNBatch.getRecordCount());
+
+    // State refresh happens and limit again works on new data batches
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK);
+    assertEquals(1, topNBatch.getRecordCount());
+
+    // verify results with baseline
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet2.container());
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet2.clear();
+    expectedRowSet1.clear();
+  }
+
+  @Test
+  public void testTopNMultipleEMITOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 10);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // first limit evaluation
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, topNBatch.getRecordCount());
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, topNBatch.getRecordCount());
+
+    // After seeing EMIT limit will refresh it's state and again evaluate limit on next set of input batches
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(2, topNBatch.getRecordCount());
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, topNBatch.getRecordCount());
+
+    nonEmptyInputRowSet2.clear();
+  }
+
+  @Test
+  public void testTopNMultipleInputToSingleOutputBatch() throws Exception {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(1, 10, "item1")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 10);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(2, topNBatch.getRecordCount());
+
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, topNBatch.getRecordCount());
+
+    nonEmptyInputRowSet2.clear();
+  }
+
+  @Test
+  public void testTopNMultipleInputToMultipleOutputBatch_LowerLimits() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(7, 70, "item7")
+      .addRow(3, 30, "item3")
+      .addRow(13, 130, "item13")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(17, 170, "item17")
+      .addRow(3, 30, "item3")
+      .addRow(13, 130, "item13")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(13, 130, "item13")
+      .addRow(7, 70, "item7")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(17, 170, "item17")
+      .addRow(13, 130, "item13")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 2);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(2, topNBatch.getRecordCount());
+
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet1.container());
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, topNBatch.getRecordCount());
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(2, topNBatch.getRecordCount());
+
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet2.container());
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    expectedRowSet1.clear();
+    expectedRowSet2.clear();
+  }
+
+  /*****************************************************************************************
+   Tests for validating regular TopN behavior with no EMIT outcome
+  ******************************************************************************************/
+  @Test
+  public void testTopN_WithEmptyNonEmptyBatchesAndOKOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(7, 70, "item7")
+      .addRow(3, 30, "item3")
+      .addRow(13, 130, "item13")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(17, 170, "item17")
+      .addRow(23, 230, "item23")
+      .addRow(130, 1300, "item130")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(130, 1300, "item130")
+      .addRow(23, 230, "item23")
+      .addRow(17, 170, "item17")
+      .addRow(13, 130, "item13")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 4);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(4, topNBatch.getRecordCount());
+
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    expectedRowSet.clear();
+  }
+
+  @Test
+  public void testRegularTopNWithEmptyDataSet() {
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 4);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index 3a17ef5..f2cc366 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -70,6 +70,24 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
   }
 
   @Test
+  public void testLateral_WithTopNInSubQuery() throws Exception {
+    String Sql = "SELECT customer.c_name, orders.o_id, orders.o_amount " +
+      "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " +
+      "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) ORDER BY " +
+      "o_amount DESC LIMIT 1) orders";
+
+    testBuilder()
+      .sqlQuery(Sql)
+      .unOrdered()
+      .baselineColumns("c_name", "o_id", "o_amount")
+      .baselineValues("customer1", 3.0,  294.5)
+      .baselineValues("customer2", 10.0,  724.5)
+      .baselineValues("customer3", 23.0,  772.2)
+      .baselineValues("customer4", 32.0,  1030.1)
+      .go();
+  }
+
+  @Test
   public void testOuterApply_WithFilterAndLimitInSubQuery() throws Exception {
     String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, orders.o_amount " +
       "FROM cp.`lateraljoin/nested-customer.parquet` customer OUTER APPLY " +
@@ -123,6 +141,22 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
   }
 
   @Test
+  public void testMultipleBatchesLateral_WithTopNInSubQuery() throws Exception {
+    String sql = "SELECT customer.c_name, orders.o_orderkey, orders.o_totalprice " +
+      "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
+      "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord)" +
+      " ORDER BY o_totalprice DESC LIMIT 1) orders";
+
+    testBuilder()
+      .sqlQuery(sql)
+      .unOrdered()
+      .baselineColumns("c_name", "o_orderkey", "o_totalprice")
+      .baselineValues("Customer#000951313", (long)47035683, 306996.2)
+      .baselineValues("Customer#000007180", (long)54646821, 367189.55)
+      .go();
+  }
+
+  @Test
   public void testMultipleBatchesLateral_WithLimitFilterInSubQuery() throws Exception {
     String sql = "SELECT customer.c_name, customer.c_address, orders.o_orderkey, orders.o_totalprice " +
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +

-- 
To stop receiving notification emails like this one, please contact
sorabh@apache.org.

Mime
View raw message