beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From reuven...@apache.org
Subject [1/2] beam git commit: Move file deletion into subsequent ParDo.
Date Tue, 26 Sep 2017 15:37:01 GMT
Repository: beam
Updated Branches:
  refs/heads/master 352f106f9 -> b2138b0d7


Move file deletion into subsequent ParDo.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6d9b8f06
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6d9b8f06
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6d9b8f06

Branch: refs/heads/master
Commit: 6d9b8f06eeb9a2b0728f1dcf3c1e669bbd53959d
Parents: 352f106
Author: Reuven Lax <relax@relax-macbookpro2.roam.corp.google.com>
Authored: Tue Sep 19 10:10:18 2017 -0700
Committer: Reuven Lax <relax@relax-macbookpro2.roam.corp.google.com>
Committed: Tue Sep 26 10:40:33 2017 -0400

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/LocalFileSystem.java |   9 +-
 .../org/apache/beam/sdk/io/FileSystemsTest.java |  15 +-
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    |  15 +-
 .../sdk/io/gcp/bigquery/TableDestination.java   |   5 +-
 .../io/gcp/bigquery/WriteBundlesToFiles.java    |  28 ++-
 .../bigquery/WriteGroupedRecordsToFiles.java    |   7 +-
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   | 190 +++++++++++++------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 122 +++++++-----
 .../sdk/io/gcp/bigquery/FakeJobService.java     |   5 +-
 9 files changed, 258 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6d9b8f06/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
