Repository: storm Updated Branches: refs/heads/master 528958c27 -> eb6107b93 STORM-851: Storm Solr Connector 1. SolrUpdate Bolt 2. Trident State implementation 3. Fields Mapper 4. JSON Mapper 5. Integration Tests Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4ab6e0c0 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4ab6e0c0 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4ab6e0c0 Branch: refs/heads/master Commit: 4ab6e0c02aaddbd1ad0b1334a5c458f940271827 Parents: 528958c Author: Hugo Louro Authored: Thu Jul 16 19:10:30 2015 -0700 Committer: Hugo Louro Committed: Tue Aug 25 08:18:17 2015 -0700 ---------------------------------------------------------------------- .gitignore | 2 +- external/storm-solr/pom.xml | 110 +++++++++++++ .../java/org/apache/storm/solr/bolt/.gitignore | 1 + .../storm/solr/bolt/AbstractSolrBolt.java | 33 ++++ .../apache/storm/solr/bolt/SolrUpdateBolt.java | 107 ++++++++++++ .../storm/solr/config/CountBasedCommit.java | 39 +++++ .../storm/solr/config/SolrCommitStrategy.java | 14 ++ .../apache/storm/solr/config/SolrConfig.java | 26 +++ .../org/apache/storm/solr/mapper/.gitignore | 1 + .../storm/solr/mapper/SolrFieldsMapper.java | 144 ++++++++++++++++ .../storm/solr/mapper/SolrJsonMapper.java | 97 +++++++++++ .../apache/storm/solr/mapper/SolrMapper.java | 17 ++ .../storm/solr/mapper/SolrMapperException.java | 9 + .../org/apache/storm/solr/schema/CopyField.java | 35 ++++ .../org/apache/storm/solr/schema/Field.java | 45 +++++ .../org/apache/storm/solr/schema/FieldType.java | 48 ++++++ .../org/apache/storm/solr/schema/Schema.java | 99 +++++++++++ .../storm/solr/schema/SolrFieldTypeFinder.java | 165 +++++++++++++++++++ .../schema/builder/RestJsonSchemaBuilder.java | 53 ++++++ .../solr/schema/builder/SchemaBuilder.java | 12 ++ .../apache/storm/solr/trident/SolrState.java | 66 ++++++++ .../storm/solr/trident/SolrStateFactory.java | 44 +++++ .../apache/storm/solr/trident/SolrUpdater.java | 32 ++++ .../storm/solr/spout/SolrFieldsSpout.java | 60 +++++++ .../apache/storm/solr/spout/SolrJsonSpout.java | 112 +++++++++++++ .../org/apache/storm/solr/topology/.gitignore | 1 + .../storm/solr/topology/SolrFieldsTopology.java | 40 +++++ .../storm/solr/topology/SolrJsonTopology.java | 33 ++++ .../storm/solr/topology/SolrTopology.java | 67 ++++++++ .../solr/trident/SolrFieldsTridentTopology.java | 30 ++++ .../solr/trident/SolrJsonTridentTopology.java | 30 ++++ .../org/apache/storm/solr/util/TestUtil.java | 17 ++ pom.xml | 4 + 33 files changed, 1592 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 02816a1..4e08a2e 100644 --- a/.gitignore +++ b/.gitignore @@ -28,7 +28,7 @@ target *.ipr *.iws .idea -.* +#.* !/.travis.yml !/.gitignore _site http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-solr/pom.xml b/external/storm-solr/pom.xml new file mode 100644 index 0000000..4213e0c --- /dev/null +++ b/external/storm-solr/pom.xml @@ -0,0 +1,110 @@ + + + + storm + org.apache.storm + 0.11.0-SNAPSHOT + ../../pom.xml + + 4.0.0 + + storm-solr + + + + Hugo-Louro + Hugo Louro + hmclouro@gmail.com + + + + + + org.apache.storm + storm-core + ${project.version} + provided + + + + org.apache.solr + solr-solrj + 5.2.1 + compile + + + org.apache.solr + solr-core + 5.2.1 + test + + + org.apache.solr + solr-test-framework + 5.2.1 + test + + + commons-codec + commons-codec + 1.3 + + + commons-httpclient + commons-httpclient + 3.1 + + + commons-io + commons-io + 1.4 + + + + junit + junit + 4.11 + test + + + com.google.code.gson + gson + 2.3.1 + + + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.5 + + + + test-jar + + + + + + maven-assembly-plugin + + + + org.apache.storm.solr.topology.SolrFieldsTopology + + + + jar-with-dependencies + + + + + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/.gitignore ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/.gitignore b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/.gitignore new file mode 100644 index 0000000..1adc831 --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/.gitignore @@ -0,0 +1 @@ +# Created by .ignore support plugin (hsz.mobi) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/AbstractSolrBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/AbstractSolrBolt.java b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/AbstractSolrBolt.java new file mode 100644 index 0000000..cd3261f --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/AbstractSolrBolt.java @@ -0,0 +1,33 @@ +package org.apache.storm.solr.bolt; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Tuple; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.storm.solr.config.SolrConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Created by hlouro on 7/17/15. + */ +public abstract class AbstractSolrBolt extends BaseRichBolt { + protected OutputCollector collector; + protected SolrConfig solrConfig; + protected SolrClient solrClient; + + public AbstractSolrBolt(SolrConfig solrConfig) { + this.solrConfig = solrConfig; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + solrClient = new CloudSolrClient(solrConfig.getZkHostString()); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java new file mode 100644 index 0000000..fe90b73 --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java @@ -0,0 +1,107 @@ +package org.apache.storm.solr.bolt; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.storm.solr.config.SolrCommitStrategy; +import org.apache.storm.solr.config.SolrConfig; +import org.apache.storm.solr.mapper.SolrMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Created by hlouro on 7/19/15. + */ +public class SolrUpdateBolt extends AbstractSolrBolt { + private final Logger logger = LoggerFactory.getLogger(SolrUpdateBolt.class); + private final SolrMapper solrMapper; + private final SolrCommitStrategy commitStgy; + private List toCommitTuples; + private final String ackFailLock = "LOCK"; //serializable lock + + + public SolrUpdateBolt(SolrConfig solrConfig, SolrMapper solrMapper) { + this(solrConfig, solrMapper, null); + } + + public SolrUpdateBolt(SolrConfig solrConfig, SolrMapper solrMapper, SolrCommitStrategy commitStgy) { + super(solrConfig); + this.solrMapper = solrMapper; + this.commitStgy = commitStgy; + logger.info("Created {} with the following configuration: " + + "[SolrConfig = {}], [SolrMapper = {}], [CommitStgy = {}]", + this.getClass().getSimpleName(), solrConfig, solrMapper, commitStgy); + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + super.prepare(stormConf, context, collector); + this.toCommitTuples = new LinkedList<>(); + } + + @Override + public void execute(Tuple tuple) { + try { + SolrRequest request = solrMapper.toSolrRequest(tuple); + solrClient.request(request, solrMapper.getCollection()); + ack(tuple); + } catch (Exception e) { + fail(tuple, e); + } + } + + private void ack(Tuple tuple) throws SolrServerException, IOException { + if (commitStgy == null) { + collector.ack(tuple); + } else { + synchronized(ackFailLock) { + toCommitTuples.add(tuple); + commitStgy.update(); + } + if (commitStgy.commit()) { + solrClient.commit(solrMapper.getCollection()); + ackCommittedTuples(); + } + } + } + + private void ackCommittedTuples() { + List toAckTuples = getQueuedTuples(); + for (Tuple tuple : toAckTuples) { + collector.ack(tuple); + } + } + + private void fail(Tuple tuple, Exception e) { + collector.reportError(e); + + if (commitStgy == null) { + collector.fail(tuple); + } else { + List failedTuples = getQueuedTuples(); + for (Tuple failedTuple : failedTuples) { + collector.fail(failedTuple); + } + } + } + + private List getQueuedTuples() { + List queuedTuples; + synchronized(ackFailLock) { + queuedTuples = toCommitTuples; + toCommitTuples = new LinkedList<>(); + } + return queuedTuples; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/config/CountBasedCommit.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/config/CountBasedCommit.java b/external/storm-solr/src/main/java/org/apache/storm/solr/config/CountBasedCommit.java new file mode 100644 index 0000000..3877980 --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/config/CountBasedCommit.java @@ -0,0 +1,39 @@ +package org.apache.storm.solr.config; + +/** + * Class defining a count based commit strategy. When the count reaches the commit threshold, + * SolrInputDocuments are committed to Solr. + * + * Created by hlouro on 7/29/15. + */ +public class CountBasedCommit implements SolrCommitStrategy { + private int threshHold; + private int count; + + /** + * Initializes a count based commit strategy with the specified threshold + * + * @param threshold The commit threshold, defining when SolrInputDocuments should be committed to Solr + * */ + public CountBasedCommit(int threshold) { + if (threshold < 1) { + throw new IllegalArgumentException("Threshold must be a positive integer: " + threshold); + } + this.threshHold = threshold; + } + + @Override + public boolean commit() { + return count != 0 && count % threshHold == 0; + } + + + @Override + public void update() { + count++; + } + + public int getCount() { + return count; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrCommitStrategy.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrCommitStrategy.java b/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrCommitStrategy.java new file mode 100644 index 0000000..0ec9149 --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrCommitStrategy.java @@ -0,0 +1,14 @@ +package org.apache.storm.solr.config; + +import java.io.Serializable; + +/** + * Strategy definining when the Solr Bolt should commit the request to Solr. + *

