storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/4] storm git commit: STORM-2209: Update documents adding new integration for some external systems
Date Wed, 23 Nov 2016 20:13:01 GMT
Repository: storm
Updated Branches:
  refs/heads/master ecb3fd896 -> 33c155f7f


STORM-2209: Update documents adding new integration for some external systems


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8705605c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8705605c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8705605c

Branch: refs/heads/master
Commit: 8705605c46da22bae0af6fff3a6f9caf84339bb3
Parents: a912cf3
Author: Xin Wang <best.wangxin@163.com>
Authored: Sun Nov 20 13:49:29 2016 +0800
Committer: Xin Wang <best.wangxin@163.com>
Committed: Sun Nov 20 13:49:29 2016 +0800

----------------------------------------------------------------------
 docs/index.md              |  11 ++-
 docs/storm-cassandra.md    |  23 -----
 docs/storm-druid.md        | 119 +++++++++++++++++++++++++
 docs/storm-kafka-client.md | 188 ++++++++++++++++++++++++++++++++++++++++
 docs/storm-kinesis.md      | 136 +++++++++++++++++++++++++++++
 docs/storm-mongodb.md      |  23 -----
 docs/storm-opentsdb.md     |  52 +++++++++++
 docs/storm-redis.md        |  24 -----
 8 files changed, 505 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8705605c/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index f951adf..b842cda 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -66,7 +66,7 @@ Trident is an alternative interface to Storm. It provides exactly-once processin
 * [Event Logging](Eventlogging.html)
 
 ### Integration With External Systems, and Other Libraries
-* [Apache Kafka Integration](storm-kafka.html)
+* [Apache Kafka Integration](storm-kafka.html), [New Kafka Consumer Integration](storm-kafka-client.html)
 * [Apache HBase Integration](storm-hbase.html)
 * [Apache HDFS Integration](storm-hdfs.html)
 * [Apache Hive Integration](storm-hive.html)
@@ -79,8 +79,17 @@ Trident is an alternative interface to Storm. It provides exactly-once
processin
 * [Elasticsearch Integration](storm-elasticsearch.html)
 * [MQTT Integration](storm-mqtt.html)
 * [Mongodb Integration](storm-mongodb.html)
+* [OpenTSDB Integration](storm-opentsdb.html)
+* [Kinesis Integration](storm-kinesis.html)
+* [Druid Integration](storm-druid.html)
 * [Kestrel Integration](Kestrel-and-Storm.html)
 
