beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/3] beam git commit: KafkaIO javadoc fixups
Date Tue, 09 May 2017 19:46:08 GMT
KafkaIO javadoc fixups


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fdd10892
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fdd10892
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fdd10892

Branch: refs/heads/master
Commit: fdd10892df750604af0e68bddcbda1275ceac8ed
Parents: d3ff8da
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Tue May 9 12:25:15 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Tue May 9 12:25:15 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 88 +++++++++++---------
 1 file changed, 49 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fdd10892/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index cb31ea2..a1130fc 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -329,7 +329,7 @@ public class KafkaIO {
     }
 
     /**
-     * Returns a new {@link Read} with Kafka consumer pointing to {@code bootstrapServers}.
+     * Sets the bootstrap servers for the Kafka consumer.
      */
     public Read<K, V> withBootstrapServers(String bootstrapServers) {
       return updateConsumerProperties(
@@ -338,8 +338,9 @@ public class KafkaIO {
     }
 
     /**
-     * Returns a new {@link Read} that reads from the topic.
-     * See {@link UnboundedKafkaSource#split(int, PipelineOptions)} for description
+     * Sets the topic to read from.
+     *
+     * <p>See {@link UnboundedKafkaSource#split(int, PipelineOptions)} for description
      * of how the partitions are distributed among the splits.
      */
     public Read<K, V> withTopic(String topic) {
@@ -347,9 +348,10 @@ public class KafkaIO {
     }
 
     /**
-     * Returns a new {@link Read} that reads from the topics. All the partitions from each
+     * Sets a list of topics to read from. All the partitions from each
      * of the topics are read.
-     * See {@link UnboundedKafkaSource#split(int, PipelineOptions)} for description
+     *
+     * <p>See {@link UnboundedKafkaSource#split(int, PipelineOptions)} for description
      * of how the partitions are distributed among the splits.
      */
     public Read<K, V> withTopics(List<String> topics) {
@@ -359,9 +361,10 @@ public class KafkaIO {
     }
 
     /**
-     * Returns a new {@link Read} that reads from the partitions. This allows reading only
a subset
+     * Sets a list of partitions to read from. This allows reading only a subset
      * of partitions for one or more topics when (if ever) needed.
-     * See {@link UnboundedKafkaSource#split(int, PipelineOptions)} for description
+     *
+     * <p>See {@link UnboundedKafkaSource#split(int, PipelineOptions)} for description
      * of how the partitions are distributed among the splits.
      */
     public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions)
{
@@ -370,48 +373,51 @@ public class KafkaIO {
     }
 
     /**
-     * Returns new {@link Read} with a Kafka {@link Deserializer} to interpret key bytes
read
-     * from Kafka. In addition, Beam also needs a Coder to serialize and deserialize key
objects
-     * at runtime. KafkaIO tries to infer coders for many of the common types (bytes, strings,
-     * integers, etc) based on Deserializer class. It might not be able to do so for some
types.
-     * Please use {@link #withKeyDeserializerAndCoder(Class, Coder)} to provide the key coder
-     * explicitly.
+     * Sets a Kafka {@link Deserializer} to interpret key bytes read from Kafka.
      *
-     * @see KafkaIO.Read#withKeyDeserializerAndCoder
+     * <p>In addition, Beam also needs a {@link Coder} to serialize and deserialize
key objects at
+     * runtime. KafkaIO tries to infer a coder for the key based on the {@link Deserializer}
class,
+     * however in case that fails, you can use {@link #withKeyDeserializerAndCoder(Class,
Coder)} to
+     * provide the key coder explicitly.
      */
     public Read<K, V> withKeyDeserializer(Class<? extends Deserializer<K>>
keyDeserializer) {
       return toBuilder().setKeyDeserializer(keyDeserializer).build();
     }
 
     /**
-     * Returns new {@link Read} with a Kafka {@link Deserializer} to deserializer key bytes
read
-     * from Kafka and {@link Coder} for serializing and deserializing the keys objects at
runtime.
+     * Sets a Kafka {@link Deserializer} for interpreting key bytes read from Kafka along
with a
+     * {@link Coder} for helping the Beam runner materialize key objects at runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withKeyDeserializer(Class)}.
      */
-    public Read<K, V> withKeyDeserializerAndCoder(Class<? extends Deserializer<K>>
keyDeserializer,
-                                                  Coder<K> keyCoder) {
-
-      return toBuilder()
-          .setKeyDeserializer(keyDeserializer)
-          .setKeyCoder(keyCoder)
-          .build();
+    public Read<K, V> withKeyDeserializerAndCoder(
+        Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder)
{
+      return toBuilder().setKeyDeserializer(keyDeserializer).setKeyCoder(keyCoder).build();
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} to interpret value bytes read from Kafka.
+     *
+     * <p>In addition, Beam also needs a {@link Coder} to serialize and deserialize
value objects at
+     * runtime. KafkaIO tries to infer a coder for the value based on the {@link Deserializer}
+     * class, however in case that fails, you can use {@link #withValueDeserializerAndCoder(Class,
+     * Coder)} to provide the value coder explicitly.
+     */
     public Read<K, V> withValueDeserializer(Class<? extends Deserializer<V>>
valueDeserializer) {
       return toBuilder().setValueDeserializer(valueDeserializer).build();
     }
 
     /**
-     * Returns new {@link Read} with a Kafka {@link Deserializer} to deserializer value bytes
read
-     * from Kafka and {@link Coder} for serializing and deserializing the value objects at
runtime.
+     * Sets a Kafka {@link Deserializer} for interpreting value bytes read from Kafka along
with a
+     * {@link Coder} for helping the Beam runner materialize value objects at runtime if
necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withValueDeserializer(Class)}.
      */
     public Read<K, V> withValueDeserializerAndCoder(
-        Class<? extends Deserializer<V>> valueDeserializer,
-        Coder<V> valueCoder) {
-
-      return toBuilder()
-          .setValueDeserializer(valueDeserializer)
-          .setValueCoder(valueCoder)
-          .build();
+        Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder)
{
+      return toBuilder().setValueDeserializer(valueDeserializer).setValueCoder(valueCoder).build();
     }
 
     /**
@@ -1342,15 +1348,16 @@ public class KafkaIO {
     }
 
     /**
-     * Returns a new {@link Write} transform that writes to given topic.
+     * Sets the Kafka topic to write to.
      */
     public Write<K, V> withTopic(String topic) {
       return toBuilder().setTopic(topic).build();
     }
 
     /**
-     * Returns a new {@link Write} with {@link Serializer} for serializing key (if any) to
bytes.
-     * A key is optional while writing to Kafka. Note when a key is set, its hash is used
to
+     * Sets a {@link Serializer} for serializing key (if any) to bytes.
+     *
+     * <p>A key is optional while writing to Kafka. Note when a key is set, its hash
is used to
      * determine partition in Kafka (see {@link ProducerRecord} for more details).
      */
     public Write<K, V> withKeySerializer(Class<? extends Serializer<K>>
keySerializer) {
@@ -1358,12 +1365,15 @@ public class KafkaIO {
     }
 
     /**
-     * Returns a new {@link Write} with {@link Serializer} for serializing value to bytes.
+     * Sets a {@link Serializer} for serializing value to bytes.
      */
     public Write<K, V> withValueSerializer(Class<? extends Serializer<V>>
valueSerializer) {
       return toBuilder().setValueSerializer(valueSerializer).build();
     }
 
+    /**
+     * Adds the given producer properties, overriding old values of properties with the same
key.
+     */
     public Write<K, V> updateProducerProperties(Map<String, Object> configUpdates)
{
       Map<String, Object> config = updateKafkaProperties(getProducerConfig(),
           IGNORED_PRODUCER_PROPERTIES, configUpdates);
@@ -1371,7 +1381,7 @@ public class KafkaIO {
     }
 
     /**
-     * Returns a new {@link Write} with a custom function to create Kafka producer. Primarily
used
+     * Sets a custom function to create Kafka producer. Primarily used
      * for tests. Default is {@link KafkaProducer}
      */
     public Write<K, V> withProducerFactoryFn(
@@ -1380,8 +1390,8 @@ public class KafkaIO {
     }
 
     /**
-     * Returns a new transform that writes just the values to Kafka. This is useful for writing
-     * collections of values rather thank {@link KV}s.
+     * Writes just the values to Kafka. This is useful for writing collections of values
rather
+     * thank {@link KV}s.
      */
     public PTransform<PCollection<V>, PDone> values() {
       return new KafkaValueWrite<>(toBuilder().build());


Mime
View raw message