storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sam mohel <sammoh...@gmail.com>
Subject Re: Storm + sklearn
Date Sun, 03 Jan 2016 07:26:16 GMT
what i know beacuse i'm new to storm too , that you should mvn clean then
mvn package after any change in the code
because you will later submit your topology with changes and all of this in
jar file that you mvn it

i hope someone help you in your code

On Sun, Jan 3, 2016 at 9:19 AM, K Zharas <kgzharas@gmail.com> wrote:

> Hi. After running storm-start's topologies, I decided to create my own
> topology by implementing sklearn. However, I get errors as I expected.
>
> My questions are
> 1) Am I in the right path of implementing sklearn into Storm? How to fix
> this topology?
> 2) Do I have to run "mvn clean install -DskipTests=true" and "mvn
> package" after any changes?
>
> Here is my topology (basically I changed WordCountTopology)
>
> *TestTopology.java*
> ----------------------------------------------------------------------
> public class TestTopology {
>   public static class Predict extends ShellBolt implements IRichBolt {
>
>     public Predict() {
>       super("python", "predict.py");
>     }
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>       declarer.declare(new Fields("number", "pred"));
>     }
>
>     @Override
>     public Map<String, Object> getComponentConfiguration() {
>       return null;
>     }
>   }
>
>   public static void main(String[] args) throws Exception {
>
>     TopologyBuilder builder = new TopologyBuilder();
>
>     builder.setSpout("spout", new RandomNumberSpout(), 5);
>
>     builder.setBolt("predict", new Predict(), 8).shuffleGrouping("spout");
>
>     Config conf = new Config();
>     conf.setDebug(true);
>
>     if (args != null && args.length > 0) {
>       conf.setNumWorkers(3);
>
>       StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
> builder.createTopology());
>     }
>     else {
>       conf.setMaxTaskParallelism(3);
>
>       LocalCluster cluster = new LocalCluster();
>       cluster.submitTopology("test", conf, builder.createTopology());
>
>       Thread.sleep(10000);
>
>       cluster.shutdown();
>     }
>   }
> }
> ----------------------------------------------------------------------
>
> *RandomNumberSpout.java*
> ----------------------------------------------------------------------
> public class RandomNumberSpout extends BaseRichSpout {
>   SpoutOutputCollector _collector;
>   Random _rand;
>
>   @Override
>   public void open(Map conf, TopologyContext context, SpoutOutputCollector
> collector) {
>     _collector = collector;
>     _rand = new Random();
>   }
>
>   @Override
>   public void nextTuple() {
>     Utils.sleep(100);
>     int number = 3;
>     _collector.emit(new Values(number));
>   }
>
>   @Override
>   public void ack(Object id) {
>   }
>
>   @Override
>   public void fail(Object id) {
>   }
>
>   @Override
>   public void declareOutputFields(OutputFieldsDeclarer declarer) {
>     declarer.declare(new Fields("number"));
>   }
> }
> ----------------------------------------------------------------------
>
> *predict.py*
> ----------------------------------------------------------------------
> import storm
>
> import numpy as np
> X = np.array([[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2]])
> Y = np.array([1, 1, 1, 2, 2, 2])
> from sklearn.naive_bayes import GaussianNB
> clf = GaussianNB()
> clf.fit(X, Y)
>
> class PredictBolt(storm.BasicBolt):
>     def process(self, tup):
>         pred = clf.predict([[tup.values[0], tup.values[0]]])
>         print pred
>         storm.emit([pred])
>
> PredictBolt().run()
> ----------------------------------------------------------------------
>
> Thank you.
>
> Best regards,
> Zharas Andamassov
>

Mime
View raw message