storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [2/3] storm git commit: STORM-3290: Split configuration for storm-kafka-client Trident and non-Trident spout into two classes
Date Thu, 29 Nov 2018 18:19:11 GMT
STORM-3290: Split configuration for storm-kafka-client Trident and non-Trident spout into two classes


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/63de17ed
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/63de17ed
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/63de17ed

Branch: refs/heads/master
Commit: 63de17eda9220163867ea051d03adc44eaa7d113
Parents: 40f1b61
Author: Stig Rohde Døssing <srdo@apache.org>
Authored: Tue Nov 20 13:03:25 2018 +0100
Committer: Stig Rohde Døssing <srdo@apache.org>
Committed: Tue Nov 20 19:57:25 2018 +0100

----------------------------------------------------------------------
 .../KafkaSpoutTopologyMainNamedTopics.java      |   2 +-
 .../KafkaSpoutTopologyMainWildcardTopics.java   |   2 +-
 .../TridentKafkaClientTopologyNamedTopics.java  |  27 +-
 ...ridentKafkaClientTopologyWildcardTopics.java |  12 +-
 .../apache/storm/perf/KafkaClientHdfsTopo.java  |   3 +-
 .../perf/KafkaClientSpoutNullBoltTopo.java      |   3 +-
 .../kafka/spout/FirstPollOffsetStrategy.java    |  42 +++
 .../apache/storm/kafka/spout/KafkaSpout.java    |  11 +-
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 330 ++++---------------
 .../spout/internal/CommonKafkaSpoutConfig.java  | 264 +++++++++++++++
 .../kafka/spout/internal/ConsumerFactory.java   |   4 +-
 .../spout/internal/ConsumerFactoryDefault.java  |   6 +-
 .../spout/trident/KafkaTridentSpoutConfig.java  | 100 ++++++
 .../trident/KafkaTridentSpoutCoordinator.java   |   8 +-
 .../spout/trident/KafkaTridentSpoutEmitter.java |  32 +-
 .../spout/trident/KafkaTridentSpoutOpaque.java  |   4 +-
 .../trident/KafkaTridentSpoutTransactional.java |   4 +-
 .../trident/internal/OutputFieldsExtractor.java |   4 +-
 .../kafka/spout/KafkaSpoutAbstractTest.java     |   4 +-
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |   1 -
 .../kafka/spout/KafkaSpoutReactivationTest.java |   4 +-
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |   2 +-
 ...outTopologyDeployActivateDeactivateTest.java |   2 +-
 .../kafka/spout/MaxUncommittedOffsetTest.java   |   2 +-
 .../SingleTopicKafkaSpoutConfiguration.java     |   2 +-
 .../KafkaTridentSpoutBatchMetadataTest.java     |   2 -
 .../trident/KafkaTridentSpoutEmitterTest.java   |  16 +-
 .../KafkaTridentSpoutOpaqueCoordinatorTest.java |  11 +-
 ...ngleTopicKafkaTridentSpoutConfiguration.java |  47 +++
 29 files changed, 591 insertions(+), 360 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java
index 6703738..beb7948 100644
--- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java
+++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java
@@ -18,7 +18,7 @@
 
 package org.apache.storm.kafka.spout;
 
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.EARLIEST;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.storm.Config;

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainWildcardTopics.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainWildcardTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainWildcardTopics.java
index 714f476..a574078 100644
--- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainWildcardTopics.java
+++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainWildcardTopics.java
@@ -18,7 +18,7 @@
 
 package org.apache.storm.kafka.spout;
 
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.EARLIEST;
 
 import java.util.regex.Pattern;
 import org.apache.kafka.clients.consumer.ConsumerConfig;

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java
index 6fecb5c..3c92a22 100644
--- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java
+++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java
@@ -18,11 +18,10 @@
 
 package org.apache.storm.kafka.trident;
 
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.EARLIEST;
 
 import java.io.Serializable;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.Config;
@@ -32,10 +31,7 @@ import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.InvalidTopologyException;
 import org.apache.storm.kafka.bolt.KafkaProducerTopology;
 import org.apache.storm.kafka.spout.Func;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
-import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
-import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
+import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
 import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
 import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutTransactional;
 import org.apache.storm.trident.spout.ITridentDataSource;
@@ -52,12 +48,12 @@ public class TridentKafkaClientTopologyNamedTopics {
     private static final String TOPIC_2 = "test-trident-1";
     private static final String KAFKA_LOCAL_BROKER = "localhost:9092";
 
-    private KafkaTridentSpoutOpaque<String, String> newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, String> spoutConfig) {
+    private KafkaTridentSpoutOpaque<String, String> newKafkaTridentSpoutOpaque(KafkaTridentSpoutConfig<String, String> spoutConfig) {
         return new KafkaTridentSpoutOpaque<>(spoutConfig);
     }
     
     private KafkaTridentSpoutTransactional<String, String> newKafkaTridentSpoutTransactional(
-        KafkaSpoutConfig<String, String> spoutConfig) {
+        KafkaTridentSpoutConfig<String, String> spoutConfig) {
         return new KafkaTridentSpoutTransactional<>(spoutConfig);
     }
 
@@ -74,23 +70,14 @@ public class TridentKafkaClientTopologyNamedTopics {
         }
     }
     
-    protected KafkaSpoutConfig<String, String> newKafkaSpoutConfig(String bootstrapServers) {
-        return KafkaSpoutConfig.builder(bootstrapServers, TOPIC_1, TOPIC_2)
-            .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup_" + System.nanoTime())
+    protected KafkaTridentSpoutConfig<String, String> newKafkaSpoutConfig(String bootstrapServers) {
+        return KafkaTridentSpoutConfig.builder(bootstrapServers, TOPIC_1, TOPIC_2)
             .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200)
             .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
-            .setRetry(newRetryService())
-            .setOffsetCommitPeriodMs(10_000)
             .setFirstPollOffsetStrategy(EARLIEST)
-            .setMaxUncommittedOffsets(250)
             .build();
     }
 