index 5fe894d..3891b91 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
@@ -34,6 +34,7 @@ import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
@@ -181,8 +182,12 @@ class LocalFileSystem extends FileSystem<LocalResourceId> {
   @Override
   protected void delete(Collection<LocalResourceId> resourceIds) throws IOException
{
     for (LocalResourceId resourceId : resourceIds) {
-      LOG.debug("Deleting file {}", resourceId);
-      Files.delete(resourceId.getPath());
+      try {
+        Files.delete(resourceId.getPath());
+      } catch (NoSuchFileException e) {
+        LOG.info("Ignoring failed deletion of file {} which already does not exist: {}",
resourceId,
+            e);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6d9b8f06/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
index a75c54d..3e393bf 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
@@ -83,18 +83,6 @@ public class FileSystemsTest {
   }
 
   @Test
-  public void testDeleteThrowsNoSuchFileException() throws Exception {
-    Path existingPath = temporaryFolder.newFile().toPath();
-    Path nonExistentPath = existingPath.resolveSibling("non-existent");
-
-    createFileWithContent(existingPath, "content1");
-
-    thrown.expect(NoSuchFileException.class);
-    FileSystems.delete(
-        toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory
*/));
-  }
-
-  @Test
   public void testDeleteIgnoreMissingFiles() throws Exception {
     Path existingPath = temporaryFolder.newFile().toPath();
     Path nonExistentPath = existingPath.resolveSibling("non-existent");
@@ -102,8 +90,7 @@ public class FileSystemsTest {
     createFileWithContent(existingPath, "content1");
 
     FileSystems.delete(
-        toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory
*/),
-        MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
+        toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory
*/));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/6d9b8f06/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 76cf7e8..6d832e4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -216,7 +216,6 @@ class BatchLoads<DestinationT>
                 .discardingFiredPanes());
     PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
         writeShardedFiles(inputInGlobalWindow, tempFilePrefixView);
-
     // Apply the user's trigger before we start generating BigQuery load jobs.
     results =
         results.apply(
@@ -480,15 +479,14 @@ class BatchLoads<DestinationT>
         .apply("MultiPartitionsReshuffle", Reshuffle.<ShardedKey<DestinationT>,
List<String>>of())
         .apply(
             "MultiPartitionsWriteTables",
-            ParDo.of(
-                    new WriteTables<>(
+            new WriteTables<>(
                         false,
                         bigQueryServices,
                         jobIdTokenView,
                         WriteDisposition.WRITE_EMPTY,
                         CreateDisposition.CREATE_IF_NEEDED,
-                        dynamicDestinations))
-                .withSideInputs(sideInputs));
+                        sideInputs,
+                        dynamicDestinations));
   }
 
   // In the case where the files fit into a single load job, there's no need to write temporary
@@ -510,15 +508,14 @@ class BatchLoads<DestinationT>
         .apply("SinglePartitionsReshuffle", Reshuffle.<ShardedKey<DestinationT>,
List<String>>of())
         .apply(
             "SinglePartitionWriteTables",
-            ParDo.of(
-                    new WriteTables<>(
+            new WriteTables<>(
                         true,
                         bigQueryServices,
                         jobIdTokenView,
                         writeDisposition,
                         createDisposition,
-                        dynamicDestinations))
-                .withSideInputs(sideInputs));
+                        sideInputs,
+                        dynamicDestinations));
   }
 
   private WriteResult writeResult(Pipeline p) {

http://git-wip-us.apache.org/repos/asf/beam/blob/6d9b8f06/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
index ecc34d3..ce2e7c7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -104,11 +104,12 @@ public class TableDestination implements Serializable {
     }
     TableDestination other = (TableDestination) o;
     return Objects.equals(this.tableSpec, other.tableSpec)
-        && Objects.equals(this.tableDescription, other.tableDescription);
+        && Objects.equals(this.tableDescription, other.tableDescription)
+        && Objects.equals(this.jsonTimePartitioning, other.jsonTimePartitioning);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(tableSpec, tableDescription);
+    return Objects.hash(tableSpec, tableDescription, jsonTimePartitioning);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6d9b8f06/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index e337f94..017d5c1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -30,6 +30,7 @@ import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -75,7 +76,7 @@ class WriteBundlesToFiles<DestinationT>
    * The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output
file,
    * and encapsulates the table it is destined to as well as the file byte size.
    */
-  public static final class Result<DestinationT> implements Serializable {
+  static final class Result<DestinationT> implements Serializable {
     private static final long serialVersionUID = 1L;
     public final String filename;
     public final Long fileByteSize;
@@ -87,6 +88,31 @@ class WriteBundlesToFiles<DestinationT>
       this.fileByteSize = fileByteSize;
       this.destination = destination;
     }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other instanceof Result) {
+        Result<DestinationT> o = (Result<DestinationT>) other;
+        return Objects.equals(this.filename, o.filename)
+            && Objects.equals(this.fileByteSize, o.fileByteSize)
+            && Objects.equals(this.destination, o.destination);
+      }
+      return  false;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(filename, fileByteSize, destination);
+    }
+
+    @Override
+    public String toString() {
+      return "Result{"
+          + "filename='" + filename + '\''
+          + ", fileByteSize=" + fileByteSize
+          + ", destination=" + destination
+          + '}';
+    }
   }
 
   /** a coder for the {@link Result} class. */

