beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: Demonstrate PubsubIO with NVP
Date Tue, 29 Nov 2016 05:21:49 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master ae06f759f -> aeff1d5c2


Demonstrate PubsubIO with NVP


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f9225981
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f9225981
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f9225981

Branch: refs/heads/master
Commit: f92259814964fb4d3b2381187247b3f11b5fe33f
Parents: ae06f75
Author: Sam McVeety <sgmc@google.com>
Authored: Sat Oct 29 19:02:51 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Mon Nov 28 21:14:33 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/PubsubIO.java   | 176 ++++++++++++++++---
 .../apache/beam/sdk/io/PubsubUnboundedSink.java |  23 ++-
 .../beam/sdk/io/PubsubUnboundedSource.java      |  40 +++--
 .../org/apache/beam/sdk/io/PubsubIOTest.java    |  43 +++--
 .../beam/sdk/io/PubsubUnboundedSinkTest.java    |  20 ++-
 .../beam/sdk/io/PubsubUnboundedSourceTest.java  |  14 +-
 6 files changed, 232 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f9225981/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 72a6399..9768788 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -31,11 +31,15 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -134,7 +138,7 @@ public class PubsubIO {
    * Populate common {@link DisplayData} between Pubsub source and sink.
    */
   private static void populateCommonDisplayData(DisplayData.Builder builder,
-      String timestampLabel, String idLabel, PubsubTopic topic) {
+      String timestampLabel, String idLabel, String topic) {
     builder
         .addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)
             .withLabel("Timestamp Label Attribute"))
@@ -142,7 +146,7 @@ public class PubsubIO {
             .withLabel("ID Label Attribute"));
 
     if (topic != null) {
-      builder.add(DisplayData.item("topic", topic.asPath())
+      builder.add(DisplayData.item("topic", topic)
           .withLabel("Pubsub Topic"));
     }
   }