-    protected KafkaSpoutRetryService newRetryService() {
-        return new KafkaSpoutRetryExponentialBackoff(new TimeInterval(500L, TimeUnit.MICROSECONDS),
-            TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
-    }
-
     public static void main(String[] args) throws Exception {
         new TridentKafkaClientTopologyNamedTopics().run(args);
     }
@@ -109,7 +96,7 @@ public class TridentKafkaClientTopologyNamedTopics {
         StormSubmitter.submitTopology(TOPIC_1 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_1));
         StormSubmitter.submitTopology(TOPIC_2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_2));
         // Consumer
-        KafkaSpoutConfig<String, String> spoutConfig = newKafkaSpoutConfig(brokerUrl);
+        KafkaTridentSpoutConfig<String, String> spoutConfig = newKafkaSpoutConfig(brokerUrl);
         ITridentDataSource spout = isOpaque ? newKafkaTridentSpoutOpaque(spoutConfig) : newKafkaTridentSpoutTransactional(spoutConfig);
         StormSubmitter.submitTopology("topics-consumer", tpConf,
             TridentKafkaConsumerTopology.newTopology(spout));

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyWildcardTopics.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyWildcardTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyWildcardTopics.java
index d770ac4..7da8f91 100644
--- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyWildcardTopics.java
+++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyWildcardTopics.java
@@ -18,11 +18,11 @@
 
 package org.apache.storm.kafka.trident;
 
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.EARLIEST;
 
 import java.util.regex.Pattern;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
@@ -33,15 +33,11 @@ public class TridentKafkaClientTopologyWildcardTopics extends TridentKafkaClient
     private static final Pattern TOPIC_WILDCARD_PATTERN = Pattern.compile("test-trident(-1)?");
 
     @Override