http://git-wip-us.apache.org/repos/asf/beam/blob/6d9b8f06/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
index 887cb93..e82b29d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
@@ -48,18 +48,21 @@ class WriteGroupedRecordsToFiles<DestinationT>
   public void processElement(ProcessContext c) throws Exception {
     String tempFilePrefix = c.sideInput(this.tempFilePrefix);
     TableRowWriter writer = new TableRowWriter(tempFilePrefix);
-    try (TableRowWriter ignored = writer) {
+    try {
       for (TableRow tableRow : c.element().getValue()) {
         if (writer.getByteSize() > maxFileSize) {
           writer.close();
+          writer = new TableRowWriter(tempFilePrefix);
           TableRowWriter.Result result = writer.getResult();
           c.output(new WriteBundlesToFiles.Result<>(
               result.resourceId.toString(), result.byteSize, c.element().getKey().getKey()));
-          writer = new TableRowWriter(tempFilePrefix);
         }
         writer.write(tableRow);
       }
+    } finally {
+      writer.close();
     }
+
     TableRowWriter.Result result = writer.getResult();
     c.output(
         new WriteBundlesToFiles.Result<>(

http://git-wip-us.apache.org/repos/asf/beam/blob/6d9b8f06/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index a646f17..f8ed796 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -29,12 +29,13 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileSystems;
-import org.apache.beam.sdk.io.fs.MoveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
@@ -42,9 +43,22 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.ShardedKey;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,7 +75,8 @@ import org.slf4j.LoggerFactory;
  * {@link KV} maps the final table to itself.
  */
 class WriteTables<DestinationT>
-    extends DoFn<KV<ShardedKey<DestinationT>, List<String>>, KV<TableDestination,
String>> {
+  extends PTransform<PCollection<KV<ShardedKey<DestinationT>, List<String>>>,
+    PCollection<KV<TableDestination, String>>> {
   private static final Logger LOG = LoggerFactory.getLogger(WriteTables.class);
 
   private final boolean singlePartition;
@@ -70,7 +85,84 @@ class WriteTables<DestinationT>
   private final WriteDisposition firstPaneWriteDisposition;
   private final CreateDisposition firstPaneCreateDisposition;
   private final DynamicDestinations<?, DestinationT> dynamicDestinations;
-  private Map<DestinationT, String> jsonSchemas = Maps.newHashMap();
+  private final List<PCollectionView<?>> sideInputs;
+  private final TupleTag<KV<TableDestination, String>> mainOutputTag;
+  private final TupleTag<String> temporaryFilesTag;
+
+
+  private class WriteTablesDoFn
+      extends DoFn<KV<ShardedKey<DestinationT>, List<String>>, KV<TableDestination,
String>> {
+    private Map<DestinationT, String> jsonSchemas = Maps.newHashMap();
+
+    @StartBundle
+    public void startBundle(StartBundleContext c) {
+      // Clear the map on each bundle so we can notice side-input updates.
+      // (alternative is to use a cache with a TTL).
+      jsonSchemas.clear();
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      dynamicDestinations.setSideInputAccessorFromProcessContext(c);
+      DestinationT destination = c.element().getKey().getKey();
+      TableSchema tableSchema;
+      String jsonSchema = jsonSchemas.get(destination);
+      if (jsonSchema != null) {
+        tableSchema = BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class);
+      } else {
+        tableSchema = dynamicDestinations.getSchema(destination);
+        if (tableSchema != null) {
+          jsonSchemas.put(destination, BigQueryHelpers.toJsonString(tableSchema));
+        }
+      }
+
+      TableDestination tableDestination = dynamicDestinations.getTable(destination);
+      TableReference tableReference = tableDestination.getTableReference();
+      if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
+        tableReference.setProjectId(
+            c.getPipelineOptions().as(BigQueryOptions.class).getProject());
+        tableDestination = new TableDestination(
+            tableReference, tableDestination.getTableDescription());
+      }
+
+      Integer partition = c.element().getKey().getShardNumber();
+      List<String> partitionFiles = Lists.newArrayList(c.element().getValue());
+      String jobIdPrefix = BigQueryHelpers.createJobId(
+          c.sideInput(jobIdToken), tableDestination, partition, c.pane().getIndex());
+
+      if (!singlePartition) {
+        tableReference.setTableId(jobIdPrefix);
+      }
+
+      WriteDisposition writeDisposition =
+          (c.pane().getIndex() == 0) ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND;
+      CreateDisposition createDisposition =
+          (c.pane().getIndex() == 0) ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER;
+      load(
+          bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
+          bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
+          jobIdPrefix,
+          tableReference,
+          tableDestination.getTimePartitioning(),
+          tableSchema,
+          partitionFiles,
+          writeDisposition,
+          createDisposition,
+          tableDestination.getTableDescription());
+      c.output(
+          mainOutputTag, KV.of(tableDestination, BigQueryHelpers.toJsonString(tableReference)));
+      for (String file : partitionFiles) {
+        c.output(temporaryFilesTag, file);
+      }
+    }
+  }
+
+  private class GarbageCollectTemporaryFiles extends DoFn<Iterable<String>, Void>
{
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      removeTemporaryFiles(c.element());
+    }
+  }
 
   public WriteTables(
       boolean singlePartition,
@@ -78,74 +170,48 @@ class WriteTables<DestinationT>
       PCollectionView<String> jobIdToken,
       WriteDisposition writeDisposition,
       CreateDisposition createDisposition,
+      List<PCollectionView<?>> sideInputs,
       DynamicDestinations<?, DestinationT> dynamicDestinations) {
     this.singlePartition = singlePartition;
     this.bqServices = bqServices;
     this.jobIdToken = jobIdToken;
     this.firstPaneWriteDisposition = writeDisposition;
     this.firstPaneCreateDisposition = createDisposition;
+    this.sideInputs = sideInputs;
     this.dynamicDestinations = dynamicDestinations;
+    this.mainOutputTag = new TupleTag<>("WriteTablesMainOutput");
+    this.temporaryFilesTag = new TupleTag<>("TemporaryFiles");
   }
 
-  @StartBundle
-  public void startBundle(StartBundleContext c) {
-    // Clear the map on each bundle so we can notice side-input updates.
-    // (alternative is to use a cache with a TTL).
-    jsonSchemas.clear();
-  }
+  @Override
+  public PCollection<KV<TableDestination, String>> expand(
+      PCollection<KV<ShardedKey<DestinationT>, List<String>>> input)
{
+    PCollectionTuple writeTablesOutputs = input.apply(ParDo.of(new WriteTablesDoFn())
+        .withSideInputs(sideInputs)
+        .withOutputTags(mainOutputTag, TupleTagList.of(temporaryFilesTag)));
 
-  @ProcessElement
-  public void processElement(ProcessContext c) throws Exception {
-    dynamicDestinations.setSideInputAccessorFromProcessContext(c);
-    DestinationT destination = c.element().getKey().getKey();
-    TableSchema tableSchema;
-    String jsonSchema = jsonSchemas.get(destination);
-    if (jsonSchema != null) {
-      tableSchema = BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class);
-    } else {
-      tableSchema = dynamicDestinations.getSchema(destination);
-      if (tableSchema != null) {
-        jsonSchemas.put(destination, BigQueryHelpers.toJsonString(tableSchema));
-      }
-    }
-
-    TableDestination tableDestination = dynamicDestinations.getTable(destination);
-    TableReference tableReference = tableDestination.getTableReference();
-    if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
-      tableReference.setProjectId(
-          c.getPipelineOptions().as(BigQueryOptions.class).getProject());
-      tableDestination = new TableDestination(
-          tableReference, tableDestination.getTableDescription());
-    }
+    // Garbage collect temporary files.
+    // We mustn't start garbage collecting files until we are assured that the WriteTablesDoFn
has
+    // succeeded in loading those files and won't be retried. Otherwise, we might fail part
of the
+    // way through deleting temporary files, and retry WriteTablesDoFn. This will then fail
due
+    // to missing files, causing either the entire workflow to fail or get stuck (depending
on how
+    // the runner handles persistent failures).
+    writeTablesOutputs
+        .get(temporaryFilesTag)
+        .setCoder(StringUtf8Coder.of())
+        .apply(WithKeys.<Void, String>of((Void) null))
+        .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of()))
+        .apply(Window.<KV<Void, String>>into(new GlobalWindows())
+            .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
+            .discardingFiredPanes())
+        .apply(GroupByKey.<Void, String>create())
+        .apply(Values.<Iterable<String>>create())
+        .apply(ParDo.of(new GarbageCollectTemporaryFiles()));
 
-    Integer partition = c.element().getKey().getShardNumber();
-    List<String> partitionFiles = Lists.newArrayList(c.element().getValue());
-    String jobIdPrefix = BigQueryHelpers.createJobId(
-            c.sideInput(jobIdToken), tableDestination, partition, c.pane().getIndex());
+    return writeTablesOutputs.get(mainOutputTag);
+  }
 
