beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-5412) TFRecordIO fails with records larger than 8K
Date Fri, 21 Sep 2018 02:55:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-5412?focusedWorklogId=146197&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146197
]

ASF GitHub Bot logged work on BEAM-5412:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Sep/18 02:54
            Start Date: 21/Sep/18 02:54
    Worklog Time Spent: 10m 
      Work Description: chamikaramj closed pull request #6440: [BEAM-5412][BEAM-5408] Fixes
a bug that limited the size of TFRecords
URL: https://github.com/apache/beam/pull/6440
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index a6ac486bba0..274df369d72 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -619,20 +619,39 @@ public int recordLength(byte[] data) {
         return null;
       }
       checkState(headerBytes == HEADER_LEN, "Not a valid TFRecord. Fewer than 12 bytes.");
+
       header.rewind();
       long length = header.getLong();
+      long lengthHash = hashLong(length);
       int maskedCrc32OfLength = header.getInt();
-      checkState(hashLong(length) == maskedCrc32OfLength, "Mismatch of length mask");
+      if (lengthHash != maskedCrc32OfLength) {
+        throw new IOException(
+            String.format(
+                "Mistmatch of length mask when reading a record. Expected %d but received
%d.",
+                maskedCrc32OfLength, lengthHash));
+      }
 
       ByteBuffer data = ByteBuffer.allocate((int) length);
-      checkState(inChannel.read(data) == length, "Invalid data");
+      while (data.hasRemaining() && inChannel.read(data) >= 0) {}
+      if (data.hasRemaining()) {
+        throw new IOException(
+            String.format(
+                "EOF while reading record of length %d. Read only %d bytes. Input might be
truncated.",
+                length, data.position()));
+      }
 
       footer.clear();
       inChannel.read(footer);
       footer.rewind();
-      int maskedCrc32OfData = footer.getInt();
 
-      checkState(hashBytes(data.array()) == maskedCrc32OfData, "Mismatch of data mask");
+      int maskedCrc32OfData = footer.getInt();
+      int dataHash = hashBytes(data.array());
+      if (dataHash != maskedCrc32OfData) {
+        throw new IOException(
+            String.format(
+                "Mistmatch of data mask when reading a record. Expected %d but received %d.",
+                maskedCrc32OfData, dataHash));
+      }
       return data.array();
     }
 
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
index 5ff5b1c880f..fbd69a3f898 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
@@ -84,7 +84,8 @@
   private static final String[] FOO_BAR_RECORDS = {"foo", "bar"};
 
   private static final Iterable<String> EMPTY = Collections.emptyList();
-  private static final Iterable<String> LARGE = makeLines(1000);
+  private static final Iterable<String> LARGE = makeLines(1000, 4);
+  private static final Iterable<String> LARGE_RECORDS = makeLines(100, 100000);
 
   @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
 
@@ -287,6 +288,18 @@ public void runTestRoundTripZlibFilesWithAuto() throws IOException {
     runTestRoundTrip(LARGE, 10, ".tfrecords", DEFLATE, AUTO);
   }
 
+  @Test
+  @Category(NeedsRunner.class)
+  public void runTestRoundTripLargeRecords() throws IOException {
+    runTestRoundTrip(LARGE_RECORDS, 10, ".tfrecords", UNCOMPRESSED, UNCOMPRESSED);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void runTestRoundTripLargeRecordsGzip() throws IOException {
+    runTestRoundTrip(LARGE_RECORDS, 10, ".tfrecords", GZIP, GZIP);
+  }
+
   private void runTestRoundTrip(
       Iterable<String> elems,
       int numShards,
@@ -344,10 +357,15 @@ private void runTestRoundTrip(
     readPipeline.run();
   }
 
-  private static Iterable<String> makeLines(int n) {
+  private static Iterable<String> makeLines(int n, int minRecordSize) {
     List<String> ret = Lists.newArrayList();
+    StringBuilder recordBuilder = new StringBuilder();
+    for (int i = 0; i < minRecordSize; i++) {
+      recordBuilder.append("x");
+    }
+    String record = recordBuilder.toString();
     for (int i = 0; i < n; ++i) {
-      ret.add("word" + i);
+      ret.add(record + " " + i);
     }
     return ret;
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 146197)
    Time Spent: 2h 50m  (was: 2h 40m)

> TFRecordIO fails with records larger than 8K
> --------------------------------------------
>
>                 Key: BEAM-5412
>                 URL: https://issues.apache.org/jira/browse/BEAM-5412
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-text
>    Affects Versions: 2.4.0
>            Reporter: Raghu Angadi
>            Assignee: Chamikara Jayalath
>            Priority: Major
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> This was reported on [Stackoverflow|https://stackoverflow.com/questions/52284639/beam-java-sdk-with-tfrecord-and-compression-gzip].
TFRecordIO reader assumes a single call to {{channel.read()}} returns as much as can fit in
the input buffer. {{read()}} can return fewer bytes than requested. Assert failure : https://github.com/apache/beam/blob/release-2.4.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java#L642



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message