storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michel Hummel <hummel.mic...@gmail.com>
Subject Re: STORM-2979 : WorkerHooks EOFException during run_worker_shutdown_hooks
Date Fri, 23 Mar 2018 21:49:33 GMT
After some digging I finaly found a workaround (a pullrequest is
pending to fix the issue in storm).
Using a simple trick in the start method of the worker-hook (rewind of
the byteBuffer to allow the deserialization on stop)it can work as
expected, here is a simple example if someone encounter the same issue
:

package myTest.stormWorkerHook;

import java.nio.ByteBuffer;
import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.hooks.BaseWorkerHook;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.operation.builtin.Count;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.testing.MemoryMapState;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;


public class App {

    static class myHook extends BaseWorkerHook {
        // store the size of the serialization of this class
        int mySize = 0;

        public int getMySize() {
            return mySize;
        }

        public void setMySize(int mySize) {
            this.mySize = mySize;
        }

        /**
         *
         */
        private static final long serialVersionUID = 1L;

         public void start(Map stormConf, WorkerTopologyContext context) {
               System.out.println("### START HOOK");
               ByteBuffer bf =
context.getRawTopology().get_worker_hooks().get(0);
               // getting the byte data for deserialization has move
the position
               // to the end of the buffer
               int pos = bf.position();
               // rewind the position of the buffer to allow
               // the deserialization which will occur on STOp
               bf.position(pos-mySize);

            }

            /**
             * This method is called right before a worker shuts down
             */
            @Override
            public void shutdown() {
                   System.out.println("### STOP HOOK");
            }

    }



    /*
     * Function takes the "sentence" field and emits a tuple for each word.
     */
    public static class Split extends BaseFunction {
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }
    }

    // simple topo
    public static StormTopology buildTridentTopology() {
        /*
         * spout reads an infinite stream of sentences from the following source
         */
        @SuppressWarnings("unchecked")
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
                new Values("the cow jumped over the moon"),
                new Values("the man went to the store and bought some candy"),
                new Values("four score and seven years ago"),
                new Values("how many apples can you eat"),
                new Values("to be or not to be the person"));
        spout.setCycle(true);

        /*
         * TridentTopology object exposes the interface for
constructing Trident computations
         */
        TridentTopology topology = new TridentTopology();

        TridentState wordCounts =
                topology.newStream("spout1", spout)
                        .parallelismHint(16)
                        .each(new Fields("sentence"), new Split(), new
Fields("word"))
                        .groupBy(new Fields("word"))
                        .persistentAggregate(new
MemoryMapState.Factory(), new Count(), new Fields("count"))
                        .parallelismHint(16)


        ;
        StormTopology topo = topology.build();
        myHook mh =new myHook();
        // serialize one time to get the size of the byte[] result
        byte[] rawdata = Utils.javaSerialize(mh);
        // store the size inside the hook
        // it will be use to fix issue regarding byteBuffer
      mh.setMySize(rawdata.length );
      // serialize with the the size as attibute
      rawdata = Utils.javaSerialize(mh);

        ByteBuffer a = ByteBuffer.wrap(rawdata);
        topo.add_to_worker_hooks(a);

        return topo;

    }




    public static void main(String[] args) throws Exception {

        Config conf = new Config();
        conf.setMaxSpoutPending(20);

            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("wordCounter", conf, buildTridentTopology());
                Thread.sleep(10000);
                cluster.killTopology("wordCounter");
                Thread.sleep(10000);
                cluster.shutdown();

    }




}

2018-03-05 9:38 GMT+01:00 Robin Perice <robin.perice@thales-services.fr>:
> Hi everybody,
>
> I'm trying to use the interface IWorkerHook with LocalCluster.
>
> For the moment I use BaseWorkerHook, registred like this :
>
> final BaseWorkerHook hook = new BaseWorkerHook();
> final ByteBuffer serializedHook =
> ByteBuffer.wrap(Utils.javaSerialize(hook));
> topo.add_to_worker_hooks(hook);
>
> After killing the topology, Storm tries to deserialize the shutdown hooks.
> At this point a RuntimeException is thrown :
>
> java.lang.RuntimeException: java.io.EOFException
>     at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:254)
>     at
> org.apache.storm.daemon.worker$run_worker_shutdown_hooks$iter__5456__5460$fn__5461.invoke(worker.clj:578)
>     at clojure.lang.LazySeq.sval(LazySeq.java:40)
>     at clojure.lang.LazySeq.seq(LazySeq.java:49)
>     at clojure.lang.RT.seq(RT.java:507)
>     at clojure.core$seq__4128.invoke(core.clj:137)
>     at clojure.core$dorun.invoke(core.clj:3009)
>     at clojure.core$doall.invoke(core.clj:3025)
>     at
> org.apache.storm.daemon.worker$run_worker_shutdown_hooks.invoke(worker.clj:576)
>     at
> org.apache.storm.daemon.worker$fn__5471$exec_fn__1371__auto__$reify__5473$shutdown_STAR___5493.invoke(worker.clj:693)
>     at
> org.apache.storm.daemon.worker$fn__5471$exec_fn__1371__auto__$reify$reify__5519.shutdown(worker.clj:706)
>     at
> org.apache.storm.ProcessSimulator.killProcess(ProcessSimulator.java:67)
>     at
> org.apache.storm.daemon.supervisor.LocalContainer.kill(LocalContainer.java:59)
>     at
> org.apache.storm.daemon.supervisor.Slot.killContainerForChangedAssignment(Slot.java:311)
>     at org.apache.storm.daemon.supervisor.Slot.handleRunning(Slot.java:527)
>     at
> org.apache.storm.daemon.supervisor.Slot.stateMachineStep(Slot.java:265)
>     at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:741)
> Caused by: java.io.EOFException
>     at
> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2680)
>     at
> java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3155)
>     at
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:864)
>     at java.io.ObjectInputStream.<init>(ObjectInputStream.java:360)
>     at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:245)
>     ... 16 more
>
>
> I described the problem in this issue :
> https://issues.apache.org/jira/browse/STORM-2979
>
>
> Any help will be appreciated :)
>
>
> Regards,
>
> Robin

Mime
View raw message