storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srihar...@apache.org
Subject [1/5] storm git commit: STORM-851: Storm Solr Connector 1. SolrUpdate Bolt 2. Trident State implementation 3. Fields Mapper 4. JSON Mapper 5. Integration Tests
Date Thu, 27 Aug 2015 17:03:50 GMT
Repository: storm
Updated Branches:
  refs/heads/master 528958c27 -> eb6107b93


STORM-851: Storm Solr Connector
1. SolrUpdate Bolt
2. Trident State implementation
3. Fields Mapper
4. JSON Mapper
5. Integration Tests


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4ab6e0c0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4ab6e0c0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4ab6e0c0

Branch: refs/heads/master
Commit: 4ab6e0c02aaddbd1ad0b1334a5c458f940271827
Parents: 528958c
Author: Hugo Louro <hmclouro@gmail.com>
Authored: Thu Jul 16 19:10:30 2015 -0700
Committer: Hugo Louro <hmclouro@gmail.com>
Committed: Tue Aug 25 08:18:17 2015 -0700

----------------------------------------------------------------------
 .gitignore                                      |   2 +-
 external/storm-solr/pom.xml                     | 110 +++++++++++++
 .../java/org/apache/storm/solr/bolt/.gitignore  |   1 +
 .../storm/solr/bolt/AbstractSolrBolt.java       |  33 ++++
 .../apache/storm/solr/bolt/SolrUpdateBolt.java  | 107 ++++++++++++
 .../storm/solr/config/CountBasedCommit.java     |  39 +++++
 .../storm/solr/config/SolrCommitStrategy.java   |  14 ++
 .../apache/storm/solr/config/SolrConfig.java    |  26 +++
 .../org/apache/storm/solr/mapper/.gitignore     |   1 +
 .../storm/solr/mapper/SolrFieldsMapper.java     | 144 ++++++++++++++++
 .../storm/solr/mapper/SolrJsonMapper.java       |  97 +++++++++++
 .../apache/storm/solr/mapper/SolrMapper.java    |  17 ++
 .../storm/solr/mapper/SolrMapperException.java  |   9 +
 .../org/apache/storm/solr/schema/CopyField.java |  35 ++++
 .../org/apache/storm/solr/schema/Field.java     |  45 +++++
 .../org/apache/storm/solr/schema/FieldType.java |  48 ++++++
 .../org/apache/storm/solr/schema/Schema.java    |  99 +++++++++++
 .../storm/solr/schema/SolrFieldTypeFinder.java  | 165 +++++++++++++++++++
 .../schema/builder/RestJsonSchemaBuilder.java   |  53 ++++++
 .../solr/schema/builder/SchemaBuilder.java      |  12 ++
 .../apache/storm/solr/trident/SolrState.java    |  66 ++++++++
 .../storm/solr/trident/SolrStateFactory.java    |  44 +++++
 .../apache/storm/solr/trident/SolrUpdater.java  |  32 ++++
 .../storm/solr/spout/SolrFieldsSpout.java       |  60 +++++++
 .../apache/storm/solr/spout/SolrJsonSpout.java  | 112 +++++++++++++
 .../org/apache/storm/solr/topology/.gitignore   |   1 +
 .../storm/solr/topology/SolrFieldsTopology.java |  40 +++++
 .../storm/solr/topology/SolrJsonTopology.java   |  33 ++++
 .../storm/solr/topology/SolrTopology.java       |  67 ++++++++
 .../solr/trident/SolrFieldsTridentTopology.java |  30 ++++
 .../solr/trident/SolrJsonTridentTopology.java   |  30 ++++
 .../org/apache/storm/solr/util/TestUtil.java    |  17 ++
 pom.xml                                         |   4 +
 33 files changed, 1592 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 02816a1..4e08a2e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,7 +28,7 @@ target
 *.ipr
 *.iws
 .idea