-    if (!singlePartition) {
-      tableReference.setTableId(jobIdPrefix);
-    }
 
-    WriteDisposition writeDisposition =
-        (c.pane().getIndex() == 0) ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND;
-    CreateDisposition createDisposition =
-        (c.pane().getIndex() == 0) ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER;
-    load(
-        bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
-        bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
-        jobIdPrefix,
-        tableReference,
-        tableDestination.getTimePartitioning(),
-        tableSchema,
-        partitionFiles,
-        writeDisposition,
-        createDisposition,
-        tableDestination.getTableDescription());
-    c.output(KV.of(tableDestination, BigQueryHelpers.toJsonString(tableReference)));
-
-    removeTemporaryFiles(partitionFiles);
-  }
 
   private void load(
       JobService jobService,
@@ -208,11 +274,11 @@ class WriteTables<DestinationT>
             BigQueryHelpers.jobToPrettyString(lastFailedLoadJob)));
   }
 
-  static void removeTemporaryFiles(Collection<String> files) throws IOException {
+  static void removeTemporaryFiles(Iterable<String> files) throws IOException {
     ImmutableList.Builder<ResourceId> fileResources = ImmutableList.builder();
-    for (String file: files) {
+    for (String file : files) {
       fileResources.add(FileSystems.matchNewResource(file, false/* isDirectory */));
     }
-    FileSystems.delete(fileResources.build(), MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
+    FileSystems.delete(fileResources.build());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6d9b8f06/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index ad4cbaa..5500b12 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -63,9 +63,6 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.math.BigDecimal;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -81,16 +78,13 @@ import java.util.regex.Pattern;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
@@ -130,7 +124,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -489,6 +482,40 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
+  public void testWriteEmptyPCollection() throws Exception {
+    BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    bqOptions.setProject("project-id");
+    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+
+    FakeDatasetService datasetService = new FakeDatasetService();
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(new FakeJobService())
+        .withDatasetService(datasetService);
+
+    datasetService.createDataset("project-id", "dataset-id", "", "");
+
+    Pipeline p = TestPipeline.create(bqOptions);
+
+    TableSchema schema = new TableSchema()
+        .setFields(
+            ImmutableList.of(
+                new TableFieldSchema().setName("number").setType("INTEGER")));
+
+    p.apply(Create.empty(TableRowJsonCoder.of()))
+        .apply(BigQueryIO.writeTableRows()
+            .to("project-id:dataset-id.table-id")
+            .withTestServices(fakeBqServices)
+            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
+            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+            .withSchema(schema)
+            .withoutValidation());
+    p.run();
+    checkNotNull(datasetService.getTable(
+        BigQueryHelpers.parseTableSpec("project-id:dataset-id.table-id")));
+    testNumFiles(new File(bqOptions.getTempLocation()), 0);
+  }
+
+  @Test
   public void testWriteDynamicDestinationsBatch() throws Exception {
     writeDynamicDestinations(false);
   }
@@ -635,6 +662,7 @@ public class BigQueryIOTest implements Serializable {
       assertThat(datasetService.getAllRows("project-id", "dataset-id", "userid-" + entry.getKey()),
           containsInAnyOrder(Iterables.toArray(entry.getValue(), TableRow.class)));
     }
+    testNumFiles(new File(bqOptions.getTempLocation()), 0);
   }
 
   @Test
@@ -684,6 +712,7 @@ public class BigQueryIOTest implements Serializable {
         BigQueryHelpers.parseTableSpec("project-id:dataset-id.table-id"));
     assertEquals(schema, table.getSchema());
     assertEquals(timePartitioning, table.getTimePartitioning());
+    testNumFiles(new File(bqOptions.getTempLocation()), 0);
   }
 
   @Test
@@ -737,6 +766,7 @@ public class BigQueryIOTest implements Serializable {
     assertThat(
         datasetService.getAllRows("project-id", "dataset-id", "table-id"),
         containsInAnyOrder(Iterables.toArray(elements, TableRow.class)));
+    testNumFiles(new File(bqOptions.getTempLocation()), 0);
   }
 
   @Test
@@ -836,6 +866,7 @@ public class BigQueryIOTest implements Serializable {
     // Only row1 and row3 were successfully inserted.
     assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id"),
         containsInAnyOrder(row1, row3));
+    testNumFiles(new File(bqOptions.getTempLocation()), 0);
   }
 
   @Test
@@ -908,6 +939,7 @@ public class BigQueryIOTest implements Serializable {
             new TableRow().set("name", "b").set("number", 2),
             new TableRow().set("name", "c").set("number", 3),
             new TableRow().set("name", "d").set("number", 4)));
