storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: STORM-2103: Introduce new sql external module: storm-sql-mongodb
Date Sat, 05 Nov 2016 01:55:53 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch 109448005 -> c760e1395


STORM-2103: Introduce new sql external module: storm-sql-mongodb


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0920b6d4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0920b6d4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0920b6d4

Branch: refs/heads/1.x-branch
Commit: 0920b6d4d291e5b0d67f0eb38fa23e38122d997f
Parents: 1094480
Author: vesense <best.wangxin@163.com>
Authored: Fri Oct 28 13:38:42 2016 +0800
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Sat Nov 5 10:50:55 2016 +0900

----------------------------------------------------------------------
 external/sql/pom.xml                            |   1 +
 .../apache/storm/sql/parser/SqlCreateTable.java |  20 ++-
 .../test/org/apache/storm/sql/TestStormSql.java |  11 +-
 .../storm-sql-external/storm-sql-kafka/pom.xml  |   4 -
 .../sql/kafka/KafkaDataSourcesProvider.java     |  29 ++---
 .../sql/kafka/TestKafkaDataSourcesProvider.java |  31 +++--
 .../storm-sql-mongodb/pom.xml                   |  84 +++++++++++++
 .../sql/mongodb/MongoDataSourcesProvider.java   | 116 ++++++++++++++++++
 ...apache.storm.sql.runtime.DataSourcesProvider |  16 +++
 .../mongodb/TestMongoDataSourcesProvider.java   | 122 +++++++++++++++++++
 .../storm-sql-external/storm-sql-redis/pom.xml  |   4 -
 .../sql/redis/RedisDataSourcesProvider.java     |  27 +---
 .../sql/redis/TestRedisDataSourcesProvider.java |  26 ++--
 .../storm/sql/runtime/DataSourcesProvider.java  |   5 +-
 .../storm/sql/runtime/DataSourcesRegistry.java  |   5 +-
 .../storm/sql/runtime/utils/FieldInfoUtils.java |  39 ++++++
 storm-dist/binary/src/main/assembly/binary.xml  |   7 ++
 17 files changed, 452 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0920b6d4/external/sql/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/pom.xml b/external/sql/pom.xml
index 406069b..3fd6a34 100644
--- a/external/sql/pom.xml
+++ b/external/sql/pom.xml
@@ -41,5 +41,6 @@
         <module>storm-sql-runtime</module>
         <module>storm-sql-external/storm-sql-kafka</module>
         <module>storm-sql-external/storm-sql-redis</module>
+        <module>storm-sql-external/storm-sql-mongodb</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/storm/blob/0920b6d4/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
index 8ac52ed..d810d3a 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.sql.parser;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
@@ -29,8 +30,11 @@ import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.util.ImmutableNullableList;
 
