storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Asfak Mahamud <m.as...@gmail.com>
Subject backtype.storm.Testing seems not affected by Conf.
Date Thu, 18 Dec 2014 11:32:50 GMT
Hi,

I have wanted to use the
https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java
to write a test for a sample topology. I think configuration is not
affecting my test topology. I have added a serialiser for SampleModel class
in the Conf but when I ran it complained it could not find the serialiser.

Code:

package com.ws.storm;
import java.io.Serializable;import java.nio.ByteBuffer;import
java.nio.CharBuffer;import java.nio.charset.Charset;import
java.nio.charset.CharsetEncoder;import java.util.HashMap;import
java.util.Map;
import org.junit.Test;
import backtype.storm.Config;import
backtype.storm.ILocalCluster;import backtype.storm.Testing;import
backtype.storm.generated.StormTopology;import
backtype.storm.spout.SpoutOutputCollector;import
backtype.storm.task.TopologyContext;import
backtype.storm.testing.CompleteTopologyParam;import
backtype.storm.testing.MkClusterParam;import
backtype.storm.testing.MockedSources;import
backtype.storm.testing.TestJob;import
backtype.storm.topology.BasicOutputCollector;import
backtype.storm.topology.OutputFieldsDeclarer;import
backtype.storm.topology.TopologyBuilder;import
backtype.storm.topology.base.BaseBasicBolt;import
backtype.storm.topology.base.BaseRichSpout;import
backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import
backtype.storm.tuple.Values;import backtype.storm.utils.Utils;
import com.esotericsoftware.kryo.Kryo;import
com.esotericsoftware.kryo.Serializer;import
com.esotericsoftware.kryo.io.Input;import
com.esotericsoftware.kryo.io.Output;

public class StormTopologyTest implements Serializable {

    private static final long serialVersionUID = 2767057917773290125L;

    class SampleModel {

        private ByteBuffer data;

        public SampleModel() {
        }

        public SampleModel(ByteBuffer data) {
            this.data = data;
        }

        public ByteBuffer getData() {
            return data;
        }

        public void setData(ByteBuffer data) {
            this.data = data;
        }

    }

    final class SampleModelSerializer extends Serializer<SampleModel> {

        @Override
        public SampleModel read(Kryo theKryo, Input theInput,
                Class<SampleModel> theMessage) {
            long aDataBytesLength = theInput.readLong();
            byte[] someData = theInput.readBytes((int) aDataBytesLength);
            return new SampleModel(ByteBuffer.wrap(someData));
        }

        @Override
        public void write(Kryo theKryo, Output theOutput, SampleModel
theMessage) {
            ByteBuffer someData = theMessage.getData();
            byte[] someDataBytes = someData.array();
            long aDataBytesLength = someDataBytes.length;

            theOutput.writeLong(aDataBytesLength);
            theOutput.writeBytes(someDataBytes);
        }
    }

    class SampleSpout1 extends BaseRichSpout {

        boolean _isDistributed;
        SpoutOutputCollector _collector;

        public SampleSpout1() throws Exception {
            this(true);
        }

        public SampleSpout1(boolean isDistributed) {
            _isDistributed = isDistributed;
        }

        public void open(Map conf, TopologyContext context,
                SpoutOutputCollector collector) {
            _collector = collector;
        }

        public void close() {
        }

        public void nextTuple() {
            Utils.sleep(100);
            try {
                Charset charset = Charset.forName("UTF-8");
                CharsetEncoder encoder = charset.newEncoder();
                _collector.emit(new Values(new SampleModel(encoder
                        .encode(CharBuffer.wrap("This is for test")))));
            } catch (Exception e) {
                e.printStackTrace();
            }

        }

        public void ack(Object msgId) {

        }

        public void fail(Object msgId) {

        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("record"));
        }