+    testNumFiles(new File(bqOptions.getTempLocation()), 0);
   }
 
   /**
@@ -1128,6 +1160,7 @@ public class BigQueryIOTest implements Serializable {
               new TableRow().set("name", String.format("number%d", i)).set("number", i),
               new TableRow().set("name", String.format("number%d", i + 5)).set("number",
i + 5)));
     }
+    testNumFiles(new File(bqOptions.getTempLocation()), 0);
   }
 
   @Test
@@ -1142,6 +1175,7 @@ public class BigQueryIOTest implements Serializable {
         .withDatasetService(datasetService);
     datasetService.createDataset("project-id", "dataset-id", "", "");
     Pipeline p = TestPipeline.create(bqOptions);
+
     p.apply(Create.of(
         new TableRow().set("name", "a").set("number", 1),
         new TableRow().set("name", "b").set("number", 2),
@@ -1160,6 +1194,7 @@ public class BigQueryIOTest implements Serializable {
       File tempDir = new File(bqOptions.getTempLocation());
       testNumFiles(tempDir, 0);
     }
+    testNumFiles(new File(bqOptions.getTempLocation()), 0);
   }
 
   @Test
@@ -2029,19 +2064,23 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testWriteTables() throws Exception {
-    p.enableAbandonedNodeEnforcement(false);
+    BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    bqOptions.setProject("project-id");
+    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
     FakeDatasetService datasetService = new FakeDatasetService();
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService())
         .withDatasetService(datasetService);
     datasetService.createDataset("project-id", "dataset-id", "", "");
+
+    Pipeline p = TestPipeline.create(bqOptions);
+
     long numTables = 3;
     long numPartitions = 3;
     long numFilesPerPartition = 10;
-    String jobIdToken = "jobIdToken";
-    String stepUuid = "stepUuid";
-    Map<TableDestination, List<String>> expectedTempTables = Maps.newHashMap();
+    String jobIdToken = "jobId";
+    final Multimap<TableDestination, String> expectedTempTables = ArrayListMultimap.create();
 
     Path baseDir = Files.createTempDirectory(tempFolder, "testWriteTables");
 
@@ -2055,35 +2094,29 @@ public class BigQueryIOTest implements Serializable {
         for (int k = 0; k < numFilesPerPartition; ++k) {
           String filename = Paths.get(baseDir.toString(),
               String.format("files0x%08x_%05d", tempTableId.hashCode(), k)).toString();
-          ResourceId fileResource =
-              FileSystems.matchNewResource(filename, false /* isDirectory */);
-          try (WritableByteChannel channel = FileSystems.create(fileResource, MimeTypes.TEXT))
{
-            try (OutputStream output = Channels.newOutputStream(channel)) {
-              TableRow tableRow = new TableRow().set("name", tableName);
-              TableRowJsonCoder.of().encode(tableRow, output, Context.OUTER);
-              output.write("\n".getBytes(StandardCharsets.UTF_8));
-            }
+          TableRowWriter writer = new TableRowWriter(filename);
+          try (TableRowWriter ignored = writer) {
+            TableRow tableRow = new TableRow().set("name", tableName);
+            writer.write(tableRow);
           }
-          filesPerPartition.add(filename);
+          filesPerPartition.add(writer.getResult().resourceId.toString());
         }
         partitions.add(KV.of(ShardedKey.of(tableDestination.getTableSpec(), j),
             filesPerPartition));
 
