beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] beam git commit: Remove IOChannelUtils from BigQuery TableRowWriter
Date Mon, 01 May 2017 23:04:14 GMT
Repository: beam
Updated Branches:
  refs/heads/master 36ed6dc3c -> a2292a7dc


Remove IOChannelUtils from BigQuery TableRowWriter


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cc519138
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cc519138
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cc519138

Branch: refs/heads/master
Commit: cc519138e3ea25b92cf710c6f297da2c472f0eef
Parents: 36ed6dc
Author: Vikas Kedigehalli <vikasrk@google.com>
Authored: Sun Apr 30 23:29:20 2017 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Mon May 1 16:03:55 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/bigquery/TableRowWriter.java     | 27 ++++++++++----------
 .../io/gcp/bigquery/WriteBundlesToFiles.java    |  2 +-
 .../sdk/io/gcp/bigquery/WritePartition.java     |  2 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  2 +-
 4 files changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cc519138/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
index cb51158..f9d8785 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
@@ -26,7 +26,8 @@ import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,17 +40,17 @@ class TableRowWriter {
   private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
   private final String tempFilePrefix;
   private String id;
-  private String fileName;
+  private ResourceId resourceId;
   private WritableByteChannel channel;
   protected String mimeType = MimeTypes.TEXT;
   private CountingOutputStream out;
 
   public static final class Result {
-    final String filename;
+    final ResourceId resourceId;
     final long byteSize;
 
-    public Result(String filename, long byteSize) {
-      this.filename = filename;
+    public Result(ResourceId resourceId, long byteSize) {
+      this.resourceId = resourceId;
       this.byteSize = byteSize;
     }
   }
@@ -60,22 +61,22 @@ class TableRowWriter {
 
   public final void open(String uId) throws Exception {
     id = uId;
-    fileName = tempFilePrefix + id;
-    LOG.debug("Opening {}.", fileName);
-    channel = IOChannelUtils.create(fileName, mimeType);
+    resourceId = FileSystems.matchNewResource(tempFilePrefix + id, false);
+    LOG.debug("Opening {}.", resourceId);
+    channel = FileSystems.create(resourceId, mimeType);
     try {
       out = new CountingOutputStream(Channels.newOutputStream(channel));
-      LOG.debug("Writing header to {}.", fileName);
+      LOG.debug("Writing header to {}.", resourceId);
     } catch (Exception e) {
       try {
-        LOG.error("Writing header to {} failed, closing channel.", fileName);
+        LOG.error("Writing header to {} failed, closing channel.", resourceId);
         channel.close();
       } catch (IOException closeException) {
-        LOG.error("Closing channel for {} failed", fileName);
+        LOG.error("Closing channel for {} failed", resourceId);
       }
       throw e;
     }
-    LOG.debug("Starting write of bundle {} to {}.", this.id, fileName);
+    LOG.debug("Starting write of bundle {} to {}.", this.id, resourceId);
   }
 
   public void write(TableRow value) throws Exception {
@@ -85,6 +86,6 @@ class TableRowWriter {
 
   public final Result close() throws IOException {
     channel.close();
-    return new Result(fileName, out.getCount());
+    return new Result(resourceId, out.getCount());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/cc519138/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index d337476..5f89067 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -140,7 +140,7 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>,
WriteBund
   public void finishBundle(Context c) throws Exception {
     for (Map.Entry<TableDestination, TableRowWriter> entry : writers.entrySet()) {
       TableRowWriter.Result result = entry.getValue().close();
-      c.output(new Result(result.filename, result.byteSize, entry.getKey()));
+      c.output(new Result(result.resourceId.toString(), result.byteSize, entry.getKey()));
     }
     writers.clear();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/cc519138/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index 9414909..0ae1768 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -128,7 +128,7 @@ class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>,
List<
         TableRowWriter writer = new TableRowWriter(c.element());
         writer.open(UUID.randomUUID().toString());
         TableRowWriter.Result writerResult = writer.close();
-        results.add(new Result(writerResult.filename, writerResult.byteSize,
+        results.add(new Result(writerResult.resourceId.toString(), writerResult.byteSize,
             new TableDestination(singletonTable, singletonOutputTableDescription)));
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/cc519138/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index a46c1fe..baa5621 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1775,7 +1775,7 @@ public class BigQueryIOTest implements Serializable {
     for (int i = 0; i < numFiles; ++i) {
       String fileName = String.format("files%05d", i);
       writer.open(fileName);
-      fileNames.add(writer.close().filename);
+      fileNames.add(writer.close().resourceId.toString());
     }
     fileNames.add(tempFilePrefix + String.format("files%05d", numFiles));
 


Mime
View raw message