@@ -253,6 +257,61 @@ public class PubsubIO {
   }
 
   /**
+   * Used to build a {@link ValueProvider} for {@link PubsubSubscription}.
+   */
+  private static class SubscriptionTranslator
+      implements SerializableFunction<String, PubsubSubscription> {
+    @Override
+    public PubsubSubscription apply(String from) {
+      return PubsubSubscription.fromPath(from);
+    }
+  }
+
+  /**
+   * Used to build a {@link ValueProvider} for {@link SubscriptionPath}.
+   */
+  private static class SubscriptionPathTranslator
+      implements SerializableFunction<PubsubSubscription, SubscriptionPath> {
+    @Override
+    public SubscriptionPath apply(PubsubSubscription from) {
+      return PubsubClient.subscriptionPathFromName(from.project, from.subscription);
+    }
+  }
+
+  /**
+   * Used to build a {@link ValueProvider} for {@link PubsubTopic}.
+   */
+  private static class TopicTranslator
+      implements SerializableFunction<String, PubsubTopic> {
+    @Override
+    public PubsubTopic apply(String from) {
+      return PubsubTopic.fromPath(from);
+    }
+  }
+
+  /**
+   * Used to build a {@link ValueProvider} for {@link TopicPath}.
+   */
+  private static class TopicPathTranslator
+      implements SerializableFunction<PubsubTopic, TopicPath> {
+    @Override
+    public TopicPath apply(PubsubTopic from) {
+      return PubsubClient.topicPathFromName(from.project, from.topic);
+    }
+  }
+
+  /**
+   * Used to build a {@link ValueProvider} for {@link ProjectPath}.
+   */
+  private static class ProjectPathTranslator
+      implements SerializableFunction<PubsubTopic, ProjectPath> {
+    @Override
+    public ProjectPath apply(PubsubTopic from) {
+      return PubsubClient.projectPathFromId(from.project);
+    }
+  }
+
+  /**
    * Class representing a Cloud Pub/Sub Topic.
    */
   public static class PubsubTopic implements Serializable {
@@ -380,6 +439,13 @@ public class PubsubIO {
      * by the runner.
      */
     public static Bound<String> topic(String topic) {
+      return new Bound<>(DEFAULT_PUBSUB_CODER).topic(StaticValueProvider.of(topic));
+    }
+
+    /**
+     * Like {@code topic()} but with a {@link ValueProvider}.
+     */
+    public static Bound<String> topic(ValueProvider<String> topic) {
       return new Bound<>(DEFAULT_PUBSUB_CODER).topic(topic);
     }
 
@@ -391,6 +457,13 @@ public class PubsubIO {
      * of the {@code subscription} string.
      */
     public static Bound<String> subscription(String subscription) {
+      return new Bound<>(DEFAULT_PUBSUB_CODER).subscription(StaticValueProvider.of(subscription));
+    }
+
+    /**
+     * Like {@code topic()} but with a {@link ValueProvider}.
+     */
+    public static Bound<String> subscription(ValueProvider<String> subscription)
{
       return new Bound<>(DEFAULT_PUBSUB_CODER).subscription(subscription);
     }
 
@@ -484,10 +557,10 @@ public class PubsubIO {
      */
     public static class Bound<T> extends PTransform<PBegin, PCollection<T>>
{
       /** The Cloud Pub/Sub topic to read from. */
-      @Nullable private final PubsubTopic topic;
+      @Nullable private final ValueProvider<PubsubTopic> topic;
 
       /** The Cloud Pub/Sub subscription to read from. */
-      @Nullable private final PubsubSubscription subscription;
+      @Nullable private final ValueProvider<PubsubSubscription> subscription;
 
       /** The name of the message attribute to read timestamps from. */
       @Nullable private final String timestampLabel;
@@ -508,9 +581,9 @@ public class PubsubIO {
         this(null, null, null, null, coder, null, 0, null);
       }
 
-      private Bound(String name, PubsubSubscription subscription, PubsubTopic topic,
-          String timestampLabel, Coder<T> coder, String idLabel, int maxNumRecords,
-          Duration maxReadTime) {
+      private Bound(String name, ValueProvider<PubsubSubscription> subscription,
+          ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> coder,
+          String idLabel, int maxNumRecords, Duration maxReadTime) {
         super(name);
         this.subscription = subscription;
         this.topic = topic;
@@ -535,8 +608,16 @@ public class PubsubIO {
        * <p>Does not modify this object.
        */
       public Bound<T> subscription(String subscription) {
-        return new Bound<>(name, PubsubSubscription.fromPath(subscription), topic,
timestampLabel,
-            coder, idLabel, maxNumRecords, maxReadTime);
+        return subscription(StaticValueProvider.of(subscription));
+      }
+
+      /**
+       * Like {@code subscription()} but with a {@link ValueProvider}.
+       */
+      public Bound<T> subscription(ValueProvider<String> subscription) {
+        return new Bound<>(name,
+            NestedValueProvider.of(subscription, new SubscriptionTranslator()),
+            topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
       }
 
       /**
@@ -548,8 +629,16 @@ public class PubsubIO {
        * <p>Does not modify this object.
        */
       public Bound<T> topic(String topic) {
-        return new Bound<>(name, subscription, PubsubTopic.fromPath(topic), timestampLabel,
coder,
-            idLabel, maxNumRecords, maxReadTime);
+        return topic(StaticValueProvider.of(topic));
+      }
+
+      /**
+       * Like {@code topic()} but with a {@link ValueProvider}.
+       */
+      public Bound<T> topic(ValueProvider<String> topic) {
+        return new Bound<>(name, subscription,
+            NestedValueProvider.of(topic, new TopicTranslator()),
+            timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
       }
 
       /**
@@ -629,15 +718,14 @@ public class PubsubIO {
                       .apply(ParDo.of(new PubsubBoundedReader()))
                       .setCoder(coder);
         } else {
-          @Nullable ProjectPath projectPath =
-              topic == null ? null : PubsubClient.projectPathFromId(topic.project);
-          @Nullable TopicPath topicPath =
-              topic == null ? null : PubsubClient.topicPathFromName(topic.project, topic.topic);
-          @Nullable SubscriptionPath subscriptionPath =
+          @Nullable ValueProvider<ProjectPath> projectPath =
+              topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator());
+          @Nullable ValueProvider<TopicPath> topicPath =
+              topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator());
+          @Nullable ValueProvider<SubscriptionPath> subscriptionPath =
               subscription == null
                   ? null
-                  : PubsubClient.subscriptionPathFromName(
-                      subscription.project, subscription.subscription);
+                  : NestedValueProvider.of(subscription, new SubscriptionPathTranslator());
           return input.getPipeline().begin()
                       .apply(new PubsubUnboundedSource<T>(
                           FACTORY, projectPath, topicPath, subscriptionPath,
@@ -648,7 +736,11 @@ public class PubsubIO {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
         super.populateDisplayData(builder);
-        populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
+        String topicString =
+            topic == null ? null
+            : topic.isAccessible() ? topic.get().asPath()
+            : topic.toString();
+        populateCommonDisplayData(builder, timestampLabel, idLabel, topicString);
 
         builder
             .addIfNotNull(DisplayData.item("maxReadTime", maxReadTime)
@@ -657,8 +749,10 @@ public class PubsubIO {
               .withLabel("Maximum Read Records"), 0);
 
         if (subscription != null) {
-          builder.add(DisplayData.item("subscription", subscription.asPath())
-            .withLabel("Pubsub Subscription"));
+          String subscriptionString = subscription.isAccessible()
+              ? subscription.get().asPath() : subscription.toString();
+          builder.add(DisplayData.item("subscription", subscriptionString)
+              .withLabel("Pubsub Subscription"));
         }
       }
 
@@ -668,10 +762,18 @@ public class PubsubIO {
       }
 
       public PubsubTopic getTopic() {
+        return topic == null ? null : topic.get();
+      }
+
+      public ValueProvider<PubsubTopic> getTopicProvider() {
         return topic;
       }
 
       public PubsubSubscription getSubscription() {
+        return subscription == null ? null : subscription.get();
+      }
+
+      public ValueProvider<PubsubSubscription> getSubscriptionProvider() {
         return subscription;
       }
 
@@ -820,6 +922,13 @@ public class PubsubIO {
      * {@code topic} string.
      */
     public static Bound<String> topic(String topic) {
+      return new Bound<>(DEFAULT_PUBSUB_CODER).topic(StaticValueProvider.of(topic));
+    }
+
+    /**
+     * Like {@code topic()} but with a {@link ValueProvider}.
+     */
+    public static Bound<String> topic(ValueProvider<String> topic) {
       return new Bound<>(DEFAULT_PUBSUB_CODER).topic(topic);
     }
 
@@ -869,7 +978,7 @@ public class PubsubIO {
      */
     public static class Bound<T> extends PTransform<PCollection<T>, PDone>
{
       /** The Cloud Pub/Sub topic to publish to. */
-      @Nullable private final PubsubTopic topic;
+      @Nullable private final ValueProvider<PubsubTopic> topic;
       /** The name of the message attribute to publish message timestamps in. */
       @Nullable private final String timestampLabel;
       /** The name of the message attribute to publish unique message IDs in. */
@@ -881,7 +990,8 @@ public class PubsubIO {
       }
 
       private Bound(
-          String name, PubsubTopic topic, String timestampLabel, String idLabel, Coder<T>
coder) {
+          String name, ValueProvider<PubsubTopic> topic, String timestampLabel,
+          String idLabel, Coder<T> coder) {
         super(name);
         this.topic = topic;
         this.timestampLabel = timestampLabel;
@@ -899,7 +1009,15 @@ public class PubsubIO {
        * <p>Does not modify this object.
        */
       public Bound<T> topic(String topic) {
-        return new Bound<>(name, PubsubTopic.fromPath(topic), timestampLabel, idLabel,
coder);
+        return topic(StaticValueProvider.of(topic));
+      }
+
+      /**
+       * Like {@code topic()} but with a {@link ValueProvider}.
+       */
+      public Bound<T> topic(ValueProvider<String> topic) {
+        return new Bound<>(name, NestedValueProvider.of(topic, new TopicTranslator()),
+            timestampLabel, idLabel, coder);
       }
 
       /**
@@ -950,7 +1068,7 @@ public class PubsubIO {
           case UNBOUNDED:
             return input.apply(new PubsubUnboundedSink<T>(
                 FACTORY,
-                PubsubClient.topicPathFromName(topic.project, topic.topic),
+                NestedValueProvider.of(topic, new TopicPathTranslator()),
                 coder,
                 timestampLabel,
                 idLabel,
@@ -962,7 +1080,9 @@ public class PubsubIO {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
         super.populateDisplayData(builder);
-        populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
+        String topicString = topic.isAccessible()
+            ? topic.get().asPath() : topic.toString();
+        populateCommonDisplayData(builder, timestampLabel, idLabel, topicString);
       }
 
       @Override
@@ -971,6 +1091,10 @@ public class PubsubIO {
       }
 
       public PubsubTopic getTopic() {
+        return topic.get();
+      }
+
+      public ValueProvider<PubsubTopic> getTopicProvider() {
         return topic;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f9225981/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 179abf6..1e369c8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -205,7 +206,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>,
PDone> {
   private static class WriterFn
       extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
     private final PubsubClientFactory pubsubFactory;
-    private final TopicPath topic;
+    private final ValueProvider<TopicPath> topic;
     private final String timestampLabel;
     private final String idLabel;
     private final int publishBatchSize;
@@ -225,8 +226,8 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>,
PDone> {
         createAggregator("bytes", new Sum.SumLongFn());
 
     WriterFn(
-        PubsubClientFactory pubsubFactory, TopicPath topic, String timestampLabel,
-        String idLabel, int publishBatchSize, int publishBatchBytes) {
+        PubsubClientFactory pubsubFactory, ValueProvider<TopicPath> topic,
+        String timestampLabel, String idLabel, int publishBatchSize, int publishBatchBytes)
{
       this.pubsubFactory = pubsubFactory;
       this.topic = topic;
       this.timestampLabel = timestampLabel;
@@ -241,7 +242,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>,
PDone> {
      */
     private void publishBatch(List<OutgoingMessage> messages, int bytes)
         throws IOException {
-      int n = pubsubClient.publish(topic, messages);
+      int n = pubsubClient.publish(topic.get(), messages);
       checkState(n == messages.size(), "Attempted to publish %s messages but %s were successful",
                  messages.size(), n);
       batchCounter.addValue(1L);
@@ -290,7 +291,11 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>,
PDone> {
     @Override
     public void populateDisplayData(Builder builder) {
       super.populateDisplayData(builder);
-      builder.add(DisplayData.item("topic", topic.getPath()));
+        String topicString =
+            topic == null ? null
+            : topic.isAccessible() ? topic.get().getPath()
+            : topic.toString();
+      builder.add(DisplayData.item("topic", topicString));
       builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
       builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel));
       builder.addIfNotNull(DisplayData.item("idLabel", idLabel));
@@ -309,7 +314,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>,
PDone> {
   /**
    * Pubsub topic to publish to.
    */
-  private final TopicPath topic;
+  private final ValueProvider<TopicPath> topic;
 
   /**
    * Coder for elements. It is the responsibility of the underlying Pubsub transport to
@@ -363,7 +368,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>,
PDone> {
   @VisibleForTesting
   PubsubUnboundedSink(
       PubsubClientFactory pubsubFactory,
-      TopicPath topic,
+      ValueProvider<TopicPath> topic,
       Coder<T> elementCoder,
       String timestampLabel,
       String idLabel,
@@ -386,7 +391,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>,
PDone> {
 
   public PubsubUnboundedSink(
       PubsubClientFactory pubsubFactory,
-      TopicPath topic,
+      ValueProvider<TopicPath> topic,
       Coder<T> elementCoder,
       String timestampLabel,
       String idLabel,
@@ -397,7 +402,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>,
PDone> {
   }
 
   public TopicPath getTopic() {
-    return topic;
+    return topic.get();
   }
 
   @Nullable

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f9225981/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index bfacb71..4ec8389 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -52,6 +52,7 @@ import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -1161,7 +1162,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin,
PCollection<T>>
         createAggregator("elements", new Sum.SumLongFn());
 
     private final PubsubClientFactory pubsubFactory;
-    private final SubscriptionPath subscription;
+    private final ValueProvider<SubscriptionPath> subscription;
     @Nullable
     private final String timestampLabel;
     @Nullable
@@ -1169,7 +1170,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin,
PCollection<T>>
 
     public StatsFn(
         PubsubClientFactory pubsubFactory,
-        SubscriptionPath subscription,
+        ValueProvider<SubscriptionPath> subscription,
         @Nullable
             String timestampLabel,
         @Nullable
@@ -1189,7 +1190,11 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin,
PCollection<T>>
     @Override
     public void populateDisplayData(Builder builder) {
       super.populateDisplayData(builder);
-      builder.add(DisplayData.item("subscription", subscription.getPath()));
+        String subscriptionString =
+            subscription == null ? null
+            : subscription.isAccessible() ? subscription.get().getPath()
+            : subscription.toString();
+      builder.add(DisplayData.item("subscription", subscriptionString));
       builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
       builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel));
       builder.addIfNotNull(DisplayData.item("idLabel", idLabel));
@@ -1215,14 +1220,14 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin,
PCollection<T>>
    * Project under which to create a subscription if only the {@link #topic} was given.
    */
   @Nullable
-  private final ProjectPath project;
+  private final ValueProvider<ProjectPath> project;
 
   /**
    * Topic to read from. If {@literal null}, then {@link #subscription} must be given.
    * Otherwise {@link #subscription} must be null.
    */
   @Nullable
-  private final TopicPath topic;
+  private final ValueProvider<TopicPath> topic;
 
   /**
    * Subscription to read from. If {@literal null} then {@link #topic} must be given.
@@ -1233,7 +1238,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin,
PCollection<T>>
    * subscription is never deleted.
    */
   @Nullable
-  private SubscriptionPath subscription;
+  private ValueProvider<SubscriptionPath> subscription;
 
   /**
    * Coder for elements. Elements are effectively double-encoded: first to a byte array
@@ -1260,9 +1265,9 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin,
PCollection<T>>
   PubsubUnboundedSource(
       Clock clock,
       PubsubClientFactory pubsubFactory,
-      @Nullable ProjectPath project,
-      @Nullable TopicPath topic,
-      @Nullable SubscriptionPath subscription,
+      @Nullable ValueProvider<ProjectPath> project,
+      @Nullable ValueProvider<TopicPath> topic,
+      @Nullable ValueProvider<SubscriptionPath> subscription,
       Coder<T> elementCoder,
       @Nullable String timestampLabel,
       @Nullable String idLabel) {
@@ -1285,9 +1290,9 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin,
PCollection<T>>
    */
   public PubsubUnboundedSource(
       PubsubClientFactory pubsubFactory,
-      @Nullable ProjectPath project,
-      @Nullable TopicPath topic,
-      @Nullable SubscriptionPath subscription,
+      @Nullable ValueProvider<ProjectPath> project,
+      @Nullable ValueProvider<TopicPath> topic,
+      @Nullable ValueProvider<SubscriptionPath> subscription,
       Coder<T> elementCoder,
       @Nullable String timestampLabel,
       @Nullable String idLabel) {
@@ -1300,17 +1305,17 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin,
PCollection<T>>
 
   @Nullable
   public ProjectPath getProject() {
-    return project;
+    return project == null ? null : project.get();
   }
 
   @Nullable
   public TopicPath getTopic() {
-    return topic;
+    return topic == null ? null : topic.get();
   }
 
   @Nullable
   public SubscriptionPath getSubscription() {
-    return subscription;
+    return subscription == null ? null : subscription.get();
   }
 
   @Nullable
@@ -1335,8 +1340,11 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin,
PCollection<T>>
     try {
       try (PubsubClient pubsubClient =
           pubsubFactory.newClient(timestampLabel, idLabel, options.as(PubsubOptions.class)))
{
+        checkState(project.isAccessible(), "createRandomSubscription must be called at runtime.");
+        checkState(topic.isAccessible(), "createRandomSubscription must be called at runtime.");
         SubscriptionPath subscriptionPath =
-            pubsubClient.createRandomSubscription(project, topic, DEAULT_ACK_TIMEOUT_SEC);
+            pubsubClient.createRandomSubscription(
+                project.get(), topic.get(), DEAULT_ACK_TIMEOUT_SEC);
         LOG.warn(
             "Created subscription {} to topic {}."
                 + " Note this subscription WILL NOT be deleted when the pipeline terminates",

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f9225981/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index 086b726..b73afb2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -20,9 +20,12 @@ package org.apache.beam.sdk.io;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
 import java.util.Set;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
@@ -65,29 +68,13 @@ public class PubsubIOTest {
   }
 
   @Test
-  public void testTopicValidationBadCharacter() throws Exception {
-    thrown.expect(IllegalArgumentException.class);
-    PubsubIO.Read.topic("projects/my-project/topics/abc-*-abc");
-  }
-
-  @Test
-  public void testTopicValidationTooLong() throws Exception {
-    thrown.expect(IllegalArgumentException.class);
-    PubsubIO.Read.topic(new StringBuilder().append("projects/my-project/topics/A-really-long-one-")
-        .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
-        .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
-        .append("1111111111111111111111111111111111111111111111111111111111111111111111111111")
-        .toString());
-  }
-
-  @Test
   public void testReadDisplayData() {
     String topic = "projects/project/topics/topic";
     String subscription = "projects/project/subscriptions/subscription";
     Duration maxReadTime = Duration.standardMinutes(5);
     PubsubIO.Read.Bound<String> read = PubsubIO.Read
-        .topic(topic)
-        .subscription(subscription)
+        .topic(StaticValueProvider.of(topic))
+        .subscription(StaticValueProvider.of(subscription))
         .timestampLabel("myTimestamp")
         .idLabel("myId")
         .maxNumRecords(1234)
@@ -104,6 +91,26 @@ public class PubsubIOTest {
   }
 
   @Test
+  public void testNullTopic() {
+    String subscription = "projects/project/subscriptions/subscription";
+    PubsubIO.Read.Bound<String> read = PubsubIO.Read
+        .subscription(StaticValueProvider.of(subscription));
+    assertNull(read.getTopic());
+    assertNotNull(read.getSubscription());
+    assertNotNull(DisplayData.from(read));
+  }
+
+  @Test
+  public void testNullSubscription() {
+    String topic = "projects/project/topics/topic";
+    PubsubIO.Read.Bound<String> read = PubsubIO.Read
+        .topic(StaticValueProvider.of(topic));
+    assertNotNull(read.getTopic());
+    assertNull(read.getSubscription());
+    assertNotNull(DisplayData.from(read));
+  }
+
+  @Test
   @Category(RunnableOnService.class)
   public void testPrimitiveReadDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f9225981/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
index 4edd9c1..518136f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.PubsubUnboundedSink.RecordIdMethod;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -84,9 +85,9 @@ public class PubsubUnboundedSinkTest {
              PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
                                                       ImmutableList.<OutgoingMessage>of()))
{
       PubsubUnboundedSink<String> sink =
-          new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL,
ID_LABEL,
-                                    NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
-                                    RecordIdMethod.DETERMINISTIC);
+          new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(),
+              TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
+              Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC);
       TestPipeline p = TestPipeline.create();
       p.apply(Create.of(ImmutableList.of(DATA)))
        .apply(ParDo.of(new Stamp()))
@@ -113,9 +114,9 @@ public class PubsubUnboundedSinkTest {
              PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
                                                       ImmutableList.<OutgoingMessage>of()))
{
       PubsubUnboundedSink<String> sink =
-          new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL,
ID_LABEL,
-                                    NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
-                                    RecordIdMethod.DETERMINISTIC);
+          new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(),
+              TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
+              Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC);
       TestPipeline p = TestPipeline.create();
       p.apply(Create.of(data))
        .apply(ParDo.of(new Stamp()))
@@ -148,9 +149,10 @@ public class PubsubUnboundedSinkTest {
              PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
                                                       ImmutableList.<OutgoingMessage>of()))
{
       PubsubUnboundedSink<String> sink =
-          new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL,
ID_LABEL,
-                                    NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
-                                    RecordIdMethod.DETERMINISTIC);
+          new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC),
+              StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
+              NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
+              RecordIdMethod.DETERMINISTIC);
       TestPipeline p = TestPipeline.create();
       p.apply(Create.of(data))
        .apply(ParDo.of(new Stamp()))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f9225981/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
index bbc6c12..f6165c5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubReader;
 import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -91,8 +92,9 @@ public class PubsubUnboundedSourceTest {
     };
     factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, incoming);
     PubsubUnboundedSource<String> source =
-        new PubsubUnboundedSource<>(clock, factory, null, null, SUBSCRIPTION, StringUtf8Coder.of(),
-                                    TIMESTAMP_LABEL, ID_LABEL);
+        new PubsubUnboundedSource<>(
+            clock, factory, null, null, StaticValueProvider.of(SUBSCRIPTION),
+            StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL);
     primSource = new PubsubSource<>(source);
   }
 
@@ -332,8 +334,8 @@ public class PubsubUnboundedSourceTest {
     PubsubUnboundedSource<String> source =
         new PubsubUnboundedSource<>(
             factory,
-            PubsubClient.projectPathFromId("my_project"),
-            topicPath,
+            StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")),
+            StaticValueProvider.of(topicPath),
             null,
             StringUtf8Coder.of(),
             null,
@@ -363,8 +365,8 @@ public class PubsubUnboundedSourceTest {
     PubsubUnboundedSource<String> source =
         new PubsubUnboundedSource<>(
             factory,
-            PubsubClient.projectPathFromId("my_project"),
-            topicPath,
+            StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")),
+            StaticValueProvider.of(topicPath),
             null,
             StringUtf8Coder.of(),
             null,


Mime
View raw message