storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sati...@apache.org
Subject [1/2] storm git commit: STORM-1979: Storm Druid Connector implementation. This uses Druid's tranquility library.
Date Tue, 09 Aug 2016 05:37:39 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch 0253657f6 -> 7e90b93f7


STORM-1979: Storm Druid Connector implementation. This uses Druid's tranquility library.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d51e696b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d51e696b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d51e696b

Branch: refs/heads/1.x-branch
Commit: d51e696b7d538a5dda97a71960b3d6d04ea4e311
Parents: 0253657
Author: Manikumar Reddy O <manikumar.reddy@gmail.com>
Authored: Thu Jul 21 11:57:03 2016 +0000
Committer: Satish Duggana <sduggana@hortonworks.com>
Committed: Tue Aug 9 10:54:50 2016 +0530

----------------------------------------------------------------------
 external/storm-druid/README.md                  | 143 +++++++++++++++++++
 external/storm-druid/pom.xml                    |  83 +++++++++++
 .../apache/storm/druid/bolt/DruidBeamBolt.java  | 110 ++++++++++++++
 .../storm/druid/bolt/DruidBeamFactory.java      |  29 ++++
 .../apache/storm/druid/bolt/DruidConfig.java    | 104 ++++++++++++++
 .../druid/bolt/ITupleDruidEventMapper.java      |  38 +++++
 .../storm/druid/bolt/TupleDruidEventMapper.java |  44 ++++++
 .../storm/druid/trident/DruidBeamState.java     |  96 +++++++++++++
 .../druid/trident/DruidBeamStateFactory.java    |  42 ++++++
 .../druid/trident/DruidBeamStateUpdater.java    |  48 +++++++
 .../storm/druid/SampleDruidBeamFactoryImpl.java | 122 ++++++++++++++++
 .../storm/druid/SampleDruidBoltTopology.java    |  95 ++++++++++++
 .../druid/SampleDruidBoltTridentTopology.java   |  91 ++++++++++++
 .../apache/storm/druid/SimpleBatchSpout.java    |  95 ++++++++++++
 .../org/apache/storm/druid/SimpleSpout.java     |  68 +++++++++
 pom.xml                                         |   1 +
 storm-dist/binary/src/main/assembly/binary.xml  |  15 +-
 17 files changed, 1223 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d51e696b/external/storm-druid/README.md
