storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Susheel Kumar Gadalay <skgada...@gmail.com>
Subject Re: Write database basefunction
Date Tue, 09 Dec 2014 11:01:17 GMT
Put these lines in prepare method

        postgresConnector connector = new postgresConnector() ;
        Connection conn = connector.connectToDatabaseOrDie();


On 12/9/14, Sa Li <sa.in.vanc@gmail.com> wrote:
> Hi, Susheel
>
>
> Here is the conn,
>
> public class postgresConnector  {
>         private class Events {
>             public int id;
>             public String event_object;
>         }
>         public void connectIngress() {
>             Connection conn = null;
>             conn = connectToDatabaseOrDie();
>         }
>         public Connection connectToDatabaseOrDie() {
>             Connection conn = null;
>             try {
>                 Class.forName("org.postgresql.Driver");
>                 String url = "jdbc:postgresql://10.100.70.84:5432/ingest";
>                 conn = DriverManager.getConnection(url, "nn", "nn");
>                 System.out.println("DB connected .....");
>             } catch (ClassNotFoundException e) {
>                 e.printStackTrace();
>                 System.exit(1);
>             } catch (SQLException e)
>             {
>                 e.printStackTrace();
>                 System.exit(2);
>             }
>             return conn;
>         }
> }
>
> I change the writeDB as
>
> public static class WriteDB extends BaseFunction {
>         PreparedStatement ps = null ;
>         postgresConnector connector = new postgresConnector() ;
>         Connection conn = connector.connectToDatabaseOrDie();
>         @Override
>         public final void execute(final TridentTuple tuple, final
> TridentCollector collector) {
>             int user = tuple.getInteger(0);
>             String value = tuple.getString(1);
>             final StringBuilder queryBuilder = new StringBuilder()
>                     .append("INSERT INTO test.state(userid, event)
> VALUES(")
>                     .append(user)
>                     .append(", '")
>                     .append(value)
>                     .append("')");
>             try {
>                 ps = conn.prepareStatement(queryBuilder.toString()) ;
>                 ps.execute();
>                 collector.emit(new
> Values(tuple.getStringByField("event")));
>             } catch (SQLException ex) {
>                 System.err.println("Caught IOException: " +
> ex.getMessage());
>             }finally {
>                 if (ps != null) {
>                     try {
>                         ps.close();
>                     } catch (SQLException ex) {
>                     }
>                 }
>             }
>         }
>     }
>
> DB connected, but I got such error, I am sure what is wrong with the
> connection.
>
> DB connected .....
> 3985 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory -
> Thread Thread[main,5,main] died
> java.lang.RuntimeException: java.io.NotSerializableException:
> org.postgresql.jdbc4.Jdbc4Connection
>         at
> backtype.storm.serialization.DefaultSerializationDelegate.serialize(DefaultSerializationDelegate.java:43)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.utils.Utils.serialize(Utils.java:85)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:106)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> storm.trident.topology.TridentTopologyBuilder.buildTopology(TridentTopologyBuilder.java:246)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at storm.trident.TridentTopology.build(TridentTopology.java:425)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> storm.ingress.KafkaIngressTopology.buildTridentKafkaTopology(KafkaIngressTopology.java:292)
> ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at
> storm.ingress.KafkaIngressTopology.main(KafkaIngressTopology.java:318)
> ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> Caused by: java.io.NotSerializableException:
> org.postgresql.jdbc4.Jdbc4Connection
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> ~[na:1.7.0_65]
>         at java.util.HashMap.writeObject(HashMap.java:1128) ~[na:1.7.0_65]
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[na:1.7.0_65]
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> ~[na:1.7.0_65]
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[na:1.7.0_65]
>         at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_65]
>         at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> ~[na:1.7.0_65]
>         at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> ~[na:1.7.0_65]
>         at
> backtype.storm.serialization.DefaultSerializationDelegate.serialize(DefaultSerializationDelegate.java:39)
> ~[storm-core-0.9.3.jar:0.9.3]
>         ... 6 common frames omitted
>
>
> Thanks
>
> Alec
>
>
> On Fri, Dec 5, 2014 at 9:50 PM, Susheel Kumar Gadalay <skgadalay@gmail.com>
> wrote:
>
>> It is giving NPE in execute method of WriteDB execute method.
>>
>> You have declared
>>    private Connection conn = null ;
>>
>> and used it in execute
>>    ps = conn.prepareStatement(queryBuilder.toString()) ;
>>
>> Where is conn initialized?
>>
>> On 12/6/14, Sa Li <sa.in.vanc@gmail.com> wrote:
>> > Hi, all
>> >
>> > I wrote a writeDB function to write tuples into database,
>> >
>> > public static class WriteDB extends BaseFunction {
>> >       private Connection conn = null ;
>> >       PreparedStatement ps = null;
>> >
>> >      @Override
>> >      public final void execute(final TridentTuple tuple, final
>> > TridentCollector collector) {
>> >                       int user = tuple.getInteger(0);
>> >                       String value = tuple.getString(1);
>> >                       final StringBuilder queryBuilder = new
>> > StringBuilder()
>> >                                .append("INSERT INTO test.state(userid,
>> > event) VALUES(")
>> >                                .append(user)
>> >                                .append(", '")
>> >                                .append(value)
>> >                                .append("')");
>> >                      System.out.println(queryBuilder.toString());
>> >                      try {
>> >                               ps =
>> > conn.prepareStatement(queryBuilder.toString()) ;
>> >                               ps.execute();
>> >                               collector.emit(new
>> > Values(tuple.getStringByField("event")));
>> >                      }
>> >                      catch (SQLException ex) {
>> >                               System.err.println("Caught IOException: "
>> > +
>> > ex.getMessage());
>> >                      }      finally {
>> >                                             if (ps != null) {
>> >                                                              try {
>> >
>> > ps.close();
>> >                                                              } catch
>> > (SQLException ex) {
>> >                                                              }
>> >                                             }
>> >                      }
>> >         }
>> >  }
>> >
>> > This is the topology:
>> > topology.newStream("topictestspout", kafkaSpout)
>> >                          .each(new Fields("str"),
>> >                                    new JsonObjectParse(),
>> >                                    new Fields("userid","event"))
>> >                          .each(new Fields("userid","event"),
>> >                                     new WriteDB(),
>> >                                   new Fields("events"));
>> >
>> > However, I am getting such error:
>> > 7026 [Thread-12-b-0] ERROR backtype.storm.util - Async loop died!
>> > java.lang.RuntimeException: java.lang.NullPointerException
>> >         at
>> >
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>> >         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>> > Caused by: java.lang.NullPointerException: null
>> >         at
>> >
>> storm.ingress.KafkaIngressTopology$WriteDB.execute(KafkaIngressTopology.java:146)
>> > ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>> >         at
>> >
>> storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> storm.trident.planner.processor.AppendCollector.emit(AppendCollector.java:50)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> storm.ingress.KafkaIngressTopology$JsonObjectParse.execute(KafkaIngressTopology.java:122)
>> > ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>> >         at
>> >
>> storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:206)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> > storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:146)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         ... 6 common frames omitted
>> > 7027 [Thread-12-b-0] ERROR backtype.storm.daemon.executor -
>> > java.lang.RuntimeException: java.lang.NullPointerException
>> >         at
>> >
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>> >         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>> > Caused by: java.lang.NullPointerException: null
>> >         at
>> >
>> storm.ingress.KafkaIngressTopology$WriteDB.execute(KafkaIngressTopology.java:146)
>> > ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>> >         at
>> >
>> storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> storm.trident.planner.processor.AppendCollector.emit(AppendCollector.java:50)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> storm.ingress.KafkaIngressTopology$JsonObjectParse.execute(KafkaIngressTopology.java:122)
>> > ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>> >         at
>> >
>> storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:206)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> > storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:146)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>> > ~[storm-core-0.9.3.jar:0.9.3]
>> >         ... 6 common frames omitted
>> > 7231 [Thread-12-b-0] ERROR backtype.storm.util - Halting process:
>> ("Worker
>> > died")
>> > java.lang.RuntimeException: ("Worker died")
>> >         at
>> > backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
>> > [storm-core-0.9.3.jar:0.9.3]
>> >         at clojure.lang.RestFn.invoke(RestFn.java:423)
>> > [clojure-1.5.1.jar:na]
>> >         at
>> > backtype.storm.daemon.worker$fn__3808$fn__3809.invoke(worker.clj:452)
>> > [storm-core-0.9.3.jar:0.9.3]
>> >         at
>> >
>> backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240)
>> > [storm-core-0.9.3.jar:0.9.3]
>> >         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
>> > [storm-core-0.9.3.jar:0.9.3]
>> >         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>> >         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>> >
>> > Any idea why that happen?
>> >
>> >
>> > thanks
>> >
>> > Alec
>> >
>>
>

Mime
View raw message