This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a77818a feat: Allow Pub/Sub Lite Sink to support increasing partitions
new aa98bcf Merge pull request #13787 from [BEAM-10114] Allow Pub/Sub Lite Sink to support
increasing partitions
a77818a is described below
commit a77818a7e75d13d7174b4cde5cd418080975e87d
Author: Evan Palmer <palmere@google.com>
AuthorDate: Thu Jan 21 14:19:49 2021 -0500
feat: Allow Pub/Sub Lite Sink to support increasing partitions
---
.../beam/sdk/io/gcp/pubsublite/Publishers.java | 24 ++++++++++++----------
1 file changed, 13 insertions(+), 11 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java
index c44c3df..a0a075e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java
@@ -22,9 +22,10 @@ import static com.google.cloud.pubsublite.internal.UncheckedApiPreconditions.che
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.internal.Publisher;
+import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisher;
+import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
-import com.google.cloud.pubsublite.internal.wire.RoutingPublisherBuilder;
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken;
@@ -42,15 +43,16 @@ class Publishers {
checkArgument(token.isSupertypeOf(supplied.getClass()));
return (Publisher<PublishMetadata>) supplied;
}
- return RoutingPublisherBuilder.newBuilder()
- .setTopic(options.topicPath())
- .setPublisherFactory(
- partition ->
- SinglePartitionPublisherBuilder.newBuilder()
- .setTopic(options.topicPath())
- .setPartition(partition)
- .setContext(PubsubContext.of(FRAMEWORK))
- .build())
- .build();
+ return new PartitionCountWatchingPublisher(
+ PartitionCountWatchingPublisherSettings.newBuilder()
+ .setTopic(options.topicPath())
+ .setPublisherFactory(
+ partition ->
+ SinglePartitionPublisherBuilder.newBuilder()
+ .setTopic(options.topicPath())
+ .setPartition(partition)
+ .setContext(PubsubContext.of(FRAMEWORK))
+ .build())
+ .build());
}
}
|