-.*
+#.*
 !/.travis.yml
 !/.gitignore
 _site

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-solr/pom.xml b/external/storm-solr/pom.xml
new file mode 100644
index 0000000..4213e0c
--- /dev/null
+++ b/external/storm-solr/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>storm-solr</artifactId>
+
+    <developers>
+        <developer>
+            <id>Hugo-Louro</id>
+            <name>Hugo Louro</name>
+            <email>hmclouro@gmail.com</email>
+        </developer>
+    </developers>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <!-- Solr and its dependencies -->
+        <dependency>
+            <groupId>org.apache.solr</groupId>
+            <artifactId>solr-solrj</artifactId>
+            <version>5.2.1</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.solr</groupId>
+            <artifactId>solr-core</artifactId>
+            <version>5.2.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.solr</groupId>
+            <artifactId>solr-test-framework</artifactId>
+            <version>5.2.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+            <version>1.3</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-httpclient</groupId>
+            <artifactId>commons-httpclient</artifactId>
+            <version>3.1</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>1.4</version>
+        </dependency>
+        <!--test-->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>2.3.1</version>
+        </dependency>
+        <!--<dependency>-->
+            <!--<groupId>com.googlecode.json-simple</groupId>-->
+            <!--<artifactId>json-simple</artifactId>-->
+        <!--</dependency>-->
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.5</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            <mainClass>org.apache.storm.solr.topology.SolrFieldsTopology</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/.gitignore
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/.gitignore b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/.gitignore
new file mode 100644
index 0000000..1adc831
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/.gitignore
@@ -0,0 +1 @@
+# Created by .ignore support plugin (hsz.mobi)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/AbstractSolrBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/AbstractSolrBolt.java b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/AbstractSolrBolt.java
new file mode 100644
index 0000000..cd3261f
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/AbstractSolrBolt.java
@@ -0,0 +1,33 @@
+package org.apache.storm.solr.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.storm.solr.config.SolrConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Created by hlouro on 7/17/15.
+ */
+public abstract class AbstractSolrBolt extends BaseRichBolt {
+    protected OutputCollector collector;
+    protected SolrConfig solrConfig;
+    protected SolrClient solrClient;
+
+    public AbstractSolrBolt(SolrConfig solrConfig) {
+        this.solrConfig = solrConfig;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+        solrClient = new CloudSolrClient(solrConfig.getZkHostString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
new file mode 100644
index 0000000..fe90b73
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
@@ -0,0 +1,107 @@
+package org.apache.storm.solr.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.storm.solr.config.SolrCommitStrategy;
+import org.apache.storm.solr.config.SolrConfig;
+import org.apache.storm.solr.mapper.SolrMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by hlouro on 7/19/15.
+ */
+public class SolrUpdateBolt extends AbstractSolrBolt {
+    private final Logger logger = LoggerFactory.getLogger(SolrUpdateBolt.class);
+    private final SolrMapper solrMapper;
+    private final SolrCommitStrategy commitStgy;
+    private List<Tuple> toCommitTuples;
+    private final String ackFailLock = "LOCK";      //serializable lock
+
+
+    public SolrUpdateBolt(SolrConfig solrConfig, SolrMapper solrMapper) {
+        this(solrConfig, solrMapper, null);
+    }
+
+    public SolrUpdateBolt(SolrConfig solrConfig, SolrMapper solrMapper, SolrCommitStrategy commitStgy) {
+        super(solrConfig);
+        this.solrMapper = solrMapper;
+        this.commitStgy = commitStgy;
+        logger.info("Created {} with the following configuration: " +
+                    "[SolrConfig = {}], [SolrMapper = {}], [CommitStgy = {}]",
+                    this.getClass().getSimpleName(), solrConfig, solrMapper, commitStgy);
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        super.prepare(stormConf, context, collector);
+        this.toCommitTuples = new LinkedList<>();
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try {
+            SolrRequest request = solrMapper.toSolrRequest(tuple);
+            solrClient.request(request, solrMapper.getCollection());
+            ack(tuple);
+        } catch (Exception e) {
+            fail(tuple, e);
+        }
+    }
+
+    private void ack(Tuple tuple) throws SolrServerException, IOException {
+        if (commitStgy == null) {
+            collector.ack(tuple);
+        } else {
+            synchronized(ackFailLock) {
+                toCommitTuples.add(tuple);
+                commitStgy.update();
+            }
+            if (commitStgy.commit()) {
+                solrClient.commit(solrMapper.getCollection());
+                ackCommittedTuples();
+            }
+        }
+    }
+
+    private void ackCommittedTuples() {
+        List<Tuple> toAckTuples = getQueuedTuples();
+        for (Tuple tuple : toAckTuples) {
+            collector.ack(tuple);
+        }
+    }
+
+    private void fail(Tuple tuple, Exception e) {
+        collector.reportError(e);
+
+        if (commitStgy == null) {
+            collector.fail(tuple);
+        } else {
+            List<Tuple> failedTuples = getQueuedTuples();
+            for (Tuple failedTuple : failedTuples) {
+                collector.fail(failedTuple);
+            }
+        }
+    }
+
+    private List<Tuple> getQueuedTuples() {
+        List<Tuple> queuedTuples;
+        synchronized(ackFailLock) {
+            queuedTuples = toCommitTuples;
+            toCommitTuples = new LinkedList<>();
+        }
+        return queuedTuples;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) { }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/config/CountBasedCommit.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/config/CountBasedCommit.java b/external/storm-solr/src/main/java/org/apache/storm/solr/config/CountBasedCommit.java
new file mode 100644
index 0000000..3877980
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/config/CountBasedCommit.java
@@ -0,0 +1,39 @@
+package org.apache.storm.solr.config;
+
+/**
+ * Class defining a count based commit strategy. When the count reaches the commit threshold,
+ * SolrInputDocuments are committed to Solr.
+ *
+ * Created by hlouro on 7/29/15.
+ */
+public class CountBasedCommit implements SolrCommitStrategy {
+    private int threshHold;
+    private int count;
+
+    /**
+     * Initializes a count based commit strategy with the specified threshold
+     *
+     * @param threshold  The commit threshold, defining when SolrInputDocuments should be committed to Solr
+     * */
+    public CountBasedCommit(int threshold) {
+        if (threshold < 1) {
+            throw new IllegalArgumentException("Threshold must be a positive integer: " + threshold);
+        }
+        this.threshHold = threshold;
+    }
+
+    @Override
+    public boolean commit() {
+        return count != 0 && count % threshHold == 0;
+    }
+
+
+    @Override
+    public void update() {
+        count++;
+    }
+
+    public int getCount() {
+        return count;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrCommitStrategy.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrCommitStrategy.java b/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrCommitStrategy.java
new file mode 100644
index 0000000..0ec9149
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrCommitStrategy.java
@@ -0,0 +1,14 @@
+package org.apache.storm.solr.config;
+
+import java.io.Serializable;
+
+/**
+ * Strategy definining when the Solr Bolt should commit the request to Solr.
+ * <p></p>
+ * Created by hlouro on 7/29/15.
+ */
+public interface SolrCommitStrategy extends Serializable {
+    boolean commit();
+
+    void update();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrConfig.java b/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrConfig.java
new file mode 100644
index 0000000..6a75ae7
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrConfig.java
@@ -0,0 +1,26 @@
+package org.apache.storm.solr.config;
+
+import org.apache.solr.client.solrj.SolrClient;
+
+import java.io.Serializable;
+
+/**
+ * Class containing Solr configuration to be made available to Storm Solr bolts. Any configuration needed in
+ * the bolts should be put in this class.
+ * <p></p>
+ * Created by hlouro on 7/29/15.
+ */
+public class SolrConfig implements Serializable {
+    private String zkHostString;
+
+    /**
+     * @param zkHostString Zookeeper host string as defined in the {@link SolrClient} constructor
+     * */
+    public SolrConfig(String zkHostString) {
+        this.zkHostString = zkHostString;
+    }
+
+    public String getZkHostString() {
+        return zkHostString;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/.gitignore
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/.gitignore b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/.gitignore
new file mode 100644
index 0000000..1adc831
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/.gitignore
@@ -0,0 +1 @@
+# Created by .ignore support plugin (hsz.mobi)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrFieldsMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrFieldsMapper.java b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrFieldsMapper.java
new file mode 100644
index 0000000..df89a47
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrFieldsMapper.java
@@ -0,0 +1,144 @@
+package org.apache.storm.solr.mapper;
+
+import static org.apache.storm.solr.schema.SolrFieldTypeFinder.FieldTypeWrapper;
+
+import backtype.storm.tuple.ITuple;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.storm.solr.schema.Field;
+import org.apache.storm.solr.schema.Schema;
+import org.apache.storm.solr.schema.builder.SchemaBuilder;
+import org.apache.storm.solr.schema.SolrFieldTypeFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Created by hlouro on 7/24/15.
+ */
+public class SolrFieldsMapper implements SolrMapper {
+    private static final Logger logger = LoggerFactory.getLogger(SolrFieldsMapper.class);
+    private String collection;
+    private SolrFieldTypeFinder typeFinder;
+    private String multiValueFieldToken;
+
+    public static class Builder {
+        private String collection;
+        private SolrFieldTypeFinder typeFinder;
+        private String multiValueFieldToken = "|";
+
+        public Builder(SchemaBuilder schemaBuilder) {
+            setTypeFinder(schemaBuilder);
+        }
+
+        //TODO Handle the case where there may be no schema
+        private void setTypeFinder(SchemaBuilder schemaBuilder) {
+            Schema schema = schemaBuilder.getSchema();
+            typeFinder = new SolrFieldTypeFinder(schema);
+        }
+
+        public Builder setCollection(String collection) {
+            this.collection = collection;
+            return this;
+        }
+
+        public Builder setDefaultCollection(SolrClient solrClient) {
+            String defaultCollection = null;
+            if (solrClient instanceof CloudSolrClient) {
+                defaultCollection = ((CloudSolrClient) solrClient).getDefaultCollection();
+            }
+            this.collection = defaultCollection;
+            return this;
+        }
+
+        /**
+         * Sets the token that separates multivalue fields in tuples. The default token is |
+         * */
+        public Builder setMultiValueFieldToken(String multiValueFieldToken) {
+            this.multiValueFieldToken = multiValueFieldToken;
+            return this;
+        }
+
+        public SolrFieldsMapper build() {
+            return new SolrFieldsMapper(this);
+        }
+    }
+
+    private SolrFieldsMapper(Builder builder) {
+        this.collection = builder.collection;
+        this.typeFinder = builder.typeFinder;
+        this.multiValueFieldToken = builder.multiValueFieldToken;
+    }
+
+    @Override
+    public String getCollection() {
+        return collection;
+    }
+
+    @Override
+    public SolrRequest toSolrRequest(List<? extends ITuple> tuples) throws SolrMapperException {
+        List<SolrInputDocument> docs = new LinkedList<>();
+        for (ITuple tuple : tuples) {
+            docs.add(buildDocument(tuple));
+        }
+        UpdateRequest request = new UpdateRequest();
+        request.add(docs);
+        return request;
+    }
+
+    @Override
+    public SolrRequest toSolrRequest(ITuple tuple) throws SolrMapperException {
+        SolrInputDocument doc = buildDocument(tuple);
+        UpdateRequest request = new UpdateRequest();
+        request.add(doc);
+        return request;
+    }
+
+    private SolrInputDocument buildDocument(ITuple tuple) {
+        SolrInputDocument doc = new SolrInputDocument();
+
+        for (String tupleField : tuple.getFields()) {
+            FieldTypeWrapper fieldTypeWrapper = typeFinder.getFieldTypeWrapper(tupleField);
+            if (fieldTypeWrapper != null) {
+                Field field = fieldTypeWrapper.getField();
+                if (field.isMultiValued()) {
+                    addMultivalueFieldToDoc(doc, tupleField, tuple);
+                } else {
+                    addFieldToDoc(doc, tupleField, tuple);
+                }
+            } else {
+                logger.info("Field [{}] does NOT match static or dynamic field declared in schema. Not added to document", tupleField);
+            }
+        }
+        return doc;
+    }
+
+    private void addFieldToDoc(SolrInputDocument doc, String tupleField, ITuple tuple) {
+        Object val = getValue(tupleField, tuple);
+        logger.info("Adding to document (field, val) = ({}, {})", tupleField, val);
+        doc.addField(tupleField, val);
+    }
+
+    private void addMultivalueFieldToDoc(SolrInputDocument doc, String tupleField, ITuple tuple) {
+        String[] values = getValues(tupleField, tuple);
+        for (String value : values) {
+            logger.info("Adding {} to multivalue field document {}", value, tupleField);
+            doc.addField(tupleField, value);
+        }
+    }
+
+    private Object getValue(String field, ITuple tuple) {
+        return tuple.getValueByField(field);
+    }
+
+    private String[] getValues(String field, ITuple tuple) {
+        String multiValueField = tuple.getStringByField(field);
+        String[] values = multiValueField.split(multiValueFieldToken);
+        return values;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java
new file mode 100644
index 0000000..ef5bb3c
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java
@@ -0,0 +1,97 @@
+package org.apache.storm.solr.mapper;
+
+import backtype.storm.tuple.ITuple;
+import com.google.gson.Gson;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
+import org.apache.solr.common.util.ContentStream;
+import org.apache.solr.common.util.ContentStreamBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Created by hlouro on 7/24/15.
+ */
+public class SolrJsonMapper implements SolrMapper {
+    private static final Logger logger = LoggerFactory.getLogger(SolrJsonMapper.class);
+    private static final String JSON_UPDATE_URL = "/update/json/docs";
+    private static final String CONTENT_TYPE = "application/json";;
+
+    private final String jsonTupleField;
+    private final String collection;
+
+    public SolrJsonMapper(String collection, String jsonTupleField) {
+        this.collection = collection;
+        this.jsonTupleField = jsonTupleField;
+    }
+
+    /** Uses default collection */
+    public SolrJsonMapper(SolrClient solrClient, String jsonTupleField) {
+        String defaultCollection = null;
+        if (solrClient instanceof CloudSolrClient) {
+            defaultCollection = ((CloudSolrClient) solrClient).getDefaultCollection();
+        }
+        this.collection = defaultCollection;
+        this.jsonTupleField = jsonTupleField;
+    }
+
+    @Override
+    public String getCollection() {
+        return collection;
+    }
+
+    @Override
+    public SolrRequest toSolrRequest(List<? extends ITuple> tuples) throws SolrMapperException {
+        final String jsonList = getJsonFromTuples(tuples);
+        return createtSolrRequest(jsonList);
+    }
+
+    @Override
+    public SolrRequest toSolrRequest(ITuple tuple) throws SolrMapperException {
+        final String json = getJsonFromTuple(tuple);
+        return createtSolrRequest(json);
+    }
+
+    private SolrRequest createtSolrRequest(String json) {
+        ContentStreamUpdateRequest request = new ContentStreamUpdateRequest(JSON_UPDATE_URL);
+        ContentStream cs = new ContentStreamBase.StringStream(json, CONTENT_TYPE);
+        request.addContentStream(cs);
+        logger.info("Request generated with JSON: " + json);
+        return request;
+    }
+
+    private String getJsonFromTuples(List<? extends ITuple> tuples) throws SolrMapperException {
+        final StringBuilder jsonListBuilder = new StringBuilder("[");
+        for (ITuple tuple : tuples) {
+            final String json = getJsonFromTuple(tuple);
+            jsonListBuilder.append(json).append(",");
+        }
+        jsonListBuilder.setCharAt(jsonListBuilder.length() - 1, ']');
+        return jsonListBuilder.toString();
+    }
+
+    private String getJsonFromTuple(ITuple tuple) throws SolrMapperException {
+        String json = "";
+        if (tuple.contains(jsonTupleField)) {
+            json = doGetJson(tuple.getValueByField(jsonTupleField));
+        } else {
+            throw new SolrMapperException("Tuple does not contain JSON object");
+        }
+        return json;
+    }
+
+    private String doGetJson(Object value) {
+        String json = "";
+        if (value instanceof String) {
+            json = (String) value;          // Object associated with JSON field is already JSON
+        } else {
+            Gson gson = new Gson();
+            json = gson.toJson(value);      // Serializes a Java object to JSON
+        }
+        return json;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java
new file mode 100644
index 0000000..165498d
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java
@@ -0,0 +1,17 @@
+package org.apache.storm.solr.mapper;
+
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Tuple;
+import org.apache.solr.client.solrj.SolrRequest;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Created by hlouro on 7/22/15.
+ */
+public interface SolrMapper extends Serializable {
+    String getCollection();
+    SolrRequest toSolrRequest(ITuple tuple) throws SolrMapperException;
+    SolrRequest toSolrRequest(List<? extends ITuple> tuples) throws SolrMapperException;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapperException.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapperException.java b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapperException.java
new file mode 100644
index 0000000..b9ad5a3
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapperException.java
@@ -0,0 +1,9 @@
+package org.apache.storm.solr.mapper;
+
+/**
+ * Created by hlouro on 7/24/15.
+ */
+public class SolrMapperException extends Exception {
+    public SolrMapperException(String msg) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/schema/CopyField.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/schema/CopyField.java b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/CopyField.java
new file mode 100644
index 0000000..9fd291e
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/CopyField.java
@@ -0,0 +1,35 @@
+package org.apache.storm.solr.schema;
+
+import java.io.Serializable;
+
+/**
+ * Created by hlouro on 7/27/15.
+ */
+public class CopyField implements Serializable {
+    private String source;
+    private String dest;
+
+    public String getSource() {
+        return source;
+    }
+
+    public String getDest() {
+        return dest;
+    }
+
+    public void setSource(String source) {
+        this.source = source;
+    }
+
+    public void setDest(String dest) {
+        this.dest = dest;
+    }
+
+    @Override
+    public String toString() {
+        return "CopyField{" +
+                "source='" + source + '\'' +
+                ", dest='" + dest + '\'' +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/schema/Field.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/schema/Field.java b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/Field.java
new file mode 100644
index 0000000..55f9589
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/Field.java
@@ -0,0 +1,45 @@
+package org.apache.storm.solr.schema;
+
+import java.io.Serializable;
+
+/**
+ * Created by hlouro on 7/27/15.
+ */
+public class Field implements Serializable {
+    private String name;
+    private String type;
+    private boolean multiValued;
+
+    public String getName() {
+        return name;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public boolean isMultiValued() {
+        return multiValued;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public void setMultiValued(boolean multiValued) {
+        this.multiValued = multiValued;
+    }
+
+    @Override
+    public String toString() {
+        return "Field{" +
+                "name='" + name + '\'' +
+                ", type='" + type + '\'' +
+                ", multiValued=" + multiValued +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/schema/FieldType.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/schema/FieldType.java b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/FieldType.java
new file mode 100644
index 0000000..23bce92
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/FieldType.java
@@ -0,0 +1,48 @@
+package org.apache.storm.solr.schema;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.Serializable;
+
+/**
+ * Created by hlouro on 7/27/15.
+ */
+public class FieldType implements Serializable {
+    private String name;
+    @SerializedName("class")
+    private String clazz;
+    private boolean multiValued;
+
+    public String getName() {
+        return name;
+    }
+
+    public String getClazz() {
+        return clazz;
+    }
+
+    public boolean isMultiValued() {
+        return multiValued;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public void setClazz(String clazz) {
+        this.clazz = clazz;
+    }
+
+    public void setMultiValued(boolean multiValued) {
+        this.multiValued = multiValued;
+    }
+
+    @Override
+    public String toString() {
+        return "FieldType{" +
+                "name='" + name + '\'' +
+                ", clazz='" + clazz + '\'' +
+                ", multiValued=" + multiValued +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/schema/Schema.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/schema/Schema.java b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/Schema.java
new file mode 100644
index 0000000..91e1e09
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/Schema.java
@@ -0,0 +1,99 @@
+package org.apache.storm.solr.schema;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Class representing the SolrSchema as returned by the REST call to the URL of the form
+ * http://localhost:8983/solr/gettingstarted/schema. This particular URL returns the schema in JSON format for the
+ * <a href="http://lucene.apache.org/solr/quickstart.html">Solr quickstart</a> example running locally.
+ * <p></p>
+ * Created by hlouro on 7/27/15.
+ */
+public class Schema implements Serializable {
+    private String name;
+    private String version;
+    private String uniqueKey;
+    private List<FieldType> fieldTypes;
+    private List<Field> fields;
+    private List<Field> dynamicFields;
+    private List<CopyField> copyFields;
+
+    public String getName() {
+        return name;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public String getUniqueKey() {
+        return uniqueKey;
+    }
+
+    public List<FieldType> getFieldTypes() {
+        return Collections.unmodifiableList(fieldTypes);
+    }
+
+    public List<Field> getFields() {
+        return Collections.unmodifiableList(fields);
+    }
+
+    public List<Field> getDynamicFields() {
+        return Collections.unmodifiableList(dynamicFields);
+    }
+
+    public List<CopyField> getCopyFields() {
+        return Collections.unmodifiableList(copyFields);
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+
+    public void setUniqueKey(String uniqueKey) {
+        this.uniqueKey = uniqueKey;
+    }
+
+    public void setFieldTypes(List<FieldType> fieldTypes) {
+        this.fieldTypes = fieldTypes;
+    }
+
+    public void setFields(List<Field> fields) {
+        this.fields = fields;
+    }
+
+    public void setDynamicFields(List<Field> dynamicFields) {
+        this.dynamicFields = dynamicFields;
+    }
+
+    public void setCopyFields(List<CopyField> copyFields) {
+        this.copyFields = copyFields;
+    }
+
+    @Override
+    public String toString() {
+        return "Schema{" +
+                "name='" + name + '\'' +
+                ", version='" + version + '\'' +
+                ", uniqueKey='" + uniqueKey + '\'' +
+                ", fieldTypes=" + fieldTypes +
+                ", fields=" + fields +
+                ", dynamicFields=" + dynamicFields +
+                ", copyFields=" + copyFields +
+                '}';
+    }
+    
+    public static class SchemaWrapper implements Serializable {
+        Schema schema;
+
+        public Schema getSchema() {
+            return schema;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/schema/SolrFieldTypeFinder.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/schema/SolrFieldTypeFinder.java b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/SolrFieldTypeFinder.java
new file mode 100644
index 0000000..2cebcf5
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/SolrFieldTypeFinder.java
@@ -0,0 +1,165 @@
+package org.apache.storm.solr.schema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Class containing all the information relating fields with their types. This information is wrapped in the class
+ * {@link FieldTypeWrapper}
+ * <p></p>
+ * Created by hlouro on 7/27/15.
+ */
+public class SolrFieldTypeFinder implements Serializable {
+    private static final Logger logger = LoggerFactory.getLogger(SolrFieldTypeFinder.class);
+    private Schema schema;
+    private Map<String, FieldTypeWrapper> fieldToWrapper;
+
+
+    /**
+     * Class wrapping all the information for fields and types
+     * */
+    public static class FieldTypeWrapper implements Serializable {
+        Field field;
+        FieldType type;
+
+        public FieldTypeWrapper(Field field, FieldType type) {
+            this.field = field;
+            this.type = type;
+        }
+
+        public Field getField() {
+            return field;
+        }
+
+        public FieldType getType() {
+            return type;
+        }
+
+        @Override
+        public String toString() {
+            return "FieldTypeWrapper{" +
+                    "field=" + field +
+                    ", type=" + type +
+                    '}';
+        }
+    }
+
+    /**
+     * Initiates class containing all the information relating fields with their types.
+     * This information is parsed from the schema
+     * @param schema SolrSchema containing the information about fields and types
+     * */
+    public SolrFieldTypeFinder(Schema schema) {
+        if (schema == null) {
+            throw new IllegalArgumentException("Schema object is null");
+        }
+        this.schema = schema;
+        this.fieldToWrapper = new HashMap<>();
+        buildMap();
+    }
+
+    private void buildMap() {
+        final List<FieldType> fieldTypes = schema.getFieldTypes();
+        // static fields
+        buildMapForFields(fieldTypes, schema.getFields());
+        // dynamic fields
+        buildMapForFields(fieldTypes, schema.getDynamicFields());
+        System.out.println("Completed building Field/Type Map: " + fieldToWrapper);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Completed building Field/Type Map: " + fieldToWrapper);
+        }
+    }
+
+    private void buildMapForFields(List<FieldType> fieldTypes, List<Field> fields) {
+        for (Field field: fields) {
+            String type = field.getType();
+            int idx = indexOf(fieldTypes, type);    // idx - index of the type of this field in the FieldType list
+            if (idx != -1) {
+              fieldToWrapper.put(field.getName(), new FieldTypeWrapper(field, fieldTypes.get(idx)));
+            }
+        }
+    }
+
+    private int indexOf(List<FieldType> fieldTypes, String type) {
+        int i = 0;
+        for (FieldType fieldType : fieldTypes) {
+            if (fieldType.getName().equals(type)) {
+                return i;
+            }
+            i++;
+        }
+        return -1;
+    }
+
+    /**
+     * Finds the schema defined field that matches the input parameter, if any. It can be a dynamic field, in
+     * which case it will return the pattern of the dynamic field that matches the input parameter.
+     * @param fieldName The name of the field to get info for
+     * @return The {@link FieldTypeWrapper} that matches the input parameter, or null if none found
+     * */
+    public FieldTypeWrapper getFieldTypeWrapper(String fieldName) {
+        FieldTypeWrapper typeWrapper = fieldToWrapper.get(fieldName);
+            // => field name does not match static field, test if it matches dynamic field
+        if (typeWrapper == null) {
+            for (String pattern : fieldToWrapper.keySet()) {
+                if (matchesDynamicField(fieldName, pattern)) {
+                    typeWrapper = fieldToWrapper.get(pattern);
+                }
+            }
+        }
+        logger.debug("Solr Field and Type info: {}, {}", fieldName, typeWrapper);
+        return typeWrapper;
+    }
+
+    public Set<String> getAllSolrFieldTypes() {
+        Collection<FieldTypeWrapper> typeWrappers = fieldToWrapper.values();
+        Set<String> fieldTypeClasses = new TreeSet<>();
+        for (FieldTypeWrapper typeWrapper : typeWrappers) {
+            fieldTypeClasses.add(typeWrapper.getType().getClazz());
+        }
+        logger.debug("Field type classes present in schema: {}", fieldTypeClasses);
+        return fieldTypeClasses;
+    }
+
+    public boolean matchesField(String fieldName) {
+        return fieldToWrapper.containsKey(fieldName);
+    }
+
+    public boolean matchesDynamicField(String fieldName) {
+        for (String pattern : fieldToWrapper.keySet()) {
+            if (matchesDynamicField(fieldName, pattern)) {
+                return true;
+            }
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug("Field [{}] did NOT match any dynamic field present in {}", fieldName, fieldToWrapper.keySet());
+        }
+        return false;
+    }
+
+    public boolean matchesDynamicField(String fieldName, String pattern) {
+        if (pattern == null) {
+            throw new IllegalArgumentException("pattern and fieldName arguments cannot be null");
+        }
+        if (pattern.startsWith("*")) {
+            if (fieldName.endsWith(pattern.substring(1))) {
+                logger.debug("Field [{}] MATCHES dynamic field {}", fieldName, pattern);
+                return true;
+            }
+        } else if (pattern.endsWith("*")) {
+            if (fieldName.startsWith(pattern.substring(0, pattern.length()-1))) {
+                logger.debug("Field [{}] MATCHES dynamic field {}", fieldName, pattern);
+                return true;
+            }
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/RestJsonSchemaBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/RestJsonSchemaBuilder.java b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/RestJsonSchemaBuilder.java
new file mode 100644
index 0000000..52bc16d
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/RestJsonSchemaBuilder.java
@@ -0,0 +1,53 @@
+package org.apache.storm.solr.schema.builder;
+
+import com.google.gson.Gson;
+import org.apache.storm.solr.schema.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Scanner;
+
+/**
+ * Class that buils the {@link Schema} object from the JSON representation of the schema as returned by the
+ * URL of the form http://localhost:8983/solr/gettingstarted/schema/ . This particular URL returns the schema
+ * in JSON format for the gettingstarted example running locally.
+ * <p></p>
+ * Created by hlouro on 7/28/15.
+ */
+public class RestJsonSchemaBuilder implements SchemaBuilder {
+    private static final Logger logger = LoggerFactory.getLogger(RestJsonSchemaBuilder.class);
+    private Schema schema;
+
+
+    /** Urls with the form http://localhost:8983/solr/gettingstarted/schema/ returns the schema in JSON format */
+    public RestJsonSchemaBuilder(String solrHost, String solrPort, String collection) throws IOException {
+        this(new URL("http://" + solrHost + ":" + solrPort + "/solr/" + collection + "/schema/"));
+    }
+
+    public RestJsonSchemaBuilder(String url) throws IOException {
+        this(new URL(url));
+    }
+
+    public RestJsonSchemaBuilder(URL url) throws IOException {
+        downloadSchema(url);
+    }
+
+    private void downloadSchema(URL url) throws IOException {
+        String result;
+        logger.debug("Downloading Solr schema info from: " + url);
+        Scanner scanner = new Scanner(url.openStream());
+        result = scanner.useDelimiter("\\Z").next();
+        logger.debug("JSON Schema: " + result);
+
+        Gson gson = new Gson();
+        Schema.SchemaWrapper schemaWrapper = gson.fromJson(result, Schema.SchemaWrapper.class);
+        this.schema = schemaWrapper.getSchema();
+    }
+
+    @Override
+    public Schema getSchema() {
+        return schema;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/SchemaBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/SchemaBuilder.java b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/SchemaBuilder.java
new file mode 100644
index 0000000..a3456e0
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/SchemaBuilder.java
@@ -0,0 +1,12 @@
+package org.apache.storm.solr.schema.builder;
+
+import org.apache.storm.solr.schema.Schema;
+
+import java.io.Serializable;
+
+/**
+ * Created by hlouro on 7/28/15.
+ */
+public interface SchemaBuilder extends Serializable {
+    Schema getSchema();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java
new file mode 100644
index 0000000..3ec8b70
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.solr.trident;
+
+import backtype.storm.topology.FailedException;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.storm.solr.config.SolrConfig;
+import org.apache.storm.solr.mapper.SolrMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class SolrState implements State {
+    private static final Logger logger = LoggerFactory.getLogger(SolrState.class);
+
+    private final SolrConfig solrConfig;
+    private final SolrMapper solrMapper;
+    private SolrClient solrClient;
+
+    public SolrState(SolrConfig solrConfig, SolrMapper solrMapper) {
+        this.solrConfig = solrConfig;
+        this.solrMapper = solrMapper;
+    }
+
+    protected void prepare() {
+        solrClient = new CloudSolrClient(solrConfig.getZkHostString());
+    }
+
+    @Override
+    public void beginCommit(Long aLong){ }
+
+    @Override
+    public void commit(Long aLong) { }
+
+    public void updateState(List<TridentTuple> tuples) {
+        try {
+            SolrRequest solrRequest = solrMapper.toSolrRequest(tuples);
+            solrClient.request(solrRequest, solrMapper.getCollection());
+            solrClient.commit(solrMapper.getCollection());
+        } catch (Exception e) {
+            final String msg = String.format("%s", tuples);
+            logger.warn(msg);
+            throw new FailedException(msg, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java
new file mode 100644
index 0000000..d3bbdc4
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.solr.trident;
+
+import backtype.storm.task.IMetricsContext;
+import org.apache.storm.solr.config.SolrCommitStrategy;
+import org.apache.storm.solr.config.SolrConfig;
+import org.apache.storm.solr.mapper.SolrMapper;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+
+import java.util.Map;
+
+public class SolrStateFactory implements StateFactory {
+    private final SolrConfig solrConfig;
+    private final SolrMapper solrMapper;
+
+    public SolrStateFactory(SolrConfig solrConfig, SolrMapper solrMapper) {
+        this.solrConfig = solrConfig;
+        this.solrMapper = solrMapper;
+    }
+
+    @Override
+    public State makeState(Map map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) {
+        SolrState state = new SolrState(solrConfig, solrMapper);
+        state.prepare();
+        return state;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrUpdater.java b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrUpdater.java
new file mode 100644
index 0000000..3736cb3
--- /dev/null
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrUpdater.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.solr.trident;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class SolrUpdater extends BaseStateUpdater<SolrState>  {
+
+    @Override
+    public void updateState(SolrState solrState, List<TridentTuple> tuples, TridentCollector collector) {
+        solrState.updateState(tuples);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrFieldsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrFieldsSpout.java b/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrFieldsSpout.java
new file mode 100644
index 0000000..0d85f75
--- /dev/null
+++ b/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrFieldsSpout.java
@@ -0,0 +1,60 @@
+package org.apache.storm.solr.spout;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Lists;
+import org.apache.storm.solr.util.TestUtil;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Created by hlouro on 7/24/15.
+ */
+public class SolrFieldsSpout extends BaseRichSpout {
+    private SpoutOutputCollector collector;
+    public static final List<Values> listValues = Lists.newArrayList(
+            getValues("1"), getValues("2"), getValues("3"));
+
+    private static Values getValues(String suf) {
+        String suffix = "_hmcl_fields_test_val_" + suf;
+        return new Values(
+                "id" + suffix,
+                TestUtil.getDate(),
+                "dc_title" + suffix,
+                "dynamic_field" + suffix + "_txt",      // to match dynamic fields of the form "*_txt"
+                "non_matching_field" + suffix);         // this field won't be indexed by solr
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void nextTuple() {
+        final Random rand = new Random();
+        final Values values = listValues.get(rand.nextInt(listValues.size()));
+        collector.emit(values);
+        Thread.yield();
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(getOutputFields());
+    }
+
+    public Fields getOutputFields() {
+        return new Fields("id","date","dc_title","dynamic_field_txt","non_matching_field");
+    }
+
+    @Override
+    public void close() {
+        super.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrJsonSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrJsonSpout.java b/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrJsonSpout.java
new file mode 100644
index 0000000..7352614
--- /dev/null
+++ b/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrJsonSpout.java
@@ -0,0 +1,112 @@
+package org.apache.storm.solr.spout;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import org.junit.Test;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Created by hlouro on 7/24/15.
+ */
+public class SolrJsonSpout extends BaseRichSpout {
+    private SpoutOutputCollector collector;
+    private static final List<Values> listValues = Lists.newArrayList(
+            new Values((new JsonSchema("_hmcl_json_test_1")).toJson()),
+            new Values((new JsonSchema("_hmcl_json_test_2")).toJson()),
+            new Values((new JsonSchema("_hmcl_json_test_3")).toJson()),
+            new Values(new JsonSchema("_hmcl_json_test_4")),
+            new Values(new JsonSchema("_hmcl_json_test_5")));
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void nextTuple() {
+        final Random rand = new Random();
+        final Values values = listValues.get(rand.nextInt(listValues.size()));
+        collector.emit(values);
+        Thread.yield();
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(getOutputFields());
+    }
+
+    public Fields getOutputFields() {
+        return new Fields("JSON");
+    }
+
+    @Override
+    public void close() {   //TODO
+        super.close();
+    }
+
+    public static class JsonSchema {
+        private String id;
+        private String date;
+        private String dc_title;
+
+        private static final Gson gson = new Gson();
+
+        public JsonSchema(String suffix) {
+            this.id = "id" + suffix;
+            this.date = getDate();
+            this.dc_title = "dc_title" + suffix;
+        }
+
+        public JsonSchema(String id, String date, String dc_title) {
+            this.id = id;
+            this.date = date;
+            this.dc_title = dc_title;
+        }
+
+        // copy constructor
+        public JsonSchema(JsonSchema jsonSchema) {
+            this.id = jsonSchema.id;
+            this.date = jsonSchema.date;
+            this.dc_title = jsonSchema.dc_title;
+        }
+
+        public String toJson() {
+            String json = gson.toJson(this);
+            System.out.println(json);   // TODO log
+            return json;
+        }
+
+        //TODO: clean this up
+        public static JsonSchema fromJson(String jsonStr) {
+            return new JsonSchema(gson.fromJson(jsonStr, JsonSchema.class));
+        }
+
+        private String getDate() {
+            DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+            String date = df.format(new Date());
+            System.out.println(date);
+            return date;
+        }
+    }
+
+    //TODO Delete
+    @Test
+    public void test() {
+        SolrJsonSpout solrJsonSpout = new SolrJsonSpout();
+        System.out.println("stop");
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/test/java/org/apache/storm/solr/topology/.gitignore
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/.gitignore b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/.gitignore
new file mode 100644
index 0000000..1adc831
--- /dev/null
+++ b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/.gitignore
@@ -0,0 +1 @@
+# Created by .ignore support plugin (hsz.mobi)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrFieldsTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrFieldsTopology.java b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrFieldsTopology.java
new file mode 100644
index 0000000..27d57db
--- /dev/null
+++ b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrFieldsTopology.java
@@ -0,0 +1,40 @@
+package org.apache.storm.solr.topology;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+import org.apache.storm.solr.bolt.SolrUpdateBolt;
+import org.apache.storm.solr.config.CountBasedCommit;
+import org.apache.storm.solr.config.SolrCommitStrategy;
+import org.apache.storm.solr.mapper.SolrFieldsMapper;
+import org.apache.storm.solr.mapper.SolrMapper;
+import org.apache.storm.solr.schema.builder.RestJsonSchemaBuilder;
+import org.apache.storm.solr.spout.SolrFieldsSpout;
+
+import java.io.IOException;
+
+/**
+ * Created by hlouro on 7/31/15.
+ */
+public class SolrFieldsTopology extends SolrTopology {
+        public static void main(String[] args) throws Exception {
+            SolrFieldsTopology solrJsonTopology = new SolrFieldsTopology();
+            solrJsonTopology.run(args);
+        }
+
+    protected SolrMapper getSolrMapper() throws IOException {
+        return new SolrFieldsMapper.Builder(
+                new RestJsonSchemaBuilder("localhost", "8983", COLLECTION)).setCollection(COLLECTION).build();
+    }
+
+    protected SolrCommitStrategy getSolrCommitStgy() {
+        return new CountBasedCommit(2);         // To Commit to Solr and Ack according to the commit strategy
+    }
+
+    protected StormTopology getTopology() throws IOException {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("SolrFieldsSpout", new SolrFieldsSpout());
+        builder.setBolt("SolrUpdateBolt", new SolrUpdateBolt(getSolrConfig(), getSolrMapper(), getSolrCommitStgy()))
+                .shuffleGrouping("SolrFieldsSpout");
+        return builder.createTopology();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrJsonTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrJsonTopology.java b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrJsonTopology.java
new file mode 100644
index 0000000..bcb3f55
--- /dev/null
+++ b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrJsonTopology.java
@@ -0,0 +1,33 @@
+package org.apache.storm.solr.topology;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+import org.apache.storm.solr.bolt.SolrUpdateBolt;
+import org.apache.storm.solr.mapper.SolrJsonMapper;
+import org.apache.storm.solr.mapper.SolrMapper;
+import org.apache.storm.solr.spout.SolrJsonSpout;
+
+import java.io.IOException;
+
+/**
+ * Created by hlouro on 7/31/15.
+ */
+public class SolrJsonTopology extends SolrTopology {
+    public static void main(String[] args) throws Exception {
+        SolrJsonTopology solrJsonTopology = new SolrJsonTopology();
+        solrJsonTopology.run(args);
+    }
+
+    protected SolrMapper getSolrMapper() throws IOException {
+        final String jsonTupleField = "JSON";
+        return new SolrJsonMapper(COLLECTION, jsonTupleField);
+    }
+
+    protected StormTopology getTopology() throws IOException {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("SolrJsonSpout", new SolrJsonSpout());
+        builder.setBolt("SolrUpdateBolt", new SolrUpdateBolt(getSolrConfig(), getSolrMapper(), getSolrCommitStgy()))
+                .shuffleGrouping("SolrJsonSpout");
+        return builder.createTopology();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrTopology.java b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrTopology.java
new file mode 100644
index 0000000..38aae80
--- /dev/null
+++ b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrTopology.java
@@ -0,0 +1,67 @@
+package org.apache.storm.solr.topology;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.storm.solr.config.SolrCommitStrategy;
+import org.apache.storm.solr.config.SolrConfig;
+
+import java.io.IOException;
+
+/**
+ * Created by hlouro on 7/24/15.
+ */
+public abstract class SolrTopology {
+    protected static String COLLECTION = "gettingstarted";
+    protected static SolrClient solrClient = getSolrClient();
+
+    public void run(String[] args) throws Exception {
+        final StormTopology topology = getTopology();
+        final Config config = getConfig();
+
+        if (args.length == 0) {
+            submitTopologyLocalCluster(topology, config);
+        } else {
+            submitTopologyRemoteCluster(args[1], topology, config);
+        }
+    }
+
+    protected abstract StormTopology getTopology() throws IOException;
+
+    protected void submitTopologyRemoteCluster(String arg, StormTopology topology, Config config) throws Exception {
+        StormSubmitter.submitTopology(arg, config, topology);
+    }
+
+    protected void submitTopologyLocalCluster(StormTopology topology, Config config) throws InterruptedException {
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("test", config, topology);
+        Thread.sleep(10000);
+        cluster.killTopology("test");
+        cluster.shutdown();
+        System.exit(0);
+    }
+
+    protected Config getConfig() {
+        Config config = new Config();
+        config.setDebug(true);
+        return config;
+    }
+
+    protected SolrCommitStrategy getSolrCommitStgy() {
+        return null;                          // To Commit to Solr and Ack every tuple
+    }
+
+    protected static SolrConfig getSolrConfig() {
+        String zkHostString = "127.0.0.1:9983";  // zkHostString for Solr gettingstarted example
+        return new SolrConfig(zkHostString);
+    }
+
+    protected static SolrClient getSolrClient() {
+        String zkHostString = "127.0.0.1:9983";  // zkHostString for Solr gettingstarted example
+        return new CloudSolrClient(zkHostString);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java b/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java
new file mode 100644
index 0000000..86dabe3
--- /dev/null
+++ b/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java
@@ -0,0 +1,30 @@
+package org.apache.storm.solr.trident;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import org.apache.storm.solr.spout.SolrFieldsSpout;
+import org.apache.storm.solr.topology.SolrFieldsTopology;
+import storm.trident.Stream;
+import storm.trident.TridentTopology;
+import storm.trident.state.StateFactory;
+
+import java.io.IOException;
+
+/**
+ * Created by hlouro on 7/31/15.
+ */
+public class SolrFieldsTridentTopology extends SolrFieldsTopology {
+    public static void main(String[] args) throws Exception {
+        SolrFieldsTridentTopology solrJsonTopology = new SolrFieldsTridentTopology();
+        solrJsonTopology.run(args);
+    }
+
+    protected StormTopology getTopology() throws IOException {
+        final TridentTopology topology = new TridentTopology();
+        final SolrFieldsSpout spout = new SolrFieldsSpout();
+        final Stream stream = topology.newStream("SolrFieldsSpout", spout);
+        final StateFactory solrStateFactory = new SolrStateFactory(getSolrConfig(), getSolrMapper());
+        stream.partitionPersist(solrStateFactory, spout.getOutputFields(),  new SolrUpdater(), new Fields());
+        return topology.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java b/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java
new file mode 100644
index 0000000..8f28909
--- /dev/null
+++ b/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java
@@ -0,0 +1,30 @@
+package org.apache.storm.solr.trident;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import org.apache.storm.solr.spout.SolrJsonSpout;
+import org.apache.storm.solr.topology.SolrJsonTopology;
+import storm.trident.Stream;
+import storm.trident.TridentTopology;
+import storm.trident.state.StateFactory;
+
+import java.io.IOException;
+
+/**
+ * Created by hlouro on 7/31/15.
+ */
+public class SolrJsonTridentTopology extends SolrJsonTopology {
+    public static void main(String[] args) throws Exception {
+        SolrJsonTridentTopology solrJsonTopology = new SolrJsonTridentTopology();
+        solrJsonTopology.run(args);
+    }
+
+    protected StormTopology getTopology() throws IOException {
+        final TridentTopology topology = new TridentTopology();
+        final SolrJsonSpout spout = new SolrJsonSpout();
+        final Stream stream = topology.newStream("SolrJsonSpout", spout);
+        final StateFactory solrStateFactory = new SolrStateFactory(getSolrConfig(), getSolrMapper());
+        stream.partitionPersist(solrStateFactory, spout.getOutputFields(),  new SolrUpdater(), new Fields());
+        return topology.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/test/java/org/apache/storm/solr/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/util/TestUtil.java b/external/storm-solr/src/test/java/org/apache/storm/solr/util/TestUtil.java
new file mode 100644
index 0000000..66bc4e6
--- /dev/null
+++ b/external/storm-solr/src/test/java/org/apache/storm/solr/util/TestUtil.java
@@ -0,0 +1,17 @@
+package org.apache.storm.solr.util;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * Created by hlouro on 7/31/15.
+ */
+public class TestUtil {
+    public static String getDate() {
+        DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+        String date = df.format(new Date());
+        System.out.println(date);
+        return date;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1443a36..c2b73bc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -169,8 +169,12 @@
         <module>external/storm-redis</module>
         <module>external/storm-eventhubs</module>
         <module>external/flux</module>
+<<<<<<< HEAD
         <module>examples/storm-starter</module>
         <module>external/storm-elasticsearch</module>
+=======
+        <module>external/storm-solr</module>
+>>>>>>> STORM-851: Storm Solr Connector
     </modules>
 
     <scm>


Mime
View raw message