----------------------------------------------------------------------
diff --git a/external/storm-druid/README.md b/external/storm-druid/README.md
new file mode 100644
index 0000000..75434f7
--- /dev/null
+++ b/external/storm-druid/README.md
@@ -0,0 +1,143 @@
+# Storm Druid Bolt and TridentState
+
+This module provides core Storm and Trident bolt implementations for writing data to [Druid](http://druid.io/) data store.
+This implementation uses Druid's [Tranquility library](https://github.com/druid-io/tranquility) to send messages to druid.
+
+Some of the implementation details are borrowed from existing [Tranquility Storm Bolt](https://github.com/druid-io/tranquility/blob/master/docs/storm.md).
+This new Bolt added to support latest storm release and maintain the bolt in the storm repo.
+
+### Core Bolt
+Below example describes the usage of core bolt which is `org.apache.storm.druid.bolt.DruidBeamBolt`
+By default this Bolt expects to receive tuples in which "event" field gives your event type.
+This logic can be changed by implementing ITupleDruidEventMapper interface.
+
+```java
+
+   DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
+   DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build();
+   ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
+   DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, druidConfig);
+   topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");
+   topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , druidConfig.getDiscardStreamId());
+
+```
+
+
+### Trident State
+
+```java
+    DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
+    ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
+
+    final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));
+
+    stream.peek(new Consumer() {
+        @Override
+        public void accept(TridentTuple input) {
+             LOG.info("########### Received tuple: [{}]", input);
+         }
+    }).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater());
+
+```
+
+### Sample Beam Factory Implementation
+Druid bolt must be supplied with a BeamFactory. You can implement one of these using the [DruidBeams builder's] (https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala) "buildBeam()" method.
+See the [Configuration documentation](https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details.
+For more details refer [Tranquility library](https://github.com/druid-io/tranquility) docs.
+
+```java
+
+public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {
+
+    @Override
+    public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
+
+
+        final String indexService = "druid/overlord"; // The druid.service name of the indexing service Overlord node.
+        final String discoveryPath = "/druid/discovery"; // Curator service discovery path. config: druid.discovery.curator.path
+        final String dataSource = "test"; //The name of the ingested datasource. Datasources can be thought of as tables.
+        final List<String> dimensions = ImmutableList.of("publisher", "advertiser");
+        List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
+                new CountAggregatorFactory(
+                        "click"
+                )
+        );
+        // Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>).
+        final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>()
+        {
+            @Override
+            public DateTime timestamp(Map<String, Object> theMap)
+            {
+                return new DateTime(theMap.get("timestamp"));
+            }
+        };
+
+        // Tranquility uses ZooKeeper (through Curator) for coordination.
+        final CuratorFramework curator = CuratorFrameworkFactory
+                .builder()
+                .connectString((String)conf.get("druid.tranquility.zk.connect")) //take config from storm conf
+                .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
+                .build();
+        curator.start();
+
+        // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,
+        // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.
+        final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
+
+        // Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is
+        // done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```.
+        // In this case, we won't provide one, so we're just using Jackson.
+        final Beam<Map<String, Object>> beam = DruidBeams
+                .builder(timestamper)
+                .curator(curator)
+                .discoveryPath(discoveryPath)
+                .location(DruidLocation.create(indexService, dataSource))
+                .timestampSpec(timestampSpec)
+                .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE))
+                .tuning(
+                        ClusteredBeamTuning
+                                .builder()
+                                .segmentGranularity(Granularity.HOUR)
+                                .windowPeriod(new Period("PT10M"))
+                                .partitions(1)
+                                .replicants(1)
+                                .build()
+                )
+                .druidBeamConfig(
+                      DruidBeamConfig
+                           .builder()
+                           .indexRetryPeriod(new Period("PT10M"))
+                           .build())
+                .buildBeam();
+
+        return beam;
+    }
+}
+
+```
+
+Example code is available [here.](https://github.com/apache/storm/tree/master/external/storm-druid/src/test/java/org/apache/storm/druid)
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+ * Sriharha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
+ * Satish Duggana ([satishd@apache.org](mailto:satishd@apache.org))

http://git-wip-us.apache.org/repos/asf/storm/blob/d51e696b/external/storm-druid/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-druid/pom.xml b/external/storm-druid/pom.xml
new file mode 100644
index 0000000..0f50d76
--- /dev/null
+++ b/external/storm-druid/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>storm-druid</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.druid</groupId>
+            <artifactId>tranquility-core_2.11</artifactId>
+            <version>0.8.2</version>
+        </dependency>
+        <dependency>
+            <groupId>io.druid</groupId>
+            <artifactId>druid-server</artifactId>
+            <version>0.9.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>2.11.8</version>
+        </dependency>
+        <dependency>
+            <groupId>com.twitter</groupId>
+            <artifactId>util-core_2.11</artifactId>
+            <version>6.30.0</version>
+        </dependency>
+        <!-- tranquility library depends on jackson 2.4.6 version -->
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>2.4.6</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.4.6</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-smile</artifactId>
+            <version>2.4.6</version>
+        </dependency>
+
+        <!--test dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/d51e696b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
new file mode 100644
index 0000000..721eaa1
--- /dev/null
+++ b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid.bolt;
+
+import com.metamx.tranquility.tranquilizer.MessageDroppedException;
+import com.metamx.tranquility.tranquilizer.Tranquilizer;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Basic bolt implementation for storing data to Druid datastore.
+ * <p/>
+ * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
+ * to send to druid store.
+ * Some of the concepts are borrowed from Tranquility storm connector implementation.
+ * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
+ *
+ * By default this Bolt expects to receive tuples in which "event" field gives your event type.
+ * This logic can be changed by implementing ITupleDruidEventMapper interface.
+ * <p/>
+ *
+ */
+public class DruidBeamBolt<E> extends BaseRichBolt {
+
+    private volatile  OutputCollector collector;
+    private DruidBeamFactory<E> beamFactory = null;
+    private DruidConfig druidConfig = null;
+    private Tranquilizer<E> tranquilizer = null;
+    private ITupleDruidEventMapper<E> druidEventMapper = null;
+
+    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, ITupleDruidEventMapper<E> druidEventMapper, DruidConfig druidConfig) {
+        this.beamFactory = beamFactory;
+        this.druidConfig = druidConfig;
+        this.druidEventMapper = druidEventMapper;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+        tranquilizer = Tranquilizer.builder()
+                .maxBatchSize(druidConfig.getMaxBatchSize())
+                .maxPendingBatches(druidConfig.getMaxPendingBatches())
+                .lingerMillis(druidConfig.getLingerMillis())
+                .blockOnFull(druidConfig.isBlockOnFull())
+                .build(beamFactory.makeBeam(stormConf, context));
+        this.tranquilizer.start();
+    }
+
+    @Override
+    public void execute(final Tuple tuple) {
+      Future future = tranquilizer.send((druidEventMapper.getEvent(tuple)));
+      future.addEventListener(new FutureEventListener() {
+          @Override
+          public void onFailure(Throwable cause) {
+              if (cause instanceof MessageDroppedException) {
+                  collector.ack(tuple);
+                  if (druidConfig.getDiscardStreamId() != null)
+                      collector.emit(druidConfig.getDiscardStreamId(), new Values(tuple, System.currentTimeMillis()));
+              }
+              else {
+                  collector.fail(tuple);
+              }
+          }
+
+          @Override
+          public void onSuccess(Object value) {
+                  collector.ack(tuple);
+          }
+      });
+
+    }
+
+    @Override
+    public void cleanup() {
+        tranquilizer.stop();
+    }
+
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declareStream(druidConfig.getDiscardStreamId(), new Fields("tuple", "timestamp"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d51e696b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java
new file mode 100644
index 0000000..7d1866f
--- /dev/null
+++ b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid.bolt;
+
+import com.metamx.tranquility.beam.Beam;
+import org.apache.storm.task.IMetricsContext;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public interface DruidBeamFactory<E>  extends  Serializable {
+    public Beam<E> makeBeam(Map<?,?> conf, IMetricsContext metrics);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d51e696b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java
new file mode 100644
index 0000000..081d9ff
--- /dev/null
+++ b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid.bolt;
+
+import com.metamx.tranquility.tranquilizer.Tranquilizer;
+
+import java.io.Serializable;
+
+public class DruidConfig implements Serializable {
+
+    public static final String DEFAULT_DISCARD_STREAM_ID = "druid-discard-stream";
+
+    //Tranquilizer configs for DruidBeamBolt
+    private int maxBatchSize;
+    private int maxPendingBatches;
+    private long lingerMillis;
+    private boolean blockOnFull;
+    private String discardStreamId;
+
+    public int getMaxBatchSize() {
+        return maxBatchSize;
+    }
+
+    public int getMaxPendingBatches() {
+        return maxPendingBatches;
+    }
+
+    public long getLingerMillis() {
+        return lingerMillis;
+    }
+
+    public boolean isBlockOnFull() {
+        return blockOnFull;
+    }
+
+    public String getDiscardStreamId() {
+        return discardStreamId;
+    }
+
+    private DruidConfig(Builder builder) {
+        this.maxBatchSize = builder.maxBatchSize;
+        this.maxPendingBatches = builder.maxPendingBatches;
+        this.lingerMillis = builder.lingerMillis;
+        this.blockOnFull = builder.blockOnFull;
+        this.discardStreamId = builder.discardStreamId;
+    }
+
+    public static DruidConfig.Builder newBuilder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private int maxBatchSize = Tranquilizer.DefaultMaxBatchSize();
+        private int maxPendingBatches = Tranquilizer.DefaultMaxPendingBatches();
+        private long lingerMillis = Tranquilizer.DefaultLingerMillis();
+        private boolean blockOnFull =  Tranquilizer.DefaultBlockOnFull();
+        private String discardStreamId = null;
+
+        public Builder maxBatchSize(int maxBatchSize) {
+            this.maxBatchSize = maxBatchSize;
+            return this;
+        }
+
+        public Builder maxPendingBatches(int maxPendingBatches) {
+            this.maxPendingBatches = maxPendingBatches;
+            return this;
+        }
+
+        public Builder lingerMillis(int lingerMillis) {
+            this.lingerMillis = lingerMillis;
+            return this;
+        }
+
+        public Builder blockOnFull(boolean blockOnFull) {
+            this.blockOnFull = blockOnFull;
+            return this;
+        }
+
+        public Builder discardStreamId(String discardStreamId) {
+            this.discardStreamId = discardStreamId;
+            return this;
+        }
+
+        public DruidConfig build() {
+            return new DruidConfig(this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d51e696b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java
new file mode 100644
index 0000000..0ae0233
--- /dev/null
+++ b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid.bolt;
+
+import org.apache.storm.tuple.ITuple;
+
+import java.io.Serializable;
+
+/**
+ * This class gives a mapping of a {@link ITuple} to Druid Event
+ *
+ */
+public interface ITupleDruidEventMapper<E> extends Serializable {
+
+    /**
+     * Returns a Druid Event for a given {@code tuple}.
+     *
+     * @param tuple tuple instance
+     */
+    public E getEvent(ITuple tuple);
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d51e696b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java
new file mode 100644
index 0000000..67b7cc0
--- /dev/null
+++ b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid.bolt;
+
+import org.apache.storm.tuple.ITuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Converts {@link ITuple} to Event
+ */
+public final class TupleDruidEventMapper<E> implements ITupleDruidEventMapper<E> {
+
+    public static final String DEFAULT_FIELD_NAME = "event";
+
+    private final String eventFiledName;
+
+    public TupleDruidEventMapper(String eventFiledName) {
+        this.eventFiledName =  eventFiledName;
+    }
+
+    @Override
+    public E getEvent(ITuple tuple) {
+        return (E) tuple.getValueByField(eventFiledName);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d51e696b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java
new file mode 100644
index 0000000..e59fea9
--- /dev/null
+++ b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid.trident;
+
+import com.metamx.tranquility.beam.Beam;
+import com.metamx.tranquility.beam.SendResult;
+import com.twitter.util.Await;
+import com.twitter.util.Future;
+import org.apache.storm.druid.bolt.ITupleDruidEventMapper;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+
+/**
+ * Trident {@link State} implementation for Druid.
+ */
+public class DruidBeamState<E> implements State {
+    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamState.class);
+
+    private Beam<E> beam = null;
+    private ITupleDruidEventMapper<E> druidEventMapper = null;
+
+    public DruidBeamState(Beam<E> beam, ITupleDruidEventMapper<E> druidEventMapper) {
+        this.beam = beam;
+        this.druidEventMapper = druidEventMapper;
+    }
+
+    public  List<E> update(List<TridentTuple> tuples, TridentCollector collector) {
+        List<E> events = new ArrayList<>(tuples.size());
+        for (TridentTuple tuple: tuples) {
+            events.add(druidEventMapper.getEvent(tuple));
+        }
+
+        LOG.info("Sending [{}] events", events.size());
+        scala.collection.immutable.List<E> scalaList = scala.collection.JavaConversions.collectionAsScalaIterable(events).toList();
+        Collection<Future<SendResult>> futureList = scala.collection.JavaConversions.asJavaCollection(beam.sendAll(scalaList));
+        List<E> discardedEvents = new ArrayList<>();
+
+        int index = 0;
+        for (Future<SendResult> future : futureList) {
+            try {
+                SendResult result = Await.result(future);
+                if (!result.sent()) {
+                    discardedEvents.add(events.get(index));
+                }
+            } catch (Exception e) {
+                LOG.error("Failed in writing messages to Druid", e);
+            }
+            index++;
+        }
+
+        return discardedEvents;
+
+    }
+
+    public void close() {
+        try {
+            Await.result(beam.close());
+        } catch (Exception e) {
+            LOG.error("Error while closing Druid beam client", e);
+        }
+    }
+
+    @Override
+    public void beginCommit(Long txid) {
+
+    }
+
+    @Override
+    public void commit(Long txid) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d51e696b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java
new file mode 100644
index 0000000..b745cdd
--- /dev/null
+++ b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid.trident;
+
+import org.apache.storm.druid.bolt.DruidBeamFactory;
+import org.apache.storm.druid.bolt.ITupleDruidEventMapper;
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.util.Map;
+
+public class DruidBeamStateFactory<E> implements StateFactory {
+    DruidBeamFactory beamFactory = null;
+    ITupleDruidEventMapper druidEventMapper = null;
+
+    public DruidBeamStateFactory(DruidBeamFactory<E> beamFactory,  ITupleDruidEventMapper<E> druidEventMapper) {
+        this.beamFactory = beamFactory;
+        this.druidEventMapper = druidEventMapper;
+    }
+
+    @Override
+    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+        return new DruidBeamState(beamFactory.makeBeam(conf , metrics), druidEventMapper);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d51e696b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java
new file mode 100644
index 0000000..d8e2b78
--- /dev/null
+++ b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid.trident;
+
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+public class DruidBeamStateUpdater<E> extends BaseStateUpdater<DruidBeamState<E>> {
+    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamStateUpdater.class);
+
+    @Override
+    public void updateState(DruidBeamState<E> state, List<TridentTuple> tuples, TridentCollector collector) {
+        List<E> discardedTuples = state.update(tuples, collector);
+        processDiscardedTuples(discardedTuples);
+    }
+
+    /**
+     * Users can override this method to  process the discarded Tuples
+     * @param discardedTuples
+     */
+    protected void processDiscardedTuples(List<E> discardedTuples) {
+        LOG.debug("discarded messages : [{}]" , discardedTuples);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d51e696b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java
new file mode 100644
index 0000000..5c1b4ed
--- /dev/null
+++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.druid;
+
+import com.google.common.collect.ImmutableList;
+import com.metamx.common.Granularity;
+import com.metamx.tranquility.beam.Beam;
+import com.metamx.tranquility.beam.ClusteredBeamTuning;
+import com.metamx.tranquility.druid.DruidBeamConfig;
+import com.metamx.tranquility.druid.DruidBeams;
+import com.metamx.tranquility.druid.DruidDimensions;
+import com.metamx.tranquility.druid.DruidLocation;
+import com.metamx.tranquility.druid.DruidRollup;
+import com.metamx.tranquility.typeclass.Timestamper;
+import io.druid.data.input.impl.TimestampSpec;
+import io.druid.granularity.QueryGranularities;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.storm.druid.bolt.DruidBeamFactory;
+import org.apache.storm.task.IMetricsContext;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Druid bolt must be supplied with a BeamFactory. You can implement one of these using the
+ * [DruidBeams builder's] (https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala)
+ * "buildBeam()" method. See the [Configuration documentation] (https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details.
+ * For more details refer [Tranquility library] (https://github.com/druid-io/tranquility) docs.
+ */
+public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {
+    Map<String, Object> factoryConf = null;
+
+
+    public SampleDruidBeamFactoryImpl(Map<String, Object> factoryConf) {
+        this.factoryConf = factoryConf; // This can be used to pass config values
+    }
+
+    @Override
+    public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
+
+
+        final String indexService = "druid/overlord"; // Your overlord's druid.service
+        final String discoveryPath = "/druid/discovery"; // Your overlord's druid.discovery.curator.path
+        final String dataSource = "test";
+        final List<String> dimensions = ImmutableList.of("publisher", "advertiser");
+        List<AggregatorFactory> aggregator = ImmutableList.<AggregatorFactory>of(
+                new CountAggregatorFactory(
+                        "click"
+                )
+        );
+        // Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>).
+        final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>()
+        {
+            @Override
+            public DateTime timestamp(Map<String, Object> theMap)
+            {
+                return new DateTime(theMap.get("timestamp"));
+            }
+        };
+
+        // Tranquility uses ZooKeeper (through Curator) for coordination.
+        final CuratorFramework curator = CuratorFrameworkFactory
+                .builder()
+                .connectString((String)conf.get("druid.tranquility.zk.connect")) // we can use Storm conf to get config values
+                .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
+                .build();
+        curator.start();
+
+        // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,
+        // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.
+        final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
+
+        // Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is
+        // done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```.
+        // In this case, we won't provide one, so we're just using Jackson.
+        final Beam<Map<String, Object>> beam = DruidBeams
+                .builder(timestamper)
+                .curator(curator)
+                .discoveryPath(discoveryPath)
+                .location(DruidLocation.create(indexService, dataSource))
+                .timestampSpec(timestampSpec)
+                .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregator, QueryGranularities.MINUTE))
+                .tuning(
+                        ClusteredBeamTuning
+                                .builder()
+                                .segmentGranularity(Granularity.HOUR)
+                                .windowPeriod(new Period("PT10M"))
+                                .partitions(1)
+                                .replicants(1)
+                                .build()
+                )
+                .druidBeamConfig(
+                        DruidBeamConfig
+                                .builder()
+                                .indexRetryPeriod(new Period("PT10M"))
+                                .build())
+                .buildBeam();
+
+        return beam;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/d51e696b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
new file mode 100644
index 0000000..99a6f67
--- /dev/null
+++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.druid.bolt.DruidBeamBolt;
+import org.apache.storm.druid.bolt.DruidBeamFactory;
+import org.apache.storm.druid.bolt.DruidConfig;
+import org.apache.storm.druid.bolt.ITupleDruidEventMapper;
+import org.apache.storm.druid.bolt.TupleDruidEventMapper;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Sample application to use Druid bolt.
+ *
+ * To test this we need to deploy Druid application. Refer Druid quickstart to run druid.
+ * http://druid.io/docs/latest/tutorials/quickstart.html
+ */
+public class SampleDruidBoltTopology {
+
+    public static void main(String[] args) throws Exception {
+        if(args.length == 0) {
+          throw new IllegalArgumentException("There should be at least one argument. Run as `SampleDruidBoltTopology <zk-url>`");
+        }
+
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+        topologyBuilder.setSpout("event-gen", new SimpleSpout(), 5);
+        DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
+        DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build();
+        ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
+        DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, druidConfig);
+        topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");
+        topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , druidConfig.getDiscardStreamId());
+
+        Config conf = new Config();
+        conf.setDebug(true);
+        conf.put("druid.tranquility.zk.connect", args[0]);
+
+        if (args.length > 1) {
+            conf.setNumWorkers(3);
+
+            StormSubmitter.submitTopologyWithProgressBar(args[1], conf, topologyBuilder.createTopology());
+        } else {
+            conf.setMaxTaskParallelism(3);
+
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("druid-test", conf, topologyBuilder.createTopology());
+
+            Thread.sleep(30000);
+
+            cluster.shutdown();
+            System.exit(0);
+        }
+    }
+
+    private static class PrinterBolt extends BaseBasicBolt {
+
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            System.out.println(tuple);
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer ofd) {
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d51e696b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
new file mode 100644
index 0000000..0e20ecd
--- /dev/null
+++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.druid.bolt.DruidBeamFactory;
+import org.apache.storm.druid.bolt.ITupleDruidEventMapper;
+import org.apache.storm.druid.bolt.TupleDruidEventMapper;
+import org.apache.storm.druid.trident.DruidBeamStateFactory;
+import org.apache.storm.druid.trident.DruidBeamStateUpdater;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.Consumer;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Sample application to use Druid Trident bolt.
+ *
+ * To test this we need to deploy Druid application. Refer Druid quickstart to run druid.
+ * http://druid.io/docs/latest/tutorials/quickstart.html
+ */
+public class SampleDruidBoltTridentTopology {
+    private static final Logger LOG = LoggerFactory.getLogger(SampleDruidBoltTridentTopology.class);
+
+    public static void main(String[] args) throws Exception {
+        if(args.length == 0) {
+          throw new IllegalArgumentException("There should be at least one argument. Run as `SampleDruidBoltTridentTopology <zk-url>`");
+        }
+
+        TridentTopology tridentTopology = new TridentTopology();
+        DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
+        ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
+
+        final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));
+
+        stream.peek(new Consumer() {
+            @Override
+            public void accept(TridentTuple input) {
+                LOG.info("########### Received tuple: [{}]", input);
+            }
+        }).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater());
+
+        Config conf = new Config();
+
+
+
+        conf.setDebug(true);
+        conf.put("druid.tranquility.zk.connect", args[0]);
+
+        if (args.length > 1) {
+            conf.setNumWorkers(3);
+
+            StormSubmitter.submitTopologyWithProgressBar(args[1], conf, tridentTopology.build());
+        } else {
+            conf.setMaxTaskParallelism(3);
+
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("druid-test", conf, tridentTopology.build());
+
+            Thread.sleep(30000);
+
+            cluster.shutdown();
+            System.exit(0);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d51e696b/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java
new file mode 100644
index 0000000..cb30b7c
--- /dev/null
+++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.joda.time.DateTime;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * BatchSpout implementation for event batch  generation.
+ */
+public class SimpleBatchSpout implements IBatchSpout {
+
+    private int batchSize;
+    private final Map<Long, List<List<Object>>> batches = new HashMap<>();
+
+    public SimpleBatchSpout(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context) {
+    }
+
+    @Override
+    public void emitBatch(long batchId, TridentCollector collector) {
+        List<List<Object>> values;
+        if(batches.containsKey(batchId)) {
+            values = batches.get(batchId);
+        } else {
+            values = new ArrayList<>();
+            for (int i = 0; i < batchSize; i++) {
+                List<Object> value = new ArrayList<>();
+                Map<String, Object> event = new LinkedHashMap<>();
+                event.put("timestamp", new DateTime().toString());
+                event.put("publisher", "foo.com");
+                event.put("advertiser", "google.com");
+                event.put("click", i);
+                value.add(event);
+                values.add(value);
+            }
+            batches.put(batchId, values);
+        }
+        for (List<Object> value : values) {
+            collector.emit(value);
+        }
+
+    }
+
+    @Override
+    public void ack(long batchId) {
+        batches.remove(batchId);
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        Config conf = new Config();
+        conf.setMaxTaskParallelism(1);
+        return conf;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return SimpleSpout.DEFAULT_FIELDS;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d51e696b/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java
new file mode 100644
index 0000000..ec0f0bf
--- /dev/null
+++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.druid;
+
+import org.apache.storm.druid.bolt.TupleDruidEventMapper;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.joda.time.DateTime;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class SimpleSpout extends BaseRichSpout {
+    SpoutOutputCollector _collector;
+    int i = 1;
+
+    public static final Fields DEFAULT_FIELDS = new Fields(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        _collector = collector;
+    }
+
+    @Override
+    public void nextTuple() {
+        Utils.sleep(1000);
+        Map<String, Object> event = new LinkedHashMap<>();
+        event.put("timestamp", new DateTime().toString());
+        event.put("publisher", "foo.com");
+        event.put("advertiser", "google.com");
+        event.put("click", i++);
+        _collector.emit(new Values(event));
+    }
+
+    @Override
+    public void ack(Object id) {
+    }
+
+    @Override
+    public void fail(Object id) {
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(DEFAULT_FIELDS);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d51e696b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c6f03cf..184e816 100644
--- a/pom.xml
+++ b/pom.xml
@@ -292,6 +292,7 @@
         <module>external/storm-opentsdb</module>
         <module>external/storm-kafka-monitor</module>
         <module>external/storm-kinesis</module>
+        <module>external/storm-druid</module>
     </modules>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/storm/blob/d51e696b/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index 0471501..1ad8f0d 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -382,7 +382,20 @@
                 <include>storm*jar</include>
             </includes>
         </fileSet>
-
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-druid/target</directory>
+            <outputDirectory>external/storm-druid</outputDirectory>
+            <includes>
+                <include>storm*jar</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-druid</directory>
+            <outputDirectory>external/storm-druid</outputDirectory>
+            <includes>
+                <include>README.*</include>
+            </includes>
+        </fileSet>
         <!-- $STORM_HOME/extlib -->
         <fileSet>
             <directory></directory>


Mime
View raw message