beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [12/50] [abbrv] beam git commit: support topicPartition in BeamKafkaTable
Date Fri, 17 Nov 2017 20:31:03 GMT
support topicPartition in BeamKafkaTable


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/92b3a9ac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/92b3a9ac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/92b3a9ac

Branch: refs/heads/tez-runner
Commit: 92b3a9acf04450313c3c2340f6921bf433c2dc04
Parents: acfab89
Author: mingmxu <mingmxu@ebay.com>
Authored: Fri Nov 10 19:51:37 2017 -0800
Committer: James Xu <xumingmingv@gmail.com>
Committed: Mon Nov 13 14:41:29 2017 +0800

----------------------------------------------------------------------
 .../sql/meta/provider/kafka/BeamKafkaTable.java | 39 +++++++++++++++-----
 1 file changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/92b3a9ac/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
index 50f7496..8f663a3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 
@@ -45,6 +46,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
 public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable {
   private String bootstrapServers;
   private List<String> topics;
+  private List<TopicPartition> topicPartitions;
   private Map<String, Object> configUpdates;
 
   protected BeamKafkaTable(BeamRecordSqlType beamSqlRowType) {
@@ -58,6 +60,13 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
     this.topics = topics;
   }
 
+  public BeamKafkaTable(BeamRecordSqlType beamSqlRowType,
+      List<TopicPartition> topicPartitions, String bootstrapServers) {
+    super(beamSqlRowType);
+    this.bootstrapServers = bootstrapServers;
+    this.topicPartitions = topicPartitions;
+  }
+
   public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates)
{
     this.configUpdates = configUpdates;
     return this;
@@ -76,15 +85,27 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements
Serializab
 
   @Override
   public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
-    return PBegin.in(pipeline).apply("read",
-            KafkaIO.<byte[], byte[]>read()
-                .withBootstrapServers(bootstrapServers)
-                .withTopics(topics)
-                .updateConsumerProperties(configUpdates)
-                .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
-                .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
-                .withoutMetadata())
-            .apply("in_format", getPTransformForInput());
+    KafkaIO.Read<byte[], byte[]> kafkaRead = null;
+    if (topics != null) {
+      kafkaRead = KafkaIO.<byte[], byte[]>read()
+      .withBootstrapServers(bootstrapServers)
+      .withTopics(topics)
+      .updateConsumerProperties(configUpdates)
+      .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
+      .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of());
+    } else if (topicPartitions != null) {
+      kafkaRead = KafkaIO.<byte[], byte[]>read()
+          .withBootstrapServers(bootstrapServers)
+          .withTopicPartitions(topicPartitions)
+          .updateConsumerProperties(configUpdates)
+          .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
+          .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of());
+    } else {
+      throw new IllegalArgumentException("One of topics and topicPartitions must be configurated.");
+    }
+
+    return PBegin.in(pipeline).apply("read", kafkaRead.withoutMetadata())
+.apply("in_format", getPTransformForInput());
   }
 
   @Override


Mime
View raw message