storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 张博 <jyzhan...@gmail.com>
Subject Re: How to use Drools in the topology
Date Fri, 15 Sep 2017 01:39:55 GMT
The Drools's version I depend on is 7.2.0.Final.The dependency in pom.xml is
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-compiler</artifactId>
<version>7.2.0.Final</version>
</dependency>

And the bolt is:
public class DealLimitBolt extends BaseRichBolt {

  private OutputCollector collector;

  private KieSession kieSession;

  public void execute(Tuple input) {
    // 获取数据
    String sentence = (String) input.getValue(0);

    // 数据转换
    PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class, sentence);

    // 获取规则
    String key = String.valueOf(dataPoint.tags.get("gatewayId")) +
String.valueOf(dataPoint.getTags().get("deviceId"))
        + dataPoint.getMetric() + "0";
    RuleLimitParam paramObj = (RuleLimitParam) MapService.getObject(key);
    LOGGER.info(Json.toJson(MapService.getObjects(), JsonFormat.tidy()));

    if (paramObj != null) {

      //I used the kieSession here
      LimitFact fact = new LimitFact();
      fact.setHigh(paramObj.high);
      fact.setLow(paramObj.low);
      fact.setOperate(paramObj.operate);
      fact.setValue(Float.valueOf(dataPoint.value));
      kieSession.insert(fact);
      kieSession.fireAllRules();

    }

    collector.ack(input);
  }


  public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
    this.collector = collector;
    SpringTools instance = SpringTools.getInstance();
    ApplicationContext applicationContext =
instance.getApplicationContext();
    methodService = applicationContext.getBean(MethodService.class);
    DroolsService droolsService =
applicationContext.getBean(DroolsService.class);
    droolsService.getRules();

    //This is the init.
    KieServices ks = KieServices.Factory.get();
    KieContainer kContainer = ks.getKieClasspathContainer();
    kieSession = kContainer.newKieSession("all-rule");
  }

  public void declareOutputFields(OutputFieldsDeclarer declarer) {

  }

The drools also uses a xml file that in resources/META-INF/kmodule.xml,
<?xml version="1.0" encoding="UTF-8"?>
<kmodule xmlns="http://www.drools.org/xsd/kmodule">
    <kbase name="rules" packages="com.rules">
        <ksession name="all-rule"/>
    </kbase>
</kmodule>

And the rules I put them in resources/com/rules/DealLimit.drl



2017-09-15 0:03 GMT+08:00 Stig Rohde Døssing <srdo@apache.org>:

