storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 张博 <jyzhan...@gmail.com>
Subject How to use Drools in the topology
Date Wed, 13 Sep 2017 06:15:35 GMT
Hi!
Now I want to use Drools in a blot,it works normal in the LocalCluster, but
when I put it to the production cluster,it has error.
The blot:
public class DealLostBolt extends BaseRichBolt {

  private static final long serialVersionUID = 1L;

  private static final Logger LOGGER =
LoggerFactory.getLogger("DEAL_LOST_BOLT");

  private OutputCollector collector;

  private KieSession kieSession;

  private FactHandle factHandle;

  @Override
  public void execute(Tuple input) {
    // 获取数据
    String sentence = (String) input.getValue(0);
    LOGGER.info("DealLostBolt获取到的数据:" + sentence);

    // 数据转换
    PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class, sentence);

    KieServices ks = KieServices.Factory.get();
    KieContainer kieContainer = ks.getKieClasspathContainer();
    kieSession = kieContainer.newKieSession("all-rule");
    kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();

    factHandle = kieSession.insert(dataPoint);
    kieSession.fireAllRules();
    kieSession.delete(factHandle);

    collector.emit(new Values(sentence));
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("value"));

  }

  @Override
  public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
    this.collector = collector;
  }

}
The erros:
java.lang.RuntimeException: java.lang.NullPointerException
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:495)
~[storm-core-1.1.1.jar:1.1.1]
at
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460)
~[storm-core-1.1.1.jar:1.1.1]
at
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
~[storm-core-1.1.1.jar:1.1.1]
at
org.apache.storm.daemon.executor$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848)
~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.1.1.jar:1.1.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.lang.NullPointerException
at
org.kie.internal.io.ResourceFactory.newByteArrayResource(ResourceFactory.java:66)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.drools.compiler.kie.builder.impl.AbstractKieModule.getResource(AbstractKieModule.java:299)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.drools.compiler.kie.builder.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:264)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.drools.compiler.kie.builder.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:259)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.drools.compiler.kie.builder.impl.AbstractKieProject.buildKnowledgePackages(AbstractKieProject.java:228)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.drools.compiler.kie.builder.impl.AbstractKieModule.createKieBase(AbstractKieModule.java:206)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.drools.compiler.kie.builder.impl.KieContainerImpl.createKieBase(KieContainerImpl.java:584)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.drools.compiler.kie.builder.impl.KieContainerImpl.getKieBase(KieContainerImpl.java:552)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.drools.compiler.kie.builder.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:680)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.drools.compiler.kie.builder.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:648)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at cn.ennwifi.storm.bolt.DealLostBolt.execute(DealLostBolt.java:52)
~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at
org.apache.storm.daemon.executor$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729)
~[storm-core-1.1.1.jar:1.1.1]
at
org.apache.storm.daemon.executor$mk_task_receiver$fn__4951.invoke(executor.clj:461)
~[storm-core-1.1.1.jar:1.1.1]
at
org.apache.storm.disruptor$clojure_handler$reify__4465.onEvent(disruptor.clj:40)
~[storm-core-1.1.1.jar:1.1.1]
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:482)
~[storm-core-1.1.1.jar:1.1.1]
... 6 more

Could somebody help me?

Thanks!

Mime
View raw message