-        List<String> expectedTables = expectedTempTables.get(tableDestination);
-        if (expectedTables == null) {
-          expectedTables = Lists.newArrayList();
-          expectedTempTables.put(tableDestination, expectedTables);
-        }
         String json = String.format(
             "{\"datasetId\":\"dataset-id\",\"projectId\":\"project-id\",\"tableId\":\"%s\"}",
             tempTableId);
-        expectedTables.add(json);
+        expectedTempTables.put(tableDestination, json);
       }
     }
 
+    PCollection<KV<ShardedKey<String>, List<String>>> writeTablesInput
=
+        p.apply(Create.of(partitions));
     PCollectionView<String> jobIdTokenView = p
         .apply("CreateJobId", Create.of("jobId"))
         .apply(View.<String>asSingleton());
+    List<PCollectionView<?>> sideInputs = ImmutableList.<PCollectionView<?>>of(jobIdTokenView);
 
     WriteTables<String> writeTables =
         new WriteTables<>(
@@ -2092,26 +2125,29 @@ public class BigQueryIOTest implements Serializable {
             jobIdTokenView,
             WriteDisposition.WRITE_EMPTY,
             CreateDisposition.CREATE_IF_NEEDED,
+            sideInputs,
             new IdentityDynamicTables());
 
-    DoFnTester<KV<ShardedKey<String>, List<String>>,
-        KV<TableDestination, String>> tester = DoFnTester.of(writeTables);
-    tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
-    tester.getPipelineOptions().setTempLocation("tempLocation");
-    for (KV<ShardedKey<String>, List<String>> partition : partitions) {
-      tester.processElement(partition);
-    }
+    PCollection<KV<TableDestination, String>> writeTablesOutput =
+        writeTablesInput.apply(writeTables);
 
-    Map<TableDestination, List<String>> tempTablesResult = Maps.newHashMap();
-    for (KV<TableDestination, String> element : tester.takeOutputElements()) {
-      List<String> tables = tempTablesResult.get(element.getKey());
-      if (tables == null) {
-        tables = Lists.newArrayList();
-        tempTablesResult.put(element.getKey(), tables);
-      }
-      tables.add(element.getValue());
-    }
-    assertEquals(expectedTempTables, tempTablesResult);
+    PAssert.thatMultimap(writeTablesOutput)
+        .satisfies(
+            new SerializableFunction<Map<TableDestination, Iterable<String>>,
Void>() {
+              @Override
+              public Void apply(Map<TableDestination, Iterable<String>> input)
{
+                assertEquals(input.keySet(), expectedTempTables.keySet());
+                for (Map.Entry<TableDestination, Iterable<String>> entry : input.entrySet())
{
+                  @SuppressWarnings("unchecked")
+                  String[] expectedValues = Iterables.toArray(
+                      expectedTempTables.get(entry.getKey()), String.class);
+                  assertThat(entry.getValue(), containsInAnyOrder(expectedValues));
+                }
+                return null;
+              }
+            });
+    p.run();
+    testNumFiles(baseDir.toFile(), 0);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/6d9b8f06/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index cc600d1..f13a7ab 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -63,7 +63,6 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.io.FileSystems;
-import org.apache.beam.sdk.io.fs.MoveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
@@ -129,8 +128,7 @@ class FakeJobService implements JobService, Serializable {
               filename + ThreadLocalRandom.current().nextInt(), false /* isDirectory */));
         }
 
-        FileSystems.copy(sourceFiles.build(), loadFiles.build(),
-            MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
+        FileSystems.copy(sourceFiles.build(), loadFiles.build());
         filesForLoadJobs.put(jobRef.getProjectId(), jobRef.getJobId(), loadFiles.build());
       }
 
@@ -325,6 +323,7 @@ class FakeJobService implements JobService, Serializable {
       rows.addAll(readRows(filename.toString()));
     }
     datasetService.insertAll(destination, rows, null);
+    FileSystems.delete(sourceFiles);
     return new JobStatus().setState("DONE");
   }
 


Mime
View raw message