storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sa Li <sa.in.v...@gmail.com>
Subject Write database basefunction
Date Fri, 05 Dec 2014 21:29:58 GMT
Hi, all

I wrote a writeDB function to write tuples into database,

public static class WriteDB extends BaseFunction {
      private Connection conn = null ;
      PreparedStatement ps = null;

     @Override
     public final void execute(final TridentTuple tuple, final
TridentCollector collector) {
                      int user = tuple.getInteger(0);
                      String value = tuple.getString(1);
                      final StringBuilder queryBuilder = new StringBuilder()
                               .append("INSERT INTO test.state(userid,
event) VALUES(")
                               .append(user)
                               .append(", '")
                               .append(value)
                               .append("')");
                     System.out.println(queryBuilder.toString());
                     try {
                              ps =
conn.prepareStatement(queryBuilder.toString()) ;
                              ps.execute();
                              collector.emit(new
Values(tuple.getStringByField("event")));
                     }
                     catch (SQLException ex) {
                              System.err.println("Caught IOException: " +
ex.getMessage());
                     }      finally {
                                            if (ps != null) {
                                                             try {

ps.close();
                                                             } catch
(SQLException ex) {
                                                             }
                                            }
                     }
        }
 }

This is the topology:
topology.newStream("topictestspout", kafkaSpout)
                         .each(new Fields("str"),
                                   new JsonObjectParse(),
                                   new Fields("userid","event"))
                         .each(new Fields("userid","event"),
                                    new WriteDB(),
                                  new Fields("events"));

However, I am getting such error:
7026 [Thread-12-b-0] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.NullPointerException
        at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
~[storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
Caused by: java.lang.NullPointerException: null
        at
storm.ingress.KafkaIngressTopology$WriteDB.execute(KafkaIngressTopology.java:146)
~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
        at
storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65)
~[storm-core-0.9.3.jar:0.9.3]
        at
storm.trident.planner.processor.AppendCollector.emit(AppendCollector.java:50)
~[storm-core-0.9.3.jar:0.9.3]
        at
storm.ingress.KafkaIngressTopology$JsonObjectParse.execute(KafkaIngressTopology.java:122)
~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
        at
storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65)
~[storm-core-0.9.3.jar:0.9.3]
        at
storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:206)
~[storm-core-0.9.3.jar:0.9.3]
        at
storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:146)
~[storm-core-0.9.3.jar:0.9.3]
        at
storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
~[storm-core-0.9.3.jar:0.9.3]
        ... 6 common frames omitted
7027 [Thread-12-b-0] ERROR backtype.storm.daemon.executor -
java.lang.RuntimeException: java.lang.NullPointerException
        at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
~[storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
Caused by: java.lang.NullPointerException: null
        at
storm.ingress.KafkaIngressTopology$WriteDB.execute(KafkaIngressTopology.java:146)
~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
        at
storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65)
~[storm-core-0.9.3.jar:0.9.3]
        at
storm.trident.planner.processor.AppendCollector.emit(AppendCollector.java:50)
~[storm-core-0.9.3.jar:0.9.3]
        at
storm.ingress.KafkaIngressTopology$JsonObjectParse.execute(KafkaIngressTopology.java:122)
~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
        at
storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65)
~[storm-core-0.9.3.jar:0.9.3]
        at
storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:206)
~[storm-core-0.9.3.jar:0.9.3]
        at
storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:146)
~[storm-core-0.9.3.jar:0.9.3]
        at
storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
~[storm-core-0.9.3.jar:0.9.3]
        ... 6 common frames omitted
7231 [Thread-12-b-0] ERROR backtype.storm.util - Halting process: ("Worker
died")
java.lang.RuntimeException: ("Worker died")
        at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
[storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.RestFn.invoke(RestFn.java:423)
[clojure-1.5.1.jar:na]
        at
backtype.storm.daemon.worker$fn__3808$fn__3809.invoke(worker.clj:452)
[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240)
[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
[storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]

Any idea why that happen?


thanks

Alec

Mime
View raw message