storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srihar...@apache.org
Subject [2/6] storm git commit: STORM-1839: Storm spout implementation for Amazon Kinesis Streams.
Date Wed, 03 Aug 2016 04:58:19 GMT
STORM-1839: Storm spout implementation for Amazon Kinesis Streams.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/de68c267
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/de68c267
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/de68c267

Branch: refs/heads/master
Commit: de68c267fcb7555c7729c9377d3f6d1e504ec25e
Parents: 8115ef5
Author: Priyank <pshah@hortonworks.com>
Authored: Tue Jul 12 12:17:54 2016 -0700
Committer: Priyank <pshah@hortonworks.com>
Committed: Sat Jul 23 00:09:45 2016 -0700

----------------------------------------------------------------------
 external/storm-kinesis/README.md                | 139 +++++
 external/storm-kinesis/pom.xml                  |  69 +++
 .../org/apache/storm/kinesis/spout/Config.java  | 166 ++++++
 .../kinesis/spout/CredentialsProviderChain.java |  39 ++
 .../spout/ExponentialBackoffRetrier.java        | 164 ++++++
 .../spout/FailedMessageRetryHandler.java        |  48 ++
 .../kinesis/spout/KinesisConnectionInfo.java    | 137 +++++
 .../storm/kinesis/spout/KinesisMessageId.java   |  73 +++
 .../kinesis/spout/KinesisRecordsManager.java    | 566 +++++++++++++++++++
 .../storm/kinesis/spout/KinesisSpout.java       |  88 +++
 .../kinesis/spout/RecordToTupleMapper.java      |  38 ++
 .../org/apache/storm/kinesis/spout/ZkInfo.java  | 153 +++++
 .../kinesis/spout/test/KinesisBoltTest.java     |  32 ++
 .../spout/test/KinesisSpoutTopology.java        |  38 ++
 .../spout/test/TestRecordToTupleMapper.java     |  42 ++
 pom.xml                                         |   1 +
 storm-dist/binary/src/main/assembly/binary.xml  |  14 +
 17 files changed, 1807 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/README.md b/external/storm-kinesis/README.md
