storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From K Zharas <kgzha...@gmail.com>
Subject Storm + sklearn
Date Sun, 03 Jan 2016 07:19:32 GMT
Hi. After running storm-start's topologies, I decided to create my own
topology by implementing sklearn. However, I get errors as I expected.

My questions are
1) Am I in the right path of implementing sklearn into Storm? How to fix
this topology?
2) Do I have to run "mvn clean install -DskipTests=true" and "mvn package"
after any changes?

Here is my topology (basically I changed WordCountTopology)

*TestTopology.java*
----------------------------------------------------------------------
public class TestTopology {
  public static class Predict extends ShellBolt implements IRichBolt {

    public Predict() {
      super("python", "predict.py");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("number", "pred"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
      return null;
    }
  }

  public static void main(String[] args) throws Exception {

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("spout", new RandomNumberSpout(), 5);

    builder.setBolt("predict", new Predict(), 8).shuffleGrouping("spout");

    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
builder.createTopology());
    }
    else {
      conf.setMaxTaskParallelism(3);

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("test", conf, builder.createTopology());

      Thread.sleep(10000);

      cluster.shutdown();
    }
  }
}
----------------------------------------------------------------------

*RandomNumberSpout.java*
----------------------------------------------------------------------
public class RandomNumberSpout extends BaseRichSpout {
  SpoutOutputCollector _collector;
  Random _rand;

  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector
collector) {
    _collector = collector;
    _rand = new Random();
  }

  @Override
  public void nextTuple() {
    Utils.sleep(100);
    int number = 3;
    _collector.emit(new Values(number));
  }

  @Override
  public void ack(Object id) {
  }

  @Override
  public void fail(Object id) {
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("number"));
  }
}
----------------------------------------------------------------------

*predict.py*
----------------------------------------------------------------------
import storm

import numpy as np
X = np.array([[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2]])
Y = np.array([1, 1, 1, 2, 2, 2])
from sklearn.naive_bayes import GaussianNB
clf = GaussianNB()
clf.fit(X, Y)

class PredictBolt(storm.BasicBolt):
    def process(self, tup):
        pred = clf.predict([[tup.values[0], tup.values[0]]])
        print pred
        storm.emit([pred])

PredictBolt().run()
----------------------------------------------------------------------

Thank you.

Best regards,
Zharas Andamassov

Mime
View raw message