+#### Container, Resource Management System Integration
+
+* [YARN Integration](https://github.com/yahoo/storm-yarn), [YARN Integration via Slider](http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.2/bk_yarn_resource_mgt/content/ref-7d103a48-7c2e-4b7b-aab5-62c739a32ee0.1.html)
+* [Mesos Integration](https://github.com/mesos/storm)
+* [Docker Integration](https://hub.docker.com/_/storm/)
+
 ### Advanced
 
 * [Defining a non-JVM language DSL for Storm](Defining-a-non-jvm-language-dsl-for-storm.html)

http://git-wip-us.apache.org/repos/asf/storm/blob/8705605c/docs/storm-cassandra.md
----------------------------------------------------------------------
diff --git a/docs/storm-cassandra.md b/docs/storm-cassandra.md
index c674fbc..47fabbd 100644
--- a/docs/storm-cassandra.md
+++ b/docs/storm-cassandra.md
@@ -230,26 +230,3 @@ Below `state` API for `querying` data from Cassandra.
         TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
         stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(),
new Fields("name"));         
 ```
-
-## License
-
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-
-## Committer Sponsors
- * Sriharha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
- * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))

http://git-wip-us.apache.org/repos/asf/storm/blob/8705605c/docs/storm-druid.md
----------------------------------------------------------------------
diff --git a/docs/storm-druid.md b/docs/storm-druid.md
new file mode 100644
index 0000000..bed50dc
--- /dev/null
+++ b/docs/storm-druid.md
@@ -0,0 +1,119 @@
+# Storm Druid Bolt and TridentState
+
+This module provides core Storm and Trident bolt implementations for writing data to [Druid](http://druid.io/)
data store.
+This implementation uses Druid's [Tranquility library](https://github.com/druid-io/tranquility)
to send messages to druid.
+
+Some of the implementation details are borrowed from existing [Tranquility Storm Bolt](https://github.com/druid-io/tranquility/blob/master/docs/storm.md).
+This new Bolt added to support latest storm release and maintain the bolt in the storm repo.
+
+### Core Bolt
+Below example describes the usage of core bolt which is `org.apache.storm.druid.bolt.DruidBeamBolt`
+By default this Bolt expects to receive tuples in which "event" field gives your event type.
+This logic can be changed by implementing ITupleDruidEventMapper interface.
+
+```java
+
+   DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String,
Object>());
+   DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build();
+   ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
+   DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String,
Object>>(druidBeamFactory, eventMapper, druidConfig);
+   topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");
+   topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt"
, druidConfig.getDiscardStreamId());
+
+```
+
+
+### Trident State
+
+```java
+    DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String,
Object>());
+    ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
+
+    final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));
+
+    stream.peek(new Consumer() {
+        @Override
+        public void accept(TridentTuple input) {
+             LOG.info("########### Received tuple: [{}]", input);
+         }
+    }).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory,
eventMapper), new Fields("event"), new DruidBeamStateUpdater());
+
+```
+
+### Sample Beam Factory Implementation
+Druid bolt must be supplied with a BeamFactory. You can implement one of these using the
[DruidBeams builder's] (https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala)
"buildBeam()" method.
+See the [Configuration documentation](https://github.com/druid-io/tranquility/blob/master/docs/configuration.md)
for details.
+For more details refer [Tranquility library](https://github.com/druid-io/tranquility) docs.
+
+```java
+
+public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>>
{
+
+    @Override
+    public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext
metrics) {
+
+
+        final String indexService = "druid/overlord"; // The druid.service name of the indexing
service Overlord node.
+        final String discoveryPath = "/druid/discovery"; // Curator service discovery path.
config: druid.discovery.curator.path
+        final String dataSource = "test"; //The name of the ingested datasource. Datasources
can be thought of as tables.
+        final List<String> dimensions = ImmutableList.of("publisher", "advertiser");
+        List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
+                new CountAggregatorFactory(
+                        "click"
+                )
+        );
+        // Tranquility needs to be able to extract timestamps from your object type (in this
case, Map<String, Object>).
+        final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String,
Object>>()
+        {
+            @Override
+            public DateTime timestamp(Map<String, Object> theMap)
+            {
+                return new DateTime(theMap.get("timestamp"));
+            }
+        };
+
+        // Tranquility uses ZooKeeper (through Curator) for coordination.
+        final CuratorFramework curator = CuratorFrameworkFactory
+                .builder()
+                .connectString((String)conf.get("druid.tranquility.zk.connect")) //take config
from storm conf
+                .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
+                .build();
+        curator.start();
+
+        // The JSON serialization of your object must have a timestamp field in a format
that Druid understands. By default,
+        // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.
+        final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
+
+        // Tranquility needs to be able to serialize your object type to JSON for transmission
to Druid. By default this is
+        // done with Jackson. If you want to provide an alternate serializer, you can provide
your own via ```.objectWriter(...)```.
+        // In this case, we won't provide one, so we're just using Jackson.
+        final Beam<Map<String, Object>> beam = DruidBeams
+                .builder(timestamper)
+                .curator(curator)
+                .discoveryPath(discoveryPath)
+                .location(DruidLocation.create(indexService, dataSource))
+                .timestampSpec(timestampSpec)
+                .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators,
QueryGranularities.MINUTE))
+                .tuning(
+                        ClusteredBeamTuning
+                                .builder()
+                                .segmentGranularity(Granularity.HOUR)
+                                .windowPeriod(new Period("PT10M"))
+                                .partitions(1)
+                                .replicants(1)
+                                .build()
+                )
+                .druidBeamConfig(
+                      DruidBeamConfig
+                           .builder()
+                           .indexRetryPeriod(new Period("PT10M"))
+                           .build())
+                .buildBeam();
+
+        return beam;
+    }
+}
+
+```
+
+Example code is available [here.](https://github.com/apache/storm/tree/master/external/storm-druid/src/test/java/org/apache/storm/druid)

http://git-wip-us.apache.org/repos/asf/storm/blob/8705605c/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
new file mode 100644
index 0000000..c8e038f
--- /dev/null
+++ b/docs/storm-kafka-client.md
@@ -0,0 +1,188 @@
+#Storm Kafka Spout with New Kafka Consumer API
+
+Apache Storm Spout implementation to consume data from Apache Kafka versions 0.10 onwards
(please see [Apache Kafka Version Compatibility] (#compatibility)). 
+
+The Apache Storm Spout allows clients to consume data from Kafka starting at offsets as defined
by the offset strategy specified in `FirstPollOffsetStrategy`. 
+In case of failure, the Kafka Spout will re-start consuming messages from the offset that
matches the chosen `FirstPollOffsetStrategy`.
+
+The Kafka Spout implementation allows you to specify the stream (`KafkaSpoutStream`) associated
with each topic or topic wildcard. `KafkaSpoutStream` represents the stream and output fields.
For named topics use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use `KafkaSpoutStreamsWildcardTopics`.

+
+The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s from `ConsumerRecord`s.
The logic is provided by the user through implementing the appropriate number of `KafkaSpoutTupleBuilder`
instances. For named topics use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard
use `KafkaSpoutTuplesBuilderWildcardTopics`.
+
+Multiple topics and topic wildcards can use the same `KafkaSpoutTupleBuilder` implementation,
as long as the logic to build `Tuple`s from `ConsumerRecord`s is identical.
+
+
+# Usage Examples
+
+### Create a Kafka Spout
+
+The code snippet bellow is extracted from the example in the module [test] (https://github.com/apache/storm/tree/master/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test).
The code that is common for named topics and topic wildcards is in the first box. The specific
implementations are in the appropriate section. 
+
+These snippets serve as a reference and do not compile. If you would like to reuse this code
in your implementation, please obtain it from the test module, where it is complete.
+
+```java
+KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
+
+KafkaSpoutConfig kafkaSpoutConfig = new KafkaSpoutConfig.Builder<String, String>(kafkaConsumerProps,
kafkaSpoutStreams, tuplesBuilder, retryService)
+        .setOffsetCommitPeriodMs(10_000)
+        .setFirstPollOffsetStrategy(EARLIEST)
+        .setMaxUncommittedOffsets(250)
+        .build();
+
+Map<String, Object> kafkaConsumerProps= new HashMap<>();
+kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS,"127.0.0.1:9092");
+kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.GROUP_ID,"kafkaSpoutTestGroup");
+kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER,"org.apache.kafka.common.serialization.StringDeserializer");
+kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER,"org.apache.kafka.common.serialization.StringDeserializer");
+
+KafkaSpoutRetryService retryService = new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
+        KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), Integer.MAX_VALUE,
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
+```
+
+### Named Topics
+```java
+KafkaSpoutStreams kafkaSpoutStreams = new KafkaSpoutStreamsNamedTopics.Builder(outputFields,
STREAMS[0], new String[]{TOPICS[0], TOPICS[1]})
+            .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  // contents of
topic test2 sent to test_stream
+            .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})  // contents of
topic test2 sent to test2_stream
+            .build();
+            
+KafkaSpoutTuplesBuilder<String, String> tuplesBuilder = new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
+            new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], TOPICS[1]),
+            new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
+            .build();
+            
+String[] STREAMS = new String[]{"test_stream", "test1_stream", "test2_stream"};
+String[] TOPICS = new String[]{"test", "test1", "test2"};
+
+Fields outputFields = new Fields("topic", "partition", "offset", "key", "value");
+Fields outputFields1 = new Fields("topic", "partition", "offset");
+```
+
+### Topic Wildcards
+```java
+KafkaSpoutStreams kafkaSpoutStreams = new KafkaSpoutStreamsWildcardTopics(
+            new KafkaSpoutStream(outputFields, STREAM, Pattern.compile(TOPIC_WILDCARD_PATTERN)));
+
+KafkaSpoutTuplesBuilder<String, String> tuplesBuilder = new TopicsTest0Test1TupleBuilder<>(TOPIC_WILDCARD_PATTERN);
+
+String STREAM = "test_wildcard_stream";
+String TOPIC_WILDCARD_PATTERN = "test[1|2]";
+
+Fields outputFields = new Fields("topic", "partition", "offset", "key", "value");
+```
+
+### Create a simple Toplogy using the Kafka Spout:
+
+
+```java
+TopologyBuilder tp = new TopologyBuilder();
+tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())),
1);
+tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
+tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
+tp.createTopology();
+```
+
+# Build And Run Bundled Examples  
+To be able to run the examples you must first build the java code in the package `storm-kafka-client`,

+and then generate an uber jar with all the dependencies.
+
+## Use the Maven Shade Plugin to Build the Uber Jar
+
+Add the following to `REPO_HOME/storm/external/storm-kafka-client/pom.xml`
+```xml
+<plugin>
+    <groupId>org.apache.maven.plugins</groupId>
+    <artifactId>maven-shade-plugin</artifactId>
+    <version>2.4.1</version>
+    <executions>
+        <execution>
+            <phase>package</phase>
+            <goals>
+                <goal>shade</goal>
+            </goals>
+            <configuration>
+                <transformers>
+                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                        <mainClass>org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain</mainClass>
+                    </transformer>
+                </transformers>
+            </configuration>
+        </execution>
+    </executions>
+</plugin>
+```
+
+create the uber jar by running the commmand:
+
+`mvn package -f REPO_HOME/storm/external/storm-kafka-client/pom.xml`
+
+This will create the uber jar file with the name and location matching the following pattern:
+ 
+`REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-1.0.x.jar`
+
+### Run Storm Topology
+
+Copy the file `REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-1.0.x.jar`
to `STORM_HOME/extlib`
+
+Using the Kafka command line tools create three topics [test, test1, test2] and use the Kafka
console producer to populate the topics with some data 
+
+Execute the command `STORM_HOME/bin/storm jar REPO_HOME/storm/external/storm/target/storm-kafka-client-1.0.x.jar
org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain`
+
+With the debug level logs enabled it is possible to see the messages of each topic being
redirected to the appropriate Bolt as defined 
+by the streams defined and choice of shuffle grouping.   
+
+## Using storm-kafka-client with different versions of kafka
+
+Storm-kafka-client's Kafka dependency is defined as `provided` scope in maven, meaning it
will not be pulled in
+as a transitive dependency. This allows you to use a version of Kafka dependency compatible
with your kafka cluster.
+
+When building a project with storm-kafka-client, you must explicitly add the Kafka clients
dependency. For example, to
+use Kafka-clients 0.10.0.0, you would use the following dependency in your `pom.xml`:
+
+```xml
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.10.0.0</version>
+        </dependency>
+```
+
+You can also override the kafka clients version while building from maven, with parameter
`storm.kafka.client.version`
+e.g. `mvn clean install -Dstorm.kafka.client.version=0.10.0.0`
+
+When selecting a kafka client version, you should ensure - 
+ 1. kafka api is compatible. storm-kafka-client module only supports **0.10 or newer** kafka
client API. For older versions,
+ you can use storm-kafka module (https://github.com/apache/storm/tree/master/external/storm-kafka).
 
+ 2. The kafka client selected by you should be wire compatible with the broker. e.g. 0.9.x
client will not work with 
+ 0.8.x broker. 
+
+#Kafka Spout Performance Tuning
+
+The Kafka spout provides two internal parameters to control its performance. The parameters
can be set using the [KafkaSpoutConfig] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java)
methods [setOffsetCommitPeriodMs] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L189-L193)
and [setMaxUncommittedOffsets] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L211-L217).

+
+* "offset.commit.period.ms" controls how often the spout commits to Kafka
+* "max.uncommitted.offsets" controls how many offsets can be pending commit before another
poll can take place
+<br/>
+
+The [Kafka consumer config] (http://kafka.apache.org/documentation.html#consumerconfigs)
parameters may also have an impact on the performance of the spout. The following Kafka parameters
are likely the most influential in the spout performance: 
+
+* “fetch.min.bytes”
+* “fetch.max.wait.ms”
+* [Kafka Consumer] (http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
instance poll timeout, which is specified for each Kafka spout using the [KafkaSpoutConfig]
(https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java)
method [setPollTimeoutMs] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L180-L184)
+<br/>
+
+Depending on the structure of your Kafka cluster, distribution of the data, and availability
of data to poll, these parameters will have to be configured appropriately. Please refer to
the Kafka documentation on Kafka parameter tuning.
+
+###Default values
+
+Currently the Kafka spout has has the following default values, which have shown to give
good performance in the test environment as described in this [blog post] (https://hortonworks.com/blog/microbenchmarking-storm-1-0-performance/)
+
+* poll.timeout.ms = 200
+* offset.commit.period.ms = 30000   (30s)
+* max.uncommitted.offsets = 10000000
+<br/>
+
+There will be a blog post coming soon analyzing the trade-offs of this tuning parameters,
and comparing the performance of the Kafka Spouts using the Kafka client API introduced in
0.9 (new implementation) and in prior versions (prior implementation)
+
+#Future Work
+ Implement comprehensive metrics. Trident spout is coming soon.

http://git-wip-us.apache.org/repos/asf/storm/blob/8705605c/docs/storm-kinesis.md
----------------------------------------------------------------------
diff --git a/docs/storm-kinesis.md b/docs/storm-kinesis.md
new file mode 100644
index 0000000..b23c10d
--- /dev/null
+++ b/docs/storm-kinesis.md
@@ -0,0 +1,136 @@
+#Storm Kinesis Spout
+Provides core storm spout for consuming data from a stream in Amazon Kinesis Streams. It
stores the sequence numbers that can be committed in zookeeper and 
+starts consuming records after that sequence number on restart by default. Below is the code
sample to create a sample topology that uses the spout. Each 
+object used in configuring the spout is explained below. Ideally, the number of spout tasks
should be equal to number of shards in kinesis. However each task 
+can read from more than one shard.
+
+```java
+public class KinesisSpoutTopology {
+    public static void main (String args[]) throws InvalidTopologyException, AuthorizationException,
AlreadyAliveException {
+        String topologyName = args[0];
+        RecordToTupleMapper recordToTupleMapper = new TestRecordToTupleMapper();
+        KinesisConnectionInfo kinesisConnectionInfo = new KinesisConnectionInfo(new CredentialsProviderChain(),
new ClientConfiguration(), Regions.US_WEST_2,
+                1000);
+        ZkInfo zkInfo = new ZkInfo("localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L,
3, 2000);
+        KinesisConfig kinesisConfig = new KinesisConfig(args[1], ShardIteratorType.TRIM_HORIZON,
+                recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), zkInfo,
kinesisConnectionInfo, 10000L);
+        KinesisSpout kinesisSpout = new KinesisSpout(kinesisConfig);
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+        topologyBuilder.setSpout("spout", kinesisSpout, 3);
+        topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 1).shuffleGrouping("spout");
+        Config topologyConfig = new Config();
+        topologyConfig.setDebug(true);
+        topologyConfig.setNumWorkers(3);
+        StormSubmitter.submitTopology(topologyName, topologyConfig, topologyBuilder.createTopology());
+    }
+}
+```
+As you can see above the spout takes an object of KinesisConfig in its constructor. The constructor
of KinesisConfig takes 8 objects as explained below.
+
+#### `String` streamName
+name of kinesis stream to consume data from
+
+#### `ShardIteratorType` shardIteratorType
+3 types are supported - TRIM_HORIZON(beginning of shard), LATEST and AT_TIMESTAMP. By default
this argument is ignored if state for shards 
+is found in zookeeper. Hence they will apply the first time a topology is started. If you
want to use any of these in subsequent runs of the topology, you 
+will need to clear the state of zookeeper node used for storing sequence numbers
+
+#### `RecordToTupleMapper` recordToTupleMapper
+an implementation of `RecordToTupleMapper` interface that spout will call to convert a kinesis
record to a storm tuple. It has two methods. getOutputFields 
+tells the spout the fields that will be present in the tuple emitted from the getTuple method.
If getTuple returns null, the record will be acked
+```java
+    Fields getOutputFields ();
+    List<Object> getTuple (Record record);
+```
+
+#### `Date` timestamp
+used in conjunction with the AT_TIMESTAMP shardIteratorType argument. This will make the
spout fetch records from kinesis starting at that time or later. The
+time used by kinesis is the server side time associated to the record by kinesis
+
+#### `FailedMessageRetryHadnler` failedMessageRetryHandler 
+an implementation of the `FailedMessageRetryHandler` interface. By default this module provides
an implementation that supports a exponential backoff retry
+mechanism for failed messages. That implementation has two constructors. Default no args
constructor will configure first retry at 100 milliseconds and 
+subsequent retires at Math.pow(2, i-1) where i is the retry number in the range 2 to LONG.MAX_LONG.
2 represents the base for exponential function in seconds. 
+Other constructor takes retry interval in millis for first retry as first argument, base
for exponential function in seconds as second argument and number of 
+retries as third argument. The methods of this interface and its working in accord with the
spout is explained below
+```java
+    boolean failed (KinesisMessageId messageId);
+    KinesisMessageId getNextFailedMessageToRetry ();
+    void failedMessageEmitted (KinesisMessageId messageId);
+    void acked (KinesisMessageId messageId);
+```
+failed method will be called on every tuple that failed in the spout. It should return true
if that failed message is scheduled to be retried, false otherwise.
+
+getNextFailedMessageToRetry method will be called the first thing every time a spout wants
to emit a tuple. It should return a message that should be retried
+if any or null otherwise. Note that it can return null in the case it does not have any message
to retry as of that moment. However, it should eventually 
+return every message for which it returned true when failed method was called for that message
+
+failedMessageEmitted will be called if spout successfully manages to get the record from
kinesis and emit it. If not, the implementation should return the same 
+message when getNextFailedMessageToRetry is called again
+
+acked will be called once the failed message was re-emitted and successfully acked by the
spout. If it was failed by the spout failed will be called again
+
+#### `ZkInfo` zkInfo
+an object encapsulating information for zookeeper interaction. The constructor takes zkUrl
as first argument which is a comma separated string of zk host and
+port, zkNode as second that will be used as the root node for storing committed sequence
numbers, session timeout as third in milliseconds, connection timeout
+as fourth in milliseconds, commit interval as fifth in milliseconds for committing sequence
numbers to zookeeper, retry attempts as sixth for zk client
+connection retry attempts, retry interval as seventh in milliseconds for time to wait before
retrying to connect. 
+
+#### `KinesisConnectionInfo` kinesisConnectionInfo
+an object that captures arguments for connecting to kinesis using kinesis client. It has
a constructor that takes an implementation of `AWSCredentialsProvider`
+as first argument. This module provides an implementation called `CredentialsProviderChain`
that allows the spout to authenticate with kinesis using one of 
+the 5 mechanisms in this order - `EnvironmentVariableCredentialsProvider`, `SystemPropertiesCredentialsProvider`,
`ClasspathPropertiesFileCredentialsProvider`, 
+`InstanceProfileCredentialsProvider`, `ProfileCredentialsProvider`. It takes an object of
`ClientConfiguration` as second argument for configuring the kinesis 
+client, `Regions` as third argument that sets the region to connect to on the client and
recordsLimit as the fourth argument which represents the maximum number
+of records kinesis client will retrieve for every GetRecords request. This limit should be
carefully chosen based on the size of the record, kinesis 
+throughput rate limits and per tuple latency in storm for the topology. Also if one task
will be reading from more than one shards then that will also affect
+the choice of limit argument
+
+#### `Long` maxUncommittedRecords
+this represents the maximum number of uncommitted sequence numbers allowed per task. Once
this number is reached spout will not fetch any new records from 
+kinesis. Uncommited sequence numbers are defined as the sum of all the messages for a task
that have not been committed to zookeeper. This is different from 
+topology level max pending messages. For example if this value is set to 10, and the spout
emitted sequence numbers from 1 to 10. Sequence number 1 is pending 
+and 2 to 10 acked. In that case the number of uncommitted sequence numbers is 10 since no
sequence number in the range 1 to 10 can be committed to zk. 
+However, storm can still call next tuple on the spout because there is only 1 pending message
+ 
+### Maven dependencies
+Aws sdk version that this was tested with is 1.10.77
+
+```xml
+ <dependencies>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk</artifactId>
+            <version>${aws-java-sdk.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+        </dependency>
+ </dependencies>
+```
+
+#Future Work
+Handle merging or splitting of shards in kinesis, Trident spout implementation and metrics

http://git-wip-us.apache.org/repos/asf/storm/blob/8705605c/docs/storm-mongodb.md
----------------------------------------------------------------------
diff --git a/docs/storm-mongodb.md b/docs/storm-mongodb.md
index 90994bd..133bac3 100644
--- a/docs/storm-mongodb.md
+++ b/docs/storm-mongodb.md
@@ -174,26 +174,3 @@ To use the `MongoUpdateBolt`,  you construct an instance of it by specifying
Mon
         //updateBolt.withUpsert(true);
  ```
 
-## License
-
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-
-## Committer Sponsors
-
- * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
- 

http://git-wip-us.apache.org/repos/asf/storm/blob/8705605c/docs/storm-opentsdb.md
----------------------------------------------------------------------
diff --git a/docs/storm-opentsdb.md b/docs/storm-opentsdb.md
new file mode 100644
index 0000000..11995ce
--- /dev/null
+++ b/docs/storm-opentsdb.md
@@ -0,0 +1,52 @@
+# Storm OpenTSDB Bolt and TridentState
+  
+OpenTSDB offers a scalable and highly available storage for time series data. It consists
of a
+Time Series Daemon (TSD) servers along with command line utilities. Each TSD connects to
the 
+configured HBase cluster to push/query the data.
+
+Time series data point consists of:
+ - a metric name.
+ - a UNIX timestamp (seconds or milliseconds since Epoch).
+ - a value (64 bit integer or single-precision floating point value).
+ - a set of tags (key-value pairs) that describe the time series the point belongs to.
+
+Storm bolt and trident state creates the above time series data from a tuple based on the
given `TupleMetricPointMapper`
+  
+This module provides core Storm and Trident bolt implementations for writing data to OpenTSDB.

+
+Time series data points are written with at-least-once guarantee and duplicate data points
should be handled as mentioned [here](http://opentsdb.net/docs/build/html/user_guide/writing.html#duplicate-data-points)
in OpenTSDB. 
+
+## Examples
+
+### Core Bolt
+Below example describes the usage of core bolt which is `org.apache.storm.opentsdb.bolt.OpenTsdbBolt`
+
+```java
+
+        OpenTsdbClient.Builder builder =  OpenTsdbClient.newBuilder(openTsdbUrl).sync(30_000).returnDetails();
+        final OpenTsdbBolt openTsdbBolt = new OpenTsdbBolt(builder, Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
+        openTsdbBolt.withBatchSize(10).withFlushInterval(2000);
+        topologyBuilder.setBolt("opentsdb", openTsdbBolt).shuffleGrouping("metric-gen");
+        
+```
+
+
+### Trident State
+
+```java
+
+        final OpenTsdbStateFactory openTsdbStateFactory =
+                new OpenTsdbStateFactory(OpenTsdbClient.newBuilder(tsdbUrl),
+                        Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
+        TridentTopology tridentTopology = new TridentTopology();
+        
+        final Stream stream = tridentTopology.newStream("metric-tsdb-stream", new MetricGenSpout());
+        
+        stream.peek(new Consumer() {
+            @Override
+            public void accept(TridentTuple input) {
+                LOG.info("########### Received tuple: [{}]", input);
+            }
+        }).partitionPersist(openTsdbStateFactory, MetricGenSpout.DEFAULT_METRIC_FIELDS, new
OpenTsdbStateUpdater());
+        
+```

http://git-wip-us.apache.org/repos/asf/storm/blob/8705605c/docs/storm-redis.md
----------------------------------------------------------------------
diff --git a/docs/storm-redis.md b/docs/storm-redis.md
index adbac68..87541b9 100644
--- a/docs/storm-redis.md
+++ b/docs/storm-redis.md
@@ -232,27 +232,3 @@ RedisClusterState
                                 new RedisClusterStateQuerier(lookupMapper),
                                 new Fields("columnName","columnValue"));
 ```
-
-## License
-
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-
-## Committer Sponsors
-
- * Robert Evans ([@revans2](https://github.com/revans2))
- * Jungtaek Lim ([@HeartSaVioR](https://github.com/HeartSaVioR))


Mime
View raw message