drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sor...@apache.org
Subject [drill] 01/04: DRILL-6824: Handle schema changes in MapRDBJsonRecordReader closes #1518
Date Fri, 02 Nov 2018 21:19:59 GMT
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit f4ef0d4bd8e290bc3b3b6a4bd45713e38e6c1edf
Author: Gautam Parai <gparai@maprtech.com>
AuthorDate: Fri Sep 21 18:44:38 2018 -0700

    DRILL-6824: Handle schema changes in MapRDBJsonRecordReader
    closes #1518
---
 .../store/mapr/db/json/MaprDBJsonRecordReader.java | 127 ++++++++++++++++-----
 .../drill/exec/physical/impl/OutputMutator.java    |   5 +
 .../apache/drill/exec/physical/impl/ScanBatch.java |   2 +
 .../store/parquet/ParquetRecordReaderTest.java     |   5 +
 4 files changed, 113 insertions(+), 26 deletions(-)

diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
index 0be44e8..5b849ea 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -45,6 +45,7 @@ import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.complex.fn.JsonReaderUtils;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.hadoop.fs.Path;
+import org.ojai.Document;
 import org.ojai.DocumentReader;
 import org.ojai.DocumentStream;
 import org.ojai.FieldPath;
@@ -77,6 +78,7 @@ import static org.ojai.DocumentConstants.ID_FIELD;
 
 public class MaprDBJsonRecordReader extends AbstractRecordReader {
   private static final Logger logger = LoggerFactory.getLogger(MaprDBJsonRecordReader.class);
+  protected enum SchemaState {SCHEMA_UNKNOWN, SCHEMA_INIT, SCHEMA_CHANGE};
 
   protected static final FieldPath[] ID_ONLY_PROJECTION = { ID_FIELD };
 
@@ -94,16 +96,19 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
   private OperatorContext operatorContext;
   protected VectorContainerWriter vectorWriter;
   private DBDocumentReaderBase reader;
+  Document document;
+  protected OutputMutator vectorWriterMutator;
 
   private DrillBuf buffer;
 
   private DocumentStream documentStream;
 
   private Iterator<DocumentReader> documentReaderIterators;
+  private Iterator<Document> documentIterator;
 
   private boolean includeId;
   private boolean idOnly;
-
+  private SchemaState schemaState;
   private boolean projectWholeDocument;
   private FieldProjector projector;
 
@@ -121,11 +126,16 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
   protected OjaiValueWriter valueWriter;
   protected DocumentReaderVectorWriter documentWriter;
   protected int maxRecordsToRead = -1;
+  protected DBDocumentReaderBase lastDocumentReader;
+  protected Document lastDocument;
 
   public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, MapRDBFormatPlugin formatPlugin,
                                 List<SchemaPath> projectedColumns, FragmentContext
context, int maxRecords) {
     this(subScanSpec, formatPlugin, projectedColumns, context);
     this.maxRecordsToRead = maxRecords;
+    this.lastDocumentReader = null;
+    this.lastDocument = null;
+    this.schemaState = SchemaState.SCHEMA_UNKNOWN;
   }
 
   protected MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, MapRDBFormatPlugin formatPlugin,
@@ -264,34 +274,40 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
   @Override
   public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException
{
     this.vectorWriter = new VectorContainerWriter(output, unionEnabled);
+    this.vectorWriterMutator = output;
     this.operatorContext = context;
 
     try {
       table.setOption(TableOption.EXCLUDEID, !includeId);
       documentStream = table.find(condition, scannedFields);
-      documentReaderIterators = documentStream.documentReaders().iterator();
-
-      if (allTextMode) {
-        valueWriter = new AllTextValueWriter(buffer);
-      } else if (readNumbersAsDouble) {
-        valueWriter = new NumbersAsDoubleValueWriter(buffer);
-      } else {
-        valueWriter = new OjaiValueWriter(buffer);
-      }
-
-      if (projectWholeDocument) {
-        documentWriter = new ProjectionPassthroughVectorWriter(valueWriter, projector, includeId);
-      } else if (isSkipQuery()) {
-        documentWriter = new RowCountVectorWriter(valueWriter);
-      } else if (idOnly) {
-        documentWriter = new IdOnlyVectorWriter(valueWriter);
-      } else {
-        documentWriter = new FieldTransferVectorWriter(valueWriter);
-      }
+      documentIterator = documentStream.iterator();
+      setupWriter();
     } catch (DBException ex) {
       throw new ExecutionSetupException(ex);
     }
   }
