storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kevin Peek <kp...@salesforce.com>
Subject WorkerHook deserialization problem
Date Fri, 21 Oct 2016 13:58:08 GMT
I am running into problems with WorkerHooks on a local cluster. Even using
only a BaseWorkerHook, I get an Exception. When I run the following code,
an EOFException is thrown - it seems the Worker is trying to deserialize an
empty byte[] for one of the WorkerHooks. Comment out the line adding the
hook and this runs fine.

Can someone help me understand what is going wrong here and whether or not
this is strictly an issue with the LocalCluster and how I am using it.


TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spoutId", new RandomNumberSpout());
builder.addWorkerHook(new BaseWorkerHook());
StormTopology topology = builder.createTopology();
Config config = new Config();
config.setMessageTimeoutSecs(1);
String topologyName = "dummy-topology";

LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, config, topology);
Thread.sleep(5000);
cluster.killTopology(topologyName);
Thread.sleep(10000);
cluster.shutdown();


Produces:


java.lang.RuntimeException: java.io.EOFException

at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:185)
at
org.apache.storm.daemon.worker$run_worker_shutdown_hooks$iter__8540__8544$fn__8545.invoke(worker.clj:576)
at clojure.lang.LazySeq.sval(LazySeq.java:40)
at clojure.lang.LazySeq.seq(LazySeq.java:49)
at clojure.lang.RT.seq(RT.java:507)
at clojure.core$seq__4128.invoke(core.clj:137)
at clojure.core$dorun.invoke(core.clj:3009)
at clojure.core$doall.invoke(core.clj:3025)
at
org.apache.storm.daemon.worker$run_worker_shutdown_hooks.invoke(worker.clj:574)
at
org.apache.storm.daemon.worker$fn__8555$exec_fn__2466__auto__$reify__8557$shutdown_STAR___8577.invoke(worker.clj:691)
at
org.apache.storm.daemon.worker$fn__8555$exec_fn__2466__auto__$reify$reify__8603.shutdown(worker.clj:704)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
at
org.apache.storm.process_simulator$kill_process.invoke(process_simulator.clj:46)
at
org.apache.storm.daemon.supervisor$shutdown_worker.invoke(supervisor.clj:286)
at
org.apache.storm.daemon.supervisor$fn__9307$exec_fn__2466__auto__$reify__9332.shutdown_all_workers(supervisor.clj:852)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
at org.apache.storm.testing$kill_local_storm_cluster.invoke(testing.clj:199)
at org.apache.storm.LocalCluster$_shutdown.invoke(LocalCluster.clj:66)
at org.apache.storm.LocalCluster.shutdown(Unknown Source)

Mime
View raw message