storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 张博 <jyzhan...@gmail.com>
Subject Re: How to use Drools in the topology
Date Fri, 15 Sep 2017 07:22:25 GMT
I changs the package way,it runs OK.But I don't know why.
The wrong way is follow,and put the jar
"se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar" to production.
                                 <plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>cn.ennwifi.storm.Application</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

Today I changes it to :
                                  <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>target/libs</outputDirectory>
<includeScope>runtime</includeScope>
</configuration>
</execution>
</executions>
</plugin>

Then I put the se-storm-0.0.1-SNAPSHOT.jar to storm/extlib,and put thest
target/libs/*.jar to storm/extlib also. The problem is solved.But why is
that.

I see the directory of the two jar, the *.xml file and  *.drl files that
Drools used are in the same position。

2017-09-15 9:39 GMT+08:00 张博 <jyzhangbo@gmail.com>:

> The Drools's version I depend on is 7.2.0.Final.The dependency in pom.xml
> is
> <dependency>
> <groupId>org.drools</groupId>
> <artifactId>drools-compiler</artifactId>
> <version>7.2.0.Final</version>
> </dependency>
>
> And the bolt is:
> public class DealLimitBolt extends BaseRichBolt {
>
>   private OutputCollector collector;
>
>   private KieSession kieSession;
>
>   public void execute(Tuple input) {
>     // 获取数据
>     String sentence = (String) input.getValue(0);
>
>     // 数据转换
>     PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class, sentence);
>
>     // 获取规则
>     String key = String.valueOf(dataPoint.tags.get("gatewayId")) +
> String.valueOf(dataPoint.getTags().get("deviceId"))
>         + dataPoint.getMetric() + "0";
>     RuleLimitParam paramObj = (RuleLimitParam) MapService.getObject(key);
>     LOGGER.info(Json.toJson(MapService.getObjects(), JsonFormat.tidy()));
>
>     if (paramObj != null) {
>
>       //I used the kieSession here
>       LimitFact fact = new LimitFact();
>       fact.setHigh(paramObj.high);
>       fact.setLow(paramObj.low);
>       fact.setOperate(paramObj.operate);
>       fact.setValue(Float.valueOf(dataPoint.value));
>       kieSession.insert(fact);
>       kieSession.fireAllRules();
>
>     }
>
>     collector.ack(input);
>   }
>
>
>   public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
>     this.collector = collector;
>     SpringTools instance = SpringTools.getInstance();
>     ApplicationContext applicationContext = instance.
> getApplicationContext();
>     methodService = applicationContext.getBean(MethodService.class);
>     DroolsService droolsService = applicationContext.getBean(
> DroolsService.class);
>     droolsService.getRules();
>
>     //This is the init.
>     KieServices ks = KieServices.Factory.get();
>     KieContainer kContainer = ks.getKieClasspathContainer();
>     kieSession = kContainer.newKieSession("all-rule");
>   }
>
>   public void declareOutputFields(OutputFieldsDeclarer declarer) {
>
>   }
>
> The drools also uses a xml file that in resources/META-INF/kmodule.xml,
> <?xml version="1.0" encoding="UTF-8"?>
> <kmodule xmlns="http://www.drools.org/xsd/kmodule">
>     <kbase name="rules" packages="com.rules">
>         <ksession name="all-rule"/>
>     </kbase>
> </kmodule>
>
> And the rules I put them in resources/com/rules/DealLimit.drl
>
>
>
> 2017-09-15 0:03 GMT+08:00 Stig Rohde Døssing <srdo@apache.org>:
>
>> There are a lot of differences between local cluster and production. The
>> primary difference is that local clusters run all the code in one JVM,
>> whereas a production cluster runs bolts spread across multiple worker JVMs,
>> and the topology wiring code in a separate JVM on the Nimbus host.
>>
>> There is no initialization in this prepare method:
>> @Override
>>   public void prepare(Map stormConf, TopologyContext context,
>> OutputCollector collector) {
>>     this.collector = collector;
>>   }
>>
>> I think it's easier to help if you post the code you're using to
>> initialize Drools, as well as which Drools artifact and version you're
>> depending on.
>>
>> 2017-09-14 12:37 GMT+02:00 张博 <jyzhangbo@gmail.com>:
>>
>>> I only use drools in bolt.I init it in prepare method.So,I think that it
>>> is not the reason.But it runs in the localcluster.Do you know the
>>> difference between the localcluster and the production cluster?
>>>
>>> 2017-09-13 23:27 GMT+08:00 Stig Rohde Døssing <srdo@apache.org>:
>>>
>>>> I'm not familiar with Drools, so I'm just guessing here, but are you
>>>> doing any kind of setup of the KieContainer before submitting your
>>>> topology? When you run your topology the bolt doesn't run in the same JVM
>>>> as the topology setup code, so any setup done via static variables/methods
>>>> won't transfer from the submitter JVM to the bolt JVM.
>>>>
>>>> If you need to run code before starting a worker, you might want to
>>>> look at https://github.com/apache/storm/blob/master/storm-client/src
>>>> /jvm/org/apache/storm/hooks/BaseWorkerHook.java and
>>>> https://storm.apache.org/releases/1.0.3/javadocs/org/apache/
>>>> storm/topology/TopologyBuilder.html#addWorkerHook-org.apache
>>>> .storm.hooks.IWorkerHook-.
>>>>
>>>> 2017-09-13 15:30 GMT+02:00 zhangwenwei <jerry.zww@icloud.com>:
>>>>
>>>>> According to the log info, there have a NPE occur when call method
>>>>> kieContainer.newKieSession().
>>>>>
>>>>> Best Regards,
>>>>> Jerry Zhang
>>>>>
>>>>> > On 13 Sep 2017, at 14:15, 张博 <jyzhangbo@gmail.com> wrote:
>>>>> >
>>>>> > Hi!
>>>>> > Now I want to use Drools in a blot,it works normal in the
>>>>> LocalCluster, but when I put it to the production cluster,it has error.
>>>>> > The blot:
>>>>> > public class DealLostBolt extends BaseRichBolt {
>>>>> >
>>>>> >   private static final long serialVersionUID = 1L;
>>>>> >
>>>>> >   private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_
>>>>> LOST_BOLT");
>>>>> >
>>>>> >   private OutputCollector collector;
>>>>> >
>>>>> >   private KieSession kieSession;
>>>>> >
>>>>> >   private FactHandle factHandle;
>>>>> >
>>>>> >   @Override
>>>>> >   public void execute(Tuple input) {
>>>>> >     // 获取数据
>>>>> >     String sentence = (String) input.getValue(0);
>>>>> >     LOGGER.info("DealLostBolt获取到的数据:" + sentence);
>>>>> >
>>>>> >     // 数据转换
>>>>> >     PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class,
>>>>> sentence);
>>>>> >
>>>>> >     KieServices ks = KieServices.Factory.get();
>>>>> >     KieContainer kieContainer = ks.getKieClasspathContainer();
>>>>> >     kieSession = kieContainer.newKieSession("all-rule");
>>>>> >     kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();
>>>>> >
>>>>> >     factHandle = kieSession.insert(dataPoint);
>>>>> >     kieSession.fireAllRules();
>>>>> >     kieSession.delete(factHandle);
>>>>> >
>>>>> >     collector.emit(new Values(sentence));
>>>>> >   }
>>>>> >
>>>>> >   @Override
>>>>> >   public void declareOutputFields(OutputFieldsDeclarer declarer)
{
>>>>> >     declarer.declare(new Fields("value"));
>>>>> >
>>>>> >   }
>>>>> >
>>>>> >   @Override
>>>>> >   public void prepare(Map stormConf, TopologyContext context,
>>>>> OutputCollector collector) {
>>>>> >     this.collector = collector;
>>>>> >   }
>>>>> >
>>>>> > }
>>>>> > The erros:
>>>>> > java.lang.RuntimeException: java.lang.NullPointerException
>>>>> >       at org.apache.storm.utils.Disrupt
>>>>> orQueue.consumeBatchToCursor(DisruptorQueue.java:495)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> >       at org.apache.storm.utils.Disrupt
>>>>> orQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> >       at org.apache.storm.disruptor$con
>>>>> sume_batch_when_available.invoke(disruptor.clj:73)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> >       at org.apache.storm.daemon.execut
>>>>> or$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> >       at org.apache.storm.util$async_lo
>>>>> op$fn__557.invoke(util.clj:484) [storm-core-1.1.1.jar:1.1.1]
>>>>> >       at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>>>> >       at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
>>>>> > Caused by: java.lang.NullPointerException
>>>>> >       at org.kie.internal.io.ResourceFa
>>>>> ctory.newByteArrayResource(ResourceFactory.java:66)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at org.drools.compiler.kie.builde
>>>>> r.impl.AbstractKieModule.getResource(AbstractKieModule.java:299)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at org.drools.compiler.kie.builde
>>>>> r.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:264)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at org.drools.compiler.kie.builde
>>>>> r.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:259)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at org.drools.compiler.kie.builde
>>>>> r.impl.AbstractKieProject.buildKnowledgePackages(AbstractKieProject.java:228)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at org.drools.compiler.kie.builde
>>>>> r.impl.AbstractKieModule.createKieBase(AbstractKieModule.java:206)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at org.drools.compiler.kie.builde
>>>>> r.impl.KieContainerImpl.createKieBase(KieContainerImpl.java:584)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at org.drools.compiler.kie.builde
>>>>> r.impl.KieContainerImpl.getKieBase(KieContainerImpl.java:552)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at org.drools.compiler.kie.builde
>>>>> r.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:680)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at org.drools.compiler.kie.builde
>>>>> r.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:648)
>>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>>> >       at cn.ennwifi.storm.bolt.DealLost
>>>>> Bolt.execute(DealLostBolt.java:52) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>>> with-dependencies.jar:?]
>>>>> >       at org.apache.storm.daemon.execut
>>>>> or$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> >       at org.apache.storm.daemon.execut
>>>>> or$mk_task_receiver$fn__4951.invoke(executor.clj:461)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> >       at org.apache.storm.disruptor$clo
>>>>> jure_handler$reify__4465.onEvent(disruptor.clj:40)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> >       at org.apache.storm.utils.Disrupt
>>>>> orQueue.consumeBatchToCursor(DisruptorQueue.java:482)
>>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>>> >       ... 6 more
>>>>> >
>>>>> > Could somebody help me?
>>>>> >
>>>>> > Thanks!
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message