This is an automated email from the ASF dual-hosted git repository.
sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit f4ef0d4bd8e290bc3b3b6a4bd45713e38e6c1edf
Author: Gautam Parai <gparai@maprtech.com>
AuthorDate: Fri Sep 21 18:44:38 2018 -0700
DRILL-6824: Handle schema changes in MapRDBJsonRecordReader
closes #1518
---
.../store/mapr/db/json/MaprDBJsonRecordReader.java | 127 ++++++++++++++++-----
.../drill/exec/physical/impl/OutputMutator.java | 5 +
.../apache/drill/exec/physical/impl/ScanBatch.java | 2 +
.../store/parquet/ParquetRecordReaderTest.java | 5 +
4 files changed, 113 insertions(+), 26 deletions(-)
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
index 0be44e8..5b849ea 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -45,6 +45,7 @@ import org.apache.drill.exec.vector.BaseValueVector;
import org.apache.drill.exec.vector.complex.fn.JsonReaderUtils;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.hadoop.fs.Path;
+import org.ojai.Document;
import org.ojai.DocumentReader;
import org.ojai.DocumentStream;
import org.ojai.FieldPath;
@@ -77,6 +78,7 @@ import static org.ojai.DocumentConstants.ID_FIELD;
public class MaprDBJsonRecordReader extends AbstractRecordReader {
private static final Logger logger = LoggerFactory.getLogger(MaprDBJsonRecordReader.class);
+ protected enum SchemaState {SCHEMA_UNKNOWN, SCHEMA_INIT, SCHEMA_CHANGE};
protected static final FieldPath[] ID_ONLY_PROJECTION = { ID_FIELD };
@@ -94,16 +96,19 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
private OperatorContext operatorContext;
protected VectorContainerWriter vectorWriter;
private DBDocumentReaderBase reader;
+ Document document;
+ protected OutputMutator vectorWriterMutator;
private DrillBuf buffer;
private DocumentStream documentStream;
private Iterator<DocumentReader> documentReaderIterators;
+ private Iterator<Document> documentIterator;
private boolean includeId;
private boolean idOnly;
-
+ private SchemaState schemaState;
private boolean projectWholeDocument;
private FieldProjector projector;
@@ -121,11 +126,16 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
protected OjaiValueWriter valueWriter;
protected DocumentReaderVectorWriter documentWriter;
protected int maxRecordsToRead = -1;
+ protected DBDocumentReaderBase lastDocumentReader;
+ protected Document lastDocument;
public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, MapRDBFormatPlugin formatPlugin,
List<SchemaPath> projectedColumns, FragmentContext
context, int maxRecords) {
this(subScanSpec, formatPlugin, projectedColumns, context);
this.maxRecordsToRead = maxRecords;
+ this.lastDocumentReader = null;
+ this.lastDocument = null;
+ this.schemaState = SchemaState.SCHEMA_UNKNOWN;
}
protected MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, MapRDBFormatPlugin formatPlugin,
@@ -264,34 +274,40 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
@Override
public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException
{
this.vectorWriter = new VectorContainerWriter(output, unionEnabled);
+ this.vectorWriterMutator = output;
this.operatorContext = context;
try {
table.setOption(TableOption.EXCLUDEID, !includeId);
documentStream = table.find(condition, scannedFields);
- documentReaderIterators = documentStream.documentReaders().iterator();
-
- if (allTextMode) {
- valueWriter = new AllTextValueWriter(buffer);
- } else if (readNumbersAsDouble) {
- valueWriter = new NumbersAsDoubleValueWriter(buffer);
- } else {
- valueWriter = new OjaiValueWriter(buffer);
- }
-
- if (projectWholeDocument) {
- documentWriter = new ProjectionPassthroughVectorWriter(valueWriter, projector, includeId);
- } else if (isSkipQuery()) {
- documentWriter = new RowCountVectorWriter(valueWriter);
- } else if (idOnly) {
- documentWriter = new IdOnlyVectorWriter(valueWriter);
- } else {
- documentWriter = new FieldTransferVectorWriter(valueWriter);
- }
+ documentIterator = documentStream.iterator();
+ setupWriter();
} catch (DBException ex) {
throw new ExecutionSetupException(ex);
}
}
+ /*
+ * Setup the valueWriter and documentWriters based on config options
+ */
+ private void setupWriter() {
+ if (allTextMode) {
+ valueWriter = new AllTextValueWriter(buffer);
+ } else if (readNumbersAsDouble) {
+ valueWriter = new NumbersAsDoubleValueWriter(buffer);
+ } else {
+ valueWriter = new OjaiValueWriter(buffer);
+ }
+
+ if (projectWholeDocument) {
+ documentWriter = new ProjectionPassthroughVectorWriter(valueWriter, projector, includeId);
+ } else if (isSkipQuery()) {
+ documentWriter = new RowCountVectorWriter(valueWriter);
+ } else if (idOnly) {
+ documentWriter = new IdOnlyVectorWriter(valueWriter);
+ } else {
+ documentWriter = new FieldTransferVectorWriter(valueWriter);
+ }
+ }
@Override
public int next() {
@@ -303,33 +319,71 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
int recordCount = 0;
reader = null;
+ document = null;
int maxRecordsForThisBatch = this.maxRecordsToRead >= 0?
Math.min(BaseValueVector.INITIAL_VALUE_ALLOCATION, this.maxRecordsToRead) : BaseValueVector.INITIAL_VALUE_ALLOCATION;
+ try {
+ // If the last document caused a SchemaChange create a new output schema for this scan
batch
+ if (schemaState == SchemaState.SCHEMA_CHANGE && !ignoreSchemaChange) {
+ // Clear the ScanBatch vector container writer/mutator in order to be able to generate
the new schema
+ vectorWriterMutator.clear();
+ vectorWriter = new VectorContainerWriter(vectorWriterMutator, unionEnabled);
+ logger.debug("Encountered schema change earlier use new writer {}", vectorWriter.toString());
+ document = lastDocument;
+ setupWriter();
+ if (recordCount < maxRecordsForThisBatch) {
+ vectorWriter.setPosition(recordCount);
+ if (document != null) {
+ reader = (DBDocumentReaderBase) document.asReader();
+ documentWriter.writeDBDocument(vectorWriter, reader);
+ recordCount++;
+ }
+ }
+ }
+ } catch (SchemaChangeException e) {
+ String err_row = reader.getId().asJsonString();
+ if (ignoreSchemaChange) {
+ logger.warn("{}. Dropping row '{}' from result.", e.getMessage(), err_row);
+ logger.debug("Stack trace:", e);
+ } else {
+ /* We should not encounter a SchemaChangeException here since this is the first
document for this
+ * new schema. Something is very wrong - cannot handle any further!
+ */
+ throw dataReadError(logger, e, "SchemaChangeException for row '%s'.", err_row);
+ }
+ }
+ schemaState = SchemaState.SCHEMA_INIT;
while(recordCount < maxRecordsForThisBatch) {
vectorWriter.setPosition(recordCount);
try {
- reader = nextDocumentReader();
- if (reader == null) {
+ document = nextDocument();
+ if (document == null) {
break; // no more documents for this reader
} else {
- documentWriter.writeDBDocument(vectorWriter, reader);
+ documentWriter.writeDBDocument(vectorWriter, (DBDocumentReaderBase) document.asReader());
}
recordCount++;
} catch (UserException e) {
throw UserException.unsupportedError(e)
.addContext(String.format("Table: %s, document id: '%s'",
table.getPath(),
- reader == null ? null : IdCodec.asString(reader.getId())))
+ document.asReader() == null ? null :
+ IdCodec.asString(((DBDocumentReaderBase)document.asReader()).getId())))
.build(logger);
} catch (SchemaChangeException e) {
- String err_row = reader.getId().asJsonString();
+ String err_row = ((DBDocumentReaderBase)document.asReader()).getId().asJsonString();
if (ignoreSchemaChange) {
logger.warn("{}. Dropping row '{}' from result.", e.getMessage(), err_row);
logger.debug("Stack trace:", e);
} else {
- throw dataReadError(logger, e, "SchemaChangeException for row '%s'.", err_row);
+ /* Save the current document reader for next iteration. The recordCount is not
updated so we
+ * would start from this reader on the next next() call
+ */
+ lastDocument = document;
+ schemaState = SchemaState.SCHEMA_CHANGE;
+ break;
}
}
}
@@ -367,6 +421,27 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
}
}
+ protected Document nextDocument() {
+ final OperatorStats operatorStats = operatorContext == null ? null : operatorContext.getStats();
+ try {
+ if (operatorStats != null) {
+ operatorStats.startWait();
+ }
+ try {
+ if (!documentIterator.hasNext()) {
+ return null;
+ } else {
+ return documentIterator.next();
+ }
+ } finally {
+ if (operatorStats != null) {
+ operatorStats.stopWait();
+ }
+ }
+ } catch (DBException e) {
+ throw dataReadError(logger, e);
+ }
+ }
/*
* Extracts contiguous named segments from the SchemaPath, starting from the
* root segment and build the FieldPath from it for projection.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
index b25b001..b78eaa1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
@@ -68,4 +68,9 @@ public interface OutputMutator {
* @return the CallBack object for this mutator
*/
public CallBack getCallBack();
+
+ /**
+ * Clear this mutator i.e. reset it to pristine condition
+ */
+ public void clear();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index a688f37..0aa8328 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -491,9 +491,11 @@ public class ScanBatch implements CloseableRecordBatch {
return callBack;
}
+ @Override
public void clear() {
regularFieldVectorMap.clear();
implicitFieldVectorMap.clear();
+ container.clear();
schemaChanged = false;
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index f29002e..1bd90b3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -364,6 +364,11 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
public CallBack getCallBack() {
return null;
}
+
+ @Override
+ public void clear() {
+ // Nothing to do!
+ }
}
private void validateFooters(final List<Footer> metadata) {
|