orc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject orc git commit: ORC-248. PhysicalFsWriter will sometimes pass a negative amount of requested padding to the shims.
Date Tue, 01 May 2018 20:34:09 GMT
Repository: orc
Updated Branches:
  refs/heads/master a03ddd188 -> 837a1bd7c


ORC-248. PhysicalFsWriter will sometimes pass a negative amount of
requested padding to the shims.

Fixes #259

Signed-off-by: Owen O'Malley <omalley@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/837a1bd7
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/837a1bd7
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/837a1bd7

Branch: refs/heads/master
Commit: 837a1bd7c4c9a5c82b7525f65ed8eaa60a5a1b52
Parents: a03ddd1
Author: Owen O'Malley <omalley@apache.org>
Authored: Mon Apr 30 14:10:07 2018 -0700
Committer: Owen O'Malley <omalley@apache.org>
Committed: Tue May 1 13:33:26 2018 -0700

----------------------------------------------------------------------
 java/core/src/java/org/apache/orc/OrcConf.java  |   5 +-
 java/core/src/java/org/apache/orc/OrcFile.java  |  37 +++
 .../org/apache/orc/impl/PhysicalFsWriter.java   |  96 +++---
 .../apache/orc/impl/TestPhysicalFsWriter.java   | 331 +++++++++++++++++++
 .../java/org/apache/orc/impl/HadoopShims.java   |   9 +-
 .../org/apache/orc/impl/HadoopShimsCurrent.java |  12 +-
 .../org/apache/orc/impl/HadoopShimsPre2_3.java  |  17 +-
 .../org/apache/orc/impl/HadoopShimsPre2_6.java  |   5 +-
 .../org/apache/orc/impl/HadoopShimsPre2_7.java  |   5 +-
 site/_docs/hive-config.md                       |   7 +
 10 files changed, 441 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/837a1bd7/java/core/src/java/org/apache/orc/OrcConf.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/OrcConf.java b/java/core/src/java/org/apache/orc/OrcConf.java
