storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptgo...@apache.org
Subject [1/5] storm git commit: [STORM-1505] Add map and flatMap functions in trident stream
Date Wed, 03 Feb 2016 21:59:24 GMT
Repository: storm
Updated Branches:
  refs/heads/master 3d9481f40 -> 695f8c931


[STORM-1505] Add map and flatMap functions in trident stream

map and flatmap are common stream operations. Right now in trident this has to be
implemented via each() which also sends the input field values
in addition to the mapped field values, so the map and flatmap should make things slightly
more efficient and easy.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1832c369
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1832c369
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1832c369

Branch: refs/heads/master
Commit: 1832c369735576df61c6533ff772814b537e5d68
Parents: ecce1ce
Author: Arun Mahadevan <aiyer@hortonworks.com>
Authored: Wed Jan 27 12:32:33 2016 +0530
Committer: Arun Mahadevan <aiyer@hortonworks.com>
Committed: Wed Jan 27 16:04:59 2016 +0530

----------------------------------------------------------------------
 .../starter/trident/TridentMapExample.java      | 105 +++++++++++++++++++
 .../jvm/org/apache/storm/trident/Stream.java    |  43 +++++++-
 .../trident/operation/FlatMapFunction.java      |  36 +++++++
 .../storm/trident/operation/MapFunction.java    |  35 +++++++
 .../operation/impl/FlatMapFunctionExecutor.java |  52 +++++++++
 .../operation/impl/MapFunctionExecutor.java     |  50 +++++++++
 .../trident/planner/processor/MapProcessor.java |  87 +++++++++++++++
 7 files changed, 407 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1832c369/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
