storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Susheel Kumar Gadalay <skgada...@gmail.com>
Subject Re: Write database basefunction
Date Sat, 06 Dec 2014 05:50:28 GMT
It is giving NPE in execute method of WriteDB execute method.

You have declared
   private Connection conn = null ;

and used it in execute
   ps = conn.prepareStatement(queryBuilder.toString()) ;

Where is conn initialized?

On 12/6/14, Sa Li <sa.in.vanc@gmail.com> wrote:
> Hi, all
>
> I wrote a writeDB function to write tuples into database,
>
> public static class WriteDB extends BaseFunction {
>       private Connection conn = null ;
>       PreparedStatement ps = null;
>
>      @Override
>      public final void execute(final TridentTuple tuple, final
> TridentCollector collector) {
>                       int user = tuple.getInteger(0);
>                       String value = tuple.getString(1);
>                       final StringBuilder queryBuilder = new
> StringBuilder()
>                                .append("INSERT INTO test.state(userid,
> event) VALUES(")
>                                .append(user)
>                                .append(", '")
>                                .append(value)
>                                .append("')");
>                      System.out.println(queryBuilder.toString());
>                      try {
>                               ps =
> conn.prepareStatement(queryBuilder.toString()) ;
>                               ps.execute();
>                               collector.emit(new
> Values(tuple.getStringByField("event")));
>                      }
>                      catch (SQLException ex) {
>                               System.err.println("Caught IOException: " +
> ex.getMessage());
>                      }      finally {
>                                             if (ps != null) {
>                                                              try {
>
> ps.close();
>                                                              } catch
> (SQLException ex) {
>                                                              }
>                                             }
>                      }
>         }
>  }
>
> This is the topology:
> topology.newStream("topictestspout", kafkaSpout)
>                          .each(new Fields("str"),
>                                    new JsonObjectParse(),
>                                    new Fields("userid","event"))
>                          .each(new Fields("userid","event"),
>                                     new WriteDB(),
>                                   new Fields("events"));
>
> However, I am getting such error:
> 7026 [Thread-12-b-0] ERROR backtype.storm.util - Async loop died!
> java.lang.RuntimeException: java.lang.NullPointerException
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
> Caused by: java.lang.NullPointerException: null
>         at
> storm.ingress.KafkaIngressTopology$WriteDB.execute(KafkaIngressTopology.java:146)
> ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at
> storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> storm.trident.planner.processor.AppendCollector.emit(AppendCollector.java:50)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> storm.ingress.KafkaIngressTopology$JsonObjectParse.execute(KafkaIngressTopology.java:122)
> ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at
> storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:206)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:146)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
> ~[storm-core-0.9.3.jar:0.9.3]
>         ... 6 common frames omitted
> 7027 [Thread-12-b-0] ERROR backtype.storm.daemon.executor -
> java.lang.RuntimeException: java.lang.NullPointerException
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
> Caused by: java.lang.NullPointerException: null
>         at
> storm.ingress.KafkaIngressTopology$WriteDB.execute(KafkaIngressTopology.java:146)
> ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at
> storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> storm.trident.planner.processor.AppendCollector.emit(AppendCollector.java:50)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> storm.ingress.KafkaIngressTopology$JsonObjectParse.execute(KafkaIngressTopology.java:122)
> ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at
> storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:206)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:146)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
> ~[storm-core-0.9.3.jar:0.9.3]
>         ... 6 common frames omitted
> 7231 [Thread-12-b-0] ERROR backtype.storm.util - Halting process: ("Worker
> died")
> java.lang.RuntimeException: ("Worker died")
>         at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
> [storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.RestFn.invoke(RestFn.java:423)
> [clojure-1.5.1.jar:na]
>         at
> backtype.storm.daemon.worker$fn__3808$fn__3809.invoke(worker.clj:452)
> [storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240)
> [storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
> [storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>
> Any idea why that happen?
>
>
> thanks
>
> Alec
>

Mime
View raw message