+  /*
+   * Setup the valueWriter and documentWriters based on config options
+   */
+  private void setupWriter() {
+    if (allTextMode) {
+      valueWriter = new AllTextValueWriter(buffer);
+    } else if (readNumbersAsDouble) {
+      valueWriter = new NumbersAsDoubleValueWriter(buffer);
+    } else {
+      valueWriter = new OjaiValueWriter(buffer);
+    }
+
+    if (projectWholeDocument) {
+      documentWriter = new ProjectionPassthroughVectorWriter(valueWriter, projector, includeId);
+    } else if (isSkipQuery()) {
+      documentWriter = new RowCountVectorWriter(valueWriter);
+    } else if (idOnly) {
+      documentWriter = new IdOnlyVectorWriter(valueWriter);
+    } else {
+      documentWriter = new FieldTransferVectorWriter(valueWriter);
+    }
+  }
 
   @Override
   public int next() {
@@ -303,33 +319,71 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
 
     int recordCount = 0;
     reader = null;
+    document = null;
 
     int maxRecordsForThisBatch = this.maxRecordsToRead >= 0?
         Math.min(BaseValueVector.INITIAL_VALUE_ALLOCATION, this.maxRecordsToRead) : BaseValueVector.INITIAL_VALUE_ALLOCATION;
 
+    try {
+      // If the last document caused a SchemaChange create a new output schema for this scan
batch
+      if (schemaState == SchemaState.SCHEMA_CHANGE && !ignoreSchemaChange) {
+        // Clear the ScanBatch vector container writer/mutator in order to be able to generate
the new schema
+        vectorWriterMutator.clear();
+        vectorWriter = new VectorContainerWriter(vectorWriterMutator, unionEnabled);
+        logger.debug("Encountered schema change earlier use new writer {}", vectorWriter.toString());
+        document = lastDocument;
+        setupWriter();
+        if (recordCount < maxRecordsForThisBatch) {
+          vectorWriter.setPosition(recordCount);
+          if (document != null) {
+            reader = (DBDocumentReaderBase) document.asReader();
+            documentWriter.writeDBDocument(vectorWriter, reader);
+            recordCount++;
+          }
+        }
+      }
+    } catch (SchemaChangeException e) {
+      String err_row = reader.getId().asJsonString();
+      if (ignoreSchemaChange) {
+        logger.warn("{}. Dropping row '{}' from result.", e.getMessage(), err_row);
+        logger.debug("Stack trace:", e);
+      } else {
+          /* We should not encounter a SchemaChangeException here since this is the first
document for this
+           * new schema. Something is very wrong - cannot handle any further!
+           */
+        throw dataReadError(logger, e, "SchemaChangeException for row '%s'.", err_row);
+      }
+    }
+    schemaState = SchemaState.SCHEMA_INIT;
     while(recordCount < maxRecordsForThisBatch) {
       vectorWriter.setPosition(recordCount);
       try {
-        reader = nextDocumentReader();
-        if (reader == null) {
+        document = nextDocument();
+        if (document == null) {
           break; // no more documents for this reader
         } else {
-          documentWriter.writeDBDocument(vectorWriter, reader);
+          documentWriter.writeDBDocument(vectorWriter, (DBDocumentReaderBase) document.asReader());
         }
         recordCount++;
       } catch (UserException e) {
         throw UserException.unsupportedError(e)
             .addContext(String.format("Table: %s, document id: '%s'",
                 table.getPath(),
-                reader == null ? null : IdCodec.asString(reader.getId())))
+                    document.asReader() == null ? null :
+                        IdCodec.asString(((DBDocumentReaderBase)document.asReader()).getId())))
             .build(logger);
       } catch (SchemaChangeException e) {
-        String err_row = reader.getId().asJsonString();
+        String err_row = ((DBDocumentReaderBase)document.asReader()).getId().asJsonString();
         if (ignoreSchemaChange) {
           logger.warn("{}. Dropping row '{}' from result.", e.getMessage(), err_row);
           logger.debug("Stack trace:", e);
         } else {
-          throw dataReadError(logger, e, "SchemaChangeException for row '%s'.", err_row);
+          /* Save the current document reader for next iteration. The recordCount is not
updated so we
+           * would start from this reader on the next next() call
+           */
+          lastDocument = document;
+          schemaState = SchemaState.SCHEMA_CHANGE;
+          break;
         }
       }
     }
@@ -367,6 +421,27 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
     }
   }
 
+  protected Document nextDocument() {
+    final OperatorStats operatorStats = operatorContext == null ? null : operatorContext.getStats();
+    try {
+      if (operatorStats != null) {
+        operatorStats.startWait();
+      }
+      try {
+        if (!documentIterator.hasNext()) {
+          return null;
+        } else {
+          return documentIterator.next();
+        }
+      } finally {
+        if (operatorStats != null) {
+          operatorStats.stopWait();
+        }
+      }
+    } catch (DBException e) {
+      throw dataReadError(logger, e);
+    }
+  }
   /*
    * Extracts contiguous named segments from the SchemaPath, starting from the
    * root segment and build the FieldPath from it for projection.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
index b25b001..b78eaa1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
@@ -68,4 +68,9 @@ public interface OutputMutator {
    * @return the CallBack object for this mutator
    */
   public CallBack getCallBack();
+
+  /**
+   * Clear this mutator i.e. reset it to pristine condition
+   */
+  public void clear();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index a688f37..0aa8328 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -491,9 +491,11 @@ public class ScanBatch implements CloseableRecordBatch {
       return callBack;
     }
 
+    @Override
     public void clear() {
       regularFieldVectorMap.clear();
       implicitFieldVectorMap.clear();
+      container.clear();
       schemaChanged = false;
     }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index f29002e..1bd90b3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -364,6 +364,11 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
    public CallBack getCallBack() {
      return null;
    }
+
+   @Override
+   public void clear() {
+     // Nothing to do!
+   }
  }
 
   private void validateFooters(final List<Footer> metadata) {


Mime
View raw message