new file mode 100644
index 0000000..c493e04
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalDRPC;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.FlatMapFunction;
+import org.apache.storm.trident.operation.MapFunction;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.operation.builtin.FilterNull;
+import org.apache.storm.trident.operation.builtin.MapGet;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A simple example that demonstrates the usage of {@link org.apache.storm.trident.Stream#map(MapFunction)}
and
+ * {@link org.apache.storm.trident.Stream#flatMap(FlatMapFunction)} functions.
+ */
+public class TridentMapExample {
+
+    private static MapFunction toUpper = new MapFunction() {
+        @Override
+        public Values execute(Values input) {
+            return new Values(((String) input.get(0)).toUpperCase());
+        }
+    };
+
+    private static FlatMapFunction split = new FlatMapFunction() {
+        @Override
+        public Iterable<Values> execute(Values input) {
+            List<Values> valuesList = new ArrayList<>();
+            for (String word : ((String) input.get(0)).split(" ")) {
+                valuesList.add(new Values(word));
+            }
+            return valuesList;
+        }
+    };
+
+    public static StormTopology buildTopology(LocalDRPC drpc) {
+        FixedBatchSpout spout = new FixedBatchSpout(
+                new Fields("word"), 3, new Values("the cow jumped over the moon"),
+                new Values("the man went to the store and bought some candy"), new Values("four
score and seven years ago"),
+                new Values("how many apples can you eat"), new Values("to be or not to be
the person"));
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16)
+                .flatMap(split)
+                .map(toUpper)
+                .groupBy(new Fields("word"))
+                .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
+                .parallelismHint(16);
+
+        topology.newDRPCStream("words", drpc)
+                .flatMap(split)
+                .groupBy(new Fields("args"))
+                .stateQuery(wordCounts, new Fields("args"), new MapGet(), new Fields("count"))
+                .each(new Fields("count"), new FilterNull())
+                .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+        Config conf = new Config();
+        conf.setMaxSpoutPending(20);
+        if (args.length == 0) {
+            LocalDRPC drpc = new LocalDRPC();
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
+            for (int i = 0; i < 100; i++) {
+                System.out.println("DRPC RESULT: " + drpc.execute("words", "CAT THE DOG JUMPED"));
+                Thread.sleep(1000);
+            }
+        } else {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1832c369/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index fb2497a..32daa33 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -21,6 +21,11 @@ import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.NullStruct;
 import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
 import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.trident.operation.FlatMapFunction;
+import org.apache.storm.trident.operation.MapFunction;
+import org.apache.storm.trident.operation.impl.FlatMapFunctionExecutor;
+import org.apache.storm.trident.operation.impl.MapFunctionExecutor;
+import org.apache.storm.trident.planner.processor.MapProcessor;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.trident.fluent.GlobalAggregationScheme;
@@ -323,7 +328,43 @@ public class Stream implements IAggregatableStream {
     
     public Stream each(Fields inputFields, Filter filter) {
         return each(inputFields, new FilterExecutor(filter), new Fields());
-    }    
+    }
+
+    /**
+     * Returns a stream consisting of the result of applying the given mapping function to
the values of this stream.
+     *
+     * @param function a mapping function to be applied to each value in this stream.
+     * @return the new stream
+     */
+    public Stream map(MapFunction function) {
+        projectionValidation(getOutputFields());
+        return _topology.addSourcedNode(this,
+                                        new ProcessorNode(
+                                                _topology.getUniqueStreamId(),
+                                                _name,
+                                                getOutputFields(),
+                                                getOutputFields(),
+                                                new MapProcessor(getOutputFields(), new MapFunctionExecutor(function))));
+    }
+
+    /**
+     * Returns a stream consisting of the results of replacing each value of this stream
with the contents
+     * produced by applying the provided mapping function to each value. This has the effect
of applying
+     * a one-to-many transformation to the values of the stream, and then flattening the
resulting elements into a new stream.
+     *
+     * @param function a mapping function to be applied to each value in this stream which
produces new values.
+     * @return the new stream
+     */
+    public Stream flatMap(FlatMapFunction function) {
+        projectionValidation(getOutputFields());
+        return _topology.addSourcedNode(this,
+                                        new ProcessorNode(
+                                                _topology.getUniqueStreamId(),
+                                                _name,
+                                                getOutputFields(),
+                                                getOutputFields(),
+                                                new MapProcessor(getOutputFields(), new FlatMapFunctionExecutor(function))));
+    }
     
     public ChainedAggregatorDeclarer chainedAgg() {
         return new ChainedAggregatorDeclarer(this, new BatchGlobalAggScheme());

http://git-wip-us.apache.org/repos/asf/storm/blob/1832c369/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java b/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java
new file mode 100644
index 0000000..b19f811
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation;
+
+import org.apache.storm.tuple.Values;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * A one to many transformation function
+ */
+public interface FlatMapFunction extends Serializable {
+    /**
+     * Invoked by the framework for each value in a stream.
+     *
+     * @param input the input value
+     * @return an iterable over the resultant values
+     */
+    Iterable<Values> execute(Values input);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1832c369/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java b/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java
new file mode 100644
index 0000000..8611afd
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation;
+
+import org.apache.storm.tuple.Values;
+
+import java.io.Serializable;
+
+/**
+ * A one-one transformation function
+ */
+public interface MapFunction extends Serializable {
+    /**
+     * Invoked by the framework for each value in a stream.
+     *
+     * @param input the input value
+     * @return the transformed value
+     */
+    Values execute(Values input);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1832c369/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java
b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java
new file mode 100644
index 0000000..8ab2a17
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation.impl;
+
+import org.apache.storm.trident.operation.FlatMapFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+
+public class FlatMapFunctionExecutor implements Function {
+    private final FlatMapFunction function;
+
+    public FlatMapFunctionExecutor(FlatMapFunction function) {
+        this.function = function;
+    }
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+        for (Values values : function.execute(new Values(tuple.getValues().toArray()))) {
+            collector.emit(values);
+        }
+    }
+
+    @Override
+    public void prepare(Map conf, TridentOperationContext context) {
+        // NOOP
+    }
+
+    @Override
+    public void cleanup() {
+        // NOOP
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1832c369/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java
b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java
new file mode 100644
index 0000000..a751570
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation.impl;
+
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.MapFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+
+public class MapFunctionExecutor implements Function {
+    private final MapFunction function;
+
+    public MapFunctionExecutor(MapFunction function) {
+        this.function = function;
+    }
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+        collector.emit(function.execute(new Values(tuple.getValues().toArray())));
+    }
+
+    @Override
+    public void prepare(Map conf, TridentOperationContext context) {
+        // NOOP
+    }
+
+    @Override
+    public void cleanup() {
+        // NOOP
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1832c369/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
b/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
new file mode 100644
index 0000000..2fd0a4c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.planner.processor;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.FlatMapFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.MapFunction;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.planner.ProcessorContext;
+import org.apache.storm.trident.planner.TridentProcessor;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.tuple.TridentTupleView;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Processor for executing {@link org.apache.storm.trident.Stream#map(MapFunction)} and
+ * {@link org.apache.storm.trident.Stream#flatMap(FlatMapFunction)} functions.
+ */
+public class MapProcessor implements TridentProcessor {
+    Function _function;
+    TridentContext _context;
+    FreshCollector _collector;
+    Fields _inputFields;
+    TridentTupleView.ProjectionFactory _projection;
+
+    public MapProcessor(Fields inputFields, Function function) {
+        _function = function;
+        _inputFields = inputFields;
+    }
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext)
{
+        List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
+        if(parents.size()!=1) {
+            throw new RuntimeException("Each operation can only have one parent");
+        }
+        _context = tridentContext;
+        _collector = new FreshCollector(tridentContext);
+        _projection = new TridentTupleView.ProjectionFactory(parents.get(0), _inputFields);
+        _function.prepare(conf, new TridentOperationContext(context, _projection));
+    }
+
+    @Override
+    public void cleanup() {
+        _function.cleanup();
+    }
+
+    @Override
+    public void execute(ProcessorContext processorContext, String streamId, TridentTuple
tuple) {
+        _collector.setContext(processorContext);
+        _function.execute(_projection.create(tuple), _collector);
+    }
+
+    @Override
+    public void startBatch(ProcessorContext processorContext) {
+        // NOOP
+    }
+
+    @Override
+    public void finishBatch(ProcessorContext processorContext) {
+        // NOOP
+    }
+
+    @Override
+    public TridentTuple.Factory getOutputFactory() {
+        return _collector.getOutputFactory();
+    }
+}


Mime
View raw message