storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [6/7] storm git commit: STORM-2953: Remove storm-kafka
Date Thu, 19 Jul 2018 03:35:02 GMT
STORM-2953: Remove storm-kafka


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e58ac3e0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e58ac3e0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e58ac3e0

Branch: refs/heads/master
Commit: e58ac3e033670aeb83e8543ab7b8779227ca10d8
Parents: 7f992a6
Author: Stig Rohde Døssing <srdo@apache.org>
Authored: Tue Jul 17 18:29:08 2018 +0200
Committer: Stig Rohde Døssing <srdo@apache.org>
Committed: Wed Jul 18 11:01:13 2018 +0200

----------------------------------------------------------------------
 docs/index.md                                   |   2 +-
 docs/storm-kafka-client.md                      |   7 +-
 docs/storm-kafka.md                             | 399 ------------------
 examples/storm-kafka-examples/pom.xml           | 110 -----
 .../kafka/trident/KafkaProducerTopology.java    |  75 ----
 .../trident/TridentKafkaConsumerTopology.java   |  42 --
 .../trident/TridentKafkaRandomStrings.java      |  82 ----
 .../kafka/trident/TridentKafkaTopology.java     |  83 ----
 external/storm-kafka-monitor/pom.xml            |   5 -
 .../storm/kafka/monitor/KafkaOffsetLagUtil.java | 298 +-------------
 .../kafka/monitor/OldKafkaSpoutOffsetQuery.java | 127 ------
 external/storm-kafka/README.md                  | 382 -----------------
 external/storm-kafka/pom.xml                    | 125 ------
 .../src/jvm/org/apache/storm/kafka/Broker.java  |  79 ----
 .../jvm/org/apache/storm/kafka/BrokerHosts.java |  20 -
 .../storm/kafka/ByteBufferSerializer.java       |  41 --
 .../storm/kafka/DynamicBrokersReader.java       | 208 ----------
 .../kafka/DynamicPartitionConnections.java      |  91 -----
 .../ExponentialBackoffMsgRetryManager.java      | 201 ---------
 .../storm/kafka/FailedFetchException.java       |  24 --
 .../storm/kafka/FailedMsgRetryManager.java      |  77 ----
 .../org/apache/storm/kafka/IntSerializer.java   |  42 --
 .../jvm/org/apache/storm/kafka/KafkaConfig.java |  49 ---
 .../jvm/org/apache/storm/kafka/KafkaError.java  |  38 --
 .../jvm/org/apache/storm/kafka/KafkaSpout.java  | 255 ------------
 .../jvm/org/apache/storm/kafka/KafkaUtils.java  | 288 -------------
 .../org/apache/storm/kafka/KeyValueScheme.java  |  21 -
 .../kafka/KeyValueSchemeAsMultiScheme.java      |  35 --
 .../storm/kafka/MessageMetadataScheme.java      |  21 -
 .../MessageMetadataSchemeAsMultiScheme.java     |  35 --
 .../jvm/org/apache/storm/kafka/Partition.java   |  85 ----
 .../storm/kafka/PartitionCoordinator.java       |  23 --
 .../apache/storm/kafka/PartitionManager.java    | 405 -------------------
 .../jvm/org/apache/storm/kafka/SpoutConfig.java |  58 ---
 .../apache/storm/kafka/StaticCoordinator.java   |  50 ---
 .../jvm/org/apache/storm/kafka/StaticHosts.java |  33 --
 .../storm/kafka/StaticPartitionConnections.java |  46 ---
 .../storm/kafka/StringKeyValueScheme.java       |  32 --
 .../kafka/StringMessageAndMetadataScheme.java   |  36 --
 .../storm/kafka/StringMultiSchemeWithTopic.java |  41 --
 .../org/apache/storm/kafka/StringScheme.java    |  44 --
 .../kafka/TopicOffsetOutOfRangeException.java   |  26 --
 .../org/apache/storm/kafka/ZkCoordinator.java   | 127 ------
 .../src/jvm/org/apache/storm/kafka/ZkHosts.java |  31 --
 .../src/jvm/org/apache/storm/kafka/ZkState.java | 112 -----
 .../org/apache/storm/kafka/bolt/KafkaBolt.java  | 172 --------
 .../FieldNameBasedTupleToKafkaMapper.java       |  43 --
 .../kafka/bolt/mapper/TupleToKafkaMapper.java   |  27 --
 .../bolt/selector/DefaultTopicSelector.java     |  29 --
 .../bolt/selector/FieldIndexTopicSelector.java  |  43 --
 .../bolt/selector/FieldNameTopicSelector.java   |  44 --
 .../kafka/bolt/selector/KafkaTopicSelector.java |  20 -
 .../apache/storm/kafka/trident/Coordinator.java |  46 ---
 .../storm/kafka/trident/DefaultCoordinator.java |  26 --
 .../trident/GlobalPartitionInformation.java     | 113 ------
 .../storm/kafka/trident/IBatchCoordinator.java  |  21 -
 .../storm/kafka/trident/IBrokerReader.java      |  24 --
 .../apache/storm/kafka/trident/MaxMetric.java   |  35 --
 .../kafka/trident/OpaqueTridentKafkaSpout.java  |  62 ---
 .../storm/kafka/trident/StaticBrokerReader.java |  44 --
 .../trident/TransactionalTridentKafkaSpout.java |  50 ---
 .../storm/kafka/trident/TridentKafkaConfig.java |  32 --
 .../kafka/trident/TridentKafkaEmitter.java      | 306 --------------
 .../storm/kafka/trident/TridentKafkaState.java  | 110 -----
 .../kafka/trident/TridentKafkaStateFactory.java |  57 ---
 .../kafka/trident/TridentKafkaUpdater.java      |  25 --
 .../storm/kafka/trident/ZkBrokerReader.java     |  79 ----
 .../FieldNameBasedTupleToKafkaMapper.java       |  36 --
 .../mapper/TridentTupleToKafkaMapper.java       |  22 -
 .../trident/selector/DefaultTopicSelector.java  |  29 --
 .../trident/selector/KafkaTopicSelector.java    |  20 -
 .../storm/kafka/DynamicBrokersReaderTest.java   | 245 -----------
 .../ExponentialBackoffMsgRetryManagerTest.java  | 279 -------------
 .../org/apache/storm/kafka/KafkaErrorTest.java  |  51 ---
 .../org/apache/storm/kafka/KafkaTestBroker.java | 177 --------
 .../org/apache/storm/kafka/KafkaUtilsTest.java  | 294 --------------
 .../storm/kafka/PartitionManagerTest.java       | 241 -----------
 .../storm/kafka/StringKeyValueSchemeTest.java   |  56 ---
 .../apache/storm/kafka/TestStringScheme.java    |  40 --
 .../test/org/apache/storm/kafka/TestUtils.java  |  97 -----
 .../apache/storm/kafka/TridentKafkaTest.java    |  75 ----
 .../apache/storm/kafka/ZkCoordinatorTest.java   | 191 ---------
 .../apache/storm/kafka/bolt/KafkaBoltTest.java  | 355 ----------------
 flux/flux-core/pom.xml                          |   5 +
 flux/flux-examples/pom.xml                      |   4 +
 flux/pom.xml                                    |   5 -
 pom.xml                                         |  31 +-
 .../apache/storm/utils/TopologySpoutLag.java    |  62 +--
 .../final-package/src/main/assembly/binary.xml  |   7 -
 89 files changed, 30 insertions(+), 8111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index 2697c47..135e563 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -91,7 +91,7 @@ But small change will not affect the user experience. We will notify the user wh
 * [Event Logging](Eventlogging.html)
 
 ### Integration With External Systems, and Other Libraries