index 443da87..d92f776 100644
--- a/java/core/src/java/org/apache/orc/OrcConf.java
+++ b/java/core/src/java/org/apache/orc/OrcConf.java
@@ -152,7 +152,10 @@ public enum OrcConf {
   OVERWRITE_OUTPUT_FILE("orc.overwrite.output.file", "orc.overwrite.output.file", false,
     "A boolean flag to enable overwriting of the output file if it already exists.\n"),
   IS_SCHEMA_EVOLUTION_CASE_SENSITIVE("orc.schema.evolution.case.sensitive", "orc.schema.evolution.case.sensitive",
true,
-          "A boolean flag to determine if the comparision of field names in schema evolution
is case sensitive .\n")
+          "A boolean flag to determine if the comparision of field names in schema evolution
is case sensitive .\n"),
+  WRITE_VARIABLE_LENGTH_BLOCKS("orc.write.variable.length.blocks", null, false,
+      "A boolean flag as to whether the ORC writer should write variable length\n"
+      + "HDFS blocks.")
   ;
 
   private final String attribute;

http://git-wip-us.apache.org/repos/asf/orc/blob/837a1bd7/java/core/src/java/org/apache/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/OrcFile.java b/java/core/src/java/org/apache/orc/OrcFile.java
index 3ff7666..c9262e9 100644
--- a/java/core/src/java/org/apache/orc/OrcFile.java
+++ b/java/core/src/java/org/apache/orc/OrcFile.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.orc.impl.HadoopShims;
+import org.apache.orc.impl.HadoopShimsFactory;
 import org.apache.orc.impl.MemoryManagerImpl;
 import org.apache.orc.impl.OrcTail;
 import org.apache.orc.impl.ReaderImpl;
@@ -389,6 +391,8 @@ public class OrcFile {
     private PhysicalWriter physicalWriter;
     private WriterVersion writerVersion = CURRENT_WRITER;
     private boolean overwrite;
+    private boolean writeVariableLengthBlocks;
+    private HadoopShims shims;
 
     protected WriterOptions(Properties tableProperties, Configuration conf) {
       configuration = conf;
@@ -428,6 +432,9 @@ public class OrcFile {
           BloomFilterVersion.fromString(
               OrcConf.BLOOM_FILTER_WRITE_VERSION.getString(tableProperties,
                   conf));
+      shims = HadoopShimsFactory.get();
+      writeVariableLengthBlocks =
+          OrcConf.WRITE_VARIABLE_LENGTH_BLOCKS.getBoolean(tableProperties,conf);
     }
 
     /**
@@ -622,6 +629,28 @@ public class OrcFile {
     }
 
     /**
+     * Should the ORC file writer use HDFS variable length blocks, if they
+     * are available?
+     * @param value the new value
+     * @return this
+     */
+    public WriterOptions writeVariableLengthBlocks(boolean value) {
+      writeVariableLengthBlocks = value;
+      return this;
+    }
+
+    /**
+     * Set the HadoopShims to use.
+     * This is only for testing.
+     * @param value the new value
+     * @return this
+     */
+    public WriterOptions setShims(HadoopShims value) {
+      this.shims = value;
+      return this;
+    }
+
+    /**
      * Manually set the writer version.
      * This is an internal API.
      * @param version the version to write
@@ -722,6 +751,14 @@ public class OrcFile {
     public WriterVersion getWriterVersion() {
       return writerVersion;
     }
+
+    public boolean getWriteVariableLengthBlocks() {
+      return writeVariableLengthBlocks;
+    }
+
+    public HadoopShims getHadoopShims() {
+      return shims;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/orc/blob/837a1bd7/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
index 2521e6d..7a2be5b 100644
--- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
@@ -44,31 +44,32 @@ public class PhysicalFsWriter implements PhysicalWriter {
   private static final Logger LOG = LoggerFactory.getLogger(PhysicalFsWriter.class);
 
   private static final int HDFS_BUFFER_SIZE = 256 * 1024;
-  private static final HadoopShims shims = HadoopShimsFactory.get();
 
   private FSDataOutputStream rawWriter;
   // the compressed metadata information outStream
-  private OutStream writer = null;
+  private OutStream writer;
   // a protobuf outStream around streamFactory
-  private CodedOutputStream protobufWriter = null;
+  private CodedOutputStream protobufWriter;
 
   private final Path path;
+  private final HadoopShims shims;
   private final long blockSize;
   private final int bufferSize;
-  private final double paddingTolerance;
-  private final long defaultStripeSize;
+  private final int maxPadding;
   private final CompressionKind compress;
   private CompressionCodec codec;
   private final boolean addBlockPadding;
+  private final boolean writeVariableLengthBlocks;
 
   // the streams that make up the current stripe
   private final Map<StreamName, BufferedStream> streams =
     new TreeMap<>();
 
-  private long adjustedStripeSize;
   private long headerLength;
   private long stripeStart;
-  private long blockStart;
+  // The position of the last time we wrote a short block, which becomes the
+  // natural blocks
+  private long blockOffset;
   private int metadataLength;
   private int footerLength;
 
@@ -76,7 +77,7 @@ public class PhysicalFsWriter implements PhysicalWriter {
                           Path path,
                           OrcFile.WriterOptions opts) throws IOException {
     this.path = path;
-    this.defaultStripeSize = this.adjustedStripeSize = opts.getStripeSize();
+    long defaultStripeSize = opts.getStripeSize();
     this.addBlockPadding = opts.getBlockPadding();
     if (opts.isEnforceBufferSize()) {
       this.bufferSize = opts.getBufferSize();
@@ -86,17 +87,20 @@ public class PhysicalFsWriter implements PhysicalWriter {
           opts.getBufferSize());
     }
     this.compress = opts.getCompress();
-    this.paddingTolerance = opts.getPaddingTolerance();
+    this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize);
     this.blockSize = opts.getBlockSize();
     LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" +
         " compression: {} bufferSize: {}", path, defaultStripeSize, blockSize,
         compress, bufferSize);
     rawWriter = fs.create(path, opts.getOverwrite(), HDFS_BUFFER_SIZE,
         fs.getDefaultReplication(path), blockSize);
+    blockOffset = 0;
     codec = OrcCodecPool.getCodec(compress);
     writer = new OutStream("metadata", bufferSize, codec,
         new DirectStream(rawWriter));
     protobufWriter = CodedOutputStream.newInstance(writer);
+    writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks();
+    shims = opts.getHadoopShims();
   }
 
   @Override
@@ -130,49 +134,41 @@ public class PhysicalFsWriter implements PhysicalWriter {
     return size;
   }
 
-  private void padStripe(long indexSize, long dataSize, int footerSize) throws IOException
{
-    this.stripeStart = rawWriter.getPos();
-    final long currentStripeSize = indexSize + dataSize + footerSize;
-    final long available = blockSize - (stripeStart - blockStart);
-    final long overflow = currentStripeSize - adjustedStripeSize;
-    final float availRatio = (float) available / (float) defaultStripeSize;
-
-    if (availRatio > 0.0f && availRatio < 1.0f
-        && availRatio > paddingTolerance) {
-      // adjust default stripe size to fit into remaining space, also adjust
-      // the next stripe for correction based on the current stripe size
-      // and user specified padding tolerance. Since stripe size can overflow
-      // the default stripe size we should apply this correction to avoid
-      // writing portion of last stripe to next hdfs block.
-      double correction = overflow > 0 ? (double) overflow
-          / (double) adjustedStripeSize : 0.0;
-
-      // correction should not be greater than user specified padding
-      // tolerance
-      correction = correction > paddingTolerance ? paddingTolerance
-          : correction;
-
-      // adjust next stripe size based on current stripe estimate correction
-      adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * defaultStripeSize));
-    } else if (availRatio >= 1.0) {
-      adjustedStripeSize = defaultStripeSize;
+  private static final byte[] ZEROS = new byte[64*1024];
+
+  private static void writeZeros(OutputStream output,
+                                 long remaining) throws IOException {
+    while (remaining > 0) {
+      long size = Math.min(ZEROS.length, remaining);
+      output.write(ZEROS, 0, (int) size);
+      remaining -= size;
     }
+  }
 
-    if (addBlockPadding) {
-      if (availRatio < paddingTolerance) {
-        long padding = blockSize - (stripeStart - blockStart);
-        LOG.info(String.format("Padding ORC by %d bytes (<=  %.2f * %d)",
-            padding, availRatio, defaultStripeSize));
-        stripeStart += shims.padStreamToBlock(rawWriter, padding);
-        blockStart = stripeStart; // new block
-        adjustedStripeSize = defaultStripeSize;
+  /**
+   * Do any required shortening of the HDFS block or padding to avoid stradling
+   * HDFS blocks. This is called before writing the current stripe.
+   * @param stripeSize the number of bytes in the current stripe
+   */
+  private void padStripe(long stripeSize) throws IOException {
+    this.stripeStart = rawWriter.getPos();
+    long previousBytesInBlock = (stripeStart - blockOffset) % blockSize;
+    // We only have options if this isn't the first stripe in the block
+    if (previousBytesInBlock > 0) {
+      if (previousBytesInBlock + stripeSize >= blockSize) {
+        // Try making a short block
+        if (writeVariableLengthBlocks &&
+            shims.endVariableLengthBlock(rawWriter)) {
+          blockOffset = stripeStart;
+        } else if (addBlockPadding) {
+          // if we cross the block boundary, figure out what we should do
+          long padding = blockSize - previousBytesInBlock;
+          if (padding <= maxPadding) {
+            writeZeros(rawWriter, padding);
+            stripeStart += padding;
+          }
+        }
       }
-    } else if (currentStripeSize < blockSize
-        && (stripeStart - blockStart) + currentStripeSize > blockSize) {
-      // even if you don't intend to pad, reset the default stripe size when crossing a
-      // block boundary
-      adjustedStripeSize = defaultStripeSize;
-      blockStart = stripeStart + (stripeStart + currentStripeSize) % blockSize;
     }
   }
 
@@ -370,7 +366,7 @@ public class PhysicalFsWriter implements PhysicalWriter {
 
     OrcProto.StripeFooter footer = footerBuilder.build();
     // Do we need to pad the file so the stripe doesn't straddle a block boundary?
-    padStripe(indexSize, dataSize, footer.getSerializedSize());
+    padStripe(indexSize + dataSize + footer.getSerializedSize());
 
     // write out the data streams
     for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {

http://git-wip-us.apache.org/repos/asf/orc/blob/837a1bd7/java/core/src/test/org/apache/orc/impl/TestPhysicalFsWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestPhysicalFsWriter.java b/java/core/src/test/org/apache/orc/impl/TestPhysicalFsWriter.java
new file mode 100644
index 0000000..6d2d298
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/TestPhysicalFsWriter.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.PhysicalWriter;
+import org.apache.orc.TypeDescription;
+import org.junit.Test;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestPhysicalFsWriter {
+
+  final Configuration conf = new Configuration();
+
+  static class MemoryOutputStream extends OutputStream {
+    private final List<byte[]> contents;
+
+    MemoryOutputStream(List<byte[]> contents) {
+      this.contents = contents;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      contents.add(new byte[]{(byte) b});
+    }
+
+    @Override
+    public void write(byte[] a, int offset, int length) {
+      byte[] buffer = new byte[length];
+      System.arraycopy(a, offset, buffer, 0, length);
+      contents.add(buffer);
+    }
+  }
+
+  static class MemoryFileSystem extends FileSystem {
+
+    @Override
+    public URI getUri() {
+      try {
+        return new URI("test:///");
+      } catch (URISyntaxException e) {
+        throw new IllegalStateException("bad url", e);
+      }
+    }
+
+    @Override
+    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+      return null;
+    }
+
+    @Override
+    public FSDataOutputStream create(Path f, FsPermission permission,
+                                     boolean overwrite, int bufferSize,
+                                     short replication, long blockSize,
+                                     Progressable progress) throws IOException {
+      List<byte[]> contents = new ArrayList<>();
+      fileContents.put(f, contents);
+      return new FSDataOutputStream(new MemoryOutputStream(contents));
+    }
+
+    @Override
+    public FSDataOutputStream append(Path f, int bufferSize,
+                                     Progressable progress) throws IOException {
+      throw new UnsupportedOperationException("append not supported");
+    }
+
+    @Override
+    public boolean rename(Path src, Path dst) throws IOException {
+      boolean result = fileContents.containsKey(src) &&
+          !fileContents.containsKey(dst);
+      if (result) {
+        List<byte[]> contents = fileContents.remove(src);
+        fileContents.put(dst, contents);
+      }
+      return result;
+    }
+
+    @Override
+    public boolean delete(Path f, boolean recursive) throws IOException {
+      boolean result = fileContents.containsKey(f);
+      fileContents.remove(f);
+      return result;
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path f) throws IOException {
+      return new FileStatus[]{getFileStatus(f)};
+    }
+
+    @Override
+    public void setWorkingDirectory(Path new_dir) {
+      currentWorkingDirectory = new_dir;
+    }
+
+    @Override
+    public Path getWorkingDirectory() {
+      return currentWorkingDirectory;
+    }
+
+    @Override
+    public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+      return false;
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path f) throws IOException {
+      List<byte[]> contents = fileContents.get(f);
+      if (contents != null) {
+        long sum = 0;
+        for(byte[] b: contents) {
+          sum += b.length;
+        }
+        return new FileStatus(sum, false, 1, 256 * 1024, 0, f);
+      }
+      return null;
+    }
+
+    private final Map<Path, List<byte[]>> fileContents = new HashMap<>();
+    private Path currentWorkingDirectory = new Path("/");
+  }
+
+  @Test
+  public void testStripePadding() throws IOException {
+    TypeDescription schema = TypeDescription.fromString("int");
+    OrcFile.WriterOptions opts =
+        OrcFile.writerOptions(conf)
+            .stripeSize(32 * 1024)
+            .blockSize(64 * 1024)
+            .compress(CompressionKind.NONE)
+            .setSchema(schema);
+    MemoryFileSystem fs = new MemoryFileSystem();
+    PhysicalFsWriter writer = new PhysicalFsWriter(fs, new Path("test1.orc"),
+        opts);
+    writer.writeHeader();
+    StreamName stream0 = new StreamName(0, OrcProto.Stream.Kind.DATA);
+    PhysicalWriter.OutputReceiver output = writer.createDataStream(stream0);
+    byte[] buffer = new byte[1024];
+    for(int i=0; i < buffer.length; ++i) {
+      buffer[i] = (byte) i;
+    }
+    for(int i=0; i < 63; ++i) {
+      output.output(ByteBuffer.wrap(buffer));
+    }
+    OrcProto.StripeFooter.Builder footer = OrcProto.StripeFooter.newBuilder();
+    OrcProto.StripeInformation.Builder dirEntry =
+        OrcProto.StripeInformation.newBuilder();
+    writer.finalizeStripe(footer, dirEntry);
+    // check to make sure that it laid it out without padding
+    assertEquals(0L, dirEntry.getIndexLength());
+    assertEquals(63 * 1024L, dirEntry.getDataLength());
+    assertEquals(3, dirEntry.getOffset());
+    for(int i=0; i < 62; ++i) {
+      output.output(ByteBuffer.wrap(buffer));
+    }
+    footer = OrcProto.StripeFooter.newBuilder();
+    dirEntry = OrcProto.StripeInformation.newBuilder();
+    writer.finalizeStripe(footer, dirEntry);
+    // the second one should pad
+    assertEquals(64 * 1024, dirEntry.getOffset());
+    assertEquals(62 * 1024, dirEntry.getDataLength());
+    long endOfStripe = dirEntry.getOffset() + dirEntry.getIndexLength() +
+        dirEntry.getDataLength() + dirEntry.getFooterLength();
+
+    for(int i=0; i < 3; ++i) {
+      output.output(ByteBuffer.wrap(buffer));
+    }
+    footer = OrcProto.StripeFooter.newBuilder();
+    dirEntry = OrcProto.StripeInformation.newBuilder();
+    writer.finalizeStripe(footer, dirEntry);
+    // the third one should be over the padding limit
+    assertEquals(endOfStripe, dirEntry.getOffset());
+    assertEquals(3 * 1024, dirEntry.getDataLength());
+  }
+
+  @Test
+  public void testNoStripePadding() throws IOException {
+    TypeDescription schema = TypeDescription.fromString("int");
+    OrcFile.WriterOptions opts =
+        OrcFile.writerOptions(conf)
+            .blockPadding(false)
+            .stripeSize(32 * 1024)
+            .blockSize(64 * 1024)
+            .compress(CompressionKind.NONE)
+            .setSchema(schema);
+    MemoryFileSystem fs = new MemoryFileSystem();
+    PhysicalFsWriter writer = new PhysicalFsWriter(fs, new Path("test1.orc"),
+        opts);
+    writer.writeHeader();
+    StreamName stream0 = new StreamName(0, OrcProto.Stream.Kind.DATA);
+    PhysicalWriter.OutputReceiver output = writer.createDataStream(stream0);
+    byte[] buffer = new byte[1024];
+    for(int i=0; i < buffer.length; ++i) {
+      buffer[i] = (byte) i;
+    }
+    for(int i=0; i < 63; ++i) {
+      output.output(ByteBuffer.wrap(buffer));
+    }
+    OrcProto.StripeFooter.Builder footer = OrcProto.StripeFooter.newBuilder();
+    OrcProto.StripeInformation.Builder dirEntry =
+        OrcProto.StripeInformation.newBuilder();
+    writer.finalizeStripe(footer, dirEntry);
+    // check to make sure that it laid it out without padding
+    assertEquals(0L, dirEntry.getIndexLength());
+    assertEquals(63 * 1024L, dirEntry.getDataLength());
+    assertEquals(3, dirEntry.getOffset());
+    long endOfStripe = dirEntry.getOffset() + dirEntry.getDataLength()
+        + dirEntry.getFooterLength();
+    for(int i=0; i < 62; ++i) {
+      output.output(ByteBuffer.wrap(buffer));
+    }
+    footer = OrcProto.StripeFooter.newBuilder();
+    dirEntry = OrcProto.StripeInformation.newBuilder();
+    writer.finalizeStripe(footer, dirEntry);
+    // no padding, because we turned it off
+    assertEquals(endOfStripe, dirEntry.getOffset());
+    assertEquals(62 * 1024, dirEntry.getDataLength());
+  }
+
+  static class MockHadoopShim implements HadoopShims {
+    long lastShortBlock = -1;
+
+    @Override
+    public DirectDecompressor getDirectDecompressor(DirectCompressionType codec) {
+      return null;
+    }
+
+    @Override
+    public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim
pool) throws IOException {
+      return null;
+    }
+
+    @Override
+    public boolean endVariableLengthBlock(OutputStream output) throws IOException {
+      if (output instanceof FSDataOutputStream) {
+        lastShortBlock = ((FSDataOutputStream) output).getPos();
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public KeyProvider getKeyProvider(Configuration conf, Random random) throws IOException
{
+      return null;
+    }
+  }
+
+  @Test
+  public void testShortBlock() throws IOException {
+    MockHadoopShim shim = new MockHadoopShim();
+    TypeDescription schema = TypeDescription.fromString("int");
+    OrcFile.WriterOptions opts =
+        OrcFile.writerOptions(conf)
+            .blockPadding(false)
+            .stripeSize(32 * 1024)
+            .blockSize(64 * 1024)
+            .compress(CompressionKind.NONE)
+            .setSchema(schema)
+            .setShims(shim)
+            .writeVariableLengthBlocks(true);
+    MemoryFileSystem fs = new MemoryFileSystem();
+    PhysicalFsWriter writer = new PhysicalFsWriter(fs, new Path("test1.orc"),
+        opts);
+    writer.writeHeader();
+    StreamName stream0 = new StreamName(0, OrcProto.Stream.Kind.DATA);
+    PhysicalWriter.OutputReceiver output = writer.createDataStream(stream0);
+    byte[] buffer = new byte[1024];
+    for(int i=0; i < buffer.length; ++i) {
+      buffer[i] = (byte) i;
+    }
+    for(int i=0; i < 63; ++i) {
+      output.output(ByteBuffer.wrap(buffer));
+    }
+    OrcProto.StripeFooter.Builder footer = OrcProto.StripeFooter.newBuilder();
+    OrcProto.StripeInformation.Builder dirEntry =
+        OrcProto.StripeInformation.newBuilder();
+    writer.finalizeStripe(footer, dirEntry);
+    // check to make sure that it laid it out without padding
+    assertEquals(0L, dirEntry.getIndexLength());
+    assertEquals(63 * 1024L, dirEntry.getDataLength());
+    assertEquals(3, dirEntry.getOffset());
+    long endOfStripe = dirEntry.getOffset() + dirEntry.getDataLength()
+        + dirEntry.getFooterLength();
+    for(int i=0; i < 62; ++i) {
+      output.output(ByteBuffer.wrap(buffer));
+    }
+    footer = OrcProto.StripeFooter.newBuilder();
+    dirEntry = OrcProto.StripeInformation.newBuilder();
+    writer.finalizeStripe(footer, dirEntry);
+    // we should get a short block and no padding
+    assertEquals(endOfStripe, dirEntry.getOffset());
+    assertEquals(62 * 1024, dirEntry.getDataLength());
+    assertEquals(endOfStripe, shim.lastShortBlock);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/837a1bd7/java/shims/src/java/org/apache/orc/impl/HadoopShims.java
----------------------------------------------------------------------
diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShims.java b/java/shims/src/java/org/apache/orc/impl/HadoopShims.java
index 6ba7866..cdc43ac 100644
--- a/java/shims/src/java/org/apache/orc/impl/HadoopShims.java
+++ b/java/shims/src/java/org/apache/orc/impl/HadoopShims.java
@@ -116,11 +116,12 @@ public interface HadoopShims {
   }
 
   /**
-   * Allow block boundaries to be reached by zero-fill or variable length block
-   * markers (in HDFS).
-   * @return the number of bytes written
+   * End the OutputStream's current block at the current location.
+   * This is only available on HDFS on Hadoop >= 2.7, but will return false
+   * otherwise.
+   * @return was a variable length block created?
    */
-  long padStreamToBlock(OutputStream output, long padding) throws IOException;
+  boolean endVariableLengthBlock(OutputStream output) throws IOException;
 
   /**
    * A source of crypto keys. This is usually backed by a Ranger KMS.

http://git-wip-us.apache.org/repos/asf/orc/blob/837a1bd7/java/shims/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
----------------------------------------------------------------------
diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShimsCurrent.java b/java/shims/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
index 95c254b..ff11159 100644
--- a/java/shims/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
+++ b/java/shims/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
@@ -49,15 +49,13 @@ public class HadoopShimsCurrent implements HadoopShims {
   }
 
   @Override
-  public long padStreamToBlock(OutputStream output,
-                               long padding) throws IOException {
+  public boolean endVariableLengthBlock(OutputStream output) throws IOException {
     if (output instanceof HdfsDataOutputStream) {
-      ((HdfsDataOutputStream) output).hsync(
-          EnumSet.of(HdfsDataOutputStream.SyncFlag.END_BLOCK));
-      return 0; // no padding
-    } else {
-      return HadoopShimsPre2_3.padStream(output, padding);
+      HdfsDataOutputStream hdfs = (HdfsDataOutputStream) output;
+      hdfs.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.END_BLOCK));
+      return true;
     }
+    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/orc/blob/837a1bd7/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_3.java
----------------------------------------------------------------------
diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_3.java b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_3.java
index c76c4b9..1d55147 100644
--- a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_3.java
+++ b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_3.java
@@ -49,22 +49,9 @@ public class HadoopShimsPre2_3 implements HadoopShims {
     return null;
   }
 
-  private static final int BUFFER_SIZE = 256  * 1024;
-
-  static long padStream(OutputStream output,
-                        long padding) throws IOException {
-    byte[] pad = new byte[(int) Math.min(BUFFER_SIZE, padding)]; // always clear
-    while (padding > 0) {
-      int writeLen = (int) Math.min(padding, pad.length);
-      output.write(pad, 0, writeLen);
-      padding -= writeLen;
-    }
-    return padding;
-  }
-
   @Override
-  public long padStreamToBlock(OutputStream output, long padding) throws IOException {
-    return padStream(output, padding);
+  public boolean endVariableLengthBlock(OutputStream output) {
+    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/orc/blob/837a1bd7/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java
----------------------------------------------------------------------
diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java
index 775ce0d..618e4c8 100644
--- a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java
+++ b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java
@@ -124,9 +124,8 @@ public class HadoopShimsPre2_6 implements HadoopShims {
   }
 
   @Override
-  public long padStreamToBlock(OutputStream output,
-                               long padding) throws IOException {
-    return HadoopShimsPre2_3.padStream(output, padding);
+  public boolean endVariableLengthBlock(OutputStream output) {
+    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/orc/blob/837a1bd7/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_7.java
----------------------------------------------------------------------
diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_7.java b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_7.java
index f405f28..17296ab 100644
--- a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_7.java
+++ b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_7.java
@@ -60,9 +60,8 @@ public class HadoopShimsPre2_7 implements HadoopShims {
   }
 
   @Override
-  public long padStreamToBlock(OutputStream output,
-                               long padding) throws IOException {
-    return HadoopShimsPre2_3.padStream(output, padding);
+  public boolean endVariableLengthBlock(OutputStream output) {
+    return false;
   }
 
   static String buildKeyVersionName(KeyMetadata key) {

http://git-wip-us.apache.org/repos/asf/orc/blob/837a1bd7/site/_docs/hive-config.md
----------------------------------------------------------------------
diff --git a/site/_docs/hive-config.md b/site/_docs/hive-config.md
index 7945cdf..b463a6b 100644
--- a/site/_docs/hive-config.md
+++ b/site/_docs/hive-config.md
@@ -179,4 +179,11 @@ There are many Hive configuration properties related to ORC files:
       the compression level of higher level compression codec. Value can be
       SPEED or COMPRESSION.</td>
 </tr>
+<tr>
+  <td>orc.write.variable.length.blocks</td>
+  <td>false</td>
+  <td>Should the ORC writer use HDFS variable length blocks, if they are
+      available? If the new stripe would straddle a block, Hadoop is &ge; 2.7,
+      and this is enabled, it will end the block before the new stripe.</td>
+</tr>
 </table>


Mime
View raw message