storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stig Rohde Døssing <s...@apache.org>
Subject Re: How to use Drools in the topology
Date Wed, 13 Sep 2017 15:27:10 GMT
I'm not familiar with Drools, so I'm just guessing here, but are you doing
any kind of setup of the KieContainer before submitting your topology? When
you run your topology the bolt doesn't run in the same JVM as the topology
setup code, so any setup done via static variables/methods won't transfer
from the submitter JVM to the bolt JVM.

If you need to run code before starting a worker, you might want to look at
https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/hooks/BaseWorkerHook.java
and
https://storm.apache.org/releases/1.0.3/javadocs/org/apache/storm/topology/TopologyBuilder.html#addWorkerHook-org.apache.storm.hooks.IWorkerHook-
.

2017-09-13 15:30 GMT+02:00 zhangwenwei <jerry.zww@icloud.com>:

> According to the log info, there have a NPE occur when call method
> kieContainer.newKieSession().
>
> Best Regards,
> Jerry Zhang
>
> > On 13 Sep 2017, at 14:15, 张博 <jyzhangbo@gmail.com> wrote:
> >
> > Hi!
> > Now I want to use Drools in a blot,it works normal in the LocalCluster,
> but when I put it to the production cluster,it has error.
> > The blot:
> > public class DealLostBolt extends BaseRichBolt {
> >
> >   private static final long serialVersionUID = 1L;
> >
> >   private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_
> LOST_BOLT");
> >
> >   private OutputCollector collector;
> >
> >   private KieSession kieSession;
> >
> >   private FactHandle factHandle;
> >
> >   @Override
> >   public void execute(Tuple input) {
> >     // 获取数据
> >     String sentence = (String) input.getValue(0);
> >     LOGGER.info("DealLostBolt获取到的数据:" + sentence);
> >
> >     // 数据转换
> >     PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class,
> sentence);
> >
> >     KieServices ks = KieServices.Factory.get();
> >     KieContainer kieContainer = ks.getKieClasspathContainer();
> >     kieSession = kieContainer.newKieSession("all-rule");
> >     kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();
> >
> >     factHandle = kieSession.insert(dataPoint);
> >     kieSession.fireAllRules();
> >     kieSession.delete(factHandle);
> >
> >     collector.emit(new Values(sentence));
> >   }
> >
> >   @Override
> >   public void declareOutputFields(OutputFieldsDeclarer declarer) {
> >     declarer.declare(new Fields("value"));
> >
> >   }
> >
> >   @Override
> >   public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
> >     this.collector = collector;
> >   }
> >
> > }
> > The erros:
> > java.lang.RuntimeException: java.lang.NullPointerException
> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:495)
> ~[storm-core-1.1.1.jar:1.1.1]
> >       at org.apache.storm.utils.DisruptorQueue.
> consumeBatchWhenAvailable(DisruptorQueue.java:460)
> ~[storm-core-1.1.1.jar:1.1.1]
> >       at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
> ~[storm-core-1.1.1.jar:1.1.1]
> >       at org.apache.storm.daemon.executor$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848)
> ~[storm-core-1.1.1.jar:1.1.1]
> >       at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
> [storm-core-1.1.1.jar:1.1.1]
> >       at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> >       at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> > Caused by: java.lang.NullPointerException
> >       at org.kie.internal.io.ResourceFactory.newByteArrayResource(ResourceFactory.java:66)
> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> >       at org.drools.compiler.kie.builder.impl.
> AbstractKieModule.getResource(AbstractKieModule.java:299)
> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.
> addResourceToCompiler(AbstractKieModule.java:264)
> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.
> addResourceToCompiler(AbstractKieModule.java:259)
> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> >       at org.drools.compiler.kie.builder.impl.AbstractKieProject.
> buildKnowledgePackages(AbstractKieProject.java:228)
> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.
> createKieBase(AbstractKieModule.java:206) ~[se-storm-0.0.1-SNAPSHOT-jar-
> with-dependencies.jar:?]
> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.
> createKieBase(KieContainerImpl.java:584) ~[se-storm-0.0.1-SNAPSHOT-jar-
> with-dependencies.jar:?]
> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.
> getKieBase(KieContainerImpl.java:552) ~[se-storm-0.0.1-SNAPSHOT-jar-
> with-dependencies.jar:?]
> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.
> newKieSession(KieContainerImpl.java:680) ~[se-storm-0.0.1-SNAPSHOT-jar-
> with-dependencies.jar:?]
> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.
> newKieSession(KieContainerImpl.java:648) ~[se-storm-0.0.1-SNAPSHOT-jar-
> with-dependencies.jar:?]
> >       at cn.ennwifi.storm.bolt.DealLostBolt.execute(DealLostBolt.java:52)
> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
> >       at org.apache.storm.daemon.executor$fn__5030$tuple_
> action_fn__5032.invoke(executor.clj:729) ~[storm-core-1.1.1.jar:1.1.1]
> >       at org.apache.storm.daemon.executor$mk_task_receiver$fn__4951.invoke(executor.clj:461)
> ~[storm-core-1.1.1.jar:1.1.1]
> >       at org.apache.storm.disruptor$clojure_handler$reify__4465.onEvent(disruptor.clj:40)
> ~[storm-core-1.1.1.jar:1.1.1]
> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:482)
> ~[storm-core-1.1.1.jar:1.1.1]
> >       ... 6 more
> >
> > Could somebody help me?
> >
> > Thanks!
>
>

Mime
View raw message