> There are a lot of differences between local cluster and production. The
> primary difference is that local clusters run all the code in one JVM,
> whereas a production cluster runs bolts spread across multiple worker JVMs,
> and the topology wiring code in a separate JVM on the Nimbus host.
>
> There is no initialization in this prepare method:
> @Override
>   public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
>     this.collector = collector;
>   }
>
> I think it's easier to help if you post the code you're using to
> initialize Drools, as well as which Drools artifact and version you're
> depending on.
>
> 2017-09-14 12:37 GMT+02:00 张博 <jyzhangbo@gmail.com>:
>
>> I only use drools in bolt.I init it in prepare method.So,I think that it
>> is not the reason.But it runs in the localcluster.Do you know the
>> difference between the localcluster and the production cluster?
>>
>> 2017-09-13 23:27 GMT+08:00 Stig Rohde Døssing <srdo@apache.org>:
>>
>>> I'm not familiar with Drools, so I'm just guessing here, but are you
>>> doing any kind of setup of the KieContainer before submitting your
>>> topology? When you run your topology the bolt doesn't run in the same JVM
>>> as the topology setup code, so any setup done via static variables/methods
>>> won't transfer from the submitter JVM to the bolt JVM.
>>>
>>> If you need to run code before starting a worker, you might want to look
>>> at https://github.com/apache/storm/blob/master/storm-client/src
>>> /jvm/org/apache/storm/hooks/BaseWorkerHook.java and
>>> https://storm.apache.org/releases/1.0.3/javadocs/org/apache/
>>> storm/topology/TopologyBuilder.html#addWorkerHook-org.apache
>>> .storm.hooks.IWorkerHook-.
>>>
>>> 2017-09-13 15:30 GMT+02:00 zhangwenwei <jerry.zww@icloud.com>:
>>>
>>>> According to the log info, there have a NPE occur when call method
>>>> kieContainer.newKieSession().
>>>>
>>>> Best Regards,
>>>> Jerry Zhang
>>>>
>>>> > On 13 Sep 2017, at 14:15, 张博 <jyzhangbo@gmail.com> wrote:
>>>> >
>>>> > Hi!
>>>> > Now I want to use Drools in a blot,it works normal in the
>>>> LocalCluster, but when I put it to the production cluster,it has error.
>>>> > The blot:
>>>> > public class DealLostBolt extends BaseRichBolt {
>>>> >
>>>> >   private static final long serialVersionUID = 1L;
>>>> >
>>>> >   private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_
>>>> LOST_BOLT");
>>>> >
>>>> >   private OutputCollector collector;
>>>> >
>>>> >   private KieSession kieSession;
>>>> >
>>>> >   private FactHandle factHandle;
>>>> >
>>>> >   @Override
>>>> >   public void execute(Tuple input) {
>>>> >     // 获取数据
>>>> >     String sentence = (String) input.getValue(0);
>>>> >     LOGGER.info("DealLostBolt获取到的数据:" + sentence);
>>>> >
>>>> >     // 数据转换
>>>> >     PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class,
>>>> sentence);
>>>> >
>>>> >     KieServices ks = KieServices.Factory.get();
>>>> >     KieContainer kieContainer = ks.getKieClasspathContainer();
>>>> >     kieSession = kieContainer.newKieSession("all-rule");
>>>> >     kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();
>>>> >
>>>> >     factHandle = kieSession.insert(dataPoint);
>>>> >     kieSession.fireAllRules();
>>>> >     kieSession.delete(factHandle);
>>>> >
>>>> >     collector.emit(new Values(sentence));
>>>> >   }
>>>> >
>>>> >   @Override
>>>> >   public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>> >     declarer.declare(new Fields("value"));
>>>> >
>>>> >   }
>>>> >
>>>> >   @Override
>>>> >   public void prepare(Map stormConf, TopologyContext context,
>>>> OutputCollector collector) {
>>>> >     this.collector = collector;
>>>> >   }
>>>> >
>>>> > }
>>>> > The erros:
>>>> > java.lang.RuntimeException: java.lang.NullPointerException
>>>> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:495)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> >       at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> >       at org.apache.storm.daemon.executor$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> >       at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
>>>> [storm-core-1.1.1.jar:1.1.1]
>>>> >       at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>>> >       at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
>>>> > Caused by: java.lang.NullPointerException
>>>> >       at org.kie.internal.io.ResourceFactory.newByteArrayResource(ResourceFactory.java:66)
>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.getRe
>>>> source(AbstractKieModule.java:299) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>> with-dependencies.jar:?]
>>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.addRe
>>>> sourceToCompiler(AbstractKieModule.java:264)
>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.addRe
>>>> sourceToCompiler(AbstractKieModule.java:259)
>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieProject.buil
>>>> dKnowledgePackages(AbstractKieProject.java:228)
>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>> >       at org.drools.compiler.kie.builder.impl.AbstractKieModule.creat
>>>> eKieBase(AbstractKieModule.java:206) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>> with-dependencies.jar:?]
>>>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.create
>>>> KieBase(KieContainerImpl.java:584) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>> with-dependencies.jar:?]
>>>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.getKie
>>>> Base(KieContainerImpl.java:552) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>> with-dependencies.jar:?]
>>>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKie
>>>> Session(KieContainerImpl.java:680) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>> with-dependencies.jar:?]
>>>> >       at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKie
>>>> Session(KieContainerImpl.java:648) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>> with-dependencies.jar:?]
>>>> >       at cn.ennwifi.storm.bolt.DealLostBolt.execute(DealLostBolt.java:52)
>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>> >       at org.apache.storm.daemon.executor$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> >       at org.apache.storm.daemon.executor$mk_task_receiver$fn__4951.invoke(executor.clj:461)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> >       at org.apache.storm.disruptor$clojure_handler$reify__4465.onEvent(disruptor.clj:40)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> >       at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:482)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> >       ... 6 more
>>>> >
>>>> > Could somebody help me?
>>>> >
>>>> > Thanks!
>>>>
>>>>
>>>
>>
>

Mime
View raw message