-    protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig(String bootstrapServers) {
-        return KafkaSpoutConfig.builder(bootstrapServers, TOPIC_WILDCARD_PATTERN)
-                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
+    protected KafkaTridentSpoutConfig<String,String> newKafkaSpoutConfig(String bootstrapServers) {
+        return KafkaTridentSpoutConfig.builder(bootstrapServers, TOPIC_WILDCARD_PATTERN)
                 .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200)
                 .setRecordTranslator((r) -> new Values(r.value()), new Fields("str"))
-                .setRetry(newRetryService())
-                .setOffsetCommitPeriodMs(10_000)
                 .setFirstPollOffsetStrategy(EARLIEST)
-                .setMaxUncommittedOffsets(250)
                 .build();
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java
index 67117e4..e269873 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java
@@ -29,6 +29,7 @@ import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
 import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
 import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
 import org.apache.storm.kafka.spout.KafkaSpout;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.perf.utils.Helper;
@@ -80,7 +81,7 @@ public class KafkaClientHdfsTopo {
 
         KafkaSpoutConfig<String, String> spoutConfig = KafkaSpoutConfig.builder(bootstrapHosts, topicName)
                                                                        .setFirstPollOffsetStrategy(
-                                                                           KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)
+                                                                           FirstPollOffsetStrategy.EARLIEST)
                                                                        .build();
 
         KafkaSpout<String, String> spout = new KafkaSpout<>(spoutConfig);

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java
index a75785d..4ac1ed8 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java
@@ -20,6 +20,7 @@ import java.util.Map;
 import java.util.Optional;
 import org.apache.storm.Config;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
 import org.apache.storm.kafka.spout.KafkaSpout;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
@@ -72,7 +73,7 @@ public class KafkaClientSpoutNullBoltTopo {
                                                                             .setProcessingGuarantee(processingGuarantee)
                                                                             .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
                                                                             .setFirstPollOffsetStrategy(
-                                                                                KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)
+                                                                                FirstPollOffsetStrategy.EARLIEST)
                                                                             .setTupleTrackingEnforced(true)
                                                                             .build();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/FirstPollOffsetStrategy.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/FirstPollOffsetStrategy.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/FirstPollOffsetStrategy.java
new file mode 100644
index 0000000..f2e14cb
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/FirstPollOffsetStrategy.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+/**
+ * Defines how the spout seeks the offset to be used in the first poll to Kafka upon topology deployment. By default this parameter is set
+ * to UNCOMMITTED_EARLIEST.
+ */
+public enum FirstPollOffsetStrategy {
+    /**
+     * The kafka spout polls records starting in the first offset of the partition, regardless of previous commits. This setting only takes
+     * effect on topology deployment
+     */
+    EARLIEST,
+    /**
+     * The kafka spout polls records starting at the end of the partition, regardless of previous commits. This setting only takes effect on
+     * topology deployment
+     */
+    LATEST,
+    /**
+     * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as EARLIEST
+     */
+    UNCOMMITTED_EARLIEST,
+    /**
+     * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as LATEST
+     */
+    UNCOMMITTED_LATEST;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 7d7a856..5ebef80 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -18,10 +18,10 @@
 
 package org.apache.storm.kafka.spout;
 
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
@@ -47,7 +47,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.RetriableException;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
 import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
 import org.apache.storm.kafka.spout.internal.ConsumerFactory;
@@ -145,7 +144,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         rebalanceListener = new KafkaSpoutConsumerRebalanceListener();
 
-        consumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
+        consumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());
 
         tupleListener.open(conf, context);
         if (canRegisterMetrics()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index b17a47c..b6059b5 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -18,47 +18,34 @@
 
 package org.apache.storm.kafka.spout;
 
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.storm.Config;
 import org.apache.storm.annotation.InterfaceStability;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.kafka.spout.internal.CommonKafkaSpoutConfig;
 import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
-import org.apache.storm.kafka.spout.subscription.NamedTopicFilter;
-import org.apache.storm.kafka.spout.subscription.PatternTopicFilter;
-import org.apache.storm.kafka.spout.subscription.RoundRobinManualPartitioner;
 import org.apache.storm.kafka.spout.subscription.TopicFilter;
-import org.apache.storm.tuple.Fields;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics.
  */
-public class KafkaSpoutConfig<K, V> implements Serializable {
+public class KafkaSpoutConfig<K, V> extends CommonKafkaSpoutConfig<K, V> {
 
     private static final long serialVersionUID = 141902646130682494L;
-    // 200ms
-    public static final long DEFAULT_POLL_TIMEOUT_MS = 200;
     // 30s
     public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;
     // Retry forever
     public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;
     // 10,000,000 records => 80MBs of memory footprint in the worst case
     public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;
-    // 2s
-    public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000;
-
-    public static final FirstPollOffsetStrategy DEFAULT_FIRST_POLL_OFFSET_STRATEGY = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
 
     public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =
         new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
@@ -71,21 +58,11 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
 
     public static final int DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS = 60;
 
-
-    // Kafka consumer configuration
-    private final Map<String, Object> kafkaProps;
-    private final TopicFilter topicFilter;
-    private final ManualPartitioner topicPartitioner;
-    private final long pollTimeoutMs;
-
     // Kafka spout configuration
-    private final RecordTranslator<K, V> translator;
     private final long offsetCommitPeriodMs;
     private final int maxUncommittedOffsets;
-    private final FirstPollOffsetStrategy firstPollOffsetStrategy;
     private final KafkaSpoutRetryService retryService;
     private final KafkaTupleListener tupleListener;
-    private final long partitionRefreshPeriodMs;
     private final boolean emitNullTuples;
     private final ProcessingGuarantee processingGuarantee;
     private final boolean tupleTrackingEnforced;
@@ -97,18 +74,11 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
      * @param builder The Builder to construct the KafkaSpoutConfig from
      */
     public KafkaSpoutConfig(Builder<K, V> builder) {
-        setKafkaPropsForProcessingGuarantee(builder);
-        this.kafkaProps = builder.kafkaProps;
-        this.topicFilter = builder.topicFilter;
-        this.topicPartitioner = builder.topicPartitioner;
-        this.translator = builder.translator;
-        this.pollTimeoutMs = builder.pollTimeoutMs;
+        super(builder.setKafkaPropsForProcessingGuarantee());
         this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-        this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
         this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
         this.retryService = builder.retryService;
         this.tupleListener = builder.tupleListener;
-        this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
         this.emitNullTuples = builder.emitNullTuples;
         this.processingGuarantee = builder.processingGuarantee;
         this.tupleTrackingEnforced = builder.tupleTrackingEnforced;
@@ -116,36 +86,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     }
 
     /**
-     * Defines how the {@link KafkaSpout} seeks the offset to be used in the first poll to Kafka upon topology deployment.
-     * By default this parameter is set to UNCOMMITTED_EARLIEST. 
-     */
-    public enum FirstPollOffsetStrategy {
-        /**
-         * The kafka spout polls records starting in the first offset of the partition, regardless of previous commits. This setting only
-         * takes effect on topology deployment
-         */
-        EARLIEST,
-        /**
-         * The kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits. This
-         * setting only takes effect on topology deployment
-         */
-        LATEST,
-        /**
-         * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as EARLIEST
-         */
-        UNCOMMITTED_EARLIEST,
-        /**
-         * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as LATEST
-         */
-        UNCOMMITTED_LATEST;
-
-        @Override
-        public String toString() {
-            return "FirstPollOffsetStrategy{" + super.toString() + "}";
-        }
-    }
-
-    /**
      * This enum controls when the tuple with the {@link ConsumerRecord} for an offset is marked as processed,
      * i.e. when the offset can be committed to Kafka. The default value is AT_LEAST_ONCE.
      * The commit interval is controlled by {@link KafkaSpoutConfig#getOffsetsCommitPeriodMs() }, if the mode commits on an interval.
@@ -173,34 +113,27 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         NO_GUARANTEE,
     }
 
-    public static class Builder<K, V> {
+    public static class Builder<K, V> extends CommonKafkaSpoutConfig.Builder<K, V, Builder<K, V>> {
 
-        private final Map<String, Object> kafkaProps;
-        private final TopicFilter topicFilter;
-        private final ManualPartitioner topicPartitioner;
-        private RecordTranslator<K, V> translator;
-        private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
         private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
-        private FirstPollOffsetStrategy firstPollOffsetStrategy = DEFAULT_FIRST_POLL_OFFSET_STRATEGY;
         private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
         private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE;
         private KafkaTupleListener tupleListener = DEFAULT_TUPLE_LISTENER;
-        private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS;
         private boolean emitNullTuples = false;
         private ProcessingGuarantee processingGuarantee = DEFAULT_PROCESSING_GUARANTEE;
         private boolean tupleTrackingEnforced = false;
         private int metricsTimeBucketSizeInSecs = DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS;
 
         public Builder(String bootstrapServers, String... topics) {
-            this(bootstrapServers, new NamedTopicFilter(topics), new RoundRobinManualPartitioner());
+            super(bootstrapServers, topics);
         }
 
         public Builder(String bootstrapServers, Set<String> topics) {
-            this(bootstrapServers, new NamedTopicFilter(topics), new RoundRobinManualPartitioner());
+            super(bootstrapServers, topics);
         }
 
         public Builder(String bootstrapServers, Pattern topics) {
-            this(bootstrapServers, new PatternTopicFilter(topics), new RoundRobinManualPartitioner());
+            super(bootstrapServers, topics);
         }
 
         /**
@@ -211,58 +144,11 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
          * @param topicPartitioner The topic partitioner defining which topics and partitions are assinged to each spout task
          */
         public Builder(String bootstrapServers, TopicFilter topicFilter, ManualPartitioner topicPartitioner) {
-            kafkaProps = new HashMap<>();
-            if (bootstrapServers == null || bootstrapServers.isEmpty()) {
-                throw new IllegalArgumentException("bootstrap servers cannot be null");
-            }
-            kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-            this.topicFilter = topicFilter;
-            this.topicPartitioner = topicPartitioner;
-            this.translator = new DefaultRecordTranslator<>();
-        }
-
-        /**
-         * Set a {@link KafkaConsumer} property. 
-         */
-        public Builder<K, V> setProp(String key, Object value) {
-            kafkaProps.put(key, value);
-            return this;
-        }
-
-        /**
-         * Set multiple {@link KafkaConsumer} properties. 
-         */
-        public Builder<K, V> setProp(Map<String, Object> props) {
-            kafkaProps.putAll(props);
-            return this;
-        }
-
-        /**
-         * Set multiple {@link KafkaConsumer} properties. 
-         */
-        public Builder<K, V> setProp(Properties props) {
-            props.forEach((key, value) -> {
-                if (key instanceof String) {
-                    kafkaProps.put((String) key, value);
-                } else {
-                    throw new IllegalArgumentException("Kafka Consumer property keys must be Strings");
-                }
-            });
-            return this;
+            super(bootstrapServers, topicFilter, topicPartitioner);
         }
 
         //Spout Settings
         /**
-         * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s.
-         *
-         * @param pollTimeoutMs time in ms
-         */
-        public Builder<K, V> setPollTimeoutMs(long pollTimeoutMs) {
-            this.pollTimeoutMs = pollTimeoutMs;
-            return this;
-        }
-
-        /**
          * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s.
          *
          * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE} or
@@ -292,17 +178,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         }
 
         /**
-         * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start. Please refer to to the
-         * documentation in {@link FirstPollOffsetStrategy}
-         *
-         * @param firstPollOffsetStrategy Offset used by Kafka spout first poll
-         */
-        public Builder<K, V> setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) {
-            this.firstPollOffsetStrategy = firstPollOffsetStrategy;
-            return this;
-        }
-
-        /**
          * Sets the retry service for the spout to use.
          *
          * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}.
@@ -332,46 +207,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
             return this;
         }
 
-        public Builder<K, V> setRecordTranslator(RecordTranslator<K, V> translator) {
-            this.translator = translator;
-            return this;
-        }
-
-        /**
-         * Configure a translator with tuples to be emitted on the default stream.
-         *
-         * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
-         * @param fields the names of the fields extracted
-         * @return this to be able to chain configuration
-         */
-        public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) {
-            return setRecordTranslator(new SimpleRecordTranslator<>(func, fields));
-        }
-
-        /**
-         * Configure a translator with tuples to be emitted to a given stream.
-         *
-         * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
-         * @param fields the names of the fields extracted
-         * @param stream the stream to emit the tuples on
-         * @return this to be able to chain configuration
-         */
-        public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
-            return setRecordTranslator(new SimpleRecordTranslator<>(func, fields, stream));
-        }
-
-        /**
-         * Sets partition refresh period in milliseconds. This is how often Kafka will be polled to check for new topics and/or new
-         * partitions.
-         *
-         * @param partitionRefreshPeriodMs time in milliseconds
-         * @return the builder (this)
-         */
-        public Builder<K, V> setPartitionRefreshPeriodMs(long partitionRefreshPeriodMs) {
-            this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
-            return this;
-        }
-
         /**
          * Specifies if the spout should emit null tuples to the component downstream, or rather not emit and directly ack them. By default
          * this parameter is set to false, which means that null tuples are not emitted.
@@ -417,6 +252,47 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
             this.metricsTimeBucketSizeInSecs = metricsTimeBucketSizeInSecs;
             return this;
         }
+        
+        private Builder<K, V> withStringDeserializers() {
+            setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+            setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+            return this;
+        }
+        
+        private Builder<K, V> setKafkaPropsForProcessingGuarantee() {
+            if (getKafkaProps().containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+                throw new IllegalStateException("The KafkaConsumer " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+                    + " setting is not supported."
+                    + " You can configure similar behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
+            }
+            String autoOffsetResetPolicy = (String) getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+            if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
+                if (autoOffsetResetPolicy == null) {
+                    /*
+                     * If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is
+                     * configured for at-least-once processing we should default to seeking to the earliest offset in case there's an offset
+                     * out of range error, rather than seeking to the latest (Kafka's default). This type of error will typically happen
+                     * when the consumer requests an offset that was deleted.
+                     */
+                    LOG.info("Setting Kafka consumer property '{}' to 'earliest' to ensure at-least-once processing",
+                        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+                    setProp(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+                } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
+                    LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'."
+                        + " Some messages may be skipped.");
+                }
+            } else if (processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
+                if (autoOffsetResetPolicy != null
+                    && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) {
+                    LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'."
+                        + " Some messages may be processed more than once.");
+                }
+            }
+            LOG.info("Setting Kafka consumer property '{}' to 'false', because the spout does not support auto-commit",
+                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+            setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+            return this;
+        }
 
         public KafkaSpoutConfig<K, V> build() {
             return new KafkaSpoutConfig<>(this);
@@ -431,7 +307,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
      * @return The new builder
      */
     public static Builder<String, String> builder(String bootstrapServers, String... topics) {
-        return setStringDeserializers(new Builder<>(bootstrapServers, topics));
+        return new Builder<String, String>(bootstrapServers, topics).withStringDeserializers();
     }
 
     /**
@@ -442,7 +318,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
      * @return The new builder
      */
     public static Builder<String, String> builder(String bootstrapServers, Set<String> topics) {
-        return setStringDeserializers(new Builder<>(bootstrapServers, topics));
+        return new Builder<String, String>(bootstrapServers, topics).withStringDeserializers();
     }
 
     /**
@@ -453,71 +329,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
      * @return The new builder
      */
     public static Builder<String, String> builder(String bootstrapServers, Pattern topics) {
-        return setStringDeserializers(new Builder<>(bootstrapServers, topics));
-    }
-
-    private static Builder<String, String> setStringDeserializers(Builder<String, String> builder) {
-        builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        builder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        return builder;
-    }
-
-    private static void setKafkaPropsForProcessingGuarantee(Builder<?, ?> builder) {
-        if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-            throw new IllegalStateException("The KafkaConsumer " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
-                + " setting is not supported. You can configure similar behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
-        }
-        String autoOffsetResetPolicy = (String) builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-        if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
-            if (autoOffsetResetPolicy == null) {
-                /*
-                 * If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured
-                 * for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range
-                 * error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer
-                 * requests an offset that was deleted.
-                 */
-                LOG.info("Setting Kafka consumer property '{}' to 'earliest' to ensure at-least-once processing",
-                    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-                builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-            } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
-                LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'."
-                    + " Some messages may be skipped.");
-            }
-        } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
-            if (autoOffsetResetPolicy != null
-                && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) {
-                LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'."
-                    + " Some messages may be processed more than once.");
-            }
-        }
-        LOG.info("Setting Kafka consumer property '{}' to 'false', because the spout does not support auto-commit",
-            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
-        builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-    }
-
-    /**
-     * Gets the properties that will be passed to the KafkaConsumer.
-     *
-     * @return The Kafka properties map
-     */
-    public Map<String, Object> getKafkaProps() {
-        return kafkaProps;
-    }
-
-    public TopicFilter getTopicFilter() {
-        return topicFilter;
-    }
-
-    public ManualPartitioner getTopicPartitioner() {
-        return topicPartitioner;
-    }
-
-    public RecordTranslator<K, V> getTranslator() {
-        return translator;
-    }
-
-    public long getPollTimeoutMs() {
-        return pollTimeoutMs;
+        return new Builder<String, String>(bootstrapServers, topics).withStringDeserializers();
     }
 
     public long getOffsetsCommitPeriodMs() {
@@ -533,11 +345,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     }
 
     public String getConsumerGroupId() {
-        return (String) kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG);
-    }
-
-    public FirstPollOffsetStrategy getFirstPollOffsetStrategy() {
-        return firstPollOffsetStrategy;
+        return (String) getKafkaProps().get(ConsumerConfig.GROUP_ID_CONFIG);
     }
 
     public int getMaxUncommittedOffsets() {
@@ -552,10 +360,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         return tupleListener;
     }
 