new file mode 100644
index 0000000..8ff2816
--- /dev/null
+++ b/external/storm-kinesis/README.md
@@ -0,0 +1,139 @@
+#Storm Kinesis Spout
+Provides core storm spout for consuming data from a stream in Amazon Kinesis Streams. It stores the sequence numbers that can be committed in zookeeper and 
+starts consuming records after that sequence number on restart by default. Below is the code sample to create a sample topology that uses the spout. Each 
+object used in configuring the spout is explained below. Ideally, the number of spout tasks should be equal to number of shards in kinesis. However each task 
+can read from more than one shard.
+
+```java
+public class KinesisSpoutTopology {
+    public static void main (String args[]) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
+        String topologyName = args[0];
+        RecordToTupleMapper recordToTupleMapper = new TestRecordToTupleMapper();
+        KinesisConnectionInfo kinesisConnectionInfo = new KinesisConnectionInfo(new CredentialsProviderChain(), new ClientConfiguration(), Regions.US_WEST_2,
+                1000);
+        org.apache.storm.kinesis.spout.Config config = new org.apache.storm.kinesis.spout.Config(args[1], ShardIteratorType.TRIM_HORIZON,
+                recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), new ZkInfo(), kinesisConnectionInfo, 10000L);
+        KinesisSpout kinesisSpout = new KinesisSpout(config);
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+        topologyBuilder.setSpout("spout", kinesisSpout, 3);
+        topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 1).shuffleGrouping("spout");
+        Config topologyConfig = new Config();
+        topologyConfig.setDebug(true);
+        topologyConfig.setNumWorkers(3);
+        StormSubmitter.submitTopology(topologyName, topologyConfig, topologyBuilder.createTopology());
+    }
+}
+```
+As you can see above the spout takes an object of Config in its constructor. The constructor of Config takes 8 objects as explained below.
+
+#### `String` streamName
+name of kinesis stream to consume data from
+
+#### `ShardIteratorType` shardIteratorType
+3 types are supported - TRIM_HORIZON(beginning of shard), LATEST and AT_TIMESTAMP. By default this argument is ignored if state for shards 
+is found in zookeeper. Hence they will apply the first time a topology is started. If you want to use any of these in subsequent runs of the topology, you 
+will need to clear the state of zookeeper node used for storing sequence numbers
+#### `RecordToTupleMapper` recordToTupleMapper
+an implementation of `RecordToTupleMapper` interface that spout will call to convert a kinesis record to a storm tuple. It has two methods. getOutputFields 
+tells the spout the fields that will be present in the tuple emitted from the getTuple method. If getTuple returns null, the record will be acked
+```java
+    Fields getOutputFields ();
+    List<Object> getTuple (Record record);
+```
+
+#### `Date` timestamp
+used in conjunction with the AT_TIMESTAMP shardIteratorType argument. This will make the spout fetch records from kinesis starting at that time or later. The
+time used by kinesis is the server side time associated to the record by kinesis
+
+#### `FailedMessageRetryHadnler` failedMessageRetryHandler 
+an implementation of the `FailedMessageRetryHandler` interface. By default this module provides an implementation that supports a exponential backoff retry
+mechanism for failed messages. That implementation has two constructors. Default no args constructor will configure first retry at 100 milliseconds and 
+subsequent retires at Math.pow(2, i-1) where i is the retry number in the range 2 to LONG.MAX_LONG. 2 represents the base for exponential function in seconds. 
+Other constructor takes retry interval in millis for first retry as first argument, base for exponential function in seconds as second argument and number of 
+retries as third argument. The methods of this interface and its working in accord with the spout is explained below
+```java
+    boolean failed (KinesisMessageId messageId);
+    KinesisMessageId getNextFailedMessageToRetry ();
+    void failedMessageEmitted (KinesisMessageId messageId);
+    void acked (KinesisMessageId messageId);
+```
+failed method will be called on every tuple that failed in the spout. It should return true if that failed message is scheduled to be retried, false otherwise.
+
+getNextFailedMessageToRetry method will be called the first thing every time a spout wants to emit a tuple. It should return a message that should be retried
+if any or null otherwise. Note that it can return null in the case it does not have any message to retry as of that moment. However, it should eventually 
+return every message for which it returned true when failed method was called for that message
+
+failedMessageEmitted will be called if spout successfully manages to get the record from kinesis and emit it. If not, the implementation should return the same 
+message when getNextFailedMessageToRetry is called again
+
+acked will be called once the failed message was re-emitted and successfully acked by the spout. If it was failed by the spout failed will be called again
+
+#### `ZkInfo` zkInfo
+an object encapsulating information for zookeeper interaction. It has two constructors. Default no args constructor takes zkUrl as first argument which 
+is a comma separated string of zk host and port, zkNode as second that will be used as the root node for storing committed sequence numbers, session timeout
+as third in milliseconds, connection timeout as fourth in milliseconds, commit interval as fifth in milliseconds for committing sequence numbers to zookeeper, 
+retry attempts as sixth for zk client connection retry attempts, retry interval as seventh in milliseconds for time to wait before retrying to connect. Default 
+constructor uses the values ["localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000]
+
+#### `KinesisConnectionInfo` kinesisConnectionInfo
+an object that captures arguments for connecting to kinesis using kinesis client. It has a constructor that takes an implementation of `AWSCredentialsProvider`
+as first argument. This module provides an implementation called `CredentialsProviderChain` that allows the spout to authenticate with kinesis using one of 
+the 5 mechanisms in this order - `EnvironmentVariableCredentialsProvider`, `SystemPropertiesCredentialsProvider`, `ClasspathPropertiesFileCredentialsProvider`, 
+`InstanceProfileCredentialsProvider`, `ProfileCredentialsProvider`. It takes an object of `ClientConfiguration` as second argument for configuring the kinesis 
+client, `Regions` as third argument that sets the region to connect to on the client and recordsLimit as the fourth argument which represents the maximum number
+of records kinesis client will retrieve for every GetRecords request. This limit should be carefully chosen based on the size of the record, kinesis 
+throughput rate limits and per tuple latency in storm for the topology. Also if one task will be reading from more than one shards then that will also affect
+the choice of limit argument
+
+#### `Long` maxUncommittedRecords
+this represents the maximum number of uncommitted sequence numbers allowed per task. Once this number is reached spout will not fetch any new records from 
+kinesis. Uncommited sequence numbers are defined as the sum of all the messages for a task that have not been committed to zookeeper. This is different from 
+topology level max pending messages. For example if this value is set to 10, and the spout emitted sequence numbers from 1 to 10. Sequence number 1 is pending 
+and 2 to 10 acked. In that case the number of uncommitted sequence numbers is 10 since no sequence number in the range 1 to 10 can be committed to zk. 
+However, storm can still call next tuple on the spout because there is only 1 pending message
+ 
+### Maven dependencies
+Aws sdk version that this was tested with is 1.10.77
+
+```xml
+ <dependencies>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk</artifactId>
+            <version>${aws-java-sdk.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+        </dependency>
+ </dependencies>
+```
+
+#Future Work
+Handle merging or splitting of shards in kinesis, Trident spout implementation and metrics
+
+## Committer Sponsors
+
+ * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/pom.xml b/external/storm-kinesis/pom.xml
new file mode 100644
index 0000000..f1872dc
--- /dev/null
+++ b/external/storm-kinesis/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+     http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+    <artifactId>storm-kinesis</artifactId>
+    <name>storm-kinesis</name>
+
+    <properties>
+        <aws-java-sdk.version>1.10.77</aws-java-sdk.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk</artifactId>
+            <version>${aws-java-sdk.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
+

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/Config.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/Config.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/Config.java
new file mode 100644
index 0000000..5be8de9
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/Config.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Date;
+
+public class Config implements Serializable {
+    // kinesis stream name to read from
+    private final String streamName;
+    // shard iterator type based on kinesis api - beginning of time, latest, at timestamp are only supported
+    private final ShardIteratorType shardIteratorType;
+    // implementation for converting a Kinesis record to a storm tuple
+    private final RecordToTupleMapper recordToTupleMapper;
+    // timestamp to be used for shardIteratorType AT_TIMESTAMP - can be null
+    private final Date timestamp;
+    // implementation for handling the failed messages retry logic
+    private final FailedMessageRetryHandler failedMessageRetryHandler;
+    // object capturing all zk related information for storing committed sequence numbers
+    private final ZkInfo zkInfo;
+    // object representing information on paramaters to use while connecting to kinesis using kinesis client
+    private final KinesisConnectionInfo kinesisConnectionInfo;
+    // this number represents the number of messages that are still not committed to zk. it will prevent the spout from emitting further.
+    // for e.g. if 1 failed and 2,3,4,5..... all have been acked by storm, they still cant be committed to zk because 1 is still in failed set. As a result
+    // the acked queue can infinitely grow without any of them being committed to zk. topology max pending does not help since from storm's view they are acked
+    private final Long maxUncommittedRecords;
+
+    public Config (String streamName, ShardIteratorType shardIteratorType, RecordToTupleMapper recordToTupleMapper, Date timestamp, FailedMessageRetryHandler
+            failedMessageRetryHandler, ZkInfo zkInfo, KinesisConnectionInfo kinesisConnectionInfo, Long maxUncommittedRecords) {
+        this.streamName = streamName;
+        this.shardIteratorType = shardIteratorType;
+        this.recordToTupleMapper = recordToTupleMapper;
+        this.timestamp = timestamp;
+        this.failedMessageRetryHandler = failedMessageRetryHandler;
+        this.zkInfo = zkInfo;
+        this.kinesisConnectionInfo = kinesisConnectionInfo;
+        this.maxUncommittedRecords = maxUncommittedRecords;
+        validate();
+    }
+
+    private void validate () {
+        if (streamName == null || streamName.length() < 1) {
+            throw new IllegalArgumentException("streamName is required and cannot be of length 0.");
+        }
+        if (shardIteratorType == null || shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType
+                .AT_SEQUENCE_NUMBER)) {
+            throw new IllegalArgumentException("shardIteratorType has to be one of the " + ShardIteratorType.AT_TIMESTAMP + "," + ShardIteratorType.LATEST +
+                    "," + ShardIteratorType.TRIM_HORIZON);
+        }
+        if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP) && timestamp == null) {
+            throw new IllegalArgumentException("timestamp must be provided if shardIteratorType is " + ShardIteratorType.AT_TIMESTAMP);
+        }
+        if (recordToTupleMapper == null) {
+            throw new IllegalArgumentException("recordToTupleMapper cannot be null");
+        }
+        if (failedMessageRetryHandler == null) {
+            throw new IllegalArgumentException("failedMessageRetryHandler cannot be null");
+        }
+        if (zkInfo == null) {
+            throw new IllegalArgumentException("zkInfo cannot be null");
+        }
+        if (kinesisConnectionInfo == null) {
+            throw new IllegalArgumentException("kinesisConnectionInfo cannot be null");
+        }
+        if (maxUncommittedRecords == null || maxUncommittedRecords < 1) {
+            throw new IllegalArgumentException("maxUncommittedRecords has to be a positive integer");
+        }
+    }
+
+    public String getStreamName() {
+        return streamName;
+    }
+
+    public ShardIteratorType getShardIteratorType() {
+        return shardIteratorType;
+    }
+
+    public RecordToTupleMapper getRecordToTupleMapper() {
+        return recordToTupleMapper;
+    }
+
+    public Date getTimestamp() {
+        return timestamp;
+    }
+
+    public FailedMessageRetryHandler getFailedMessageRetryHandler () {
+        return failedMessageRetryHandler;
+    }
+
+    public ZkInfo getZkInfo () {
+        return zkInfo;
+    }
+
+    public KinesisConnectionInfo getKinesisConnectionInfo () {
+        return kinesisConnectionInfo;
+    }
+
+    public Long getMaxUncommittedRecords () {
+        return maxUncommittedRecords;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Config config = (Config) o;
+
+        if (streamName != null ? !streamName.equals(config.streamName) : config.streamName != null) return false;
+        if (shardIteratorType != config.shardIteratorType) return false;
+        if (recordToTupleMapper != null ? !recordToTupleMapper.equals(config.recordToTupleMapper) : config.recordToTupleMapper != null) return false;
+        if (timestamp != null ? !timestamp.equals(config.timestamp) : config.timestamp != null) return false;
+        if (zkInfo != null ? !zkInfo.equals(config.zkInfo) : config.zkInfo != null) return false;
+        if (kinesisConnectionInfo != null ? !kinesisConnectionInfo.equals(config.kinesisConnectionInfo) : config.kinesisConnectionInfo != null) return false;
+        if (maxUncommittedRecords != null ? !maxUncommittedRecords.equals(config.maxUncommittedRecords) : config.maxUncommittedRecords != null) return false;
+        return !(failedMessageRetryHandler != null ? !failedMessageRetryHandler.equals(config.failedMessageRetryHandler) : config.failedMessageRetryHandler
+                != null);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = streamName != null ? streamName.hashCode() : 0;
+        result = 31 * result + (shardIteratorType != null ? shardIteratorType.hashCode() : 0);
+        result = 31 * result + (recordToTupleMapper != null ? recordToTupleMapper.hashCode() : 0);
+        result = 31 * result + (timestamp != null ? timestamp.hashCode() : 0);
+        result = 31 * result + (zkInfo != null ? zkInfo.hashCode() : 0);
+        result = 31 * result + (kinesisConnectionInfo != null ? kinesisConnectionInfo.hashCode() : 0);
+        result = 31 * result + (failedMessageRetryHandler != null ? failedMessageRetryHandler.hashCode() : 0);
+        result = 31 * result + (maxUncommittedRecords != null ? maxUncommittedRecords.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "Config{" +
+                "streamName='" + streamName + '\'' +
+                ", shardIteratorType=" + shardIteratorType +
+                ", recordToTupleMapper=" + recordToTupleMapper +
+                ", timestamp=" + timestamp +
+                ", zkInfo=" + zkInfo +
+                ", kinesisConnectionInfo=" + kinesisConnectionInfo +
+                ", failedMessageRetryHandler =" + failedMessageRetryHandler +
+                ", maxUncommittedRecords=" + maxUncommittedRecords +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
new file mode 100644
index 0000000..4287ae0
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.auth.AWSCredentialsProviderChain;
+import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
+import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.auth.InstanceProfileCredentialsProvider;
+import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
+import com.amazonaws.auth.profile.ProfileCredentialsProvider;
+
+/**
+ * Class representing chain of mechanisms that will be used in order to connect to kinesis
+ */
+public class CredentialsProviderChain extends AWSCredentialsProviderChain {
+    public CredentialsProviderChain () {
+        super(new EnvironmentVariableCredentialsProvider(),
+                new SystemPropertiesCredentialsProvider(),
+                new ClasspathPropertiesFileCredentialsProvider(),
+                new InstanceProfileCredentialsProvider(),
+                new ProfileCredentialsProvider());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
new file mode 100644
index 0000000..f357f30
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(ExponentialBackoffRetrier.class);
+    // Wait interfal for retrying after first failure
+    private final Long initialDelayMillis;
+    // Base for exponential function in seconds for retrying for second, third and so on failures
+    private final Long baseSeconds;
+    // Maximum number of retries
+    private final Long maxRetries;
+    // map to track number of failures for each kinesis message that failed
+    private Map<KinesisMessageId, Long> failCounts = new HashMap<>();
+    // map to track next retry time for each kinesis message that failed
+    private Map<KinesisMessageId, Long> retryTimes = new HashMap<>();
+    // sorted set of records to be retrued based on retry time. earliest retryTime record comes first
+    private SortedSet<KinesisMessageId> retryMessageSet = new TreeSet<>(new RetryTimeComparator());
+
+    /**
+     * no args constructor that uses defaults of 100 ms for first retry, max retries of Long.MAX_VALUE and an exponential backoff of Math.pow(2,i-1) secs for
+     * retry i where i = 2,3,
+     */
+    public ExponentialBackoffRetrier () {
+        this(100L, 2L, Long.MAX_VALUE);
+    }
+
+    /**
+     *
+     * @param initialDelayMillis delay in milliseconds for first retry
+     * @param baseSeconds base for exponent function in seconds
+     * @param maxRetries maximum number of retries before the record is discarded/acked
+     */
+    public ExponentialBackoffRetrier (Long initialDelayMillis, Long baseSeconds, Long maxRetries) {
+        this.initialDelayMillis = initialDelayMillis;
+        this.baseSeconds = baseSeconds;
+        this.maxRetries = maxRetries;
+        validate();
+    }
+
+    private void validate () {
+        if (initialDelayMillis < 0) {
+            throw new IllegalArgumentException("initialDelayMillis cannot be negative." );
+        }
+        if (baseSeconds < 0) {
+            throw new IllegalArgumentException("baseSeconds cannot be negative.");
+        }
+        if (maxRetries < 0) {
+            throw new IllegalArgumentException("maxRetries cannot be negative.");
+        }
+    }
+    @Override
+    public boolean failed(KinesisMessageId messageId) {
+        LOG.debug("Handling failed message " + messageId);
+        // if maxRetries is 0, dont retry and return false as per interface contract
+        if (maxRetries == 0) {
+            LOG.debug("maxRetries set to 0. Hence not queueing " + messageId);
+            return false;
+        }
+        // if first failure add it to the count map
+        if (!failCounts.containsKey(messageId)) {
+            failCounts.put(messageId, 0L);
+        }
+        // increment the fail count as we started with 0
+        Long failCount = failCounts.get(messageId);
+        failCounts.put(messageId, ++failCount);
+        // if fail count is greater than maxRetries, discard or ack. for e.g. for maxRetries 3, 4 failures are allowed at maximum
+        if (failCount > maxRetries) {
+            LOG.debug("maxRetries reached so dropping " + messageId);
+            failCounts.remove(messageId);
+            return false;
+        }
+        // if reached so far, add it to the set of messages waiting to be retried with next retry time based on how many times it failed
+        retryTimes.put(messageId, getRetryTime(failCount));
+        retryMessageSet.add(messageId);
+        LOG.debug("Scheduled " + messageId + " for retry at " + retryTimes.get(messageId) + " and retry attempt " + failCount);
+        return true;
+    }
+
+    @Override
+    public void acked(KinesisMessageId messageId) {
+        // message was acked after being retried. so clear the state for that message
+        LOG.debug("Ack received for " + messageId + ". Hence cleaning state.");
+        failCounts.remove(messageId);
+    }
+
+    @Override
+    public KinesisMessageId getNextFailedMessageToRetry() {
+        KinesisMessageId result = null;
+        // return the first message to be retried from the set. It will return the message with the earliest retry time <= current time
+        if (!retryMessageSet.isEmpty() ) {
+            result = retryMessageSet.first();
+            if (!(retryTimes.get(result) <= System.nanoTime())) {
+                result = null;
+            }
+        }
+        LOG.debug("Returning " + result + " to spout for retrying.");
+        return result;
+    }
+
+    @Override
+    public void failedMessageEmitted(KinesisMessageId messageId) {
+        // spout notified that message returned by us for retrying was actually emitted. hence remove it from set and wait for its ack or fail
+        // but still keep it in counts map to retry again on failure or remove on ack
+        LOG.debug("Spout says " + messageId + " emitted. Hence removing it from queue and wait for its ack or fail");
+        retryMessageSet.remove(messageId);
+        retryTimes.remove(messageId);
+    }
+
+    // private helper method to get next retry time for retry attempt i (handles long overflow as well by capping it to Long.MAX_VALUE)
+    private Long getRetryTime (Long retryNum) {
+        Long retryTime = System.nanoTime();
+        Long nanoMultiplierForMillis = 1000000L;
+        // if first retry then retry time  = current time  + initial delay
+        if (retryNum == 1) {
+            retryTime += initialDelayMillis * nanoMultiplierForMillis;
+        } else {
+            // else use the exponential backoff logic and handle long overflow
+            Long maxValue = Long.MAX_VALUE;
+            double time = Math.pow(baseSeconds, retryNum - 1) * 1000 * nanoMultiplierForMillis;
+            // if delay or delay + current time are bigger than long max value
+            // second predicate for or condition uses the fact that long addition over the limit circles back
+            if ((time >= maxValue.doubleValue()) || ((retryTime + (long) time) < retryTime)) {
+                retryTime = maxValue;
+            } else {
+                retryTime += (long) time;
+            }
+        }
+        return retryTime;
+    }
+
+    private class RetryTimeComparator implements Serializable, Comparator<KinesisMessageId> {
+        @Override
+        public int compare(KinesisMessageId o1, KinesisMessageId o2) {
+            return retryTimes.get(o1).compareTo(retryTimes.get(o2));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
new file mode 100644
index 0000000..bb0e450
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import java.io.Serializable;
+
+public interface FailedMessageRetryHandler extends Serializable {
+    /**
+     * message with messageId failed in the spout
+     * @param messageId
+     * @return true if this failed message was scheduled to be retried, false otherwise
+     */
+    boolean failed (KinesisMessageId messageId);
+
+    /**
+     * message with messageId succeeded/acked in the spout
+     * @param messageId
+     */
+    void acked (KinesisMessageId messageId);
+
+    /**
+     * Get the next failed message's id to retry if any, null otherwise
+     * @return messageId
+     */
+    KinesisMessageId getNextFailedMessageToRetry ();
+
+    /**
+     * message with messageId returned by last call to getNextFailedMessageToRetry was emitted/retried by the spout
+     * @param messageId
+     */
+    void failedMessageEmitted (KinesisMessageId messageId);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
new file mode 100644
index 0000000..5d9454a
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Regions;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+
+public class KinesisConnectionInfo implements Serializable {
+    private final byte[] serializedKinesisCredsProvider;
+    private final byte[] serializedkinesisClientConfig;
+    private final Integer recordsLimit;
+    private final Regions region;
+
+    private transient AWSCredentialsProvider credentialsProvider;
+    private transient ClientConfiguration clientConfiguration;
+
+    /**
+     *
+     * @param credentialsProvider implementation to provide credentials to connect to kinesis
+     * @param clientConfiguration client configuration to pass to kinesis client
+     * @param region region to connect to
+     * @param recordsLimit max records to be fetched in a getRecords request to kinesis
+     */
+    public KinesisConnectionInfo (AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, Regions region, Integer recordsLimit) {
+        if (recordsLimit == null || recordsLimit <= 0) {
+            throw new IllegalArgumentException("recordsLimit has to be a positive integer");
+        }
+        if (region == null) {
+            throw new IllegalArgumentException("region cannot be null");
+        }
+        serializedKinesisCredsProvider = getKryoSerializedBytes(credentialsProvider);
+        serializedkinesisClientConfig = getKryoSerializedBytes(clientConfiguration);
+        this.recordsLimit = recordsLimit;
+        this.region = region;
+
+        this.credentialsProvider = null;
+        this.clientConfiguration = null;
+    }
+
+    public Integer getRecordsLimit() {
+        return recordsLimit;
+    }
+
+    public AWSCredentialsProvider getCredentialsProvider() {
+        if (credentialsProvider == null) {
+            credentialsProvider = (AWSCredentialsProvider) this.getKryoDeserializedObject(serializedKinesisCredsProvider);
+        }
+        return credentialsProvider;
+    }
+
+    public ClientConfiguration getClientConfiguration() {
+        if (clientConfiguration == null) {
+            clientConfiguration = (ClientConfiguration) this.getKryoDeserializedObject(serializedkinesisClientConfig);
+        }
+        return clientConfiguration;
+    }
+
+    public Regions getRegion() {
+        return region;
+    }
+
+    private byte[] getKryoSerializedBytes (final Object obj) {
+        final Kryo kryo = new Kryo();
+        final ByteArrayOutputStream os = new ByteArrayOutputStream();
+        final Output output = new Output(os);
+        kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
+        kryo.writeClassAndObject(output, obj);
+        output.flush();
+        return os.toByteArray();
+    }
+
+    private Object getKryoDeserializedObject (final byte[] ser) {
+        final Kryo kryo = new Kryo();
+        final Input input = new Input(new ByteArrayInputStream(ser));
+        kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
+        return kryo.readClassAndObject(input);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        KinesisConnectionInfo that = (KinesisConnectionInfo) o;
+
+        if (!Arrays.equals(serializedKinesisCredsProvider, that.serializedKinesisCredsProvider)) return false;
+        if (!Arrays.equals(serializedkinesisClientConfig, that.serializedkinesisClientConfig)) return false;
+        if (region != null ? !region.equals(that.region) : that.region != null) return false;
+        return !(recordsLimit != null ? !recordsLimit.equals(that.recordsLimit) : that.recordsLimit != null);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = serializedKinesisCredsProvider != null ? Arrays.hashCode(serializedKinesisCredsProvider) : 0;
+        result = 31 * result + (serializedkinesisClientConfig != null ? Arrays.hashCode(serializedkinesisClientConfig) : 0);
+        result = 31 * result + (region != null ? region.hashCode() : 0);
+        result = 31 * result + (recordsLimit != null ? recordsLimit.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "KinesisConnectionInfo{" +
+                "serializedKinesisCredsProvider=" + Arrays.toString(serializedKinesisCredsProvider) +
+                ", serializedkinesisClientConfig=" + Arrays.toString(serializedkinesisClientConfig) +
+                ", region=" + region +
+                ", recordsLimit=" + recordsLimit +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
new file mode 100644
index 0000000..dd239f1
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+public class KinesisMessageId {
+    private final String streamName;
+    private final String shardId;
+    private final String sequenceNumber;
+
+    KinesisMessageId (String streamName, String shardId, String sequenceNumber) {
+        this.streamName = streamName;
+        this.shardId = shardId;
+        this.sequenceNumber = sequenceNumber;
+    }
+
+    public String getStreamName () {
+        return streamName;
+    }
+
+    public String getShardId () {
+        return shardId;
+    }
+
+    public String getSequenceNumber () {
+        return sequenceNumber;
+    }
+
+    @Override
+    public String toString () {
+        return "KinesisMessageId{" +
+                "streamName='" + streamName + '\'' +
+                ", shardId='" + shardId + '\'' +
+                ", sequenceNumber='" + sequenceNumber + '\'' +
+                '}';
+    }
+
+    @Override
+    public boolean equals (Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        KinesisMessageId that = (KinesisMessageId) o;
+
+        if (streamName != null ? !streamName.equals(that.streamName) : that.streamName != null) return false;
+        if (shardId != null ? !shardId.equals(that.shardId) : that.shardId != null) return false;
+        return !(sequenceNumber != null ? !sequenceNumber.equals(that.sequenceNumber) : that.sequenceNumber != null);
+
+    }
+
+    @Override
+    public int hashCode () {
+        int result = streamName != null ? streamName.hashCode() : 0;
+        result = 31 * result + (shardId != null ? shardId.hashCode() : 0);
+        result = 31 * result + (sequenceNumber != null ? sequenceNumber.hashCode() : 0);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
new file mode 100644
index 0000000..df2f97d
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
@@ -0,0 +1,566 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.zookeeper.CreateMode;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+class KinesisRecordsManager {
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class);
+    // zk interaction object
+    private transient CuratorFramework curatorFramework;
+    // Kinesis Spout Config object
+    private transient final Config config;
+    // Queue of records per shard fetched from kinesis and are waiting to be emitted
+    private transient Map<String, LinkedList<Record>> toEmitPerShard = new HashMap<>();
+    // Map of records  that were fetched from kinesis as a result of failure and are waiting to be emitted
+    private transient Map<KinesisMessageId, Record> failedandFetchedRecords = new HashMap<>();
+    // Sequence numbers per shard that have been emitted. LinkedHashSet as we need to remove on ack or fail. At the same time order is needed to figure out the
+    // sequence number to commit. Logic explained in commit
+    private transient Map<String, TreeSet<BigInteger>> emittedPerShard = new HashMap<>();
+    // sorted acked sequence numbers - needed to figure out what sequence number can be committed
+    private transient Map<String, TreeSet<BigInteger>> ackedPerShard = new HashMap<>();
+    // sorted failed sequence numbers - needed to figure out what sequence number can be committed
+    private transient Map<String, TreeSet<BigInteger>> failedPerShard = new HashMap<>();
+    // shard iterator corresponding to position in shard for new messages
+    private transient Map<String, String> shardIteratorPerShard = new HashMap<>();
+    // last fetched sequence number corresponding to position in shard
+    private transient Map<String, String> fetchedSequenceNumberPerShard = new HashMap<>();
+    // shard iterator corresponding to position in shard for failed messages
+    private transient Map<KinesisMessageId, String> shardIteratorPerFailedMessage = new HashMap<>();
+    // timestamp to decide when to commit to zk again
+    private transient long lastCommitTime;
+    // boolean to track deactivated state
+    private transient boolean deactivated;
+    private transient AmazonKinesisClient kinesisClient;
+
+    KinesisRecordsManager (Config config) {
+        this.config = config;
+    }
+
+    void initialize (int myTaskIndex, int totalTasks) {
+        deactivated = false;
+        lastCommitTime = System.currentTimeMillis();
+        initializeKinesisClient();
+        initializeCurator();
+        List<Shard> shards = this.getShards();
+        LOG.info("myTaskIndex is " + myTaskIndex);
+        LOG.info("totalTasks is " + totalTasks);
+        int i = myTaskIndex;
+        while (i < shards.size()) {
+            LOG.info("Shard id " + shards.get(i).getShardId() + " assigned to task " + myTaskIndex);
+            toEmitPerShard.put(shards.get(i).getShardId(), new LinkedList<Record>());
+            i += totalTasks;
+        }
+        initializeFetchedSequenceNumbers();
+        refreshShardIteratorsForNewRecords();
+    }
+
+    void next (SpoutOutputCollector collector) {
+        if (shouldCommit()) {
+            commit();
+        }
+        KinesisMessageId failedMessageId = config.getFailedMessageRetryHandler().getNextFailedMessageToRetry();
+        if (failedMessageId  != null) {
+            // if the retry service returns a message that is not in failed set then ignore it. should never happen
+            BigInteger failedSequenceNumber = new BigInteger(failedMessageId.getSequenceNumber());
+            if (failedPerShard.containsKey(failedMessageId.getShardId()) && failedPerShard.get(failedMessageId.getShardId()).contains(failedSequenceNumber)) {
+                if (!failedandFetchedRecords.containsKey(failedMessageId)) {
+                    fetchFailedRecords(failedMessageId);
+                }
+                if (emitFailedRecord(collector, failedMessageId)) {
+                    failedPerShard.get(failedMessageId.getShardId()).remove(failedSequenceNumber);
+                    config.getFailedMessageRetryHandler().failedMessageEmitted(failedMessageId);
+                    return;
+                } else {
+                    LOG.debug("failedMessageEmitted not called on retrier for " + failedMessageId + ". This can happen a few times but should not happen " +
+                            "infinitely");
+                }
+            } else {
+                LOG.debug("failedPerShard does not contain " + failedMessageId + ". This should never happen.");
+            }
+        }
+        LOG.debug("No failed record to emit for now. Hence will try to emit new records");
+        // if maximum uncommitted records count has reached, so dont emit any new records and return
+        if (!(getUncommittedRecordsCount() < config.getMaxUncommittedRecords())) {
+            LOG.debug("maximum uncommitted records count has reached. so not emitting any new records and returning");
+            return;
+        }
+        // early return as no shard is assigned - probably because number of executors > number of shards
+        if (toEmitPerShard.isEmpty()) {
+            LOG.debug("No shard is assigned to this task. Hence not emitting any tuple.");
+            return;
+        }
+
+        if (shouldFetchNewRecords()) {
+            fetchNewRecords();
+        }
+        emitNewRecord(collector);
+    }
+
+    void ack (KinesisMessageId kinesisMessageId) {
+        // for an acked message add it to acked set and remove it from emitted and failed
+        String shardId = kinesisMessageId.getShardId();
+        BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber());
+        LOG.debug("Ack received for shardId: " + shardId + " sequenceNumber: " + sequenceNumber);
+        if (!ackedPerShard.containsKey(shardId)) {
+            ackedPerShard.put(shardId, new TreeSet<BigInteger>());
+        }
+        ackedPerShard.get(shardId).add(sequenceNumber);
+        if (emittedPerShard.containsKey(shardId)) {
+            TreeSet<BigInteger> emitted = emittedPerShard.get(shardId);
+            emitted.remove(sequenceNumber);
+        }
+        if (failedPerShard.containsKey(shardId)) {
+            failedPerShard.get(shardId).remove(sequenceNumber);
+        }
+        if (failedandFetchedRecords.containsKey(kinesisMessageId)) {
+            config.getFailedMessageRetryHandler().acked(kinesisMessageId);
+            failedandFetchedRecords.remove(kinesisMessageId);
+        }
+        // keep committing when topology is deactivated since ack and fail keep getting called on deactivated topology
+        if (deactivated) {
+            commit();
+        }
+    }
+
+    void fail (KinesisMessageId kinesisMessageId) {
+        String shardId = kinesisMessageId.getShardId();
+        BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber());
+        LOG.debug("Fail received for shardId: " + shardId + " sequenceNumber: " + sequenceNumber);
+        // for a failed message add it to failed set if it will be retried, otherwise ack it; remove from emitted either way
+        if (config.getFailedMessageRetryHandler().failed(kinesisMessageId)) {
+            if (!failedPerShard.containsKey(shardId)) {
+                failedPerShard.put(shardId, new TreeSet<BigInteger>());
+            }
+            failedPerShard.get(shardId).add(sequenceNumber);
+            TreeSet<BigInteger> emitted = emittedPerShard.get(shardId);
+            emitted.remove(sequenceNumber);
+        } else {
+            ack(kinesisMessageId);
+        }
+        // keep committing when topology is deactivated since ack and fail keep getting called on deactivated topology
+        if (deactivated) {
+            commit();
+        }
+    }
+
+    void commit () {
+        // Logic for deciding what sequence number to ack is find the highest sequence number from acked called X such that there is no sequence number Y in
+        // emitted or failed that satisfies X > Y. For e.g. is acked is 1,3,5. Emitted is 2,4,6 then we can only commit 1 and not 3 because 2 is still pending
+        for (String shardId: toEmitPerShard.keySet()) {
+            if (ackedPerShard.containsKey(shardId)) {
+                BigInteger commitSequenceNumberBound = null;
+                if (failedPerShard.containsKey(shardId) && !failedPerShard.get(shardId).isEmpty()) {
+                    commitSequenceNumberBound = failedPerShard.get(shardId).first();
+                }
+                if (emittedPerShard.containsKey(shardId) && !emittedPerShard.get(shardId).isEmpty()) {
+                    BigInteger smallestEmittedSequenceNumber = emittedPerShard.get(shardId).first();
+                    if (commitSequenceNumberBound == null || (commitSequenceNumberBound.compareTo(smallestEmittedSequenceNumber) == 1)) {
+                        commitSequenceNumberBound = smallestEmittedSequenceNumber;
+                    }
+                }
+                Iterator<BigInteger> ackedSequenceNumbers = ackedPerShard.get(shardId).iterator();
+                BigInteger ackedSequenceNumberToCommit = null;
+                while (ackedSequenceNumbers.hasNext()) {
+                    BigInteger ackedSequenceNumber = ackedSequenceNumbers.next();
+                    if (commitSequenceNumberBound == null || (commitSequenceNumberBound.compareTo(ackedSequenceNumber) == 1)) {
+                        ackedSequenceNumberToCommit = ackedSequenceNumber;
+                        ackedSequenceNumbers.remove();
+                    } else {
+                        break;
+                    }
+                }
+                if (ackedSequenceNumberToCommit != null) {
+                    Map<Object, Object> state = new HashMap<>();
+                    state.put("committedSequenceNumber", ackedSequenceNumberToCommit.toString());
+                    LOG.debug("Committing sequence number " + ackedSequenceNumberToCommit.toString() + " for shardId " + shardId);
+                    String path = getZkPath(shardId);
+                    commitState(path, state);
+                }
+            }
+        }
+        lastCommitTime = System.currentTimeMillis();
+    }
+
+    void activate () {
+        LOG.info("Activate called");
+        deactivated = false;
+        initializeKinesisClient();
+    }
+
+    void deactivate () {
+        LOG.info("Deactivate called");
+        deactivated = true;
+        commit();
+        shutdownKinesisClient();
+    }
+
+    void close () {
+        commit();
+        shutdownKinesisClient();
+        shutdownCurator();
+    }
+
+    private String getZkPath (String shardId) {
+        String path = "";
+        if (!config.getZkInfo().getZkNode().startsWith("/")) {
+            path += "/";
+        }
+        path += config.getZkInfo().getZkNode();
+        if (!config.getZkInfo().getZkNode().endsWith("/")) {
+            path += "/";
+        }
+        path += (config.getStreamName() + "/" + shardId);
+        return path;
+    }
+
+    private void commitState (String path, Map<Object, Object> state) {
+        byte[] bytes = JSONValue.toJSONString(state).getBytes(Charset.forName("UTF-8"));
+        try {
+            if (curatorFramework.checkExists().forPath(path) == null) {
+                curatorFramework.create()
+                        .creatingParentsIfNeeded()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, bytes);
+            } else {
+                curatorFramework.setData().forPath(path, bytes);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Map<Object, Object> readState (String path) {
+        try {
+            Map<Object, Object> state = null;
+            byte[] b = null;
+            if (curatorFramework.checkExists().forPath(path) != null) {
+                b = curatorFramework.getData().forPath(path);
+            }
+            if (b != null) {
+                state = (Map<Object, Object>) JSONValue.parse(new String(b, "UTF-8"));
+            }
+            return state;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    // fetch records from kinesis starting at sequence number for message passed as argument. Any other messages fetched and are in the failed queue will also
+    // be kept in memory to avoid going to kinesis again for retry
+    private void fetchFailedRecords (KinesisMessageId kinesisMessageId) {
+        // if shard iterator not present for this message, get it
+        if (!shardIteratorPerFailedMessage.containsKey(kinesisMessageId)) {
+            refreshShardIteratorForFailedRecord(kinesisMessageId);
+        }
+        String shardIterator = shardIteratorPerFailedMessage.get(kinesisMessageId);
+        LOG.debug("Fetching failed records for shard id :" + kinesisMessageId.getShardId() + " at sequence number " + kinesisMessageId.getSequenceNumber() +
+                " using shardIterator " + shardIterator);
+        try {
+            GetRecordsResult getRecordsResult = fetchRecords(shardIterator);
+            if (getRecordsResult != null) {
+                List<Record> records = getRecordsResult.getRecords();
+                LOG.debug("Records size from fetchFailedRecords is " + records.size());
+                // update the shard iterator to next one in case this fetch does not give the message.
+                shardIteratorPerFailedMessage.put(kinesisMessageId, getRecordsResult.getNextShardIterator());
+                if (records.size() == 0) {
+                    LOG.debug("No records returned from kinesis. Hence sleeping for 1 second");
+                    Thread.sleep(1000);
+                } else {
+                    // add all fetched records to the set of failed records if they are present in failed set
+                    for (Record record: records) {
+                        KinesisMessageId current = new KinesisMessageId(kinesisMessageId.getStreamName(), kinesisMessageId.getShardId(), record.getSequenceNumber());
+                        if (failedPerShard.get(kinesisMessageId.getShardId()).contains(new BigInteger(current.getSequenceNumber()))) {
+                            failedandFetchedRecords.put(current, record);
+                            shardIteratorPerFailedMessage.remove(current);
+                        }
+                    }
+                }
+            }
+        } catch (InterruptedException ie) {
+            LOG.debug("Thread interrupted while sleeping", ie);
+        } catch (ExpiredIteratorException ex) {
+            LOG.debug("shardIterator for failedRecord " + kinesisMessageId + " has expired. Refreshing shardIterator");
+            refreshShardIteratorForFailedRecord(kinesisMessageId);
+        } catch (ProvisionedThroughputExceededException pe) {
+            try {
+                LOG.debug("ProvisionedThroughputExceededException occured. Check your limits. Sleeping for 1 second.", pe);
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                LOG.debug("Thread interrupted exception", e);
+            }
+        }
+    }
+
+    private void fetchNewRecords () {
+        for (Map.Entry<String, LinkedList<Record>> entry : toEmitPerShard.entrySet()) {
+            String shardId = entry.getKey();
+            try {
+                String shardIterator = shardIteratorPerShard.get(shardId);
+                LOG.debug("Fetching new records for shard id :" + shardId + " using shardIterator " + shardIterator + " after sequence number " +
+                        fetchedSequenceNumberPerShard.get(shardId));
+                GetRecordsResult getRecordsResult = fetchRecords(shardIterator);
+                if (getRecordsResult != null) {
+                    List<Record> records = getRecordsResult.getRecords();
+                    LOG.debug("Records size from fetchNewRecords is " + records.size());
+                    // update the shard iterator to next one in case this fetch does not give the message.
+                    shardIteratorPerShard.put(shardId, getRecordsResult.getNextShardIterator());
+                    if (records.size() == 0) {
+                        LOG.debug("No records returned from kinesis. Hence sleeping for 1 second");
+                        Thread.sleep(1000);
+                    } else {
+                        entry.getValue().addAll(records);
+                        fetchedSequenceNumberPerShard.put(shardId, records.get(records.size() - 1).getSequenceNumber());
+                    }
+                }
+            } catch (InterruptedException ie) {
+                LOG.debug("Thread interrupted while sleeping", ie);
+            } catch (ExpiredIteratorException ex) {
+                LOG.debug("shardIterator for shardId " + shardId + " has expired. Refreshing shardIterator");
+                refreshShardIteratorForNewRecords(shardId);
+            } catch (ProvisionedThroughputExceededException pe) {
+                try {
+                    LOG.debug("ProvisionedThroughputExceededException occured. Check your limits. Sleeping for 1 second.", pe);
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    LOG.debug("Thread interrupted exception", e);
+                }
+            }
+        }
+    }
+
+    private GetRecordsResult fetchRecords (String shardIterator) {
+        List<Record> records = new ArrayList<>();
+        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
+        getRecordsRequest.setShardIterator(shardIterator);
+        getRecordsRequest.setLimit(config.getKinesisConnectionInfo().getRecordsLimit());
+        GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
+        return getRecordsResult;
+    }
+
+    private List<Shard> getShards () {
+        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
+        describeStreamRequest.setStreamName(config.getStreamName());
+        List<Shard> shards = new ArrayList<>();
+        String exclusiveStartShardId = null;
+        do {
+            describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
+            DescribeStreamResult describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
+            shards.addAll(describeStreamResult.getStreamDescription().getShards());
+            if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
+                exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
+            } else {
+                exclusiveStartShardId = null;
+            }
+        } while ( exclusiveStartShardId != null );
+        LOG.info("Number of shards for stream " + config.getStreamName() + " are " + shards.size());
+        return shards;
+    }
+
+    private void emitNewRecord (SpoutOutputCollector collector) {
+        for (Map.Entry<String, LinkedList<Record>> entry: toEmitPerShard.entrySet()) {
+            String shardId = entry.getKey();
+            LinkedList<Record> listOfRecords = entry.getValue();
+            Record record;
+            while ((record = listOfRecords.pollFirst()) != null) {
+                KinesisMessageId kinesisMessageId = new KinesisMessageId(config.getStreamName(), shardId, record.getSequenceNumber());
+                if (emitRecord(collector, record, kinesisMessageId)) {
+                   return;
+                }
+            }
+        }
+    }
+
+    private boolean emitFailedRecord (SpoutOutputCollector collector, KinesisMessageId kinesisMessageId) {
+        if (!failedandFetchedRecords.containsKey(kinesisMessageId)) {
+            return false;
+        }
+        return emitRecord(collector, failedandFetchedRecords.get(kinesisMessageId), kinesisMessageId);
+    }
+
+    private boolean emitRecord (SpoutOutputCollector collector, Record record, KinesisMessageId kinesisMessageId) {
+        boolean result = false;
+        List<Object> tuple = config.getRecordToTupleMapper().getTuple(record);
+        // if a record is returned put the sequence number in the emittedPerShard to tie back with ack or fail
+        if (tuple != null && tuple.size() > 0) {
+            collector.emit(tuple, kinesisMessageId);
+            if (!emittedPerShard.containsKey(kinesisMessageId.getShardId())) {
+                emittedPerShard.put(kinesisMessageId.getShardId(), new TreeSet<BigInteger>());
+            }
+            emittedPerShard.get(kinesisMessageId.getShardId()).add(new BigInteger(record.getSequenceNumber()));
+            result = true;
+        } else {
+            // ack to not process the record again on restart and move on to next message
+            LOG.debug("Record " + record + " did not return a tuple to emit. Hence acking it");
+            ack(kinesisMessageId);
+        }
+        return result;
+    }
+
+    private boolean shouldCommit () {
+        return (System.currentTimeMillis() - lastCommitTime >= config.getZkInfo().getCommitIntervalMs());
+    }
+
+    private void initializeFetchedSequenceNumbers () {
+        for (String shardId : toEmitPerShard.keySet()) {
+            Map<Object, Object> state = readState(getZkPath(shardId));
+            // if state found for this shard in zk, then set the sequence number in fetchedSequenceNumber
+            if (state != null) {
+                Object committedSequenceNumber = state.get("committedSequenceNumber");
+                LOG.info("State read is committedSequenceNumber: " + committedSequenceNumber + " shardId:" + shardId);
+                if (committedSequenceNumber != null) {
+                    fetchedSequenceNumberPerShard.put(shardId, (String) committedSequenceNumber);
+                }
+            }
+        }
+    }
+
+    private void refreshShardIteratorsForNewRecords () {
+        for (String shardId: toEmitPerShard.keySet()) {
+            refreshShardIteratorForNewRecords(shardId);
+        }
+    }
+
+    private void refreshShardIteratorForNewRecords (String shardId) {
+        String shardIterator = null;
+        String lastFetchedSequenceNumber = fetchedSequenceNumberPerShard.get(shardId);
+        ShardIteratorType shardIteratorType = (lastFetchedSequenceNumber == null ? config.getShardIteratorType() : ShardIteratorType
+                .AFTER_SEQUENCE_NUMBER);
+        // Set the shard iterator for last fetched sequence number to start from correct position in shard
+        shardIterator = this.getShardIterator(shardId, shardIteratorType, lastFetchedSequenceNumber, config.getTimestamp());
+        if (shardIterator != null && !shardIterator.isEmpty()) {
+            LOG.debug("Refreshing shard iterator for new records for shardId " + shardId + " with shardIterator " + shardIterator);
+            shardIteratorPerShard.put(shardId, shardIterator);
+        }
+    }
+
+    private void refreshShardIteratorForFailedRecord (KinesisMessageId kinesisMessageId) {
+        String shardIterator = null;
+        // Set the shard iterator for last fetched sequence number to start from correct position in shard
+        shardIterator = this.getShardIterator(kinesisMessageId.getShardId(), ShardIteratorType.AT_SEQUENCE_NUMBER, kinesisMessageId.getSequenceNumber(), null);
+        if (shardIterator != null && !shardIterator.isEmpty()) {
+            LOG.debug("Refreshing shard iterator for failed records for message " + kinesisMessageId + " with shardIterator " + shardIterator);
+            shardIteratorPerFailedMessage.put(kinesisMessageId, shardIterator);
+        }
+    }
+
+    private String getShardIterator (String shardId, ShardIteratorType shardIteratorType, String sequenceNumber, Date timestamp) {
+        String shardIterator = "";
+        try {
+            GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
+            getShardIteratorRequest.setStreamName(config.getStreamName());
+            getShardIteratorRequest.setShardId(shardId);
+            getShardIteratorRequest.setShardIteratorType(shardIteratorType);
+            if (shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) {
+                getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber);
+            } else if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP)) {
+                getShardIteratorRequest.setTimestamp(timestamp);
+            }
+            GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
+            if (getShardIteratorResult != null) {
+                shardIterator = getShardIteratorResult.getShardIterator();
+            }
+        } catch (Exception e) {
+            LOG.debug("Exception occured while getting shardIterator for shard " + shardId + " shardIteratorType " + shardIteratorType + " sequence number " +
+                    sequenceNumber + " timestamp " + timestamp, e);
+        }
+        LOG.debug("Returning shardIterator " + shardIterator + " for shardId " + shardId + " shardIteratorType " + shardIteratorType + " sequenceNumber " +
+                sequenceNumber + " timestamp" + timestamp);
+        return shardIterator;
+    }
+
+    private Long getUncommittedRecordsCount () {
+        Long result = 0L;
+        for (Map.Entry<String, TreeSet<BigInteger>> emitted: emittedPerShard.entrySet()) {
+            result += emitted.getValue().size();
+        }
+        for (Map.Entry<String, TreeSet<BigInteger>> acked: ackedPerShard.entrySet()) {
+            result += acked.getValue().size();
+        }
+        for (Map.Entry<String, TreeSet<BigInteger>> failed: failedPerShard.entrySet()) {
+            result += failed.getValue().size();
+        }
+        LOG.debug("Returning uncommittedRecordsCount as " + result);
+        return result;
+    }
+
+    private boolean shouldFetchNewRecords () {
+        // check to see if any shard has already fetched records waiting to be emitted, in which case dont fetch more
+        boolean fetchRecords = true;
+        for (Map.Entry<String, LinkedList<Record>> entry: toEmitPerShard.entrySet()) {
+            if (!entry.getValue().isEmpty()) {
+                fetchRecords = false;
+                break;
+            }
+        }
+        return fetchRecords;
+    }
+
+    private void initializeCurator () {
+        ZkInfo zkInfo = config.getZkInfo();
+        curatorFramework = CuratorFrameworkFactory.newClient(zkInfo.getZkUrl(), zkInfo.getSessionTimeoutMs(), zkInfo.getConnectionTimeoutMs(), new
+                RetryNTimes(zkInfo.getRetryAttempts(), zkInfo.getRetryIntervalMs()));
+        curatorFramework.start();
+    }
+
+    private void initializeKinesisClient () {
+        kinesisClient = new AmazonKinesisClient(config.getKinesisConnectionInfo().getCredentialsProvider(), config.getKinesisConnectionInfo().getClientConfiguration());
+        kinesisClient.setRegion(Region.getRegion(config.getKinesisConnectionInfo().getRegion()));
+    }
+
+    private void shutdownCurator () {
+        curatorFramework.close();
+    }
+
+    private void shutdownKinesisClient () {
+        kinesisClient.shutdown();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
new file mode 100644
index 0000000..1ead4c0
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class KinesisSpout extends BaseRichSpout {
+
+    private final Config config;
+    private transient KinesisRecordsManager kinesisRecordsManager;
+    private transient SpoutOutputCollector collector;
+
+    public KinesisSpout (Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(config.getRecordToTupleMapper().getOutputFields());
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration () {
+        return super.getComponentConfiguration();
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.collector = collector;
+        kinesisRecordsManager = new KinesisRecordsManager(config);
+        kinesisRecordsManager.initialize(context.getThisTaskIndex(), context.getComponentTasks(context.getThisComponentId()).size());
+    }
+
+    @Override
+    public void close() {
+        kinesisRecordsManager.close();
+    }
+
+    @Override
+    public void activate() {
+        kinesisRecordsManager.activate();
+    }
+
+    @Override
+    public void deactivate() {
+        kinesisRecordsManager.deactivate();
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        kinesisRecordsManager.ack((KinesisMessageId) msgId);
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        kinesisRecordsManager.fail((KinesisMessageId) msgId);
+    }
+
+    @Override
+    public void nextTuple() {
+        kinesisRecordsManager.next(collector);
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
new file mode 100644
index 0000000..c806539
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.services.kinesis.model.Record;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+
+public interface RecordToTupleMapper {
+    /**
+     *
+     * @return names of fields in the emitted tuple
+     */
+    Fields getOutputFields ();
+
+    /**
+     *
+     * @param record kinesis record
+     * @return storm tuple to be emitted for this record, null if no tuple should be emitted
+     */
+    List<Object> getTuple (Record record);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
new file mode 100644
index 0000000..17bcd6f
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import java.io.Serializable;
+
+public class ZkInfo implements Serializable {
+    // comma separated list of zk connect strings to connect to zookeeper e.g. localhost:2181
+    private final String zkUrl;
+    // zk node under which to commit the sequence number of messages. e.g. /committed_sequence_numbers
+    private final String zkNode;
+    // zk session timeout in milliseconds
+    private final Integer sessionTimeoutMs;
+    // zk connection timeout in milliseconds
+    private final Integer connectionTimeoutMs;
+    // interval at which to commit offsets to zk in milliseconds
+    private final Long commitIntervalMs;
+    // number of retry attempts for zk
+    private final Integer retryAttempts;
+    // time to sleep between retries in milliseconds
+    private final Integer retryIntervalMs;
+
+    /**
+     * Default constructor that uses defaults for a local setup
+     */
+    public ZkInfo () {
+        this("localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000);
+    }
+
+    public ZkInfo (String zkUrl, String zkNode, Integer sessionTimeoutMs, Integer connectionTimeoutMs, Long commitIntervalMs, Integer retryAttempts, Integer
+            retryIntervalMs) {
+        this.zkUrl = zkUrl;
+        this.zkNode = zkNode;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.connectionTimeoutMs = connectionTimeoutMs;
+        this.commitIntervalMs = commitIntervalMs;
+        this.retryAttempts = retryAttempts;
+        this.retryIntervalMs = retryIntervalMs;
+        validate();
+    }
+
+    public String getZkUrl() {
+        return zkUrl;
+    }
+
+    public String getZkNode() {
+        return zkNode;
+    }
+
+    public Integer getSessionTimeoutMs() {
+        return sessionTimeoutMs;
+    }
+
+    public Integer getConnectionTimeoutMs() {
+        return connectionTimeoutMs;
+    }
+
+    public Long getCommitIntervalMs() {
+        return commitIntervalMs;
+    }
+
+    public Integer getRetryAttempts() {
+        return retryAttempts;
+    }
+
+    public Integer getRetryIntervalMs() {
+        return retryIntervalMs;
+    }
+
+    private void validate () {
+
+        if (zkUrl == null || zkUrl.length() < 1) {
+            throw new IllegalArgumentException("zkUrl must be specified to connect to zookeeper");
+        }
+        if (zkNode == null || zkNode.length() < 1) {
+            throw new IllegalArgumentException("zkNode must be specified");
+        }
+        checkPositive(sessionTimeoutMs, "sessionTimeoutMs");
+        checkPositive(connectionTimeoutMs, "connectionTimeoutMs");
+        checkPositive(commitIntervalMs, "commitIntervalMs");
+        checkPositive(retryAttempts, "retryAttempts");
+        checkPositive(retryIntervalMs, "retryIntervalMs");
+    }
+
+    private void checkPositive (Integer argument, String name) {
+        if (argument == null && argument <= 0) {
+            throw new IllegalArgumentException(name + " must be positive");
+        }
+    }
+    private void checkPositive (Long argument, String name) {
+        if (argument == null && argument <= 0) {
+            throw new IllegalArgumentException(name + " must be positive");
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "ZkInfo{" +
+                "zkUrl='" + zkUrl + '\'' +
+                ", zkNode='" + zkNode + '\'' +
+                ", sessionTimeoutMs=" + sessionTimeoutMs +
+                ", connectionTimeoutMs=" + connectionTimeoutMs +
+                ", commitIntervalMs=" + commitIntervalMs +
+                ", retryAttempts=" + retryAttempts +
+                ", retryIntervalMs=" + retryIntervalMs +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ZkInfo zkInfo = (ZkInfo) o;
+
+        if (zkUrl != null ? !zkUrl.equals(zkInfo.zkUrl) : zkInfo.zkUrl != null) return false;
+        if (zkNode != null ? !zkNode.equals(zkInfo.zkNode) : zkInfo.zkNode != null) return false;
+        if (sessionTimeoutMs != null ? !sessionTimeoutMs.equals(zkInfo.sessionTimeoutMs) : zkInfo.sessionTimeoutMs != null) return false;
+        if (connectionTimeoutMs != null ? !connectionTimeoutMs.equals(zkInfo.connectionTimeoutMs) : zkInfo.connectionTimeoutMs != null) return false;
+        if (commitIntervalMs != null ? !commitIntervalMs.equals(zkInfo.commitIntervalMs) : zkInfo.commitIntervalMs != null) return false;
+        if (retryAttempts != null ? !retryAttempts.equals(zkInfo.retryAttempts) : zkInfo.retryAttempts != null) return false;
+        return !(retryIntervalMs != null ? !retryIntervalMs.equals(zkInfo.retryIntervalMs) : zkInfo.retryIntervalMs != null);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = zkUrl != null ? zkUrl.hashCode() : 0;
+        result = 31 * result + (zkNode != null ? zkNode.hashCode() : 0);
+        result = 31 * result + (sessionTimeoutMs != null ? sessionTimeoutMs.hashCode() : 0);
+        result = 31 * result + (connectionTimeoutMs != null ? connectionTimeoutMs.hashCode() : 0);
+        result = 31 * result + (commitIntervalMs != null ? commitIntervalMs.hashCode() : 0);
+        result = 31 * result + (retryAttempts != null ? retryAttempts.hashCode() : 0);
+        result = 31 * result + (retryIntervalMs != null ? retryIntervalMs.hashCode() : 0);
+        return result;
+    }
+}


Mime
View raw message