        @Override
        public Map<String, Object> getComponentConfiguration() {
            if (!_isDistributed) {
                Map<String, Object> ret = new HashMap<String, Object>();
                ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
                return ret;
            } else {
                return null;
            }
        }
    }

    class SampleBolt extends BaseBasicBolt {

        private static final long serialVersionUID = 6859091283261443785L;

        public SampleBolt() {
        }

        @Override
        public void execute(Tuple theTuple, BasicOutputCollector theCollector) {
            SampleModel aSampleModel = (SampleModel) theTuple.getValue(0);
            theCollector.emit(new Values(aSampleModel.getData()));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("out"));
        }
    }

    @Test
    public void testRealTimeTopology() {
        MkClusterParam mkClusterParam = new MkClusterParam();
        mkClusterParam.setSupervisors(4);
        Config daemonConf = new Config();
        daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, true);
        mkClusterParam.setDaemonConf(daemonConf);

        Testing.withLocalCluster(mkClusterParam, new TestJob() {

            @Override
            public void run(ILocalCluster cluster) throws Exception {
                StormTopology aTopology = createTopology();
                Config aConfig = createConfig();

                Charset charset = Charset.forName("UTF-8");
                CharsetEncoder encoder = charset.newEncoder();
                MockedSources mockedSources = new MockedSources();
                mockedSources.addMockData("spout",
                        new Values(new SampleModel(encoder
                                .encode(CharBuffer.wrap("This is for test")))));

                CompleteTopologyParam aCompleteTopologyParam = new
CompleteTopologyParam();
                aCompleteTopologyParam.setStormConf(aConfig);
                aCompleteTopologyParam.setMockedSources(mockedSources);

                Map result = Testing.completeTopology(cluster, aTopology,
                        aCompleteTopologyParam);
                System.out.println("\n\nResult " + result + " \n\n");
            }
        });
    }

    private StormTopology createTopology() {
        TopologyBuilder aBuilder = new TopologyBuilder();
        aBuilder.setSpout("spout", new SampleSpout1(true), 2);
        aBuilder.setBolt("aBolt", new SampleBolt(), 2)
                .shuffleGrouping("spout");
        return aBuilder.createTopology();
    }

    private static Config createConfig() {
        Config conf = new Config();
        conf.setNumAckers(1);
        conf.setDebug(false);
        conf.setNumWorkers(2);
        conf.setMaxSpoutPending(10);
        conf.registerSerialization(java.nio.ByteBuffer.class);
        conf.registerSerialization(SampleModel.class,
SampleModelSerializer.class);
        conf.setSkipMissingKryoRegistrations(true);
        conf.setFallBackOnJavaSerialization(false);
        return conf;
    }
}


*Error:*

java.lang.RuntimeException: java.io.NotSerializableException:
com.ws.storm.StormTopologyTest$SampleModel
at
backtype.storm.serialization.DefaultSerializationDelegate.serialize(DefaultSerializationDelegate.java:43)
at backtype.storm.utils.Utils.serialize(Utils.java:85)
at backtype.storm.thrift$serialize_component_object.invoke(thrift.clj:164)
at backtype.storm.testing$complete_topology.doInvoke(testing.clj:478)
at clojure.lang.RestFn.invoke(RestFn.java:826)
at backtype.storm.testing4j$_completeTopology.invoke(testing4j.clj:61)
at backtype.storm.Testing.completeTopology(Unknown Source)
at com.ws.storm.StormTopologyTest$1.run(StormTopologyTest.java:193)
at backtype.storm.testing4j$_withLocalCluster.invoke(testing4j.clj:87)
at backtype.storm.Testing.withLocalCluster(Unknown Source)
at
com.ws.storm.StormTopologyTest.testRealTimeTopology(StormTopologyTest.java:175)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at
org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
at
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
Caused by: java.io.NotSerializableException:
com.ws.storm.StormTopologyTest$SampleModel
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1165)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:329)
at java.util.ArrayList.writeObject(ArrayList.java:570)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:950)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1482)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1535)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:329)
at java.util.ArrayList.writeObject(ArrayList.java:570)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:950)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1482)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1535)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:329)
at
backtype.storm.serialization.DefaultSerializationDelegate.serialize(DefaultSerializationDelegate.java:39)
... 33 more


Best regards and thanks,
Asfak

Mime
View raw message