drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adeneche <...@git.apache.org>
Subject [GitHub] drill pull request: DRILL-3871: Off by one error while reading bin...
Date Thu, 29 Oct 2015 19:26:27 GMT
Github user adeneche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/219#discussion_r43432757
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
---
    @@ -29,117 +29,152 @@
     import parquet.hadoop.metadata.ColumnChunkMetaData;
     
     abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<V>{
    -
    -  int nullsFound;
    -  // used to skip nulls found
    -  int rightBitShift;
    -  // used when copying less than a byte worth of data at a time, to indicate the number
of used bits in the current byte
    -  int bitsUsed;
    -  BaseDataValueVector castedBaseVector;
    -  NullableVectorDefinitionSetter castedVectorMutator;
    -  long definitionLevelsRead;
    -  long totalDefinitionLevelsRead;
    +    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NullableColumnReader.class);
    +  protected BaseDataValueVector castedBaseVector;
    +  protected NullableVectorDefinitionSetter castedVectorMutator;
    +  private long definitionLevelsRead = 0;
     
       NullableColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor
descriptor, ColumnChunkMetaData columnChunkMetaData,
                    boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException
{
         super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v,
schemaElement);
         castedBaseVector = (BaseDataValueVector) v;
         castedVectorMutator = (NullableVectorDefinitionSetter) v.getMutator();
    -    totalDefinitionLevelsRead = 0;
       }
     
    -
    -  @Override
    -  public void processPages(long recordsToReadInThisPass) throws IOException {
    -    int indexInOutputVector = 0;
    +  @Override public void processPages(long recordsToReadInThisPass)
    +      throws IOException {
         readStartInBytes = 0;
         readLength = 0;
         readLengthInBits = 0;
         recordsReadInThisIteration = 0;
         vectorData = castedBaseVector.getBuffer();
     
    -      // values need to be spaced out where nulls appear in the column
    -      // leaving blank space for nulls allows for random access to values
    -      // to optimize copying data out of the buffered disk stream, runs of defined values
    -      // are located and copied together, rather than copying individual values
    -
    -      long runStart = pageReader.readPosInBytes;
    -      int runLength;
    -      int currentDefinitionLevel;
    -      boolean lastValueWasNull;
    -      boolean lastRunBrokenByNull = false;
    -      while (indexInOutputVector < recordsToReadInThisPass && indexInOutputVector
< valueVec.getValueCapacity()){
    -        // read a page if needed
    +    // values need to be spaced out where nulls appear in the column
    +    // leaving blank space for nulls allows for random access to values
    +    // to optimize copying data out of the buffered disk stream, runs of defined values
    +    // are located and copied together, rather than copying individual values
    +
    +    int runLength = -1;     // number of non-null records in this pass.
    +    int nullRunLength = -1; // number of consecutive null records that we read.
    +    int currentDefinitionLevel = -1;
    +    int readCount = 0; // the record number we last read.
    +    int writeCount = 0; // the record number we last wrote to the value vector.
    +                        // This was previously the indexInOutputVector variable
    +    boolean haveMoreData; // true if we have more data and have not filled the vector
    +
    +    while (readCount < recordsToReadInThisPass && writeCount < valueVec.getValueCapacity())
{
    +      // read a page if needed
           if (!pageReader.hasPage()
    -            || ((readStartInBytes + readLength >= pageReader.byteLength &&
bitsUsed == 0) &&
    -          definitionLevelsRead >= pageReader.currentPageCount)) {
    -          if (!pageReader.next()) {
    -            break;
    -          }
    -          definitionLevelsRead = 0;
    +          || (definitionLevelsRead >= pageReader.currentPageCount)) {
    +        if (!pageReader.next()) {
    +          break;
             }
    -        lastValueWasNull = true;
    -        runLength = 0;
    -        if (lastRunBrokenByNull ) {
    -          nullsFound = 1;
    -          lastRunBrokenByNull = false;
    -        } else  {
    -          nullsFound = 0;
    -        }
    -        // loop to find the longest run of defined values available, can be preceded
by several nulls
    -        while(indexInOutputVector < recordsToReadInThisPass
    -            && indexInOutputVector < valueVec.getValueCapacity()
    -          && definitionLevelsRead < pageReader.currentPageCount) {
    +        //New page. Reset the definition level.
    +        currentDefinitionLevel = -1;
    +        definitionLevelsRead = 0;
    +        recordsReadInThisIteration = 0;
    +      }
    +
    +      nullRunLength = 0;
    +      runLength = 0;
    +
    +      //
    +      // Let's skip the next run of nulls if any ...
    +      //
    +
    +      // If we are reentering this loop, the currentDefinitionLevel has already been
read
    +      if (currentDefinitionLevel < 0) {
    +        currentDefinitionLevel = pageReader.definitionLevels.readInteger();
    +      }
    +      haveMoreData = readCount < recordsToReadInThisPass
    +          && writeCount + nullRunLength < valueVec.getValueCapacity()
    +          && definitionLevelsRead < pageReader.currentPageCount;
    +      while (haveMoreData && currentDefinitionLevel < columnDescriptor
    +          .getMaxDefinitionLevel()) {
    +        readCount++;
    +        nullRunLength++;
    +        definitionLevelsRead++;
    +        haveMoreData = readCount < recordsToReadInThisPass
    +            && writeCount + nullRunLength < valueVec.getValueCapacity()
    +            && definitionLevelsRead < pageReader.currentPageCount;
    +        if (haveMoreData) {
               currentDefinitionLevel = pageReader.definitionLevels.readInteger();
    -          definitionLevelsRead++;
    -          indexInOutputVector++;
    -          totalDefinitionLevelsRead++;
    -          if ( currentDefinitionLevel < columnDescriptor.getMaxDefinitionLevel()){
    -            // a run of non-null values was found, break out of this loop to do a read
in the outer loop
    -            if ( ! lastValueWasNull ){
    -              lastRunBrokenByNull = true;
    -              break;
    -            }
    -            nullsFound++;
    -            lastValueWasNull = true;
    -          }
    -          else{
    -            if (lastValueWasNull){
    -              runLength = 0;
    -              lastValueWasNull = false;
    -            }
    -            runLength++;
    -            castedVectorMutator.setIndexDefined(indexInOutputVector - 1);
    -          }
             }
    -        valuesReadInCurrentPass += nullsFound;
    +      }
    +      //
    +      // Write the nulls if any
    +      //
    +      if (nullRunLength > 0) {
    +        int writerIndex =
    +            ((BaseDataValueVector) valueVec).getBuffer().writerIndex();
    +        castedBaseVector.getBuffer().setIndex(0, writerIndex + (int) Math
    --- End diff --
    
    previous code seemed to handle `dataTypeLengthInBits <= 8` separately. Is it no longer
required/needed ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message