Put these lines in prepare method postgresConnector connector = new postgresConnector() ; Connection conn = connector.connectToDatabaseOrDie(); On 12/9/14, Sa Li wrote: > Hi, Susheel > > > Here is the conn, > > public class postgresConnector { > private class Events { > public int id; > public String event_object; > } > public void connectIngress() { > Connection conn = null; > conn = connectToDatabaseOrDie(); > } > public Connection connectToDatabaseOrDie() { > Connection conn = null; > try { > Class.forName("org.postgresql.Driver"); > String url = "jdbc:postgresql://10.100.70.84:5432/ingest"; > conn = DriverManager.getConnection(url, "nn", "nn"); > System.out.println("DB connected ....."); > } catch (ClassNotFoundException e) { > e.printStackTrace(); > System.exit(1); > } catch (SQLException e) > { > e.printStackTrace(); > System.exit(2); > } > return conn; > } > } > > I change the writeDB as > > public static class WriteDB extends BaseFunction { > PreparedStatement ps = null ; > postgresConnector connector = new postgresConnector() ; > Connection conn = connector.connectToDatabaseOrDie(); > @Override > public final void execute(final TridentTuple tuple, final > TridentCollector collector) { > int user = tuple.getInteger(0); > String value = tuple.getString(1); > final StringBuilder queryBuilder = new StringBuilder() > .append("INSERT INTO test.state(userid, event) > VALUES(") > .append(user) > .append(", '") > .append(value) > .append("')"); > try { > ps = conn.prepareStatement(queryBuilder.toString()) ; > ps.execute(); > collector.emit(new > Values(tuple.getStringByField("event"))); > } catch (SQLException ex) { > System.err.println("Caught IOException: " + > ex.getMessage()); > }finally { > if (ps != null) { > try { > ps.close(); > } catch (SQLException ex) { > } > } > } > } > } > > DB connected, but I got such error, I am sure what is wrong with the > connection. > > DB connected ..... > 3985 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - > Thread Thread[main,5,main] died > java.lang.RuntimeException: java.io.NotSerializableException: > org.postgresql.jdbc4.Jdbc4Connection > at > backtype.storm.serialization.DefaultSerializationDelegate.serialize(DefaultSerializationDelegate.java:43) > ~[storm-core-0.9.3.jar:0.9.3] > at backtype.storm.utils.Utils.serialize(Utils.java:85) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:106) > ~[storm-core-0.9.3.jar:0.9.3] > at > storm.trident.topology.TridentTopologyBuilder.buildTopology(TridentTopologyBuilder.java:246) > ~[storm-core-0.9.3.jar:0.9.3] > at storm.trident.TridentTopology.build(TridentTopology.java:425) > ~[storm-core-0.9.3.jar:0.9.3] > at > storm.ingress.KafkaIngressTopology.buildTridentKafkaTopology(KafkaIngressTopology.java:292) > ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] > at > storm.ingress.KafkaIngressTopology.main(KafkaIngressTopology.java:318) > ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] > Caused by: java.io.NotSerializableException: > org.postgresql.jdbc4.Jdbc4Connection > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > ~[na:1.7.0_65] > at java.util.HashMap.writeObject(HashMap.java:1128) ~[na:1.7.0_65] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[na:1.7.0_65] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > ~[na:1.7.0_65] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[na:1.7.0_65] > at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_65] > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > ~[na:1.7.0_65] > at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > ~[na:1.7.0_65] > at > backtype.storm.serialization.DefaultSerializationDelegate.serialize(DefaultSerializationDelegate.java:39) > ~[storm-core-0.9.3.jar:0.9.3] > ... 6 common frames omitted > > > Thanks > > Alec > > > On Fri, Dec 5, 2014 at 9:50 PM, Susheel Kumar Gadalay > wrote: > >> It is giving NPE in execute method of WriteDB execute method. >> >> You have declared >> private Connection conn = null ; >> >> and used it in execute >> ps = conn.prepareStatement(queryBuilder.toString()) ; >> >> Where is conn initialized? >> >> On 12/6/14, Sa Li wrote: >> > Hi, all >> > >> > I wrote a writeDB function to write tuples into database, >> > >> > public static class WriteDB extends BaseFunction { >> > private Connection conn = null ; >> > PreparedStatement ps = null; >> > >> > @Override >> > public final void execute(final TridentTuple tuple, final >> > TridentCollector collector) { >> > int user = tuple.getInteger(0); >> > String value = tuple.getString(1); >> > final StringBuilder queryBuilder = new >> > StringBuilder() >> > .append("INSERT INTO test.state(userid, >> > event) VALUES(") >> > .append(user) >> > .append(", '") >> > .append(value) >> > .append("')"); >> > System.out.println(queryBuilder.toString()); >> > try { >> > ps = >> > conn.prepareStatement(queryBuilder.toString()) ; >> > ps.execute(); >> > collector.emit(new >> > Values(tuple.getStringByField("event"))); >> > } >> > catch (SQLException ex) { >> > System.err.println("Caught IOException: " >> > + >> > ex.getMessage()); >> > } finally { >> > if (ps != null) { >> > try { >> > >> > ps.close(); >> > } catch >> > (SQLException ex) { >> > } >> > } >> > } >> > } >> > } >> > >> > This is the topology: >> > topology.newStream("topictestspout", kafkaSpout) >> > .each(new Fields("str"), >> > new JsonObjectParse(), >> > new Fields("userid","event")) >> > .each(new Fields("userid","event"), >> > new WriteDB(), >> > new Fields("events")); >> > >> > However, I am getting such error: >> > 7026 [Thread-12-b-0] ERROR backtype.storm.util - Async loop died! >> > java.lang.RuntimeException: java.lang.NullPointerException >> > at >> > >> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >> > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] >> > Caused by: java.lang.NullPointerException: null >> > at >> > >> storm.ingress.KafkaIngressTopology$WriteDB.execute(KafkaIngressTopology.java:146) >> > ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> > at >> > >> storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> storm.trident.planner.processor.AppendCollector.emit(AppendCollector.java:50) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> storm.ingress.KafkaIngressTopology$JsonObjectParse.execute(KafkaIngressTopology.java:122) >> > ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> > at >> > >> storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:206) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:146) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > ... 6 common frames omitted >> > 7027 [Thread-12-b-0] ERROR backtype.storm.daemon.executor - >> > java.lang.RuntimeException: java.lang.NullPointerException >> > at >> > >> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >> > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] >> > Caused by: java.lang.NullPointerException: null >> > at >> > >> storm.ingress.KafkaIngressTopology$WriteDB.execute(KafkaIngressTopology.java:146) >> > ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> > at >> > >> storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> storm.trident.planner.processor.AppendCollector.emit(AppendCollector.java:50) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> storm.ingress.KafkaIngressTopology$JsonObjectParse.execute(KafkaIngressTopology.java:122) >> > ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> > at >> > >> storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:206) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:146) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > at >> > >> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) >> > ~[storm-core-0.9.3.jar:0.9.3] >> > ... 6 common frames omitted >> > 7231 [Thread-12-b-0] ERROR backtype.storm.util - Halting process: >> ("Worker >> > died") >> > java.lang.RuntimeException: ("Worker died") >> > at >> > backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) >> > [storm-core-0.9.3.jar:0.9.3] >> > at clojure.lang.RestFn.invoke(RestFn.java:423) >> > [clojure-1.5.1.jar:na] >> > at >> > backtype.storm.daemon.worker$fn__3808$fn__3809.invoke(worker.clj:452) >> > [storm-core-0.9.3.jar:0.9.3] >> > at >> > >> backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240) >> > [storm-core-0.9.3.jar:0.9.3] >> > at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) >> > [storm-core-0.9.3.jar:0.9.3] >> > at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >> > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] >> > >> > Any idea why that happen? >> > >> > >> > thanks >> > >> > Alec >> > >> >