+ * Created by hlouro on 7/29/15. + */ +public interface SolrCommitStrategy extends Serializable { + boolean commit(); + + void update(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrConfig.java b/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrConfig.java new file mode 100644 index 0000000..6a75ae7 --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrConfig.java @@ -0,0 +1,26 @@ +package org.apache.storm.solr.config; + +import org.apache.solr.client.solrj.SolrClient; + +import java.io.Serializable; + +/** + * Class containing Solr configuration to be made available to Storm Solr bolts. Any configuration needed in + * the bolts should be put in this class. + *

+ * Created by hlouro on 7/29/15. + */ +public class SolrConfig implements Serializable { + private String zkHostString; + + /** + * @param zkHostString Zookeeper host string as defined in the {@link SolrClient} constructor + * */ + public SolrConfig(String zkHostString) { + this.zkHostString = zkHostString; + } + + public String getZkHostString() { + return zkHostString; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/.gitignore ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/.gitignore b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/.gitignore new file mode 100644 index 0000000..1adc831 --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/.gitignore @@ -0,0 +1 @@ +# Created by .ignore support plugin (hsz.mobi) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrFieldsMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrFieldsMapper.java b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrFieldsMapper.java new file mode 100644 index 0000000..df89a47 --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrFieldsMapper.java @@ -0,0 +1,144 @@ +package org.apache.storm.solr.mapper; + +import static org.apache.storm.solr.schema.SolrFieldTypeFinder.FieldTypeWrapper; + +import backtype.storm.tuple.ITuple; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.common.SolrInputDocument; +import org.apache.storm.solr.schema.Field; +import org.apache.storm.solr.schema.Schema; +import org.apache.storm.solr.schema.builder.SchemaBuilder; +import org.apache.storm.solr.schema.SolrFieldTypeFinder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; + +/** + * Created by hlouro on 7/24/15. + */ +public class SolrFieldsMapper implements SolrMapper { + private static final Logger logger = LoggerFactory.getLogger(SolrFieldsMapper.class); + private String collection; + private SolrFieldTypeFinder typeFinder; + private String multiValueFieldToken; + + public static class Builder { + private String collection; + private SolrFieldTypeFinder typeFinder; + private String multiValueFieldToken = "|"; + + public Builder(SchemaBuilder schemaBuilder) { + setTypeFinder(schemaBuilder); + } + + //TODO Handle the case where there may be no schema + private void setTypeFinder(SchemaBuilder schemaBuilder) { + Schema schema = schemaBuilder.getSchema(); + typeFinder = new SolrFieldTypeFinder(schema); + } + + public Builder setCollection(String collection) { + this.collection = collection; + return this; + } + + public Builder setDefaultCollection(SolrClient solrClient) { + String defaultCollection = null; + if (solrClient instanceof CloudSolrClient) { + defaultCollection = ((CloudSolrClient) solrClient).getDefaultCollection(); + } + this.collection = defaultCollection; + return this; + } + + /** + * Sets the token that separates multivalue fields in tuples. The default token is | + * */ + public Builder setMultiValueFieldToken(String multiValueFieldToken) { + this.multiValueFieldToken = multiValueFieldToken; + return this; + } + + public SolrFieldsMapper build() { + return new SolrFieldsMapper(this); + } + } + + private SolrFieldsMapper(Builder builder) { + this.collection = builder.collection; + this.typeFinder = builder.typeFinder; + this.multiValueFieldToken = builder.multiValueFieldToken; + } + + @Override + public String getCollection() { + return collection; + } + + @Override + public SolrRequest toSolrRequest(List tuples) throws SolrMapperException { + List docs = new LinkedList<>(); + for (ITuple tuple : tuples) { + docs.add(buildDocument(tuple)); + } + UpdateRequest request = new UpdateRequest(); + request.add(docs); + return request; + } + + @Override + public SolrRequest toSolrRequest(ITuple tuple) throws SolrMapperException { + SolrInputDocument doc = buildDocument(tuple); + UpdateRequest request = new UpdateRequest(); + request.add(doc); + return request; + } + + private SolrInputDocument buildDocument(ITuple tuple) { + SolrInputDocument doc = new SolrInputDocument(); + + for (String tupleField : tuple.getFields()) { + FieldTypeWrapper fieldTypeWrapper = typeFinder.getFieldTypeWrapper(tupleField); + if (fieldTypeWrapper != null) { + Field field = fieldTypeWrapper.getField(); + if (field.isMultiValued()) { + addMultivalueFieldToDoc(doc, tupleField, tuple); + } else { + addFieldToDoc(doc, tupleField, tuple); + } + } else { + logger.info("Field [{}] does NOT match static or dynamic field declared in schema. Not added to document", tupleField); + } + } + return doc; + } + + private void addFieldToDoc(SolrInputDocument doc, String tupleField, ITuple tuple) { + Object val = getValue(tupleField, tuple); + logger.info("Adding to document (field, val) = ({}, {})", tupleField, val); + doc.addField(tupleField, val); + } + + private void addMultivalueFieldToDoc(SolrInputDocument doc, String tupleField, ITuple tuple) { + String[] values = getValues(tupleField, tuple); + for (String value : values) { + logger.info("Adding {} to multivalue field document {}", value, tupleField); + doc.addField(tupleField, value); + } + } + + private Object getValue(String field, ITuple tuple) { + return tuple.getValueByField(field); + } + + private String[] getValues(String field, ITuple tuple) { + String multiValueField = tuple.getStringByField(field); + String[] values = multiValueField.split(multiValueFieldToken); + return values; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java new file mode 100644 index 0000000..ef5bb3c --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java @@ -0,0 +1,97 @@ +package org.apache.storm.solr.mapper; + +import backtype.storm.tuple.ITuple; +import com.google.gson.Gson; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest; +import org.apache.solr.common.util.ContentStream; +import org.apache.solr.common.util.ContentStreamBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Created by hlouro on 7/24/15. + */ +public class SolrJsonMapper implements SolrMapper { + private static final Logger logger = LoggerFactory.getLogger(SolrJsonMapper.class); + private static final String JSON_UPDATE_URL = "/update/json/docs"; + private static final String CONTENT_TYPE = "application/json";; + + private final String jsonTupleField; + private final String collection; + + public SolrJsonMapper(String collection, String jsonTupleField) { + this.collection = collection; + this.jsonTupleField = jsonTupleField; + } + + /** Uses default collection */ + public SolrJsonMapper(SolrClient solrClient, String jsonTupleField) { + String defaultCollection = null; + if (solrClient instanceof CloudSolrClient) { + defaultCollection = ((CloudSolrClient) solrClient).getDefaultCollection(); + } + this.collection = defaultCollection; + this.jsonTupleField = jsonTupleField; + } + + @Override + public String getCollection() { + return collection; + } + + @Override + public SolrRequest toSolrRequest(List tuples) throws SolrMapperException { + final String jsonList = getJsonFromTuples(tuples); + return createtSolrRequest(jsonList); + } + + @Override + public SolrRequest toSolrRequest(ITuple tuple) throws SolrMapperException { + final String json = getJsonFromTuple(tuple); + return createtSolrRequest(json); + } + + private SolrRequest createtSolrRequest(String json) { + ContentStreamUpdateRequest request = new ContentStreamUpdateRequest(JSON_UPDATE_URL); + ContentStream cs = new ContentStreamBase.StringStream(json, CONTENT_TYPE); + request.addContentStream(cs); + logger.info("Request generated with JSON: " + json); + return request; + } + + private String getJsonFromTuples(List tuples) throws SolrMapperException { + final StringBuilder jsonListBuilder = new StringBuilder("["); + for (ITuple tuple : tuples) { + final String json = getJsonFromTuple(tuple); + jsonListBuilder.append(json).append(","); + } + jsonListBuilder.setCharAt(jsonListBuilder.length() - 1, ']'); + return jsonListBuilder.toString(); + } + + private String getJsonFromTuple(ITuple tuple) throws SolrMapperException { + String json = ""; + if (tuple.contains(jsonTupleField)) { + json = doGetJson(tuple.getValueByField(jsonTupleField)); + } else { + throw new SolrMapperException("Tuple does not contain JSON object"); + } + return json; + } + + private String doGetJson(Object value) { + String json = ""; + if (value instanceof String) { + json = (String) value; // Object associated with JSON field is already JSON + } else { + Gson gson = new Gson(); + json = gson.toJson(value); // Serializes a Java object to JSON + } + return json; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java new file mode 100644 index 0000000..165498d --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java @@ -0,0 +1,17 @@ +package org.apache.storm.solr.mapper; + +import backtype.storm.tuple.ITuple; +import backtype.storm.tuple.Tuple; +import org.apache.solr.client.solrj.SolrRequest; + +import java.io.Serializable; +import java.util.List; + +/** + * Created by hlouro on 7/22/15. + */ +public interface SolrMapper extends Serializable { + String getCollection(); + SolrRequest toSolrRequest(ITuple tuple) throws SolrMapperException; + SolrRequest toSolrRequest(List tuples) throws SolrMapperException; +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapperException.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapperException.java b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapperException.java new file mode 100644 index 0000000..b9ad5a3 --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapperException.java @@ -0,0 +1,9 @@ +package org.apache.storm.solr.mapper; + +/** + * Created by hlouro on 7/24/15. + */ +public class SolrMapperException extends Exception { + public SolrMapperException(String msg) { + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/schema/CopyField.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/schema/CopyField.java b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/CopyField.java new file mode 100644 index 0000000..9fd291e --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/CopyField.java @@ -0,0 +1,35 @@ +package org.apache.storm.solr.schema; + +import java.io.Serializable; + +/** + * Created by hlouro on 7/27/15. + */ +public class CopyField implements Serializable { + private String source; + private String dest; + + public String getSource() { + return source; + } + + public String getDest() { + return dest; + } + + public void setSource(String source) { + this.source = source; + } + + public void setDest(String dest) { + this.dest = dest; + } + + @Override + public String toString() { + return "CopyField{" + + "source='" + source + '\'' + + ", dest='" + dest + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/schema/Field.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/schema/Field.java b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/Field.java new file mode 100644 index 0000000..55f9589 --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/Field.java @@ -0,0 +1,45 @@ +package org.apache.storm.solr.schema; + +import java.io.Serializable; + +/** + * Created by hlouro on 7/27/15. + */ +public class Field implements Serializable { + private String name; + private String type; + private boolean multiValued; + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + public boolean isMultiValued() { + return multiValued; + } + + public void setName(String name) { + this.name = name; + } + + public void setType(String type) { + this.type = type; + } + + public void setMultiValued(boolean multiValued) { + this.multiValued = multiValued; + } + + @Override + public String toString() { + return "Field{" + + "name='" + name + '\'' + + ", type='" + type + '\'' + + ", multiValued=" + multiValued + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/schema/FieldType.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/schema/FieldType.java b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/FieldType.java new file mode 100644 index 0000000..23bce92 --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/FieldType.java @@ -0,0 +1,48 @@ +package org.apache.storm.solr.schema; + +import com.google.gson.annotations.SerializedName; + +import java.io.Serializable; + +/** + * Created by hlouro on 7/27/15. + */ +public class FieldType implements Serializable { + private String name; + @SerializedName("class") + private String clazz; + private boolean multiValued; + + public String getName() { + return name; + } + + public String getClazz() { + return clazz; + } + + public boolean isMultiValued() { + return multiValued; + } + + public void setName(String name) { + this.name = name; + } + + public void setClazz(String clazz) { + this.clazz = clazz; + } + + public void setMultiValued(boolean multiValued) { + this.multiValued = multiValued; + } + + @Override + public String toString() { + return "FieldType{" + + "name='" + name + '\'' + + ", clazz='" + clazz + '\'' + + ", multiValued=" + multiValued + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/schema/Schema.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/schema/Schema.java b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/Schema.java new file mode 100644 index 0000000..91e1e09 --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/Schema.java @@ -0,0 +1,99 @@ +package org.apache.storm.solr.schema; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +/** + * Class representing the SolrSchema as returned by the REST call to the URL of the form + * http://localhost:8983/solr/gettingstarted/schema. This particular URL returns the schema in JSON format for the + * Solr quickstart example running locally. + *

+ * Created by hlouro on 7/27/15. + */ +public class Schema implements Serializable { + private String name; + private String version; + private String uniqueKey; + private List fieldTypes; + private List fields; + private List dynamicFields; + private List copyFields; + + public String getName() { + return name; + } + + public String getVersion() { + return version; + } + + public String getUniqueKey() { + return uniqueKey; + } + + public List getFieldTypes() { + return Collections.unmodifiableList(fieldTypes); + } + + public List getFields() { + return Collections.unmodifiableList(fields); + } + + public List getDynamicFields() { + return Collections.unmodifiableList(dynamicFields); + } + + public List getCopyFields() { + return Collections.unmodifiableList(copyFields); + } + + public void setName(String name) { + this.name = name; + } + + public void setVersion(String version) { + this.version = version; + } + + public void setUniqueKey(String uniqueKey) { + this.uniqueKey = uniqueKey; + } + + public void setFieldTypes(List fieldTypes) { + this.fieldTypes = fieldTypes; + } + + public void setFields(List fields) { + this.fields = fields; + } + + public void setDynamicFields(List dynamicFields) { + this.dynamicFields = dynamicFields; + } + + public void setCopyFields(List copyFields) { + this.copyFields = copyFields; + } + + @Override + public String toString() { + return "Schema{" + + "name='" + name + '\'' + + ", version='" + version + '\'' + + ", uniqueKey='" + uniqueKey + '\'' + + ", fieldTypes=" + fieldTypes + + ", fields=" + fields + + ", dynamicFields=" + dynamicFields + + ", copyFields=" + copyFields + + '}'; + } + + public static class SchemaWrapper implements Serializable { + Schema schema; + + public Schema getSchema() { + return schema; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/schema/SolrFieldTypeFinder.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/schema/SolrFieldTypeFinder.java b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/SolrFieldTypeFinder.java new file mode 100644 index 0000000..2cebcf5 --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/SolrFieldTypeFinder.java @@ -0,0 +1,165 @@ +package org.apache.storm.solr.schema; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +/** + * Class containing all the information relating fields with their types. This information is wrapped in the class + * {@link FieldTypeWrapper} + *

+ * Created by hlouro on 7/27/15. + */ +public class SolrFieldTypeFinder implements Serializable { + private static final Logger logger = LoggerFactory.getLogger(SolrFieldTypeFinder.class); + private Schema schema; + private Map fieldToWrapper; + + + /** + * Class wrapping all the information for fields and types + * */ + public static class FieldTypeWrapper implements Serializable { + Field field; + FieldType type; + + public FieldTypeWrapper(Field field, FieldType type) { + this.field = field; + this.type = type; + } + + public Field getField() { + return field; + } + + public FieldType getType() { + return type; + } + + @Override + public String toString() { + return "FieldTypeWrapper{" + + "field=" + field + + ", type=" + type + + '}'; + } + } + + /** + * Initiates class containing all the information relating fields with their types. + * This information is parsed from the schema + * @param schema SolrSchema containing the information about fields and types + * */ + public SolrFieldTypeFinder(Schema schema) { + if (schema == null) { + throw new IllegalArgumentException("Schema object is null"); + } + this.schema = schema; + this.fieldToWrapper = new HashMap<>(); + buildMap(); + } + + private void buildMap() { + final List fieldTypes = schema.getFieldTypes(); + // static fields + buildMapForFields(fieldTypes, schema.getFields()); + // dynamic fields + buildMapForFields(fieldTypes, schema.getDynamicFields()); + System.out.println("Completed building Field/Type Map: " + fieldToWrapper); + if (logger.isDebugEnabled()) { + logger.debug("Completed building Field/Type Map: " + fieldToWrapper); + } + } + + private void buildMapForFields(List fieldTypes, List fields) { + for (Field field: fields) { + String type = field.getType(); + int idx = indexOf(fieldTypes, type); // idx - index of the type of this field in the FieldType list + if (idx != -1) { + fieldToWrapper.put(field.getName(), new FieldTypeWrapper(field, fieldTypes.get(idx))); + } + } + } + + private int indexOf(List fieldTypes, String type) { + int i = 0; + for (FieldType fieldType : fieldTypes) { + if (fieldType.getName().equals(type)) { + return i; + } + i++; + } + return -1; + } + + /** + * Finds the schema defined field that matches the input parameter, if any. It can be a dynamic field, in + * which case it will return the pattern of the dynamic field that matches the input parameter. + * @param fieldName The name of the field to get info for + * @return The {@link FieldTypeWrapper} that matches the input parameter, or null if none found + * */ + public FieldTypeWrapper getFieldTypeWrapper(String fieldName) { + FieldTypeWrapper typeWrapper = fieldToWrapper.get(fieldName); + // => field name does not match static field, test if it matches dynamic field + if (typeWrapper == null) { + for (String pattern : fieldToWrapper.keySet()) { + if (matchesDynamicField(fieldName, pattern)) { + typeWrapper = fieldToWrapper.get(pattern); + } + } + } + logger.debug("Solr Field and Type info: {}, {}", fieldName, typeWrapper); + return typeWrapper; + } + + public Set getAllSolrFieldTypes() { + Collection typeWrappers = fieldToWrapper.values(); + Set fieldTypeClasses = new TreeSet<>(); + for (FieldTypeWrapper typeWrapper : typeWrappers) { + fieldTypeClasses.add(typeWrapper.getType().getClazz()); + } + logger.debug("Field type classes present in schema: {}", fieldTypeClasses); + return fieldTypeClasses; + } + + public boolean matchesField(String fieldName) { + return fieldToWrapper.containsKey(fieldName); + } + + public boolean matchesDynamicField(String fieldName) { + for (String pattern : fieldToWrapper.keySet()) { + if (matchesDynamicField(fieldName, pattern)) { + return true; + } + } + if (logger.isDebugEnabled()) { + logger.debug("Field [{}] did NOT match any dynamic field present in {}", fieldName, fieldToWrapper.keySet()); + } + return false; + } + + public boolean matchesDynamicField(String fieldName, String pattern) { + if (pattern == null) { + throw new IllegalArgumentException("pattern and fieldName arguments cannot be null"); + } + if (pattern.startsWith("*")) { + if (fieldName.endsWith(pattern.substring(1))) { + logger.debug("Field [{}] MATCHES dynamic field {}", fieldName, pattern); + return true; + } + } else if (pattern.endsWith("*")) { + if (fieldName.startsWith(pattern.substring(0, pattern.length()-1))) { + logger.debug("Field [{}] MATCHES dynamic field {}", fieldName, pattern); + return true; + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/RestJsonSchemaBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/RestJsonSchemaBuilder.java b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/RestJsonSchemaBuilder.java new file mode 100644 index 0000000..52bc16d --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/RestJsonSchemaBuilder.java @@ -0,0 +1,53 @@ +package org.apache.storm.solr.schema.builder; + +import com.google.gson.Gson; +import org.apache.storm.solr.schema.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; +import java.util.Scanner; + +/** + * Class that buils the {@link Schema} object from the JSON representation of the schema as returned by the + * URL of the form http://localhost:8983/solr/gettingstarted/schema/ . This particular URL returns the schema + * in JSON format for the gettingstarted example running locally. + *

+ * Created by hlouro on 7/28/15. + */ +public class RestJsonSchemaBuilder implements SchemaBuilder { + private static final Logger logger = LoggerFactory.getLogger(RestJsonSchemaBuilder.class); + private Schema schema; + + + /** Urls with the form http://localhost:8983/solr/gettingstarted/schema/ returns the schema in JSON format */ + public RestJsonSchemaBuilder(String solrHost, String solrPort, String collection) throws IOException { + this(new URL("http://" + solrHost + ":" + solrPort + "/solr/" + collection + "/schema/")); + } + + public RestJsonSchemaBuilder(String url) throws IOException { + this(new URL(url)); + } + + public RestJsonSchemaBuilder(URL url) throws IOException { + downloadSchema(url); + } + + private void downloadSchema(URL url) throws IOException { + String result; + logger.debug("Downloading Solr schema info from: " + url); + Scanner scanner = new Scanner(url.openStream()); + result = scanner.useDelimiter("\\Z").next(); + logger.debug("JSON Schema: " + result); + + Gson gson = new Gson(); + Schema.SchemaWrapper schemaWrapper = gson.fromJson(result, Schema.SchemaWrapper.class); + this.schema = schemaWrapper.getSchema(); + } + + @Override + public Schema getSchema() { + return schema; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/SchemaBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/SchemaBuilder.java b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/SchemaBuilder.java new file mode 100644 index 0000000..a3456e0 --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/SchemaBuilder.java @@ -0,0 +1,12 @@ +package org.apache.storm.solr.schema.builder; + +import org.apache.storm.solr.schema.Schema; + +import java.io.Serializable; + +/** + * Created by hlouro on 7/28/15. + */ +public interface SchemaBuilder extends Serializable { + Schema getSchema(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java new file mode 100644 index 0000000..3ec8b70 --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrState.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.solr.trident; + +import backtype.storm.topology.FailedException; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.storm.solr.config.SolrConfig; +import org.apache.storm.solr.mapper.SolrMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import storm.trident.state.State; +import storm.trident.tuple.TridentTuple; + +import java.util.List; + +public class SolrState implements State { + private static final Logger logger = LoggerFactory.getLogger(SolrState.class); + + private final SolrConfig solrConfig; + private final SolrMapper solrMapper; + private SolrClient solrClient; + + public SolrState(SolrConfig solrConfig, SolrMapper solrMapper) { + this.solrConfig = solrConfig; + this.solrMapper = solrMapper; + } + + protected void prepare() { + solrClient = new CloudSolrClient(solrConfig.getZkHostString()); + } + + @Override + public void beginCommit(Long aLong){ } + + @Override + public void commit(Long aLong) { } + + public void updateState(List tuples) { + try { + SolrRequest solrRequest = solrMapper.toSolrRequest(tuples); + solrClient.request(solrRequest, solrMapper.getCollection()); + solrClient.commit(solrMapper.getCollection()); + } catch (Exception e) { + final String msg = String.format("%s", tuples); + logger.warn(msg); + throw new FailedException(msg, e); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java new file mode 100644 index 0000000..d3bbdc4 --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.solr.trident; + +import backtype.storm.task.IMetricsContext; +import org.apache.storm.solr.config.SolrCommitStrategy; +import org.apache.storm.solr.config.SolrConfig; +import org.apache.storm.solr.mapper.SolrMapper; +import storm.trident.state.State; +import storm.trident.state.StateFactory; + +import java.util.Map; + +public class SolrStateFactory implements StateFactory { + private final SolrConfig solrConfig; + private final SolrMapper solrMapper; + + public SolrStateFactory(SolrConfig solrConfig, SolrMapper solrMapper) { + this.solrConfig = solrConfig; + this.solrMapper = solrMapper; + } + + @Override + public State makeState(Map map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) { + SolrState state = new SolrState(solrConfig, solrMapper); + state.prepare(); + return state; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrUpdater.java b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrUpdater.java new file mode 100644 index 0000000..3736cb3 --- /dev/null +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrUpdater.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.solr.trident; + +import storm.trident.operation.TridentCollector; +import storm.trident.state.BaseStateUpdater; +import storm.trident.tuple.TridentTuple; + +import java.util.List; + +public class SolrUpdater extends BaseStateUpdater { + + @Override + public void updateState(SolrState solrState, List tuples, TridentCollector collector) { + solrState.updateState(tuples); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrFieldsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrFieldsSpout.java b/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrFieldsSpout.java new file mode 100644 index 0000000..0d85f75 --- /dev/null +++ b/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrFieldsSpout.java @@ -0,0 +1,60 @@ +package org.apache.storm.solr.spout; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import com.google.common.collect.Lists; +import org.apache.storm.solr.util.TestUtil; + +import java.util.List; +import java.util.Map; +import java.util.Random; + +/** + * Created by hlouro on 7/24/15. + */ +public class SolrFieldsSpout extends BaseRichSpout { + private SpoutOutputCollector collector; + public static final List listValues = Lists.newArrayList( + getValues("1"), getValues("2"), getValues("3")); + + private static Values getValues(String suf) { + String suffix = "_hmcl_fields_test_val_" + suf; + return new Values( + "id" + suffix, + TestUtil.getDate(), + "dc_title" + suffix, + "dynamic_field" + suffix + "_txt", // to match dynamic fields of the form "*_txt" + "non_matching_field" + suffix); // this field won't be indexed by solr + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.collector = collector; + } + + @Override + public void nextTuple() { + final Random rand = new Random(); + final Values values = listValues.get(rand.nextInt(listValues.size())); + collector.emit(values); + Thread.yield(); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(getOutputFields()); + } + + public Fields getOutputFields() { + return new Fields("id","date","dc_title","dynamic_field_txt","non_matching_field"); + } + + @Override + public void close() { + super.close(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrJsonSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrJsonSpout.java b/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrJsonSpout.java new file mode 100644 index 0000000..7352614 --- /dev/null +++ b/external/storm-solr/src/test/java/org/apache/storm/solr/spout/SolrJsonSpout.java @@ -0,0 +1,112 @@ +package org.apache.storm.solr.spout; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import org.junit.Test; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Random; + +/** + * Created by hlouro on 7/24/15. + */ +public class SolrJsonSpout extends BaseRichSpout { + private SpoutOutputCollector collector; + private static final List listValues = Lists.newArrayList( + new Values((new JsonSchema("_hmcl_json_test_1")).toJson()), + new Values((new JsonSchema("_hmcl_json_test_2")).toJson()), + new Values((new JsonSchema("_hmcl_json_test_3")).toJson()), + new Values(new JsonSchema("_hmcl_json_test_4")), + new Values(new JsonSchema("_hmcl_json_test_5"))); + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.collector = collector; + } + + @Override + public void nextTuple() { + final Random rand = new Random(); + final Values values = listValues.get(rand.nextInt(listValues.size())); + collector.emit(values); + Thread.yield(); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(getOutputFields()); + } + + public Fields getOutputFields() { + return new Fields("JSON"); + } + + @Override + public void close() { //TODO + super.close(); + } + + public static class JsonSchema { + private String id; + private String date; + private String dc_title; + + private static final Gson gson = new Gson(); + + public JsonSchema(String suffix) { + this.id = "id" + suffix; + this.date = getDate(); + this.dc_title = "dc_title" + suffix; + } + + public JsonSchema(String id, String date, String dc_title) { + this.id = id; + this.date = date; + this.dc_title = dc_title; + } + + // copy constructor + public JsonSchema(JsonSchema jsonSchema) { + this.id = jsonSchema.id; + this.date = jsonSchema.date; + this.dc_title = jsonSchema.dc_title; + } + + public String toJson() { + String json = gson.toJson(this); + System.out.println(json); // TODO log + return json; + } + + //TODO: clean this up + public static JsonSchema fromJson(String jsonStr) { + return new JsonSchema(gson.fromJson(jsonStr, JsonSchema.class)); + } + + private String getDate() { + DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + String date = df.format(new Date()); + System.out.println(date); + return date; + } + } + + //TODO Delete + @Test + public void test() { + SolrJsonSpout solrJsonSpout = new SolrJsonSpout(); + System.out.println("stop"); + } + + +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/test/java/org/apache/storm/solr/topology/.gitignore ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/.gitignore b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/.gitignore new file mode 100644 index 0000000..1adc831 --- /dev/null +++ b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/.gitignore @@ -0,0 +1 @@ +# Created by .ignore support plugin (hsz.mobi) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrFieldsTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrFieldsTopology.java b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrFieldsTopology.java new file mode 100644 index 0000000..27d57db --- /dev/null +++ b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrFieldsTopology.java @@ -0,0 +1,40 @@ +package org.apache.storm.solr.topology; + +import backtype.storm.generated.StormTopology; +import backtype.storm.topology.TopologyBuilder; +import org.apache.storm.solr.bolt.SolrUpdateBolt; +import org.apache.storm.solr.config.CountBasedCommit; +import org.apache.storm.solr.config.SolrCommitStrategy; +import org.apache.storm.solr.mapper.SolrFieldsMapper; +import org.apache.storm.solr.mapper.SolrMapper; +import org.apache.storm.solr.schema.builder.RestJsonSchemaBuilder; +import org.apache.storm.solr.spout.SolrFieldsSpout; + +import java.io.IOException; + +/** + * Created by hlouro on 7/31/15. + */ +public class SolrFieldsTopology extends SolrTopology { + public static void main(String[] args) throws Exception { + SolrFieldsTopology solrJsonTopology = new SolrFieldsTopology(); + solrJsonTopology.run(args); + } + + protected SolrMapper getSolrMapper() throws IOException { + return new SolrFieldsMapper.Builder( + new RestJsonSchemaBuilder("localhost", "8983", COLLECTION)).setCollection(COLLECTION).build(); + } + + protected SolrCommitStrategy getSolrCommitStgy() { + return new CountBasedCommit(2); // To Commit to Solr and Ack according to the commit strategy + } + + protected StormTopology getTopology() throws IOException { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("SolrFieldsSpout", new SolrFieldsSpout()); + builder.setBolt("SolrUpdateBolt", new SolrUpdateBolt(getSolrConfig(), getSolrMapper(), getSolrCommitStgy())) + .shuffleGrouping("SolrFieldsSpout"); + return builder.createTopology(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrJsonTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrJsonTopology.java b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrJsonTopology.java new file mode 100644 index 0000000..bcb3f55 --- /dev/null +++ b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrJsonTopology.java @@ -0,0 +1,33 @@ +package org.apache.storm.solr.topology; + +import backtype.storm.generated.StormTopology; +import backtype.storm.topology.TopologyBuilder; +import org.apache.storm.solr.bolt.SolrUpdateBolt; +import org.apache.storm.solr.mapper.SolrJsonMapper; +import org.apache.storm.solr.mapper.SolrMapper; +import org.apache.storm.solr.spout.SolrJsonSpout; + +import java.io.IOException; + +/** + * Created by hlouro on 7/31/15. + */ +public class SolrJsonTopology extends SolrTopology { + public static void main(String[] args) throws Exception { + SolrJsonTopology solrJsonTopology = new SolrJsonTopology(); + solrJsonTopology.run(args); + } + + protected SolrMapper getSolrMapper() throws IOException { + final String jsonTupleField = "JSON"; + return new SolrJsonMapper(COLLECTION, jsonTupleField); + } + + protected StormTopology getTopology() throws IOException { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("SolrJsonSpout", new SolrJsonSpout()); + builder.setBolt("SolrUpdateBolt", new SolrUpdateBolt(getSolrConfig(), getSolrMapper(), getSolrCommitStgy())) + .shuffleGrouping("SolrJsonSpout"); + return builder.createTopology(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrTopology.java b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrTopology.java new file mode 100644 index 0000000..38aae80 --- /dev/null +++ b/external/storm-solr/src/test/java/org/apache/storm/solr/topology/SolrTopology.java @@ -0,0 +1,67 @@ +package org.apache.storm.solr.topology; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.generated.StormTopology; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.storm.solr.config.SolrCommitStrategy; +import org.apache.storm.solr.config.SolrConfig; + +import java.io.IOException; + +/** + * Created by hlouro on 7/24/15. + */ +public abstract class SolrTopology { + protected static String COLLECTION = "gettingstarted"; + protected static SolrClient solrClient = getSolrClient(); + + public void run(String[] args) throws Exception { + final StormTopology topology = getTopology(); + final Config config = getConfig(); + + if (args.length == 0) { + submitTopologyLocalCluster(topology, config); + } else { + submitTopologyRemoteCluster(args[1], topology, config); + } + } + + protected abstract StormTopology getTopology() throws IOException; + + protected void submitTopologyRemoteCluster(String arg, StormTopology topology, Config config) throws Exception { + StormSubmitter.submitTopology(arg, config, topology); + } + + protected void submitTopologyLocalCluster(StormTopology topology, Config config) throws InterruptedException { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", config, topology); + Thread.sleep(10000); + cluster.killTopology("test"); + cluster.shutdown(); + System.exit(0); + } + + protected Config getConfig() { + Config config = new Config(); + config.setDebug(true); + return config; + } + + protected SolrCommitStrategy getSolrCommitStgy() { + return null; // To Commit to Solr and Ack every tuple + } + + protected static SolrConfig getSolrConfig() { + String zkHostString = "127.0.0.1:9983"; // zkHostString for Solr gettingstarted example + return new SolrConfig(zkHostString); + } + + protected static SolrClient getSolrClient() { + String zkHostString = "127.0.0.1:9983"; // zkHostString for Solr gettingstarted example + return new CloudSolrClient(zkHostString); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java b/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java new file mode 100644 index 0000000..86dabe3 --- /dev/null +++ b/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java @@ -0,0 +1,30 @@ +package org.apache.storm.solr.trident; + +import backtype.storm.generated.StormTopology; +import backtype.storm.tuple.Fields; +import org.apache.storm.solr.spout.SolrFieldsSpout; +import org.apache.storm.solr.topology.SolrFieldsTopology; +import storm.trident.Stream; +import storm.trident.TridentTopology; +import storm.trident.state.StateFactory; + +import java.io.IOException; + +/** + * Created by hlouro on 7/31/15. + */ +public class SolrFieldsTridentTopology extends SolrFieldsTopology { + public static void main(String[] args) throws Exception { + SolrFieldsTridentTopology solrJsonTopology = new SolrFieldsTridentTopology(); + solrJsonTopology.run(args); + } + + protected StormTopology getTopology() throws IOException { + final TridentTopology topology = new TridentTopology(); + final SolrFieldsSpout spout = new SolrFieldsSpout(); + final Stream stream = topology.newStream("SolrFieldsSpout", spout); + final StateFactory solrStateFactory = new SolrStateFactory(getSolrConfig(), getSolrMapper()); + stream.partitionPersist(solrStateFactory, spout.getOutputFields(), new SolrUpdater(), new Fields()); + return topology.build(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java b/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java new file mode 100644 index 0000000..8f28909 --- /dev/null +++ b/external/storm-solr/src/test/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java @@ -0,0 +1,30 @@ +package org.apache.storm.solr.trident; + +import backtype.storm.generated.StormTopology; +import backtype.storm.tuple.Fields; +import org.apache.storm.solr.spout.SolrJsonSpout; +import org.apache.storm.solr.topology.SolrJsonTopology; +import storm.trident.Stream; +import storm.trident.TridentTopology; +import storm.trident.state.StateFactory; + +import java.io.IOException; + +/** + * Created by hlouro on 7/31/15. + */ +public class SolrJsonTridentTopology extends SolrJsonTopology { + public static void main(String[] args) throws Exception { + SolrJsonTridentTopology solrJsonTopology = new SolrJsonTridentTopology(); + solrJsonTopology.run(args); + } + + protected StormTopology getTopology() throws IOException { + final TridentTopology topology = new TridentTopology(); + final SolrJsonSpout spout = new SolrJsonSpout(); + final Stream stream = topology.newStream("SolrJsonSpout", spout); + final StateFactory solrStateFactory = new SolrStateFactory(getSolrConfig(), getSolrMapper()); + stream.partitionPersist(solrStateFactory, spout.getOutputFields(), new SolrUpdater(), new Fields()); + return topology.build(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/external/storm-solr/src/test/java/org/apache/storm/solr/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/external/storm-solr/src/test/java/org/apache/storm/solr/util/TestUtil.java b/external/storm-solr/src/test/java/org/apache/storm/solr/util/TestUtil.java new file mode 100644 index 0000000..66bc4e6 --- /dev/null +++ b/external/storm-solr/src/test/java/org/apache/storm/solr/util/TestUtil.java @@ -0,0 +1,17 @@ +package org.apache.storm.solr.util; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * Created by hlouro on 7/31/15. + */ +public class TestUtil { + public static String getDate() { + DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + String date = df.format(new Date()); + System.out.println(date); + return date; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4ab6e0c0/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1443a36..c2b73bc 100644 --- a/pom.xml +++ b/pom.xml @@ -169,8 +169,12 @@ external/storm-redis external/storm-eventhubs external/flux +<<<<<<< HEAD examples/storm-starter external/storm-elasticsearch +======= + external/storm-solr +>>>>>>> STORM-851: Storm Solr Connector