-* [Apache Kafka Integration](storm-kafka.html), [New Kafka Consumer Integration](storm-kafka-client.html)
+* [Apache Kafka Integration](storm-kafka-client.html)
 * [Apache HBase Integration](storm-hbase.html)
 * [Apache HDFS Integration](storm-hdfs.html)
 * [Apache Hive Integration](storm-hive.html)

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
index cd7ad20..0b521a0 100644
--- a/docs/storm-kafka-client.md
+++ b/docs/storm-kafka-client.md
@@ -269,10 +269,9 @@ You can also override the kafka clients version while building from maven, with
 e.g. `mvn clean install -Dstorm.kafka.client.version=0.10.0.0`
 
 When selecting a kafka client version, you should ensure -
- 1. kafka api is compatible. storm-kafka-client module only supports **0.10 or newer** kafka client API. For older versions,
- you can use storm-kafka module (https://github.com/apache/storm/tree/master/external/storm-kafka).  
- 2. The kafka client selected by you should be wire compatible with the broker. e.g. 0.9.x client will not work with 
- 0.8.x broker. 
+ 1. The Kafka api must be compatible. The storm-kafka-client module only supports Kafka **0.10 or newer**. For older versions,
+ you can use the storm-kafka module (https://github.com/apache/storm/tree/1.x-branch/external/storm-kafka).  
+ 2. The Kafka client version selected by you should be wire compatible with the broker. Please see the [Kafka compatibility matrix](https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix).
 
 # Kafka Spout Performance Tuning
 

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/docs/storm-kafka.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka.md b/docs/storm-kafka.md
deleted file mode 100644
index 901d795..0000000
--- a/docs/storm-kafka.md
+++ /dev/null
@@ -1,399 +0,0 @@
----
-title: Storm Kafka Integration
-layout: documentation
-documentation: true
----
-
-Provides core Storm and Trident spout implementations for consuming data from Apache Kafka 0.8.x.
-
-## Spouts
-We support both Trident and core Storm spouts. For both spout implementations, we use a BrokerHost interface that
-tracks Kafka broker host to partition mapping and kafkaConfig that controls some Kafka related parameters.
-
-### BrokerHosts
-In order to initialize your Kafka spout/emitter you need to construct an instance of the marker interface BrokerHosts.
-Currently, we support the following two implementations:
-
-#### ZkHosts
-ZkHosts is what you should use if you want to dynamically track Kafka broker to partition mapping. This class uses
-Kafka's ZooKeeper entries to track brokerHost -> partition mapping. You can instantiate an object by calling
-
-```java
-public ZkHosts(String brokerZkStr, String brokerZkPath)
-public ZkHosts(String brokerZkStr)
-```
-
-Where brokerZkStr is just ip:port (e.g. localhost:2181). brokerZkPath is the root directory under which all the topics and
-partition information is stored. By default this is /brokers which is what the default Kafka implementation uses.
-
-By default, the broker-partition mapping is refreshed every 60 seconds from ZooKeeper. If you want to change it, you
-should set host.refreshFreqSecs to your chosen value.
-
-#### StaticHosts
-This is an alternative implementation where broker -> partition information is static. In order to construct an instance
-of this class, you need to first construct an instance of GlobalPartitionInformation.
-
-```java
-Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
-Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
-Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
-GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
-partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
-partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
-partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
-StaticHosts hosts = new StaticHosts(partitionInfo);
-```
-
-### KafkaConfig
-The second thing needed for constructing a kafkaSpout is an instance of KafkaConfig.
-
-```java
-public KafkaConfig(BrokerHosts hosts, String topic)
-public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
-```
-
-The BrokerHosts can be any implementation of BrokerHosts interface as described above. The topic is name of Kafka topic.
-The optional ClientId is used as a part of the ZooKeeper path where the spout's current consumption offset is stored.
-
-There are 2 extensions of KafkaConfig currently in use.
-
-SpoutConfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling
-behavior specific to KafkaSpout.
-The clientId will be used to identify requests which are made using the Kafka Protocol.
-The zkRoot will be used as root to store your consumer's offset.
-The id should uniquely identify your spout.
-
-```java
-public SpoutConfig(BrokerHosts hosts, String topic, String clientId, String zkRoot, String id);
-public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);
-```
-
-In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves:
-
-```java
-// setting for how often to save the current Kafka offset to ZooKeeper
-public long stateUpdateIntervalMs = 2000;
-
-// Retry strategy for failed messages
-public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
-
-// Exponential back-off retry settings.  These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
-// calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
-// Initial delay between successive retries
-public long retryInitialDelayMs = 0;
-public double retryDelayMultiplier = 1.0;
-
-// Maximum delay between successive retries    
-public long retryDelayMaxMs = 60 * 1000;
-// Failed message will be retried infinitely if retryLimit is less than zero. 
-public int retryLimit = -1;     
-```
-
-Core KafkaSpout only accepts an instance of SpoutConfig.
-
-TridentKafkaConfig is another extension of KafkaConfig.
-TridentKafkaEmitter only accepts TridentKafkaConfig.
-
-The KafkaConfig class also has bunch of public variables that controls your application's behavior. Here are defaults:
-
-```java
-public int fetchSizeBytes = 1024 * 1024;
-public int socketTimeoutMs = 10000;
-public int fetchMaxWait = 10000;
-public int bufferSizeBytes = 1024 * 1024;
-public MultiScheme scheme = new RawMultiScheme();
-public boolean ignoreZkOffsets = false;
-public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
-public long maxOffsetBehind = Long.MAX_VALUE;
-public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
-public int metricsTimeBucketSizeInSecs = 60;
-```
-
-Most of them are self explanatory except MultiScheme.
-### MultiScheme
-MultiScheme is an interface that dictates how the ByteBuffer consumed from Kafka gets transformed into a storm tuple. It
-also controls the naming of your output field.
-
-```java
-public Iterable<List<Object>> deserialize(ByteBuffer ser);
-public Fields getOutputFields();
-```
-
-The default `RawMultiScheme` just takes the `ByteBuffer` and returns a tuple with the ByteBuffer converted to a `byte[]`. The name of the outputField is "bytes". There are alternative implementations like `SchemeAsMultiScheme` and `KeyValueSchemeAsMultiScheme` which can convert the `ByteBuffer` to `String`.
-
-There is also an extension of `SchemeAsMultiScheme`, `MessageMetadataSchemeAsMultiScheme`,
-which has an additional deserialize method that accepts the message `ByteBuffer` in addition to the `Partition` and `offset` associated with the message.
-
-```java
-public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset)
-```
-
-This is useful for auditing/replaying messages from arbitrary points on a Kafka topic, saving the partition and offset of each message of a discrete stream instead of persisting the entire message.
-
-### Failed message retry
-FailedMsgRetryManager is an interface which defines the retry strategy for a failed message. Default implementation is ExponentialBackoffMsgRetryManager which retries with exponential delays
-between consecutive retries. To use a custom implementation, set SpoutConfig.failedMsgRetryManagerClass to the full classname
-of implementation. Here is the interface 
-
-```java
-// Spout initialization can go here. This can be called multiple times during lifecycle of a worker. 
-void prepare(SpoutConfig spoutConfig, Map stormConf);
-
-// Message corresponding to offset has failed. This method is called only if retryFurther returns true for offset.
-void failed(Long offset);
-
-// Message corresponding to offset has been acked.  
-void acked(Long offset);
-
-// Message corresponding to the offset, has been re-emitted and under transit.
-void retryStarted(Long offset);
-
-/**
- * The offset of message, which is to be re-emitted. Spout will fetch messages starting from this offset
- * and resend them, except completed messages.
- */
-Long nextFailedMessageToRetry();
-
-/**
- * @return True if the message corresponding to the offset should be emitted NOW. False otherwise.
- */
-boolean shouldReEmitMsg(Long offset);
-
-/**
- * Spout will clean up the state for this offset if false is returned. If retryFurther is set to true,
- * spout will called failed(offset) in next call and acked(offset) otherwise 
- */
-boolean retryFurther(Long offset);
-
-/**
- * Spout will call this method after retryFurther returns false.
- * This gives a chance for hooking up custom logic before all clean up.
- * @param partition,offset
- */
-void cleanOffsetAfterRetries(Partition partition, Long offset);
-
-/**
- * Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.
- */
-Set<Long> clearOffsetsBefore(Long kafkaOffset);
-``` 
-
-#### Version incompatibility
-In Storm versions prior to 1.0, the MultiScheme methods accepted a `byte[]` instead of `ByteBuffer`. The `MultScheme` and the related
-Scheme apis were changed in version 1.0 to accept a ByteBuffer instead of a byte[].
-
-This means that pre 1.0 kafka spouts will not work with Storm versions 1.0 and higher. While running topologies in Storm version 1.0
-and higher, it must be ensured that the storm-kafka version is at least 1.0. Pre 1.0 shaded topology jars that bundles
-storm-kafka classes must be rebuilt with storm-kafka version 1.0 for running in clusters with storm 1.0 and higher.
-
-### Examples
-
-#### Core Spout
-
-```java
-BrokerHosts hosts = new ZkHosts(zkConnString);
-SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
-spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
-KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
-```
-
-#### Trident Spout
-
-```java
-TridentTopology topology = new TridentTopology();
-BrokerHosts zk = new ZkHosts("localhost");
-TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
-spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
-OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
-```
-
-
-### How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures
-
-As shown in the above KafkaConfig properties, you can control from where in the Kafka topic the spout begins to read by
-setting `KafkaConfig.startOffsetTime` as follows:
-
-1. `kafka.api.OffsetRequest.EarliestTime()`:  read from the beginning of the topic (i.e. from the oldest messages onwards)
-2. `kafka.api.OffsetRequest.LatestTime()`: read from the end of the topic (i.e. any new messsages that are being written to the topic)
-3. A Unix timestamp aka seconds since the epoch (e.g. via `System.currentTimeMillis()`):
-   see [How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?](https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest?) in the Kafka FAQ
-
-As the topology runs the Kafka spout keeps track of the offsets it has read and emitted by storing state information
-under the ZooKeeper path `SpoutConfig.zkRoot+ "/" + SpoutConfig.id`.  In the case of failures it recovers from the last
-written offset in ZooKeeper.
-
-> **Important:**  When re-deploying a topology make sure that the settings for `SpoutConfig.zkRoot` and `SpoutConfig.id`
-> were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the
-> offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case.
-
-This means that when a topology has run once the setting `KafkaConfig.startOffsetTime` will not have an effect for
-subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in
-ZooKeeper to determine from where it should begin (more precisely: resume) reading.
-If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should
-set the parameter `KafkaConfig.ignoreZkOffsets` to `true`.  If `true`, the spout will always begin reading from the
-offset defined by `KafkaConfig.startOffsetTime` as described above.
-
-
-## Using storm-kafka with different versions of kafka
-
-Storm-kafka's Kafka dependency is defined as `provided` scope in maven, meaning it will not be pulled in
-as a transitive dependency. This allows you to use a version of Kafka dependency compatible with your kafka cluster.
-
-When building a project with storm-kafka, you must explicitly add the Kafka dependency. For example, to
-use Kafka 0.8.1.1 built against Scala 2.10, you would use the following dependency in your `pom.xml`:
-
-```xml
-<dependency>
-    <groupId>org.apache.kafka</groupId>
-    <artifactId>kafka_2.10</artifactId>
-    <version>0.8.1.1</version>
-    <exclusions>
-        <exclusion>
-            <groupId>org.apache.zookeeper</groupId>
-            <artifactId>zookeeper</artifactId>
-        </exclusion>
-        <exclusion>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-        </exclusion>
-    </exclusions>
-</dependency>
-```
-
-Note that the ZooKeeper and log4j dependencies are excluded to prevent version conflicts with Storm's dependencies.
-
-You can also override the kafka dependency version while building from maven, with parameter `kafka.version` and `kafka.artifact.id`
-e.g. `mvn clean install -Dkafka.artifact.id=kafka_2.11 -Dkafka.version=0.9.0.1`
-
-When selecting a kafka dependency version, you should ensure - 
-
-1. kafka api is compatible with storm-kafka. Currently, only 0.9.x and 0.8.x client API is supported by storm-kafka 
-module. If you want to use a higher version, storm-kafka-client module should be used instead.
-2. The kafka client selected by you should be wire compatible with the broker. e.g. 0.9.x client will not work with 
-0.8.x broker. 
-
-
-## Writing to Kafka as part of your topology
-You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you
-are using trident you can use org.apache.storm.kafka.trident.TridentState, org.apache.storm.kafka.trident.TridentStateFactory and
-org.apache.storm.kafka.trident.TridentKafkaUpdater.
-
-You need to provide implementation of following 2 interfaces
-
-### TupleToKafkaMapper and TridentTupleToKafkaMapper
-These interfaces have 2 methods defined:
-
-```java
-K getKeyFromTuple(Tuple/TridentTuple tuple);
-V getMessageFromTuple(Tuple/TridentTuple tuple);
-```
-
-As the name suggests, these methods are called to map a tuple to Kafka key and Kafka message. If you just want one field
-as key and one field as value, then you can use the provided FieldNameBasedTupleToKafkaMapper.java
-implementation. In the KafkaBolt, the implementation always looks for a field with field name "key" and "message" if you
-use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility
-reasons. Alternatively you could also specify a different key and message field by using the non default constructor.
-In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor.
-These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper.
-
-### KafkaTopicSelector and trident KafkaTopicSelector
-This interface has only one method
-
-```java
-public interface KafkaTopicSelector {
-    String getTopics(Tuple/TridentTuple tuple);
-}
-```
-The implementation of this interface should return the topic to which the tuple's key/message mapping needs to be published
-You can return a null and the message will be ignored. If you have one static topic name then you can use
-DefaultTopicSelector.java and set the name of the topic in the constructor.
-`FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support decided which topic should to push message from tuple.
-User could specify the field name or field index in tuple ,selector will use this value as topic name which to publish message.
-When the topic name not found , `KafkaBolt` will write messages into default topic .
-Please make sure the default topic have created .
-
-### Specifying Kafka producer properties
-You can provide all the produce properties in your Storm topology by calling `KafkaBolt.withProducerProperties()` and `TridentKafkaStateFactory.withProducerProperties()`. Please see  http://kafka.apache.org/documentation.html#newproducerconfigs
-Section "Important configuration properties for the producer" for more details.
-
-### Using wildcard kafka topic match
-You can do a wildcard topic match by adding the following config
-
-```java
-Config config = new Config();
-config.put("kafka.topic.wildcard.match",true);
-```
-
-After this you can specify a wildcard topic for matching e.g. clickstream.*.log.  This will match all streams matching clickstream.my.log, clickstream.cart.log etc
-
-
-### Putting it all together
-
-For the bolt :
-
-```java
-TopologyBuilder builder = new TopologyBuilder();
-
-Fields fields = new Fields("key", "message");
-FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
-            new Values("storm", "1"),
-            new Values("trident", "1"),
-            new Values("needs", "1"),
-            new Values("javadoc", "1")
-);
-spout.setCycle(true);
-builder.setSpout("spout", spout, 5);
-//set producer properties.
-Properties props = new Properties();
-props.put("bootstrap.servers", "localhost:9092");
-props.put("acks", "1");
-props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
-KafkaBolt bolt = new KafkaBolt()
-        .withProducerProperties(props)
-        .withTopicSelector(new DefaultTopicSelector("test"))
-        .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
-builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
-
-Config conf = new Config();
-
-StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
-```
-
-For Trident:
-
-```java
-Fields fields = new Fields("word", "count");
-FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
-        new Values("storm", "1"),
-        new Values("trident", "1"),
-        new Values("needs", "1"),
-        new Values("javadoc", "1")
-);
-spout.setCycle(true);
-
-TridentTopology topology = new TridentTopology();
-Stream stream = topology.newStream("spout1", spout);
-
-//set producer properties.
-Properties props = new Properties();
-props.put("bootstrap.servers", "localhost:9092");
-props.put("acks", "1");
-props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
-TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
-        .withProducerProperties(props)
-        .withKafkaTopicSelector(new DefaultTopicSelector("test"))
-        .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
-stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
-
-Config conf = new Config();
-StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
-```
-
-## Committer Sponsors
-
- * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
-

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/examples/storm-kafka-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/pom.xml b/examples/storm-kafka-examples/pom.xml
deleted file mode 100644
index 13b5573..0000000
--- a/examples/storm-kafka-examples/pom.xml
+++ /dev/null
@@ -1,110 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <artifactId>storm</artifactId>
-        <groupId>org.apache.storm</groupId>
-        <version>2.0.0-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
-    </parent>
-
-    <artifactId>storm-kafka-examples</artifactId>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-client</artifactId>
-            <version>${project.version}</version>
-            <scope>${provided.scope}</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-kafka</artifactId>
-            <version>${project.version}</version>
-            <scope>compile</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>${storm.kafka.artifact.id}</artifactId>
-            <version>${storm.kafka.version}</version>
-            <scope>compile</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-            <version>${storm.kafka.version}</version>
-            <scope>compile</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
-                <configuration>
-                    <createDependencyReducedPom>true</createDependencyReducedPom>
-                    <filters>
-                        <filter>
-                            <artifact>*:*</artifact>
-                            <excludes>
-                                <exclude>META-INF/*.SF</exclude>
-                                <exclude>META-INF/*.sf</exclude>
-                                <exclude>META-INF/*.DSA</exclude>
-                                <exclude>META-INF/*.dsa</exclude>
-                                <exclude>META-INF/*.RSA</exclude>
-                                <exclude>META-INF/*.rsa</exclude>
-                                <exclude>META-INF/*.EC</exclude>
-                                <exclude>META-INF/*.ec</exclude>
-                                <exclude>META-INF/MSFTSIG.SF</exclude>
-                                <exclude>META-INF/MSFTSIG.RSA</exclude>
-                            </excludes>
-                        </filter>
-                    </filters>
-                </configuration>
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                        <configuration>
-                            <transformers>
-                                <transformer
-                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
-                                <transformer
-                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                </transformer>
-                            </transformers>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-checkstyle-plugin</artifactId>
-                <!--Note - the version would be inherited-->
-                <configuration>
-                    <maxAllowedViolations>26</maxAllowedViolations>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/KafkaProducerTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/KafkaProducerTopology.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/KafkaProducerTopology.java
deleted file mode 100644
index d3f55de..0000000
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/KafkaProducerTopology.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.kafka.bolt.KafkaBolt;
-import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
-import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
-import org.apache.storm.topology.TopologyBuilder;
-
-import java.util.Properties;
-import java.util.UUID;
-import org.apache.storm.lambda.LambdaSpout;
-import org.apache.storm.utils.Utils;
-
-public class KafkaProducerTopology {
-    /**
-     * Create a new topology that writes random UUIDs to Kafka.
-     *
-     * @param brokerUrl Kafka broker URL
-     * @param topicName Topic to which publish sentences
-     * @return A Storm topology that produces random UUIDs using a {@link LambdaSpout} and uses a {@link KafkaBolt} to publish the UUIDs to
-     *     the kafka topic specified
-     */
-    public static StormTopology newTopology(String brokerUrl, String topicName) {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("spout", () -> {
-            Utils.sleep(1000); //Throttle this spout a bit to avoid maxing out CPU
-            return UUID.randomUUID().toString();
-        });
-
-        /* The output field of the spout ("lambda") is provided as the boltMessageField
-          so that this gets written out as the message in the kafka topic.
-          The tuples have no key field, so the messages are written to Kafka without a key.*/
-        final KafkaBolt<String, String> bolt = new KafkaBolt<String, String>()
-            .withProducerProperties(newProps(brokerUrl, topicName))
-            .withTopicSelector(new DefaultTopicSelector(topicName))
-            .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<>("key", "lambda"));
-
-        builder.setBolt("forwardToKafka", bolt, 1).shuffleGrouping("spout");
-
-        return builder.createTopology();
-    }
-
-    /**
-     * @return the Storm config for the topology that publishes sentences to kafka using a kafka bolt.
-     */
-    private static Properties newProps(final String brokerUrl, final String topicName) {
-        return new Properties() {{
-            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
-            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
-            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
-            put(ProducerConfig.CLIENT_ID_CONFIG, topicName);
-        }};
-    }
-}
-
-

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
deleted file mode 100644
index ae76cf9..0000000
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *  
- *   http://www.apache.org/licenses/LICENSE-2.0
- *  
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.builtin.Debug;
-import org.apache.storm.trident.spout.ITridentDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TridentKafkaConsumerTopology {
-    protected static final Logger LOG = LoggerFactory.getLogger(TridentKafkaConsumerTopology.class);
-
-    /**
-     * Creates a new topology that prints inputs to stdout.
-     * @param tridentSpout The spout to use
-     */
-    public static StormTopology newTopology(ITridentDataSource tridentSpout) {
-        final TridentTopology tridentTopology = new TridentTopology();
-        final Stream spoutStream = tridentTopology.newStream("spout", tridentSpout).parallelismHint(2);
-        spoutStream.each(spoutStream.getOutputFields(), new Debug(false));
-        return tridentTopology.build();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaRandomStrings.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaRandomStrings.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaRandomStrings.java
deleted file mode 100644
index a6c90e8..0000000
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaRandomStrings.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-package org.apache.storm.kafka.trident;
-
-
-import org.apache.storm.Config;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.kafka.StringScheme;
-import org.apache.storm.kafka.ZkHosts;
-import org.apache.storm.spout.SchemeAsMultiScheme;
-
-import java.io.Serializable;
-
-/**
- * This example sets up a few topologies to put random strings in the "test" Kafka topic via the KafkaBolt,
- * and shows how to set up a Trident topology that reads from the "test" topic using the KafkaSpout.
- * Please ensure you have a Kafka instance running on localhost:9092 before you deploy this example.
- */
-public class TridentKafkaRandomStrings implements Serializable {
-    public static void main(String[] args) throws Exception {
-        final String[] zkBrokerUrl = parseUrl(args);
-        final String topicName = "test";
-        Config tpConf = new Config();
-        tpConf.setMaxSpoutPending(20);
-        String prodTpName = "kafkaBolt";
-        String consTpName = "kafka-topology-random-strings";
-
-        if (args.length == 3)  {
-            prodTpName = args[2] + "-producer";
-            consTpName = args[2] + "-consumer";
-        }
-        // Producer
-        StormSubmitter.submitTopology(prodTpName, tpConf, KafkaProducerTopology.newTopology(zkBrokerUrl[1], topicName));
-        // Consumer
-        StormSubmitter.submitTopology(consTpName, tpConf, TridentKafkaConsumerTopology.newTopology(
-                new TransactionalTridentKafkaSpout(newTridentKafkaConfig(zkBrokerUrl[0]))));
-    }
-
-    private static String[] parseUrl(String[] args) {
-        String zkUrl = "localhost:2181";        // the defaults.
-        String brokerUrl = "localhost:9092";
-
-        if (args.length > 3 || (args.length == 1 && args[0].matches("^-h|--help$"))) {
-            System.out.println("Usage: TridentKafkaWordCount [kafka zookeeper url] [kafka broker url] [topology name]");
-            System.out.println("   E.g TridentKafkaWordCount [" + zkUrl + "]" + " [" + brokerUrl + "] [wordcount]");
-            System.exit(1);
-        } else if (args.length == 1) {
-            zkUrl = args[0];
-        } else if (args.length == 2) {
-            zkUrl = args[0];
-            brokerUrl = args[1];
-        }
-
-        System.out.println("Using Kafka zookeeper uHrl: " + zkUrl + " broker url: " + brokerUrl);
-        return new String[]{zkUrl, brokerUrl};
-    }
-
-    private static TridentKafkaConfig newTridentKafkaConfig(String zkUrl) {
-        ZkHosts hosts = new ZkHosts(zkUrl);
-        TridentKafkaConfig config = new TridentKafkaConfig(hosts, "test");
-        config.scheme = new SchemeAsMultiScheme(new StringScheme());
-
-        // Consume new data from the topic
-        config.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
-        return config;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaTopology.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaTopology.java
deleted file mode 100644
index ad785b8..0000000
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaTopology.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.util.Properties;
-
-import org.apache.storm.Config;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
-import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.testing.FixedBatchSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-public class TridentKafkaTopology {
-
-    private static StormTopology buildTopology(String brokerConnectionString) {
-        Fields fields = new Fields("word", "count");
-        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
-                new Values("storm", "1"),
-                new Values("trident", "1"),
-                new Values("needs", "1"),
-                new Values("javadoc", "1")
-        );
-        spout.setCycle(true);
-
-        TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("spout1", spout);
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", brokerConnectionString);
-        props.put("acks", "1");
-        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
-        TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
-            .withProducerProperties(props)
-            .withKafkaTopicSelector(new DefaultTopicSelector("test"))
-            .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
-        stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
-
-        return topology.build();
-    }
-
-    /**
-     * To run this topology ensure you have a kafka broker running and provide connection string to broker as argument.
-     * Create a topic test with command line,
-     * kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test
-     *
-     * run this program and run the kafka consumer:
-     * kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
-     *
-     * you should see the messages flowing through.
-     *
-     * @param args
-     * @throws Exception
-     */
-    public static void main(String[] args) throws Exception {
-        if(args.length < 1) {
-            System.out.println("Please provide kafka broker url ,e.g. localhost:9092");
-        }
-        StormSubmitter.submitTopology("wordCounter", new Config(), buildTopology(args[0]));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka-monitor/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/pom.xml b/external/storm-kafka-monitor/pom.xml
index 711dbea..eb9eb25 100644
--- a/external/storm-kafka-monitor/pom.xml
+++ b/external/storm-kafka-monitor/pom.xml
@@ -49,11 +49,6 @@
             <version>${storm.kafka.client.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>${storm.kafka.artifact.id}</artifactId>
-            <version>${storm.kafka.version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
index ef65bcb..78b6993 100644
--- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
@@ -24,19 +24,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import kafka.api.OffsetRequest;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryOneTime;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.PartitionInfo;
@@ -49,105 +41,30 @@ import org.json.simple.JSONValue;
 public class KafkaOffsetLagUtil {
     private static final String OPTION_TOPIC_SHORT = "t";
     private static final String OPTION_TOPIC_LONG = "topics";
-    private static final String OPTION_OLD_CONSUMER_SHORT = "o";
-    private static final String OPTION_OLD_CONSUMER_LONG = "old-spout";
     private static final String OPTION_BOOTSTRAP_BROKERS_SHORT = "b";
     private static final String OPTION_BOOTSTRAP_BROKERS_LONG = "bootstrap-brokers";
     private static final String OPTION_GROUP_ID_SHORT = "g";
     private static final String OPTION_GROUP_ID_LONG = "groupid";
-    private static final String OPTION_TOPIC_WILDCARD_SHORT = "w";
-    private static final String OPTION_TOPIC_WILDCARD_LONG = "wildcard-topic";
-    private static final String OPTION_PARTITIONS_SHORT = "p";
-    private static final String OPTION_PARTITIONS_LONG = "partitions";
-    private static final String OPTION_LEADERS_SHORT = "l";
-    private static final String OPTION_LEADERS_LONG = "leaders";
-    private static final String OPTION_ZK_SERVERS_SHORT = "z";
-    private static final String OPTION_ZK_SERVERS_LONG = "zk-servers";
-    private static final String OPTION_ZK_COMMITTED_NODE_SHORT = "n";
-    private static final String OPTION_ZK_COMMITTED_NODE_LONG = "zk-node";
-    private static final String OPTION_ZK_BROKERS_ROOT_SHORT = "r";
-    private static final String OPTION_ZK_BROKERS_ROOT_LONG = "zk-brokers-root-node";
     private static final String OPTION_SECURITY_PROTOCOL_SHORT = "s";
     private static final String OPTION_SECURITY_PROTOCOL_LONG = "security-protocol";
 
     public static void main(String args[]) {
         try {
-            List<KafkaOffsetLagResult> results;
             Options options = buildOptions();
             CommandLineParser parser = new DefaultParser();
             CommandLine commandLine = parser.parse(options, args);
             if (!commandLine.hasOption(OPTION_TOPIC_LONG)) {
                 printUsageAndExit(options, OPTION_TOPIC_LONG + " is required");
             }
-            if (commandLine.hasOption(OPTION_OLD_CONSUMER_LONG)) {
-                OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery;
-                if (commandLine.hasOption(OPTION_GROUP_ID_LONG) || commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG) ||
-                    commandLine.hasOption(OPTION_SECURITY_PROTOCOL_LONG)) {
-                    printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or " + OPTION_BOOTSTRAP_BROKERS_LONG + " or " +
-                                               OPTION_SECURITY_PROTOCOL_LONG + " is " +
-                                               "not accepted with option " + OPTION_OLD_CONSUMER_LONG);
-                }
-                if (!commandLine.hasOption(OPTION_ZK_SERVERS_LONG) || !commandLine.hasOption(OPTION_ZK_COMMITTED_NODE_LONG)) {
-                    printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + " and " + OPTION_ZK_COMMITTED_NODE_LONG + " are required  with " +
-                                               OPTION_OLD_CONSUMER_LONG);
-                }
-                String[] topics = commandLine.getOptionValue(OPTION_TOPIC_LONG).split(",");
-                if (topics != null && topics.length > 1) {
-                    printUsageAndExit(options, "Multiple topics not supported with option " + OPTION_OLD_CONSUMER_LONG +
-                                               ". Either a single topic or a " +
-                                               "wildcard string for matching topics is supported");
-                }
-                if (commandLine.hasOption(OPTION_ZK_BROKERS_ROOT_LONG)) {
-                    if (commandLine.hasOption(OPTION_PARTITIONS_LONG) || commandLine.hasOption(OPTION_LEADERS_LONG)) {
-                        printUsageAndExit(options, OPTION_PARTITIONS_LONG + " or " + OPTION_LEADERS_LONG + " is not accepted with " +
-                                                   OPTION_ZK_BROKERS_ROOT_LONG);
-                    }
-                    oldKafkaSpoutOffsetQuery =
-                        new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue
-                            (OPTION_ZK_SERVERS_LONG), commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.hasOption
-                            (OPTION_TOPIC_WILDCARD_LONG), commandLine.getOptionValue(OPTION_ZK_BROKERS_ROOT_LONG));
-                } else {
-                    if (commandLine.hasOption(OPTION_TOPIC_WILDCARD_LONG)) {
-                        printUsageAndExit(options, OPTION_TOPIC_WILDCARD_LONG + " is not supported without " + OPTION_ZK_BROKERS_ROOT_LONG);
-                    }
-                    if (!commandLine.hasOption(OPTION_PARTITIONS_LONG) || !commandLine.hasOption(OPTION_LEADERS_LONG)) {
-                        printUsageAndExit(options, OPTION_PARTITIONS_LONG + " and " + OPTION_LEADERS_LONG + " are required if " +
-                                                   OPTION_ZK_BROKERS_ROOT_LONG +
-                                                   " is not provided");
-                    }
-                    String[] partitions = commandLine.getOptionValue(OPTION_PARTITIONS_LONG).split(",");
-                    String[] leaders = commandLine.getOptionValue(OPTION_LEADERS_LONG).split(",");
-                    if (partitions.length != leaders.length) {
-                        printUsageAndExit(options, OPTION_PARTITIONS_LONG + " and " + OPTION_LEADERS_LONG + " need to be of same size");
-                    }
-                    oldKafkaSpoutOffsetQuery =
-                        new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue
-                            (OPTION_ZK_SERVERS_LONG), commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.getOptionValue
-                            (OPTION_PARTITIONS_LONG), commandLine.getOptionValue(OPTION_LEADERS_LONG));
-                }
-                results = getOffsetLags(oldKafkaSpoutOffsetQuery);
-            } else {
-                String securityProtocol = commandLine.getOptionValue(OPTION_SECURITY_PROTOCOL_LONG);
-                String[] oldSpoutOptions = {
-                    OPTION_TOPIC_WILDCARD_LONG, OPTION_PARTITIONS_LONG, OPTION_LEADERS_LONG, OPTION_ZK_SERVERS_LONG,
-                    OPTION_ZK_COMMITTED_NODE_LONG, OPTION_ZK_BROKERS_ROOT_LONG
-                };
-                for (String oldOption : oldSpoutOptions) {
-                    if (commandLine.hasOption(oldOption)) {
-                        printUsageAndExit(options, oldOption + " is not accepted without " + OPTION_OLD_CONSUMER_LONG);
-                    }
-                }
-                if (!commandLine.hasOption(OPTION_GROUP_ID_LONG) || !commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
-                    printUsageAndExit(options, OPTION_GROUP_ID_LONG + " and " + OPTION_BOOTSTRAP_BROKERS_LONG + " are required if " +
-                                               OPTION_OLD_CONSUMER_LONG +
-                                               " is not specified");
-                }
-                NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery =
-                    new NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
-                                                 commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG),
-                                                 commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol);
-                results = getOffsetLags(newKafkaSpoutOffsetQuery);
+            String securityProtocol = commandLine.getOptionValue(OPTION_SECURITY_PROTOCOL_LONG);
+            if (!commandLine.hasOption(OPTION_GROUP_ID_LONG) || !commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
+                printUsageAndExit(options, OPTION_GROUP_ID_LONG + " and " + OPTION_BOOTSTRAP_BROKERS_LONG + " are required");
             }
+            NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery =
+                new NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
+                    commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG),
+                    commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol);
+            List<KafkaOffsetLagResult> results = getOffsetLags(newKafkaSpoutOffsetQuery);
 
             Map<String, Map<Integer, KafkaPartitionOffsetLag>> keyedResult = keyByTopicAndPartition(results);
             System.out.print(JSONValue.toJSONString(keyedResult));
@@ -188,29 +105,10 @@ public class KafkaOffsetLagUtil {
         options.addOption(OPTION_TOPIC_SHORT, OPTION_TOPIC_LONG, true,
                           "REQUIRED Topics (comma separated list) for fetching log head and spout committed " +
                           "offset");
-        options.addOption(OPTION_OLD_CONSUMER_SHORT, OPTION_OLD_CONSUMER_LONG, false, "Whether request is for old spout");
         options.addOption(OPTION_BOOTSTRAP_BROKERS_SHORT, OPTION_BOOTSTRAP_BROKERS_LONG, true,
                           "Comma separated list of bootstrap broker hosts for new " +
                           "consumer/spout e.g. hostname1:9092,hostname2:9092");
-        options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, "Group id of consumer (applicable only for new kafka spout) ");
-        options.addOption(OPTION_TOPIC_WILDCARD_SHORT, OPTION_TOPIC_WILDCARD_LONG, false,
-                          "Whether topic provided is a wildcard as supported by ZkHosts in " +
-                          "old spout");
-        options.addOption(OPTION_PARTITIONS_SHORT, OPTION_PARTITIONS_LONG, true, "Comma separated list of partitions corresponding to " +
-                                                                                 OPTION_LEADERS_LONG + " for old spout with StaticHosts");
-        options.addOption(OPTION_LEADERS_SHORT, OPTION_LEADERS_LONG, true, "Comma separated list of broker leaders corresponding to " +
-                                                                           OPTION_PARTITIONS_LONG +
-                                                                           " for old spout with StaticHosts e.g. hostname1:9092," +
-                                                                           "hostname2:9092");
-        options.addOption(OPTION_ZK_SERVERS_SHORT, OPTION_ZK_SERVERS_LONG, true,
-                          "Comma separated list of zk servers for fetching spout committed offsets  " +
-                          "and/or topic metadata for ZkHosts e.g hostname1:2181,hostname2:2181");
-        options.addOption(OPTION_ZK_COMMITTED_NODE_SHORT, OPTION_ZK_COMMITTED_NODE_LONG, true,
-                          "Zk node prefix where old kafka spout stores the committed" +
-                          " offsets without the topic and partition nodes");
-        options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, OPTION_ZK_BROKERS_ROOT_LONG, true,
-                          "Zk node prefix where kafka stores broker information e.g. " +
-                          "/brokers (applicable only for old kafka spout) ");
+        options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, "Group id of consumer");
         options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, OPTION_SECURITY_PROTOCOL_LONG, true, "Security protocol to connect to kafka");
         return options;
     }
@@ -265,182 +163,4 @@ public class KafkaOffsetLagUtil {
         }};
     }
 
-    /**
-     * @param oldKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets
-     * @return log head offset, spout offset and lag for each partition
-     */
-    public static List<KafkaOffsetLagResult> getOffsetLags(OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
-        List<KafkaOffsetLagResult> result = new ArrayList<>();
-        Map<String, List<TopicPartition>> leaders = getLeadersAndTopicPartitions(oldKafkaSpoutOffsetQuery);
-        if (leaders != null) {
-            Map<String, Map<Integer, Long>> logHeadOffsets = getLogHeadOffsets(leaders);
-            Map<String, List<Integer>> topicPartitions = new HashMap<>();
-            for (Map.Entry<String, List<TopicPartition>> entry : leaders.entrySet()) {
-                for (TopicPartition topicPartition : entry.getValue()) {
-                    if (!topicPartitions.containsKey(topicPartition.topic())) {
-                        topicPartitions.put(topicPartition.topic(), new ArrayList<Integer>());
-                    }
-                    topicPartitions.get(topicPartition.topic()).add(topicPartition.partition());
-                }
-            }
-            Map<String, Map<Integer, Long>> oldConsumerOffsets = getOldConsumerOffsetsFromZk(topicPartitions, oldKafkaSpoutOffsetQuery);
-            for (Map.Entry<String, Map<Integer, Long>> topicOffsets : logHeadOffsets.entrySet()) {
-                for (Map.Entry<Integer, Long> partitionOffsets : topicOffsets.getValue().entrySet()) {
-                    Long consumerCommittedOffset =
-                        oldConsumerOffsets.get(topicOffsets.getKey()) != null ? (Long) oldConsumerOffsets.get(topicOffsets.getKey()).get
-                            (partitionOffsets.getKey()) : -1;
-                    consumerCommittedOffset = (consumerCommittedOffset == null ? -1 : consumerCommittedOffset);
-                    KafkaOffsetLagResult kafkaOffsetLagResult = new KafkaOffsetLagResult(topicOffsets.getKey(), partitionOffsets.getKey(),
-                                                                                         consumerCommittedOffset,
-                                                                                         partitionOffsets.getValue());
-                    result.add(kafkaOffsetLagResult);
-                }
-            }
-        }
-        return result;
-    }
-
-    private static Map<String, List<TopicPartition>> getLeadersAndTopicPartitions(OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws
-        Exception {
-        Map<String, List<TopicPartition>> result = new HashMap<>();
-        // this means that kafka spout was configured with StaticHosts hosts (leader for partition)
-        if (oldKafkaSpoutOffsetQuery.getPartitions() != null) {
-            String[] partitions = oldKafkaSpoutOffsetQuery.getPartitions().split(",");
-            String[] leaders = oldKafkaSpoutOffsetQuery.getLeaders().split(",");
-            for (int i = 0; i < leaders.length; ++i) {
-                if (!result.containsKey(leaders[i])) {
-                    result.put(leaders[i], new ArrayList<TopicPartition>());
-                }
-                result.get(leaders[i]).add(new TopicPartition(oldKafkaSpoutOffsetQuery.getTopic(), Integer.parseInt(partitions[i])));
-            }
-        } else {
-            // else use zk nodes to figure out partitions and leaders for topics i.e. ZkHosts
-            CuratorFramework curatorFramework = null;
-            try {
-                String brokersZkNode = oldKafkaSpoutOffsetQuery.getBrokersZkPath();
-                if (!brokersZkNode.endsWith("/")) {
-                    brokersZkNode += "/";
-                }
-                String topicsZkPath = brokersZkNode + "topics";
-                curatorFramework =
-                    CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), 20000, 15000, new RetryOneTime(1000));
-                curatorFramework.start();
-                List<String> topics = new ArrayList<>();
-                if (oldKafkaSpoutOffsetQuery.isWildCardTopic()) {
-                    List<String> children = curatorFramework.getChildren().forPath(topicsZkPath);
-                    for (String child : children) {
-                        if (child.matches(oldKafkaSpoutOffsetQuery.getTopic())) {
-                            topics.add(child);
-                        }
-                    }
-                } else {
-                    topics.add(oldKafkaSpoutOffsetQuery.getTopic());
-                }
-                for (String topic : topics) {
-                    String partitionsPath = topicsZkPath + "/" + topic + "/partitions";
-                    List<String> children = curatorFramework.getChildren().forPath(partitionsPath);
-                    for (int i = 0; i < children.size(); ++i) {
-                        byte[] leaderData = curatorFramework.getData().forPath(partitionsPath + "/" + i + "/state");
-                        Map<Object, Object> value = (Map<Object, Object>) JSONValue.parseWithException(new String(leaderData, "UTF-8"));
-                        Integer leader = ((Number) value.get("leader")).intValue();
-                        byte[] brokerData = curatorFramework.getData().forPath(brokersZkNode + "ids/" + leader);
-                        Map<Object, Object> broker = (Map<Object, Object>) JSONValue.parseWithException(new String(brokerData, "UTF-8"));
-                        String host = (String) broker.get("host");
-                        Integer port = ((Long) broker.get("port")).intValue();
-                        String leaderBroker = host + ":" + port;
-                        if (!result.containsKey(leaderBroker)) {
-                            result.put(leaderBroker, new ArrayList<TopicPartition>());
-                        }
-                        result.get(leaderBroker).add(new TopicPartition(topic, i));
-                    }
-                }
-            } finally {
-                if (curatorFramework != null) {
-                    curatorFramework.close();
-                }
-            }
-        }
-        return result;
-    }
-
-    private static Map<String, Map<Integer, Long>> getLogHeadOffsets(Map<String, List<TopicPartition>> leadersAndTopicPartitions) {
-        Map<String, Map<Integer, Long>> result = new HashMap<>();
-        if (leadersAndTopicPartitions != null) {
-            PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1);
-            SimpleConsumer simpleConsumer = null;
-            for (Map.Entry<String, List<TopicPartition>> leader : leadersAndTopicPartitions.entrySet()) {
-                try {
-                    simpleConsumer =
-                        new SimpleConsumer(leader.getKey().split(":")[0], Integer.parseInt(leader.getKey().split(":")[1]), 10000, 64 *
-                                                                                                                                  1024,
-                                           "LogHeadOffsetRequest");
-                    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
-                        new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
-                    for (TopicPartition topicPartition : leader.getValue()) {
-                        requestInfo
-                            .put(new TopicAndPartition(topicPartition.topic(), topicPartition.partition()), partitionOffsetRequestInfo);
-                        if (!result.containsKey(topicPartition.topic())) {
-                            result.put(topicPartition.topic(), new HashMap<Integer, Long>());
-                        }
-                    }
-                    kafka.javaapi.OffsetRequest request =
-                        new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
-                                                        "LogHeadOffsetRequest");
-                    OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
-                    for (TopicPartition topicPartition : leader.getValue()) {
-                        result.get(topicPartition.topic())
-                              .put(topicPartition.partition(), response.offsets(topicPartition.topic(), topicPartition.partition())[0]);
-                    }
-                } finally {
-                    if (simpleConsumer != null) {
-                        simpleConsumer.close();
-                    }
-                }
-            }
-        }
-        return result;
-    }
-
-    private static Map<String, Map<Integer, Long>> getOldConsumerOffsetsFromZk(Map<String, List<Integer>> topicPartitions,
-                                                                               OldKafkaSpoutOffsetQuery
-                                                                                   oldKafkaSpoutOffsetQuery) throws Exception {
-        Map<String, Map<Integer, Long>> result = new HashMap<>();
-        CuratorFramework curatorFramework =
-            CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), 20000, 15000, new RetryOneTime(1000));
-        curatorFramework.start();
-        String partitionPrefix = "partition_";
-        String zkPath = oldKafkaSpoutOffsetQuery.getZkPath();
-        if (zkPath.endsWith("/")) {
-            zkPath = zkPath.substring(0, zkPath.length() - 1);
-        }
-        if (curatorFramework.checkExists().forPath(zkPath) == null) {
-            throw new IllegalArgumentException(OPTION_ZK_COMMITTED_NODE_LONG + " '" + zkPath + "' dose not exists.");
-        }
-        byte[] zkData;
-        try {
-            if (topicPartitions != null) {
-                for (Map.Entry<String, List<Integer>> topicEntry : topicPartitions.entrySet()) {
-                    Map<Integer, Long> partitionOffsets = new HashMap<>();
-                    for (Integer partition : topicEntry.getValue()) {
-                        String path =
-                            zkPath + "/" + (oldKafkaSpoutOffsetQuery.isWildCardTopic() ? topicEntry.getKey() + "/" : "") + partitionPrefix +
-                            partition;
-                        if (curatorFramework.checkExists().forPath(path) != null) {
-                            zkData = curatorFramework.getData().forPath(path);
-                            Map<Object, Object> offsetData =
-                                (Map<Object, Object>) JSONValue.parseWithException(new String(zkData, "UTF-8"));
-                            partitionOffsets.put(partition, (Long) offsetData.get("offset"));
-                        }
-                    }
-                    result.put(topicEntry.getKey(), partitionOffsets);
-                }
-            }
-        } finally {
-            if (curatorFramework != null) {
-                curatorFramework.close();
-            }
-        }
-        return result;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java
deleted file mode 100644
index ea80b64..0000000
--- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.monitor;
-
-/**
- * Class representing information for querying kafka for log head offsets, spout offsets and the difference for old kafka spout
- */
-public class OldKafkaSpoutOffsetQuery {
-    private final String topic; //single topic or a wildcard topic
-    private final String zkServers; //comma separated list of zk servers and port e.g. hostname1:2181, hostname2:2181
-    private final String zkPath; //zk node prefix without topic/partition where committed offsets are stored
-    private final boolean isWildCardTopic; //if the topic is a wildcard
-    private final String brokersZkPath; //zk node prefix where kafka stores all broker information
-    private final String partitions; //comma separated list of partitions corresponding to leaders below (for StaticHosts)
-    private final String leaders;
-        //comma separated list of leader brokers and port corresponding to the partitions above (for StaticHosts) e.g.
-    // hostname1:9092,hostname2:9092
-
-    public OldKafkaSpoutOffsetQuery(String topic, String zkServers, String zkPath, boolean isWildCardTopic, String brokersZkPath) {
-        this(topic, zkServers, zkPath, isWildCardTopic, brokersZkPath, null, null);
-    }
-
-    public OldKafkaSpoutOffsetQuery(String topic, String zkServers, String zkPath, String partitions, String leaders) {
-        this(topic, zkServers, zkPath, false, null, partitions, leaders);
-
-    }
-
-    private OldKafkaSpoutOffsetQuery(String topic, String zkServers, String zkPath, boolean isWildCardTopic, String brokersZkPath,
-                                     String partitions, String
-                                         leaders) {
-        this.topic = topic;
-        this.zkServers = zkServers;
-        this.zkPath = zkPath;
-        this.isWildCardTopic = isWildCardTopic;
-        this.brokersZkPath = brokersZkPath;
-        this.partitions = partitions;
-        this.leaders = leaders;
-    }
-
-    @Override
-    public String toString() {
-        return "OldKafkaSpoutOffsetQuery{" +
-               "topic='" + topic + '\'' +
-               ", zkServers='" + zkServers + '\'' +
-               ", zkPath='" + zkPath + '\'' +
-               ", isWildCardTopic=" + isWildCardTopic +
-               ", brokersZkPath='" + brokersZkPath + '\'' +
-               ", partitions='" + partitions + '\'' +
-               ", leaders='" + leaders + '\'' +
-               '}';
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        OldKafkaSpoutOffsetQuery that = (OldKafkaSpoutOffsetQuery) o;
-
-        if (isWildCardTopic != that.isWildCardTopic) return false;
-        if (topic != null ? !topic.equals(that.topic) : that.topic != null) return false;
-        if (zkServers != null ? !zkServers.equals(that.zkServers) : that.zkServers != null) return false;
-        if (zkPath != null ? !zkPath.equals(that.zkPath) : that.zkPath != null) return false;
-        if (brokersZkPath != null ? !brokersZkPath.equals(that.brokersZkPath) : that.brokersZkPath != null) return false;
-        if (partitions != null ? !partitions.equals(that.partitions) : that.partitions != null) return false;
-        return !(leaders != null ? !leaders.equals(that.leaders) : that.leaders != null);
-
-    }
-
-    @Override
-    public int hashCode() {
-        int result = topic != null ? topic.hashCode() : 0;
-        result = 31 * result + (zkServers != null ? zkServers.hashCode() : 0);
-        result = 31 * result + (zkPath != null ? zkPath.hashCode() : 0);
-        result = 31 * result + (isWildCardTopic ? 1 : 0);
-        result = 31 * result + (brokersZkPath != null ? brokersZkPath.hashCode() : 0);
-        result = 31 * result + (partitions != null ? partitions.hashCode() : 0);
-        result = 31 * result + (leaders != null ? leaders.hashCode() : 0);
-        return result;
-    }
-
-    public String getTopic() {
-
-        return topic;
-    }
-
-    public String getZkServers() {
-        return zkServers;
-    }
-
-    public String getZkPath() {
-        return zkPath;
-    }
-
-    public boolean isWildCardTopic() {
-        return isWildCardTopic;
-    }
-
-    public String getBrokersZkPath() {
-        return brokersZkPath;
-    }
-
-    public String getPartitions() {
-        return partitions;
-    }
-
-    public String getLeaders() {
-        return leaders;
-    }
-
-}


Mime
View raw message