storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/3] storm git commit: STORM-1154: SequenceFileBolt needs unit tests
Date Fri, 06 Nov 2015 18:06:44 GMT
Repository: storm
Updated Branches:
  refs/heads/master 7f14dfd88 -> b57da7b9e


STORM-1154: SequenceFileBolt needs unit tests


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ac25d6d3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ac25d6d3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ac25d6d3

Branch: refs/heads/master
Commit: ac25d6d3bf27efb9367cd537bfc7732b8fea0c0f
Parents: de579be
Author: Aaron Dossett <aaron.dossett@target.com>
Authored: Tue Nov 3 07:52:17 2015 -0600
Committer: Aaron Dossett <aaron.dossett@target.com>
Committed: Tue Nov 3 07:52:17 2015 -0600

----------------------------------------------------------------------
 .../storm/hdfs/bolt/TestSequenceFileBolt.java   | 186 +++++++++++++++++++
 1 file changed, 186 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ac25d6d3/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
new file mode 100644
index 0000000..5c760ef
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.Constants;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import backtype.storm.tuple.Values;
+import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.format.SequenceFormat;
+import org.apache.storm.hdfs.bolt.format.DefaultSequenceFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.junit.*;
+
+import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import static org.mockito.Mockito.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+
+public class TestSequenceFileBolt {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestSequenceFileBolt.class);
+
+    private String hdfsURI;
+    private DistributedFileSystem fs;
+    private MiniDFSCluster hdfsCluster;
+    private static final String testRoot = "/unittest";
+    Tuple tuple1 = generateTestTuple(1l, "first tuple");
+    Tuple tuple2 = generateTestTuple(2l, "second tuple");
+
+    @Mock private OutputCollector collector;
+    @Mock private TopologyContext topologyContext;
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    @Before
+    public void setup() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        Configuration conf = new Configuration();
+        conf.set("fs.trash.interval", "10");
+        conf.setBoolean("dfs.permissions", true);
+        File baseDir = new File("./target/hdfs/").getAbsoluteFile();
+        FileUtil.fullyDelete(baseDir);
+        conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+
+        MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+        hdfsCluster = builder.build();
+        fs = hdfsCluster.getFileSystem();
+        hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
+    }
+
+    @After
+    public void shutDown() throws IOException {
+        fs.close();
+        hdfsCluster.shutdown();
+    }
+
+    @Test
+    public void testTwoTuplesTwoFiles() throws IOException {
+        SequenceFileBolt bolt = makeSeqBolt(hdfsURI, 1, .00001f);
+
+        bolt.prepare(new Config(), topologyContext, collector);
+        bolt.execute(tuple1);
+        bolt.execute(tuple2);
+
+        verify(collector).ack(tuple1);
+        verify(collector).ack(tuple2);
+
+        Assert.assertEquals(2, countNonZeroLengthFiles(testRoot));
+    }
+
+    @Test
+    public void testTwoTuplesOneFile() throws IOException
+    {
+        SequenceFileBolt bolt = makeSeqBolt(hdfsURI, 2, 10000f);
+        bolt.prepare(new Config(), topologyContext, collector);
+        bolt.execute(tuple1);
+
+        verifyZeroInteractions(collector);
+
+        bolt.execute(tuple2);
+        verify(collector).ack(tuple1);
+        verify(collector).ack(tuple2);
+
+        Assert.assertEquals(1, countNonZeroLengthFiles(testRoot));
+    }
+
+    @Test
+    public void testFailedSync() throws IOException
+    {
+        SequenceFileBolt bolt = makeSeqBolt(hdfsURI, 1, .00001f);
+        bolt.prepare(new Config(), topologyContext, collector);
+
+        fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+        // All writes/syncs will fail so this should cause a RuntimeException
+        thrown.expect(RuntimeException.class);
+        bolt.execute(tuple1);
+
+    }
+
+    private SequenceFileBolt makeSeqBolt(String nameNodeAddr, int countSync, float rotationSizeMB)
{
+
+        SyncPolicy fieldsSyncPolicy = new CountSyncPolicy(countSync);
+
+        FileRotationPolicy fieldsRotationPolicy =
+                new FileSizeRotationPolicy(rotationSizeMB, FileSizeRotationPolicy.Units.MB);
+
+        FileNameFormat fieldsFileNameFormat = new DefaultFileNameFormat().withPath(testRoot);
+
+        SequenceFormat seqFormat = new DefaultSequenceFormat("key", "value");
+
+        return new SequenceFileBolt()
+                .withFsUrl(nameNodeAddr)
+                .withFileNameFormat(fieldsFileNameFormat)
+                .withRotationPolicy(fieldsRotationPolicy)
+                .withSequenceFormat(seqFormat)
+                .withSyncPolicy(fieldsSyncPolicy);
+    }
+
+    private Tuple generateTestTuple(Long key, String value) {
+        TopologyBuilder builder = new TopologyBuilder();
+        GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
+                new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+            @Override
+            public Fields getComponentOutputFields(String componentId, String streamId) {
+                return new Fields("key", "value");
+            }
+        };
+        return new TupleImpl(topologyContext, new Values(key, value), 1, "");
+    }
+
+    // Generally used to compare how files were actually written and compare to expectations
based on total
+    // amount of data written and rotation policies
+    private int countNonZeroLengthFiles(String path) throws IOException {
+        Path p = new Path(path);
+        int nonZero = 0;
+
+        for (FileStatus file : fs.listStatus(p)) {
+            if (file.getLen() > 0) {
+                nonZero++;
+            }
+        }
+
+        return nonZero;
+    }
+
+}


Mime
View raw message