pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sz...@apache.org
Subject svn commit: r1850245 - in /pig/trunk: CHANGES.txt src/org/apache/pig/impl/io/InterRecordReader.java test/org/apache/pig/test/TestBinInterSedes.java
Date Thu, 03 Jan 2019 15:54:11 GMT
Author: szita
Date: Thu Jan  3 15:54:11 2019
New Revision: 1850245

URL: http://svn.apache.org/viewvc?rev=1850245&view=rev
Log:
PIG-5373: InterRecordReader might skip records if certain sync markers are used (szita)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java
    pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1850245&r1=1850244&r2=1850245&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Jan  3 15:54:11 2019
@@ -88,6 +88,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5373: InterRecordReader might skip records if certain sync markers are used (szita)
+
 PIG-5370: Union onschema + columnprune dropping used fields (knoguchi)
 
 PIG-5362: Parameter substitution of shell cmd results doesn't handle backslash (wlauer via
rohini)

Modified: pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java?rev=1850245&r1=1850244&r2=1850245&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java Thu Jan  3 15:54:11 2019
@@ -20,6 +20,7 @@ package org.apache.pig.impl.io;
 import java.io.DataInputStream;
 import java.io.IOException;
 
+import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -89,35 +90,34 @@ public class InterRecordReader extends R
      * @return true if marker was observed, false if EOF or EndOfSplit was reached
      * @throws IOException
      */
-  private boolean skipUntilMarkerOrSplitEndOrEOF() throws IOException {
+  public boolean skipUntilMarkerOrSplitEndOrEOF() throws IOException {
       int b = Integer.MIN_VALUE;
-outer:while (b != -1) {
-          if (b != syncMarker[0]) {
+      CircularFifoQueue<Integer> queue = new CircularFifoQueue(syncMarker.length);
+      outer:while (b != -1) {
+          //There may be a case where we read through a whole split without a marker, then
we shouldn't proceed
+          // because the records are from the next split which another reader would pick
up too
+          //One exception of reading past split end is if at least the first byte of the
marker was seen before split
+          // end.
+          if (in.getPosition() >= (end+syncMarker.length-1)) {
+              return false;
+          }
+          b = in.read();
 
-              //There may be a case where we read through a whole split without a marker,
then we shouldn't proceed
-              // because the records are from the next split which another reader would pick
up too
-              if (in.getPosition() >= end) {
-                  return false;
-              }
-              b = in.read();
-              if ((byte) b != syncMarker[0] && b != -1) {
-                  continue;
-              }
-              if (b == -1) return false;
+          //EOF reached
+          if (b == -1) return false;
+
+          queue.add(b);
+          if (queue.size() != queue.maxSize()) {
+              //Not enough bytes read yet
+              continue outer;
           }
-          int i = 1;
-          while (i < syncMarker.length) {
-              b = in.read();
-              if (b == -1) return false;
-              if ((byte) b != syncMarker[i]) {
-                  if (in.getPosition() > end) {
-                      //Again we should not read past the split end, only if at least the
first byte of marker was seen before it
-                      return false;
-                  }
+          int i = 0;
+          for (Integer seenByte : queue){
+              if (syncMarker[i++] != seenByte.byteValue()) {
                   continue outer;
               }
-              ++i;
           }
+          //Found marker: queue content equals sync marker
           lastSyncPos = in.getPosition();
           return true;
       }

Modified: pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java?rev=1850245&r1=1850244&r2=1850245&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java Thu Jan  3 15:54:11 2019
@@ -18,6 +18,7 @@
 package org.apache.pig.test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -26,6 +27,7 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -42,8 +44,11 @@ import org.apache.pig.data.InterSedes;
 import org.apache.pig.data.InterSedesFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.InterRecordReader;
 import org.apache.pig.impl.util.TupleFormat;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 
 public class TestBinInterSedes {
     private static final TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -437,6 +442,45 @@ public class TestBinInterSedes {
 
     }
 
+    /**
+     * Tests all combination where:
+     * sync marker is {x, y, 4}
+     * data is {127, -2, 2, z, x, y, 4, 1, 2, 3}
+     * x,y,z in [-128,127]
+     * This means that a sync marker has to be found in all iterations (total=16,777,216)
+     * @throws Exception
+     */
+    @Test
+    public void testPrefixSyncMarkers() throws Exception {
+        long defaultInterval = PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT;
+
+        for (int b0 = -128; b0 <= 127; b0++) {
+            for (int b1 = -128; b1 <= 127; b1++) {
+                for (int b2 = -128; b2 <= 127; b2++) {
+                    byte[] syncMarker = new byte[]{(byte) b0, (byte) b1, (byte)4};
+                    byte[] data = new byte[]{127, -1, 2, (byte) b2, (byte) b0, (byte) b1,
4, 1, 2, 3};
+
+                    ByteArrayInputStream bi = new ByteArrayInputStream(data);
+                    BufferedPositionedInputStream bpi = new BufferedPositionedInputStream(bi);
+
+                    InterRecordReader reader = new InterRecordReader(syncMarker.length, defaultInterval);
+                    Whitebox.setInternalState(reader, "syncMarker", syncMarker);
+                    Whitebox.setInternalState(reader, "end", data.length);
+                    Whitebox.setInternalState(reader, "in", bpi);
+
+                    try {
+                        boolean ret = reader.skipUntilMarkerOrSplitEndOrEOF();
+                        assertTrue("Marker should have been found: " + "marker: " +
+                                Arrays.toString(syncMarker) + " , data: " + Arrays.toString(data),ret);
+                    } finally {
+                        bpi.close();
+                    }
+
+                }
+            }
+        }
+    }
+
     private void testSerTuple(Tuple t, byte[] expected) throws Exception {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutput out = new DataOutputStream(baos);



Mime
View raw message