storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 张博 <jyzhan...@gmail.com>
Subject Re: How to use Drools in the topology
Date Thu, 14 Sep 2017 10:37:41 GMT
I only use drools in bolt.I init it in prepare method.So,I think that it is
not the reason.But it runs in the localcluster.Do you know the difference
between the localcluster and the production cluster?

2017-09-13 23:27 GMT+08:00 Stig Rohde Døssing <srdo@apache.org>:

> I'm not familiar with Drools, so I'm just guessing here, but are you doing
> any kind of setup of the KieContainer before submitting your topology? When
> you run your topology the bolt doesn't run in the same JVM as the topology
> setup code, so any setup done via static variables/methods won't transfer
> from the submitter JVM to the bolt JVM.
>
> If you need to run code before starting a worker, you might want to look
> at https://github.com/apache/storm/blob/master/storm-
> client/src/jvm/org/apache/storm/hooks/BaseWorkerHook.java and
> https://storm.apache.org/releases/1.0.3/javadocs/org/
> apache/storm/topology/TopologyBuilder.html#addWorkerHook-org.apache.
> storm.hooks.IWorkerHook-.
>
> 2017-09-13 15:30 GMT+02:00 zhangwenwei <jerry.zww@icloud.com>:
>
>> According to the log info, there have a NPE occur when call method
>> kieContainer.newKieSession().
>>
>> Best Regards,
>> Jerry Zhang
>>
>> > On 13 Sep 2017, at 14:15, 张博 <jyzhangbo@gmail.com> wrote:
>> >
>> > Hi!
>> > Now I want to use Drools in a blot,it works normal in the LocalCluster,
>> but when I put it to the production cluster,it has error.
>> > The blot:
>> > public class DealLostBolt extends BaseRichBolt {
>> >
>> >   private static final long serialVersionUID = 1L;
>> >
>> >   private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_
>> LOST_BOLT");
>> >
>> >   private OutputCollector collector;
>> >
>> >   private KieSession kieSession;
>> >
>> >   private FactHandle factHandle;
>> >
>> >   @Override
>> >   public void execute(Tuple input) {
>> >     // 获取数据
>> >     String sentence = (String) input.getValue(0);
>> >     LOGGER.info("DealLostBolt获取到的数据:" + sentence);
>> >
>> >     // 数据转换
>> >     PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class,
>> sentence);
>> >
>> >     KieServices ks = KieServices.Factory.get();
>> >     KieContainer kieContainer = ks.getKieClasspathContainer();
>> >     kieSession = kieContainer.newKieSession("all-rule");
>> >     kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();
>> >
>> >     factHandle = kieSession.insert(dataPoint);
>> >     kieSession.fireAllRules();
>> >     kieSession.delete(factHandle);
>> >
>> >     collector.emit(new Values(sentence));
>> >   }
>> >
>> >   @Override
>> >   public void declareOutputFields(OutputFieldsDeclarer declarer) {
>> >     declarer.declare(new Fields("value"));
>> >
>> >   }
>> >
>> >   @Override
>> >   public void prepare(Map stormConf, TopologyContext context,
>> OutputCollector collector) {
>> >     this.collector = collector;
>> >   }
>> >
>> > }
>> > The erros:
>> > java.lang.RuntimeException: java.lang.NullPointerException
>> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:495)
>> ~[storm-core-1.1.1.jar:1.1.1]
>> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460)
>> ~[storm-core-1.1.1.jar:1.1.1]
>> >       at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
>> ~[storm-core-1.1.1.jar:1.1.1]
>> >       at org.apache.storm.daemon.executor$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848)
>> ~[storm-core-1.1.1.jar:1.1.1]
>> >       at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
>> [storm-core-1.1.1.jar:1.1.1]
>> >       at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>> >       at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
>> > Caused by: java.lang.NullPointerException
>> >       at org.kie.internal.io.ResourceFactory.newByteArrayResource(ResourceFactory.java:66)
>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.
>> getResource(AbstractKieModule.java:299) ~[se-storm-0.0.1-SNAPSHOT-jar-
>> with-dependencies.jar:?]
>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.addRe
>> sourceToCompiler(AbstractKieModule.java:264)
>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.addRe
>> sourceToCompiler(AbstractKieModule.java:259)
>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> >       at org.drools.compiler.kie.builder.impl.AbstractKieProject.buil
>> dKnowledgePackages(AbstractKieProject.java:228)
>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.creat
>> eKieBase(AbstractKieModule.java:206) ~[se-storm-0.0.1-SNAPSHOT-jar-
>> with-dependencies.jar:?]
>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.create
>> KieBase(KieContainerImpl.java:584) ~[se-storm-0.0.1-SNAPSHOT-jar-
>> with-dependencies.jar:?]
>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.getKie
>> Base(KieContainerImpl.java:552) ~[se-storm-0.0.1-SNAPSHOT-jar-
>> with-dependencies.jar:?]
>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKie
>> Session(KieContainerImpl.java:680) ~[se-storm-0.0.1-SNAPSHOT-jar-
>> with-dependencies.jar:?]
>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKie
>> Session(KieContainerImpl.java:648) ~[se-storm-0.0.1-SNAPSHOT-jar-
>> with-dependencies.jar:?]
>> >       at cn.ennwifi.storm.bolt.DealLostBolt.execute(DealLostBolt.java:52)
>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> >       at org.apache.storm.daemon.executor$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729)
>> ~[storm-core-1.1.1.jar:1.1.1]
>> >       at org.apache.storm.daemon.executor$mk_task_receiver$fn__4951.invoke(executor.clj:461)
>> ~[storm-core-1.1.1.jar:1.1.1]
>> >       at org.apache.storm.disruptor$clojure_handler$reify__4465.onEvent(disruptor.clj:40)
>> ~[storm-core-1.1.1.jar:1.1.1]
>> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:482)
>> ~[storm-core-1.1.1.jar:1.1.1]
>> >       ... 6 more
>> >
>> > Could somebody help me?
>> >
>> > Thanks!
>>
>>
>

Mime
View raw message