+import java.io.IOException;
 import java.net.URI;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Properties;
 
 public class SqlCreateTable extends SqlCall {
   public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator(
@@ -109,7 +113,7 @@ public class SqlCreateTable extends SqlCall {
   }
 
   public URI location() {
-    return URI.create(SqlLiteral.stringValue(location));
+    return URI.create(getString(location));
   }
 
   public String inputFormatClass() {
@@ -120,8 +124,18 @@ public class SqlCreateTable extends SqlCall {
     return getString(outputFormatClass);
   }
 
-  public String properties() {
-    return getString(properties);
+  public Properties properties() {
+    Properties props = new Properties();
+    if (properties != null) {
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        HashMap<String, Object> map = mapper.readValue(getString(properties), HashMap.class);
+        props.putAll(map);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return props;
   }
 
   private String getString(SqlNode n) {

http://git-wip-us.apache.org/repos/asf/storm/blob/0920b6d4/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index b0cffdd..1470344 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 public class TestStormSql {
   private static class MockDataSourceProvider implements DataSourcesProvider {
@@ -54,7 +55,7 @@ public class TestStormSql {
 
     @Override
     public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String
outputFormatClass,
-                                                  String properties, List<FieldInfo>
fields) {
+                                                  Properties properties, List<FieldInfo>
fields) {
       return new TestUtils.MockSqlTridentDataSource();
     }
   }
@@ -74,7 +75,7 @@ public class TestStormSql {
 
     @Override
     public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String
outputFormatClass,
-                                                  String properties, List<FieldInfo>
fields) {
+                                                  Properties properties, List<FieldInfo>
fields) {
       return new TestUtils.MockSqlTridentDataSource();
     }
   }
@@ -94,7 +95,7 @@ public class TestStormSql {
 
     @Override
     public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String
outputFormatClass,
-                                                  String properties, List<FieldInfo>
fields) {
+                                                  Properties properties, List<FieldInfo>
fields) {
       return new TestUtils.MockSqlTridentGroupedDataSource();
     }
   }
@@ -114,7 +115,7 @@ public class TestStormSql {
 
     @Override
     public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String
outputFormatClass,
-                                                  String properties, List<FieldInfo>
fields) {
+                                                  Properties properties, List<FieldInfo>
fields) {
       return new TestUtils.MockSqlTridentJoinDataSourceEmp();
     }
   }
@@ -134,7 +135,7 @@ public class TestStormSql {
 
     @Override
     public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String
outputFormatClass,
-                                                  String properties, List<FieldInfo>
fields) {
+                                                  Properties properties, List<FieldInfo>
fields) {
       return new TestUtils.MockSqlTridentJoinDataSourceDept();
     }
   }

http://git-wip-us.apache.org/repos/asf/storm/blob/0920b6d4/external/sql/storm-sql-external/storm-sql-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/pom.xml b/external/sql/storm-sql-external/storm-sql-kafka/pom.xml
index 021409b..104fa09 100644
--- a/external/sql/storm-sql-external/storm-sql-kafka/pom.xml
+++ b/external/sql/storm-sql-external/storm-sql-kafka/pom.xml
@@ -71,10 +71,6 @@
             <version>${storm.kafka.version}</version>
         </dependency>
         <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-        </dependency>
-        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/storm/blob/0920b6d4/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
b/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
index 19fab05..5037616 100644
--- a/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
+++ b/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
@@ -20,7 +20,6 @@ package org.apache.storm.sql.kafka;
 import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
 import org.apache.storm.kafka.trident.TridentKafkaUpdater;
 import org.apache.storm.spout.SchemeAsMultiScheme;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import org.apache.storm.sql.runtime.*;
 import org.apache.storm.kafka.ZkHosts;
@@ -33,7 +32,6 @@ import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
 import org.apache.storm.trident.spout.ITridentDataSource;
 import org.apache.storm.trident.tuple.TridentTuple;
 
-import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.*;
@@ -71,13 +69,13 @@ public class KafkaDataSourcesProvider implements DataSourcesProvider {
     private final String topic;
     private final int primaryKeyIndex;
     private final List<String> fields;
-    private final String producerProperties;
+    private final Properties props;
     private KafkaTridentDataSource(TridentKafkaConfig conf, String topic, int primaryKeyIndex,
-                                   String producerProperties, List<String> fields)
{
+                                   Properties props, List<String> fields) {
       this.conf = conf;
       this.topic = topic;
       this.primaryKeyIndex = primaryKeyIndex;
-      this.producerProperties = producerProperties;
+      this.props = props;
       this.fields = fields;
     }
 
@@ -88,21 +86,12 @@ public class KafkaDataSourcesProvider implements DataSourcesProvider {
 
     @Override
     public SqlTridentConsumer getConsumer() {
-      Preconditions.checkNotNull(producerProperties,
-          "Writable Kafka Table " + topic + " must contain producer config");
-      Properties props = new Properties();
-      try {
-        ObjectMapper mapper = new ObjectMapper();
-        @SuppressWarnings("unchecked")
-        HashMap<String, Object> map = mapper.readValue(producerProperties, HashMap.class);
-        @SuppressWarnings("unchecked")
-        HashMap<String, Object> producerConfig = (HashMap<String, Object>) map.get("producer");
-        props.putAll(producerConfig);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
+      Preconditions.checkArgument(!props.isEmpty(),
+              "Writable Kafka Table " + topic + " must contain producer config");
+      HashMap<String, Object> producerConfig = (HashMap<String, Object>) props.get("producer");
+      props.putAll(producerConfig);
       Preconditions.checkState(props.containsKey("bootstrap.servers"),
-          "Writable Kafka Table " + topic + " must contain \"bootstrap.servers\" config");
+              "Writable Kafka Table " + topic + " must contain \"bootstrap.servers\" config");
 
       JsonSerializer serializer = new JsonSerializer(fields);
       SqlKafkaMapper mapper = new SqlKafkaMapper(primaryKeyIndex, serializer);
@@ -131,7 +120,7 @@ public class KafkaDataSourcesProvider implements DataSourcesProvider {
 
   @Override
   public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String
outputFormatClass,
-                                                String properties, List<FieldInfo>
fields) {
+                                                Properties properties, List<FieldInfo>
fields) {
     int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_ZK_PORT;
     ZkHosts zk = new ZkHosts(uri.getHost() + ":" + port, uri.getPath());
     Map<String, String> values = parseURIParams(uri.getQuery());

http://git-wip-us.apache.org/repos/asf/storm/blob/0920b6d4/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
b/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
index 2725893..0cde492 100644
--- a/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
+++ b/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
@@ -17,7 +17,6 @@
  */
 package org.apache.storm.sql.kafka;
 
-import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -37,9 +36,7 @@ import org.mockito.internal.util.reflection.Whitebox;
 
 import java.net.URI;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.Future;
 
 import static org.mockito.Mockito.any;
@@ -51,24 +48,26 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 
 public class TestKafkaDataSourcesProvider {
   private static final List<FieldInfo> FIELDS = ImmutableList.of(
-      new FieldInfo("ID", int.class, true),
-      new FieldInfo("val", String.class, false));
+          new FieldInfo("ID", int.class, true),
+          new FieldInfo("val", String.class, false));
   private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
   private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
-  private static final String TBL_PROPERTIES = Joiner.on('\n').join(
-      "{\"producer\": {",
-      "\"bootstrap.servers\": \"localhost:9092\",",
-      "\"acks\": \"1\",",
-      "\"key.serializer\": \"org.apache.kafka.common.serialization.StringSerializer\",",
-      "\"value.serializer\": \"org.apache.kafka.common.serialization.StringSerializer\"",
-      "}",
-      "}"
-  );
+  private static final Properties TBL_PROPERTIES = new Properties();
+
+  static {
+    Map<String,Object> map = new HashMap<>();
+    map.put("bootstrap.servers", "localhost:9092");
+    map.put("acks", "1");
+    map.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+    map.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+    TBL_PROPERTIES.put("producer", map);
+  }
+
   @SuppressWarnings("unchecked")
   @Test
   public void testKafkaSink() {
     ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
-        URI.create("kafka://mock?topic=foo"), null, null, TBL_PROPERTIES, FIELDS);
+            URI.create("kafka://mock?topic=foo"), null, null, TBL_PROPERTIES, FIELDS);
     Assert.assertNotNull(ds);
 
     ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();

http://git-wip-us.apache.org/repos/asf/storm/blob/0920b6d4/external/sql/storm-sql-external/storm-sql-mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-mongodb/pom.xml b/external/sql/storm-sql-external/storm-sql-mongodb/pom.xml
new file mode 100644
index 0000000..40fde8a
--- /dev/null
+++ b/external/sql/storm-sql-external/storm-sql-mongodb/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-sql-mongodb</artifactId>
+
+    <developers>
+        <developer>
+            <id>vesense</id>
+            <name>Xin Wang</name>
+            <email>data.xinwang@gmail.com</email>
+        </developer>
+    </developers>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-mongodb</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <sourceDirectory>src/jvm</sourceDirectory>
+        <testSourceDirectory>src/test</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>${basedir}/src/resources</directory>
+            </resource>
+        </resources>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/0920b6d4/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
b/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
new file mode 100644
index 0000000..3c012c5
--- /dev/null
+++ b/external/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.mongodb;
+
+import com.google.common.base.Preconditions;
+import org.apache.storm.mongodb.common.mapper.MongoMapper;
+import org.apache.storm.mongodb.trident.state.MongoState;
+import org.apache.storm.mongodb.trident.state.MongoStateFactory;
+import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
+import org.apache.storm.sql.runtime.*;
+import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
+import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
+import org.apache.storm.trident.spout.ITridentDataSource;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.tuple.ITuple;
+import org.bson.Document;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Create a MongoDB sink based on the URI and properties. The URI has the format of
+ * mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]].
+ * The properties are in JSON format which specifies the name of the MongoDB collection and
etc.
+ */
+public class MongoDataSourcesProvider implements DataSourcesProvider {
+
+  private static class MongoTridentDataSource implements ISqlTridentDataSource {
+    private final String url;
+    private final Properties props;
+    private final List<FieldInfo> fields;
+
+    private MongoTridentDataSource(String url, Properties props, List<FieldInfo> fields)
{
+      this.url = url;
+      this.props = props;
+      this.fields = fields;
+    }
+
+    @Override
+    public ITridentDataSource getProducer() {
+      throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide
Producer");
+    }
+
+    @Override
+    public SqlTridentConsumer getConsumer() {
+      Preconditions.checkArgument(!props.isEmpty(), "Writable MongoDB must contain collection
config");
+      List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
+      String serField = props.getProperty("trident.ser.field", "tridentSerField");
+      MongoMapper mapper = new TridentMongoMapper(serField, new JsonSerializer(fieldNames));
+
+      MongoState.Options options = new MongoState.Options()
+          .withUrl(url)
+          .withCollectionName(props.getProperty("collection.name"))
+          .withMapper(mapper);
+
+      StateFactory stateFactory = new MongoStateFactory(options);
+      StateUpdater stateUpdater = new MongoStateUpdater();
+
+      return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
+    }
+  }
+
+  private static class TridentMongoMapper implements MongoMapper {
+    private final String serField;
+    private final IOutputSerializer serializer;
+
+    private TridentMongoMapper(String serField, IOutputSerializer serializer) {
+      this.serField = serField;
+      this.serializer = serializer;
+    }
+
+    @Override
+    public Document toDocument(ITuple tuple) {
+      Document document = new Document();
+      byte[] array = serializer.write(tuple.getValues(), null).array();
+      document.append(serField, array);
+      return document;
+    }
+  }
+
+  @Override
+  public String scheme() {
+    return "mongodb";
+  }
+
+  @Override
+  public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
+                              List<FieldInfo> fields) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String
outputFormatClass,
+                                                Properties properties, List<FieldInfo>
fields) {
+
+    return new MongoTridentDataSource(uri.toString(), properties, fields);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0920b6d4/external/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
b/external/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
new file mode 100644
index 0000000..e46d794
--- /dev/null
+++ b/external/sql/storm-sql-external/storm-sql-mongodb/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.storm.sql.mongodb.MongoDataSourcesProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/0920b6d4/external/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java
b/external/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java
new file mode 100644
index 0000000..bc0c695
--- /dev/null
+++ b/external/sql/storm-sql-external/storm-sql-mongodb/src/test/org/apache/storm/sql/mongodb/TestMongoDataSourcesProvider.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.mongodb;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.storm.mongodb.common.MongoDBClient;
+import org.apache.storm.mongodb.trident.state.MongoState;
+import org.apache.storm.mongodb.trident.state.MongoStateFactory;
+import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
+import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.bson.Document;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+public class TestMongoDataSourcesProvider {
+  private static final List<FieldInfo> FIELDS = ImmutableList.of(
+      new FieldInfo("ID", int.class, true),
+      new FieldInfo("val", String.class, false));
+  private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
+  private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
+  private static final Properties TBL_PROPERTIES = new Properties();
+
+  static {
+    TBL_PROPERTIES.put("collection.name", "collection1");
+    TBL_PROPERTIES.put("trident.ser.field", "tridentSerField");
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testMongoSink() {
+    ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
+            URI.create("mongodb://127.0.0.1:27017/test"), null, null, TBL_PROPERTIES, FIELDS);
+    Assert.assertNotNull(ds);
+
+    ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
+
+    Assert.assertEquals(MongoStateFactory.class, consumer.getStateFactory().getClass());
+    Assert.assertEquals(MongoStateUpdater.class, consumer.getStateUpdater().getClass());
+
+    MongoState state = (MongoState) consumer.getStateFactory().makeState(Collections.emptyMap(),
null, 0, 1);
+    StateUpdater stateUpdater = consumer.getStateUpdater();
+
+    MongoDBClient mongoClient = mock(MongoDBClient.class);
+    Whitebox.setInternalState(state, "mongoClient", mongoClient);
+
+    List<TridentTuple> tupleList = mockTupleList();
+
+    for (TridentTuple t : tupleList) {
+      state.updateState(Collections.singletonList(t), null);
+      verify(mongoClient).insert(argThat(new MongoArgMatcher(t)) , eq(true));
+    }
+
+    verifyNoMoreInteractions(mongoClient);
+  }
+
+  private static List<TridentTuple> mockTupleList() {
+    List<TridentTuple> tupleList = new ArrayList<>();
+    TridentTuple t0 = mock(TridentTuple.class);
+    TridentTuple t1 = mock(TridentTuple.class);
+    doReturn(1).when(t0).get(0);
+    doReturn(2).when(t1).get(0);
+    doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
+    doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
+    tupleList.add(t0);
+    tupleList.add(t1);
+    return tupleList;
+  }
+
+  private static class MongoArgMatcher extends ArgumentMatcher<List<Document>>
{
+    private final TridentTuple tuple;
+
+    private MongoArgMatcher(TridentTuple tuple) {
+      this.tuple = tuple;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public boolean matches(Object o) {
+      Document doc = ((List<Document>)o).get(0);
+      ByteBuffer buf = ByteBuffer.wrap((byte[])doc.get(TBL_PROPERTIES.getProperty("trident.ser.field")));
+      ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
+      return b.equals(buf);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0920b6d4/external/sql/storm-sql-external/storm-sql-redis/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-redis/pom.xml b/external/sql/storm-sql-external/storm-sql-redis/pom.xml
index 41cc3b5..b0bb18d 100644
--- a/external/sql/storm-sql-external/storm-sql-redis/pom.xml
+++ b/external/sql/storm-sql-external/storm-sql-redis/pom.xml
@@ -54,10 +54,6 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-        </dependency>
-        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/storm/blob/0920b6d4/external/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
b/external/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
index 3333523..0bd37ca 100644
--- a/external/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
+++ b/external/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
@@ -17,10 +17,7 @@
  */
 package org.apache.storm.sql.redis;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
@@ -36,18 +33,17 @@ import org.apache.storm.sql.runtime.IOutputSerializer;
 import org.apache.storm.sql.runtime.ISqlTridentDataSource;
 import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
 import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
+import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
 import org.apache.storm.trident.spout.ITridentDataSource;
 import org.apache.storm.trident.state.StateFactory;
 import org.apache.storm.trident.state.StateUpdater;
 import org.apache.storm.tuple.ITuple;
 import redis.clients.util.JedisURIHelper;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
 
@@ -62,13 +58,6 @@ public class RedisDataSourcesProvider implements DataSourcesProvider {
   private static final int DEFAULT_REDIS_PORT = 6379;
   private static final int DEFAULT_TIMEOUT = 2000;
 
-  private static class FieldNameExtractor implements Function<FieldInfo, String>, Serializable
{
-    @Override
-    public String apply(FieldInfo fieldInfo) {
-      return fieldInfo.name();
-    }
-  }
-
   private abstract static class AbstractRedisTridentDataSource implements ISqlTridentDataSource,
Serializable {
     protected abstract StateFactory newStateFactory();
     protected abstract StateUpdater newStateUpdater(RedisStoreMapper storeMapper);
@@ -89,7 +78,7 @@ public class RedisDataSourcesProvider implements DataSourcesProvider {
     @Override
     public SqlTridentConsumer getConsumer() {
       RedisDataTypeDescription dataTypeDescription = getDataTypeDesc(props);
-      List<String> fieldNames = Lists.transform(fields, new FieldNameExtractor());
+      List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
 
       RedisStoreMapper storeMapper = new TridentRedisStoreMapper(dataTypeDescription, fields,
new JsonSerializer(fieldNames));
 
@@ -159,7 +148,7 @@ public class RedisDataSourcesProvider implements DataSourcesProvider {
   }
 
   @Override
-  public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String
outputFormatClass, String properties, List<FieldInfo> fields) {
+  public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String
outputFormatClass, Properties props, List<FieldInfo> fields) {
     Preconditions.checkArgument(JedisURIHelper.isValid(uri), "URI is not valid for Redis:
" + uri);
 
     String host = uri.getHost();
@@ -167,16 +156,6 @@ public class RedisDataSourcesProvider implements DataSourcesProvider
{
     int dbIdx = JedisURIHelper.getDBIndex(uri);
     String password = JedisURIHelper.getPassword(uri);
 
-    Properties props = new Properties();
-    try {
-      ObjectMapper mapper = new ObjectMapper();
-      @SuppressWarnings("unchecked")
-      HashMap<String, Object> map = mapper.readValue(properties, HashMap.class);
-      props.putAll(map);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-
     int timeout = Integer.parseInt(props.getProperty("redis.timeout", String.valueOf(DEFAULT_TIMEOUT)));
 
     boolean clusterMode = Boolean.valueOf(props.getProperty("use.redis.cluster", "false"));

http://git-wip-us.apache.org/repos/asf/storm/blob/0920b6d4/external/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java
b/external/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java
index 1072576..94d4949 100644
--- a/external/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java
+++ b/external/sql/storm-sql-external/storm-sql-redis/src/test/org/apache/storm/sql/redis/TestRedisDataSourcesProvider.java
@@ -17,7 +17,6 @@
  */
 package org.apache.storm.sql.redis;
 
-import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.storm.redis.trident.state.RedisClusterState;
@@ -43,6 +42,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
@@ -58,20 +58,16 @@ public class TestRedisDataSourcesProvider {
   private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
   private static final String ADDITIONAL_KEY = "hello";
   private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
-  private static final String TBL_PROPERTIES = Joiner.on('\n').join(
-      "{",
-      "\"data.type\": \"HASH\",",
-      "\"data.additional.key\": \"" + ADDITIONAL_KEY + "\"",
-      "}"
-  );
-
-  private static final String CLUSTER_TBL_PROPERTIES = Joiner.on('\n').join(
-          "{",
-          "\"data.type\": \"HASH\",",
-          "\"data.additional.key\": \"" + ADDITIONAL_KEY + "\",",
-          "\"use.redis.cluster\": \"true\"",
-          "}"
-  );
+  private static final Properties TBL_PROPERTIES = new Properties();
+  private static final Properties CLUSTER_TBL_PROPERTIES = new Properties();
+
+  static {
+    TBL_PROPERTIES.put("data.type", "HASH");
+    TBL_PROPERTIES.put("data.additional.key", ADDITIONAL_KEY);
+    CLUSTER_TBL_PROPERTIES.put("data.type", "HASH");
+    CLUSTER_TBL_PROPERTIES.put("data.additional.key", ADDITIONAL_KEY);
+    CLUSTER_TBL_PROPERTIES.put("use.redis.cluster", "true");
+  }
 
   @SuppressWarnings("unchecked")
   @Test

http://git-wip-us.apache.org/repos/asf/storm/blob/0920b6d4/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
index 85e20d9..dbece9c 100644
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
@@ -19,6 +19,7 @@ package org.apache.storm.sql.runtime;
 
 import java.net.URI;
 import java.util.List;
+import java.util.Properties;
 
 public interface DataSourcesProvider {
   /**
@@ -41,6 +42,6 @@ public interface DataSourcesProvider {
       List<FieldInfo> fields);
 
   ISqlTridentDataSource constructTrident(
-      URI uri, String inputFormatClass, String outputFormatClass,
-      String properties, List<FieldInfo> fields);
+          URI uri, String inputFormatClass, String outputFormatClass,
+          Properties properties, List<FieldInfo> fields);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0920b6d4/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
index b2221ff..dfefb01 100644
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
@@ -24,6 +24,7 @@ import java.net.URI;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.ServiceLoader;
 
 public class DataSourcesRegistry {
@@ -56,8 +57,8 @@ public class DataSourcesRegistry {
   }
 
   public static ISqlTridentDataSource constructTridentDataSource(
-      URI uri, String inputFormatClass, String outputFormatClass,
-      String properties, List<FieldInfo> fields) {
+          URI uri, String inputFormatClass, String outputFormatClass,
+          Properties properties, List<FieldInfo> fields) {
     DataSourcesProvider provider = providers.get(uri.getScheme());
     if (provider == null) {
       return null;

http://git-wip-us.apache.org/repos/asf/storm/blob/0920b6d4/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
new file mode 100644
index 0000000..efd5d25
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/utils/FieldInfoUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime.utils;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.storm.sql.runtime.FieldInfo;
+
+import java.io.Serializable;
+import java.util.List;
+
+public final class FieldInfoUtils {
+
+    public static List<String> getFieldNames(List<FieldInfo> fields) {
+        return Lists.transform(fields, new FieldNameExtractor());
+    }
+
+    private static class FieldNameExtractor implements Function<FieldInfo, String>,
Serializable {
+        @Override
+        public String apply(FieldInfo fieldInfo) {
+            return fieldInfo.name();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0920b6d4/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index 56abed6..afb831c 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -312,6 +312,13 @@
             </includes>
         </fileSet>
         <fileSet>
+            <directory>${project.basedir}/../../external/sql/storm-sql-external/storm-sql-mongodb/target</directory>
+            <outputDirectory>external/sql/storm-sql-external/storm-sql-mongodb</outputDirectory>
+            <includes>
+                <include>storm*jar</include>
+            </includes>
+        </fileSet>
+        <fileSet>
             <directory>${project.basedir}/../../external/sql/storm-sql-runtime/target/app-assembler/repo</directory>
             <outputDirectory>external/sql/storm-sql-runtime</outputDirectory>
             <includes>


Mime
View raw message