-    public long getPartitionRefreshPeriodMs() {
-        return partitionRefreshPeriodMs;
-    }
-
     public boolean isEmitNullTuples() {
         return emitNullTuples;
     }
@@ -566,19 +370,15 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
 
     @Override
     public String toString() {
-        return "KafkaSpoutConfig{"
-            + "kafkaProps=" + kafkaProps
-            + ", pollTimeoutMs=" + pollTimeoutMs
-            + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs
-            + ", maxUncommittedOffsets=" + maxUncommittedOffsets
-            + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy
-            + ", topicFilter=" + topicFilter
-            + ", topicPartitioner=" + topicPartitioner
-            + ", translator=" + translator
-            + ", retryService=" + retryService
-            + ", tupleListener=" + tupleListener
-            + ", processingGuarantee=" + processingGuarantee
-            + ", metricsTimeBucketSizeInSecs=" + metricsTimeBucketSizeInSecs
-            + '}';
+        return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+            .append("offsetCommitPeriodMs", offsetCommitPeriodMs)
+            .append("maxUncommittedOffsets", maxUncommittedOffsets)
+            .append("retryService", retryService)
+            .append("tupleListener", tupleListener)
+            .append("processingGuarantee", processingGuarantee)
+            .append("emitNullTuples", emitNullTuples)
+            .append("tupleTrackingEnforced", tupleTrackingEnforced)
+            .append("metricsTimeBucketSizeInSecs", metricsTimeBucketSizeInSecs)
+            .toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommonKafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommonKafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommonKafkaSpoutConfig.java
new file mode 100644
index 0000000..9210d09
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommonKafkaSpoutConfig.java
@@ -0,0 +1,264 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.kafka.spout.DefaultRecordTranslator;
+import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
+import org.apache.storm.kafka.spout.Func;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.kafka.spout.SimpleRecordTranslator;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.NamedTopicFilter;
+import org.apache.storm.kafka.spout.subscription.PatternTopicFilter;
+import org.apache.storm.kafka.spout.subscription.RoundRobinManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class CommonKafkaSpoutConfig<K, V> implements Serializable {
+    // 200ms
+    public static final long DEFAULT_POLL_TIMEOUT_MS = 200;
+    // 2s
+    public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000;
+
+    public static final FirstPollOffsetStrategy DEFAULT_FIRST_POLL_OFFSET_STRATEGY = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+
+    public static final Logger LOG = LoggerFactory.getLogger(CommonKafkaSpoutConfig.class);
+
+    // Kafka consumer configuration
+    private final Map<String, Object> kafkaProps;
+    private final TopicFilter topicFilter;
+    private final ManualPartitioner topicPartitioner;
+    private final long pollTimeoutMs;
+
+    // Kafka spout configuration
+    private final RecordTranslator<K, V> translator;
+    private final FirstPollOffsetStrategy firstPollOffsetStrategy;
+    private final long partitionRefreshPeriodMs;
+    
+    /**
+     * Creates a new CommonKafkaSpoutConfig using a Builder.
+     *
+     * @param builder The Builder to construct the CommonKafkaSpoutConfig from
+     */
+    public CommonKafkaSpoutConfig(Builder<K, V, ?> builder) {
+        this.kafkaProps = builder.kafkaProps;
+        this.topicFilter = builder.topicFilter;
+        this.topicPartitioner = builder.topicPartitioner;
+        this.translator = builder.translator;
+        this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
+        this.pollTimeoutMs = builder.pollTimeoutMs;
+        this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
+    }
+
+    public abstract static class Builder<K, V, T extends Builder<K, V, T>> {
+
+        private final Map<String, Object> kafkaProps;
+        private final TopicFilter topicFilter;
+        private final ManualPartitioner topicPartitioner;
+        private RecordTranslator<K, V> translator;
+        private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
+        private FirstPollOffsetStrategy firstPollOffsetStrategy = DEFAULT_FIRST_POLL_OFFSET_STRATEGY;
+        private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS;
+
+        public Builder(String bootstrapServers, String... topics) {
+            this(bootstrapServers, new NamedTopicFilter(topics), new RoundRobinManualPartitioner());
+        }
+
+        public Builder(String bootstrapServers, Set<String> topics) {
+            this(bootstrapServers, new NamedTopicFilter(topics), new RoundRobinManualPartitioner());
+        }
+
+        public Builder(String bootstrapServers, Pattern topics) {
+            this(bootstrapServers, new PatternTopicFilter(topics), new RoundRobinManualPartitioner());
+        }
+
+        /**
+         * Create a KafkaSpoutConfig builder with default property values and no key/value deserializers.
+         *
+         * @param bootstrapServers The bootstrap servers the consumer will use
+         * @param topicFilter The topic filter defining which topics and partitions the spout will read
+         * @param topicPartitioner The topic partitioner defining which topics and partitions are assinged to each spout task
+         */
+        public Builder(String bootstrapServers, TopicFilter topicFilter, ManualPartitioner topicPartitioner) {
+            kafkaProps = new HashMap<>();
+            if (bootstrapServers == null || bootstrapServers.isEmpty()) {
+                throw new IllegalArgumentException("bootstrap servers cannot be null");
+            }
+            kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+            this.topicFilter = topicFilter;
+            this.topicPartitioner = topicPartitioner;
+            this.translator = new DefaultRecordTranslator<>();
+        }
+
+        /**
+         * Set a {@link KafkaConsumer} property. 
+         */
+        public T setProp(String key, Object value) {
+            kafkaProps.put(key, value);
+            return (T)this;
+        }
+
+        /**
+         * Set multiple {@link KafkaConsumer} properties. 
+         */
+        public T setProp(Map<String, Object> props) {
+            kafkaProps.putAll(props);
+            return (T)this;
+        }
+
+        /**
+         * Set multiple {@link KafkaConsumer} properties. 
+         */
+        public T setProp(Properties props) {
+            props.forEach((key, value) -> {
+                if (key instanceof String) {
+                    kafkaProps.put((String) key, value);
+                } else {
+                    throw new IllegalArgumentException("Kafka Consumer property keys must be Strings");
+                }
+            });
+            return (T)this;
+        }
+
+        //Spout Settings
+        /**
+         * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s.
+         *
+         * @param pollTimeoutMs time in ms
+         */
+        public T setPollTimeoutMs(long pollTimeoutMs) {
+            this.pollTimeoutMs = pollTimeoutMs;
+            return (T)this;
+        }
+
+        /**
+         * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start. Please refer to to the
+         * documentation in {@link FirstPollOffsetStrategy}
+         *
+         * @param firstPollOffsetStrategy Offset used by Kafka spout first poll
+         */
+        public T setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) {
+            this.firstPollOffsetStrategy = firstPollOffsetStrategy;
+            return (T)this;
+        }
+
+        public T setRecordTranslator(RecordTranslator<K, V> translator) {
+            this.translator = translator;
+            return (T)this;
+        }
+
+        /**
+         * Configure a translator with tuples to be emitted on the default stream.
+         *
+         * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
+         * @param fields the names of the fields extracted
+         * @return this to be able to chain configuration
+         */
+        public T setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) {
+            return setRecordTranslator(new SimpleRecordTranslator<>(func, fields));
+        }
+
+        /**
+         * Configure a translator with tuples to be emitted to a given stream.
+         *
+         * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
+         * @param fields the names of the fields extracted
+         * @param stream the stream to emit the tuples on
+         * @return this to be able to chain configuration
+         */
+        public T setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
+            return setRecordTranslator(new SimpleRecordTranslator<>(func, fields, stream));
+        }
+
+        /**
+         * Sets partition refresh period in milliseconds. This is how often Kafka will be polled to check for new topics and/or new
+         * partitions.
+         *
+         * @param partitionRefreshPeriodMs time in milliseconds
+         * @return the builder (this)
+         */
+        public T setPartitionRefreshPeriodMs(long partitionRefreshPeriodMs) {
+            this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
+            return (T)this;
+        }
+        
+        protected Map<String, Object> getKafkaProps() {
+            return kafkaProps;
+        }
+
+        public abstract CommonKafkaSpoutConfig<K, V> build();
+    }
+
+    /**
+     * Gets the properties that will be passed to the KafkaConsumer.
+     *
+     * @return The Kafka properties map
+     */
+    public Map<String, Object> getKafkaProps() {
+        return kafkaProps;
+    }
+
+    public TopicFilter getTopicFilter() {
+        return topicFilter;
+    }
+
+    public ManualPartitioner getTopicPartitioner() {
+        return topicPartitioner;
+    }
+
+    public RecordTranslator<K, V> getTranslator() {
+        return translator;
+    }
+
+    public FirstPollOffsetStrategy getFirstPollOffsetStrategy() {
+        return firstPollOffsetStrategy;
+    }
+
+    public long getPollTimeoutMs() {
+        return pollTimeoutMs;
+    }
+    
+    public long getPartitionRefreshPeriodMs() {
+        return partitionRefreshPeriodMs;
+    }
+
+    @Override
+    public String toString() {
+        return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+            .append("kafkaProps", kafkaProps)
+            .append("partitionRefreshPeriodMs", partitionRefreshPeriodMs)
+            .append("pollTimeoutMs", pollTimeoutMs)
+            .append("topicFilter", topicFilter)
+            .append("topicPartitioner", topicPartitioner)
+            .append("translator", translator)
+            .toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java
index 5ca7080..469a5f6 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java
@@ -17,12 +17,12 @@
 package org.apache.storm.kafka.spout.internal;
 
 import java.io.Serializable;
+import java.util.Map;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 
 /**
  * This is here to enable testing.
  */
 public interface ConsumerFactory<K, V> extends Serializable {
-    public Consumer<K,V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig);
+    public Consumer<K,V> createConsumer(Map<String, Object> consumerProps);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactoryDefault.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactoryDefault.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactoryDefault.java
index c384376..382808b 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactoryDefault.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactoryDefault.java
@@ -16,14 +16,14 @@
 
 package org.apache.storm.kafka.spout.internal;
 
+import java.util.Map;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 
 public class ConsumerFactoryDefault<K, V> implements ConsumerFactory<K, V> {
 
     @Override
-    public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
-        return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps());
+    public KafkaConsumer<K, V> createConsumer(Map<String, Object> consumerProps) {
+        return new KafkaConsumer<>(consumerProps);
     }
     
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutConfig.java
new file mode 100644
index 0000000..f62040f
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutConfig.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.storm.kafka.spout.internal.CommonKafkaSpoutConfig;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
+
+/**
+ * Defines the required Kafka-related configuration for the Trident spouts.
+ */
+public class KafkaTridentSpoutConfig<K, V> extends CommonKafkaSpoutConfig<K, V> {
+
+    private static final long serialVersionUID = 1L;
+
+    public KafkaTridentSpoutConfig(Builder<K, V> builder) {
+        super(builder);
+    }
+
+    /**
+     * Factory method that creates a Builder with String key/value deserializers.
+     *
+     * @param bootstrapServers The bootstrap servers for the consumer
+     * @param topics The topics to subscribe to
+     * @return The new builder
+     */
+    public static Builder<String, String> builder(String bootstrapServers, String... topics) {
+        return new Builder<String, String>(bootstrapServers, topics).withStringDeserializers();
+    }
+
+    /**
+     * Factory method that creates a Builder with String key/value deserializers.
+     *
+     * @param bootstrapServers The bootstrap servers for the consumer
+     * @param topics The topics to subscribe to
+     * @return The new builder
+     */
+    public static Builder<String, String> builder(String bootstrapServers, Set<String> topics) {
+        return new Builder<String, String>(bootstrapServers, topics).withStringDeserializers();
+    }
+
+    /**
+     * Factory method that creates a Builder with String key/value deserializers.
+     *
+     * @param bootstrapServers The bootstrap servers for the consumer
+     * @param topics The topic pattern to subscribe to
+     * @return The new builder
+     */
+    public static Builder<String, String> builder(String bootstrapServers, Pattern topics) {
+        return new Builder<String, String>(bootstrapServers, topics).withStringDeserializers();
+    }
+
+    public static class Builder<K, V> extends CommonKafkaSpoutConfig.Builder<K, V, Builder<K, V>> {
+
+        public Builder(String bootstrapServers, String... topics) {
+            super(bootstrapServers, topics);
+        }
+
+        public Builder(String bootstrapServers, Set<String> topics) {
+            super(bootstrapServers, topics);
+        }
+
+        public Builder(String bootstrapServers, Pattern topics) {
+            super(bootstrapServers, topics);
+        }
+
+        public Builder(String bootstrapServers, TopicFilter topicFilter, ManualPartitioner topicPartitioner) {
+            super(bootstrapServers, topicFilter, topicPartitioner);
+        }
+        
+        private Builder<K, V> withStringDeserializers() {
+            setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+            setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+            return this;
+        }
+
+        @Override
+        public KafkaTridentSpoutConfig<K, V> build() {
+            return new KafkaTridentSpoutConfig<>(this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutCoordinator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutCoordinator.java
index 4e46d4c..d8f097b 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutCoordinator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutCoordinator.java
@@ -44,7 +44,7 @@ public class KafkaTridentSpoutCoordinator<K,V> implements
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutCoordinator.class);
 
     private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
-    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+    private final KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig;
     private final Timer refreshAssignmentTimer;
     private final Consumer<K, V> consumer;
     
@@ -54,14 +54,14 @@ public class KafkaTridentSpoutCoordinator<K,V> implements
      * Creates a new coordinator based on the given spout config.
      * @param kafkaSpoutConfig The spout config to use
      */
-    public KafkaTridentSpoutCoordinator(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+    public KafkaTridentSpoutCoordinator(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig) {
         this(kafkaSpoutConfig, new ConsumerFactoryDefault<>());
     }
     
-    KafkaTridentSpoutCoordinator(KafkaSpoutConfig<K, V> kafkaSpoutConfig, ConsumerFactory<K, V> consumerFactory) {
+    KafkaTridentSpoutCoordinator(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig, ConsumerFactory<K, V> consumerFactory) {
         this.kafkaSpoutConfig = kafkaSpoutConfig;
         this.refreshAssignmentTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
-        this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig);
+        this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());
         LOG.debug("Created {}", this.toString());
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
index 22f21c7..ed86136 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -18,10 +18,10 @@
 
 package org.apache.storm.kafka.spout.trident;
 
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.Serializable;
@@ -39,7 +39,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
 import org.apache.storm.kafka.spout.RecordTranslator;
 import org.apache.storm.kafka.spout.TopicPartitionComparator;
 import org.apache.storm.kafka.spout.internal.ConsumerFactory;
@@ -58,14 +58,14 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
 
     // Kafka
     private final Consumer<K, V> consumer;
-    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+    private final KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig;
     private final TopicAssigner topicAssigner;
 
     // The first seek offset for each topic partition, i.e. the offset this spout instance started processing at.
     private final Map<TopicPartition, Long> tpToFirstSeekOffset = new HashMap<>();
 
     private final long pollTimeoutMs;
-    private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
+    private final FirstPollOffsetStrategy firstPollOffsetStrategy;
     private final RecordTranslator<K, V> translator;
     private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
     private final TopologyContext topologyContext;
@@ -76,15 +76,15 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
      * @param kafkaSpoutConfig The kafka spout config
      * @param topologyContext The topology context
      */
-    public KafkaTridentSpoutEmitter(KafkaSpoutConfig<K, V> kafkaSpoutConfig, TopologyContext topologyContext) {
+    public KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig, TopologyContext topologyContext) {
         this(kafkaSpoutConfig, topologyContext, new ConsumerFactoryDefault<>(), new TopicAssigner());
     }
 
     @VisibleForTesting
-    KafkaTridentSpoutEmitter(KafkaSpoutConfig<K, V> kafkaSpoutConfig, TopologyContext topologyContext,
+    KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig, TopologyContext topologyContext,
         ConsumerFactory<K, V> consumerFactory, TopicAssigner topicAssigner) {
         this.kafkaSpoutConfig = kafkaSpoutConfig;
-        this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig);
+        this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());
         this.topologyContext = topologyContext;
         this.translator = kafkaSpoutConfig.getTranslator();
         this.topicAssigner = topicAssigner;
@@ -191,8 +191,8 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
     }
 
     private boolean isFirstPollOffsetStrategyIgnoringCommittedOffsets() {
-        return firstPollOffsetStrategy == KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST
-            || firstPollOffsetStrategy == KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
+        return firstPollOffsetStrategy == FirstPollOffsetStrategy.EARLIEST
+            || firstPollOffsetStrategy == FirstPollOffsetStrategy.LATEST;
     }
 
     private void throwIfEmittingForUnassignedPartition(TopicPartition currBatchTp) {
@@ -347,14 +347,14 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
 
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-            LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
-                kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions);
+            LOG.info("Partitions revoked. [consumer={}, topic-partitions={}]",
+                consumer, partitions);
         }
 
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-            LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
-                kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions);
+            LOG.info("Partitions reassignment. [consumer={}, topic-partitions={}]",
+                consumer, partitions);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
index 420e6f1..e20c89b 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
@@ -35,13 +35,13 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);
 
