storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stig Rohde Døssing <s...@apache.org>
Subject Re: How to use Drools in the topology
Date Thu, 14 Sep 2017 16:03:38 GMT
There are a lot of differences between local cluster and production. The
primary difference is that local clusters run all the code in one JVM,
whereas a production cluster runs bolts spread across multiple worker JVMs,
and the topology wiring code in a separate JVM on the Nimbus host.

There is no initialization in this prepare method:
@Override
  public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
    this.collector = collector;
  }

I think it's easier to help if you post the code you're using to initialize
Drools, as well as which Drools artifact and version you're depending on.

2017-09-14 12:37 GMT+02:00 张博 <jyzhangbo@gmail.com>:

> I only use drools in bolt.I init it in prepare method.So,I think that it
> is not the reason.But it runs in the localcluster.Do you know the
> difference between the localcluster and the production cluster?
>
> 2017-09-13 23:27 GMT+08:00 Stig Rohde Døssing <srdo@apache.org>:
>
>> I'm not familiar with Drools, so I'm just guessing here, but are you
>> doing any kind of setup of the KieContainer before submitting your
>> topology? When you run your topology the bolt doesn't run in the same JVM
>> as the topology setup code, so any setup done via static variables/methods
>> won't transfer from the submitter JVM to the bolt JVM.
>>
>> If you need to run code before starting a worker, you might want to look
>> at https://github.com/apache/storm/blob/master/storm-client/
>> src/jvm/org/apache/storm/hooks/BaseWorkerHook.java and
>> https://storm.apache.org/releases/1.0.3/javadocs/org/apache/
>> storm/topology/TopologyBuilder.html#addWorkerHook-org.apache.storm.hooks.
>> IWorkerHook-.
>>
>> 2017-09-13 15:30 GMT+02:00 zhangwenwei <jerry.zww@icloud.com>:
>>
>>> According to the log info, there have a NPE occur when call method
>>> kieContainer.newKieSession().
>>>
>>> Best Regards,
>>> Jerry Zhang
>>>
>>> > On 13 Sep 2017, at 14:15, 张博 <jyzhangbo@gmail.com> wrote:
>>> >
>>> > Hi!
>>> > Now I want to use Drools in a blot,it works normal in the
>>> LocalCluster, but when I put it to the production cluster,it has error.
>>> > The blot:
>>> > public class DealLostBolt extends BaseRichBolt {
>>> >
>>> >   private static final long serialVersionUID = 1L;
>>> >
>>> >   private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_
>>> LOST_BOLT");
>>> >
>>> >   private OutputCollector collector;
>>> >
>>> >   private KieSession kieSession;
>>> >
>>> >   private FactHandle factHandle;
>>> >
>>> >   @Override
>>> >   public void execute(Tuple input) {
>>> >     // 获取数据
>>> >     String sentence = (String) input.getValue(0);
>>> >     LOGGER.info("DealLostBolt获取到的数据:" + sentence);
>>> >
>>> >     // 数据转换
>>> >     PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class,
>>> sentence);
>>> >
>>> >     KieServices ks = KieServices.Factory.get();
>>> >     KieContainer kieContainer = ks.getKieClasspathContainer();
>>> >     kieSession = kieContainer.newKieSession("all-rule");
>>> >     kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();
>>> >
>>> >     factHandle = kieSession.insert(dataPoint);
>>> >     kieSession.fireAllRules();
>>> >     kieSession.delete(factHandle);
>>> >
>>> >     collector.emit(new Values(sentence));
>>> >   }
>>> >
>>> >   @Override
>>> >   public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>> >     declarer.declare(new Fields("value"));
>>> >
>>> >   }
>>> >
>>> >   @Override
>>> >   public void prepare(Map stormConf, TopologyContext context,
>>> OutputCollector collector) {
>>> >     this.collector = collector;
>>> >   }
>>> >
>>> > }
>>> > The erros:
>>> > java.lang.RuntimeException: java.lang.NullPointerException
>>> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:495)
>>> ~[storm-core-1.1.1.jar:1.1.1]
>>> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460)
>>> ~[storm-core-1.1.1.jar:1.1.1]
>>> >       at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
>>> ~[storm-core-1.1.1.jar:1.1.1]
>>> >       at org.apache.storm.daemon.executor$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848)
>>> ~[storm-core-1.1.1.jar:1.1.1]
>>> >       at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
>>> [storm-core-1.1.1.jar:1.1.1]
>>> >       at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>> >       at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
>>> > Caused by: java.lang.NullPointerException
>>> >       at org.kie.internal.io.ResourceFactory.newByteArrayResource(ResourceFactory.java:66)
>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.getRe
>>> source(AbstractKieModule.java:299) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>> with-dependencies.jar:?]
>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.addRe
>>> sourceToCompiler(AbstractKieModule.java:264)
>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.addRe
>>> sourceToCompiler(AbstractKieModule.java:259)
>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieProject.buil
>>> dKnowledgePackages(AbstractKieProject.java:228)
>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.creat
>>> eKieBase(AbstractKieModule.java:206) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>> with-dependencies.jar:?]
>>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.create
>>> KieBase(KieContainerImpl.java:584) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>> with-dependencies.jar:?]
>>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.getKie
>>> Base(KieContainerImpl.java:552) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>> with-dependencies.jar:?]
>>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKie
>>> Session(KieContainerImpl.java:680) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>> with-dependencies.jar:?]
>>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKie
>>> Session(KieContainerImpl.java:648) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>> with-dependencies.jar:?]
>>> >       at cn.ennwifi.storm.bolt.DealLostBolt.execute(DealLostBolt.java:52)
>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>> >       at org.apache.storm.daemon.executor$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729)
>>> ~[storm-core-1.1.1.jar:1.1.1]
>>> >       at org.apache.storm.daemon.executor$mk_task_receiver$fn__4951.invoke(executor.clj:461)
>>> ~[storm-core-1.1.1.jar:1.1.1]
>>> >       at org.apache.storm.disruptor$clojure_handler$reify__4465.onEvent(disruptor.clj:40)
>>> ~[storm-core-1.1.1.jar:1.1.1]
>>> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:482)
>>> ~[storm-core-1.1.1.jar:1.1.1]
>>> >       ... 6 more
>>> >
>>> > Could somebody help me?
>>> >
>>> > Thanks!
>>>
>>>
>>
>

Mime
View raw message