storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "P. Taylor Goetz" <ptgo...@gmail.com>
Subject Re: WorkerHook deserialization problem
Date Fri, 28 Oct 2016 20:15:05 GMT
I was able to verify this to be a bug in how worker hooks work in local mode.

In trying to see if this affects distributed mode as well, a found a more serious issue that
prevents workers from shutting down gracefully (an thus preventing shutdown hooks from running):

https://issues.apache.org/jira/browse/STORM-2176 <https://issues.apache.org/jira/browse/STORM-2176>

So for the time being I don’t believe worker shutdown hooks work in either local or distributed
mode. I can confirm the start portion of worker hooks functions properly, but not shutdown.
Hopefully we will be able to fix both these issues in an upcoming release.

-Taylor


> On Oct 21, 2016, at 9:58 AM, Kevin Peek <kpeek@salesforce.com> wrote:
> 
> I am running into problems with WorkerHooks on a local cluster. Even using only a BaseWorkerHook,
I get an Exception. When I run the following code, an EOFException is thrown - it seems the
Worker is trying to deserialize an empty byte[] for one of the WorkerHooks. Comment out the
line adding the hook and this runs fine.
> 
> Can someone help me understand what is going wrong here and whether or not this is strictly
an issue with the LocalCluster and how I am using it.
> 
> 
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("spoutId", new RandomNumberSpout());
> builder.addWorkerHook(new BaseWorkerHook());
> StormTopology topology = builder.createTopology();
> Config config = new Config();
> config.setMessageTimeoutSecs(1);
> String topologyName = "dummy-topology";
> 
> LocalCluster cluster = new LocalCluster();
> cluster.submitTopology(topologyName, config, topology);
> Thread.sleep(5000);
> cluster.killTopology(topologyName);
> Thread.sleep(10000);
> cluster.shutdown();
> 
> 
> Produces:
> 
> 
> java.lang.RuntimeException: java.io.EOFException
> 
> 	at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:185)
> 	at org.apache.storm.daemon.worker$run_worker_shutdown_hooks$iter__8540__8544$fn__8545.invoke(worker.clj:576)
> 	at clojure.lang.LazySeq.sval(LazySeq.java:40)
> 	at clojure.lang.LazySeq.seq(LazySeq.java:49)
> 	at clojure.lang.RT.seq(RT.java:507)
> 	at clojure.core$seq__4128.invoke(core.clj:137)
> 	at clojure.core$dorun.invoke(core.clj:3009)
> 	at clojure.core$doall.invoke(core.clj:3025)
> 	at org.apache.storm.daemon.worker$run_worker_shutdown_hooks.invoke(worker.clj:574)
> 	at org.apache.storm.daemon.worker$fn__8555$exec_fn__2466__auto__$reify__8557$shutdown_STAR___8577.invoke(worker.clj:691)
> 	at org.apache.storm.daemon.worker$fn__8555$exec_fn__2466__auto__$reify$reify__8603.shutdown(worker.clj:704)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
> 	at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
> 	at org.apache.storm.process_simulator$kill_process.invoke(process_simulator.clj:46)
> 	at org.apache.storm.daemon.supervisor$shutdown_worker.invoke(supervisor.clj:286)
> 	at org.apache.storm.daemon.supervisor$fn__9307$exec_fn__2466__auto__$reify__9332.shutdown_all_workers(supervisor.clj:852)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
> 	at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
> 	at org.apache.storm.testing$kill_local_storm_cluster.invoke(testing.clj:199)
> 	at org.apache.storm.LocalCluster$_shutdown.invoke(LocalCluster.clj:66)
> 	at org.apache.storm.LocalCluster.shutdown(Unknown Source)


Mime
View raw message