-    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+    private final KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig;
     private final OutputFieldsExtractor outputFieldsExtractor;
     
     /**
      * Creates a new opaque transactional Trident Kafka spout.
      */
-    public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+    public KafkaTridentSpoutOpaque(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig) {
         this.kafkaSpoutConfig = kafkaSpoutConfig;
         this.outputFieldsExtractor = new OutputFieldsExtractor();
         LOG.debug("Created {}", this.toString());

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
index 2d1e9de..2eb3f29 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
@@ -30,13 +30,13 @@ public class KafkaTridentSpoutTransactional<K,V> implements IPartitionedTridentS
         Serializable {
     private static final long serialVersionUID = 1L;
     
-    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+    private final KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig;
     private final OutputFieldsExtractor outputFieldsExtractor;
 
     /**
      * Creates a new non-opaque transactional Trident Kafka spout.
      */
-    public KafkaTridentSpoutTransactional(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+    public KafkaTridentSpoutTransactional(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig) {
         this.kafkaSpoutConfig = kafkaSpoutConfig;
         this.outputFieldsExtractor = new OutputFieldsExtractor();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/internal/OutputFieldsExtractor.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/internal/OutputFieldsExtractor.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/internal/OutputFieldsExtractor.java
index 93a2a05..e702fef 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/internal/OutputFieldsExtractor.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/internal/OutputFieldsExtractor.java
@@ -17,8 +17,8 @@
 package org.apache.storm.kafka.spout.trident.internal;
 
 import java.io.Serializable;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
 import org.apache.storm.tuple.Fields;
 
 public class OutputFieldsExtractor implements Serializable {
@@ -28,7 +28,7 @@ public class OutputFieldsExtractor implements Serializable {
      * Extract the output fields from the config.
      * Throws an error if there are multiple declared output streams, since Trident only supports one output stream per spout.
      */
-    public <K, V> Fields getOutputFields(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+    public <K, V> Fields getOutputFields(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig) {
         RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
         int numStreams = translator.streams().size();
         if (numStreams > 1) {

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
index f51b159..b05f132 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
@@ -95,7 +95,7 @@ public abstract class KafkaSpoutAbstractTest {
 
         return new ConsumerFactory<String, String>() {
             @Override
-            public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
+            public KafkaConsumer<String, String> createConsumer(Map<String, Object> consumerProps) {
                 return consumerSpy;
             }
 
@@ -103,7 +103,7 @@ public abstract class KafkaSpoutAbstractTest {
     }
 
     KafkaConsumer<String, String> createConsumerSpy() {
-        return spy(new ConsumerFactoryDefault<String, String>().createConsumer(spoutConfig));
+        return spy(new ConsumerFactoryDefault<String, String>().createConsumer(spoutConfig.getKafkaProps()));
     }
 
     @AfterEach

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
index e168f07..90a1f1b 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -27,7 +27,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.util.HashMap;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
index 27c7372..af9f221 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.KafkaUnitExtension;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
 import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.kafka.spout.internal.ConsumerFactory;
 import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault;
@@ -44,7 +43,6 @@ import org.apache.storm.kafka.spout.subscription.TopicAssigner;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -78,7 +76,7 @@ public class KafkaSpoutReactivationTest {
                 .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
                 .build();
         ConsumerFactory<String, String> consumerFactory = new ConsumerFactoryDefault<>();
-        this.consumerSpy = spy(consumerFactory.createConsumer(spoutConfig));
+        this.consumerSpy = spy(consumerFactory.createConsumer(spoutConfig.getKafkaProps()));
         ConsumerFactory<String, String> consumerFactoryMock = mock(ConsumerFactory.class);
         when(consumerFactoryMock.createConsumer(any()))
             .thenReturn(consumerSpy);

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 5740b3f..111b4c3 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -208,7 +208,7 @@ public class KafkaSpoutRebalanceTest {
 
         TopicAssigner assignerMock = mock(TopicAssigner.class);
         KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(topicFilterMock, partitionerMock, -1)
-            .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
+            .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
             .build(), consumerFactory, assignerMock);
         String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
         TopicPartition assignedPartition = new TopicPartition(topic, 1);

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
index f45aaec..3c3c106 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
@@ -36,7 +36,7 @@ public class KafkaSpoutTopologyDeployActivateDeactivateTest extends KafkaSpoutAb
             KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitExtension.getKafkaUnit().getKafkaPort(),
                 Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC)))
             .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
-            .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)
+            .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
             .build();
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
index d7febc0..547a27e 100755
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
@@ -82,7 +82,7 @@ public class MaxUncommittedOffsetTest {
         //The spout must be able to reemit all retriable tuples, even if the maxPollRecords is set to a low value compared to maxUncommittedOffsets.
         assertThat("Current tests require maxPollRecords < maxUncommittedOffsets", maxPollRecords, lessThanOrEqualTo(maxUncommittedOffsets));
         spout = new KafkaSpout<>(spoutConfig);
-        new ConsumerFactoryDefault<String, String>().createConsumer(spoutConfig);
+        new ConsumerFactoryDefault<String, String>().createConsumer(spoutConfig.getKafkaProps());
     }
 
     private void prepareSpout(int msgCount) throws Exception {

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
index f7e0a96..7c2e588 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
@@ -17,8 +17,8 @@
  */
 package org.apache.storm.kafka.spout.config.builder;
 
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.EARLIEST;
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.DEFAULT_MAX_RETRIES;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
index 7f95b74..0f419be 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
@@ -16,8 +16,6 @@
 
 package org.apache.storm.kafka.spout.trident;
 
-import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutBatchMetadata;
-
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 


Mime
View raw message