storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [2/3] storm git commit: [STORM-2916] separate hdfs-blobstore from storm-hdfs
Date Thu, 01 Feb 2018 15:24:17 GMT
[STORM-2916] separate hdfs-blobstore from storm-hdfs


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f58d4729
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f58d4729
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f58d4729

Branch: refs/heads/master
Commit: f58d472994297d240af8a478d660c3be8a69bd97
Parents: f37a6bd
Author: Ethan Li <ethanopensource@gmail.com>
Authored: Mon Jan 29 16:15:01 2018 -0600
Committer: Ethan Li <ethanopensource@gmail.com>
Committed: Tue Jan 30 15:05:38 2018 -0600

----------------------------------------------------------------------
 external/storm-blobstore-migration/pom.xml      |   2 +-
 external/storm-hdfs-blobstore/pom.xml           | 251 +++++++++
 .../storm/hdfs/blobstore/HdfsBlobStore.java     | 394 ++++++++++++++
 .../storm/hdfs/blobstore/HdfsBlobStoreFile.java | 196 +++++++
 .../storm/hdfs/blobstore/HdfsBlobStoreImpl.java | 312 +++++++++++
 .../hdfs/blobstore/HdfsClientBlobStore.java     | 130 +++++
 .../storm/hdfs/blobstore/BlobStoreTest.java     | 540 ++++++++++++++++++
 .../hdfs/blobstore/HdfsBlobStoreImplTest.java   | 224 ++++++++
 .../storm/hdfs/blobstore/HdfsBlobStore.java     | 395 --------------
 .../storm/hdfs/blobstore/HdfsBlobStoreFile.java | 196 -------
 .../storm/hdfs/blobstore/HdfsBlobStoreImpl.java | 312 -----------
 .../hdfs/blobstore/HdfsClientBlobStore.java     | 130 -----
 .../storm/hdfs/blobstore/BlobStoreTest.java     | 544 -------------------
 .../hdfs/blobstore/HdfsBlobStoreImplTest.java   | 226 --------
 pom.xml                                         |   1 +
 15 files changed, 2049 insertions(+), 1804 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-blobstore-migration/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-blobstore-migration/pom.xml b/external/storm-blobstore-migration/pom.xml
index c530eec..bd1af90 100644
--- a/external/storm-blobstore-migration/pom.xml
+++ b/external/storm-blobstore-migration/pom.xml
@@ -47,7 +47,7 @@ limitations under the License.
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-hdfs</artifactId>
+            <artifactId>storm-hdfs-blobstore</artifactId>
             <version>${project.version}</version>
             <exclusions>
                 <!--log4j-over-slf4j must be excluded for hadoop-minicluster

http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs-blobstore/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hdfs-blobstore/pom.xml b/external/storm-hdfs-blobstore/pom.xml
new file mode 100644
index 0000000..6383e11
--- /dev/null
+++ b/external/storm-hdfs-blobstore/pom.xml
@@ -0,0 +1,251 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-hdfs-blobstore</artifactId>
+
+    <developers>
+        <developer>
+            <id>ptgoetz</id>
+            <name>P. Taylor Goetz</name>
+            <email>ptgoetz@gmail.com</email>
+        </developer>
+    </developers>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-client</artifactId>
+            <version>${project.version}</version>
+            <scope>${provided.scope}</scope>
+            <exclusions>
+                <!--log4j-over-slf4j must be excluded for hadoop-minicluster
+                    see: http://stackoverflow.com/q/20469026/3542091 -->
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${hadoop.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mockito</groupId>
+                    <artifactId>mockito-all</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jetty-util</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jetty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>servlet-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-server</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet.jsp</groupId>
+                    <artifactId>jsp-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-codec</groupId>
+                    <artifactId>commons-codec</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-minikdc</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.directory.server</groupId>
+                    <artifactId>apacheds-kerberos-codec</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>httpclient</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>servlet-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jetty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jetty-util</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-json</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-server</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet.jsp</groupId>
+                    <artifactId>jsp-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minicluster</artifactId>
+            <version>${hadoop.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-hdfs</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <reuseForks>false</reuseForks>
+                    <forkCount>1</forkCount>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.2</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-clean-plugin</artifactId>
+                <version>2.5</version>
+                <executions>
+                    <execution>
+                        <id>cleanup</id>
+                        <phase>clean</phase>
+                        <goals>
+                            <goal>clean</goal>
+                        </goals>
+                        <configuration>
+                            <excludeDefaultDirectories>true</excludeDefaultDirectories>
+                            <filesets>
+                                <fileset>
+                                    <directory>./build/</directory>
+                                </fileset>
+                            </filesets>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <!--Note - the version would be inherited-->
+                <configuration>
+                    <maxAllowedViolations>80</maxAllowedViolations>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
new file mode 100644
index 0000000..a4b108f
--- /dev/null
+++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.blobstore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.AtomicOutputStream;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.BlobStoreAclHandler;
+import org.apache.storm.blobstore.BlobStoreFile;
+import org.apache.storm.blobstore.InputStreamWithMeta;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyAlreadyExistsException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.storm.blobstore.BlobStoreAclHandler.*;
+
+/**
+ * Provides a HDFS file system backed blob store implementation.
+ * Note that this provides an api for having HDFS be the backing store for the blobstore,
+ * it is not a service/daemon.
+ *
+ * We currently have NIMBUS_ADMINS and SUPERVISOR_ADMINS configuration. NIMBUS_ADMINS are given READ, WRITE and ADMIN
+ * access whereas the SUPERVISOR_ADMINS are given READ access in order to read and download the blobs form the nimbus.
+ *
+ * The ACLs for the blob store are validated against whether the subject is a NIMBUS_ADMIN, SUPERVISOR_ADMIN or USER
+ * who has read, write or admin privileges in order to perform respective operations on the blob.
+ *
+ * For hdfs blob store
+ * 1. The USER interacts with nimbus to upload and access blobs through NimbusBlobStore Client API. Here, unlike
+ * local blob store which stores the blobs locally, the nimbus talks to HDFS to upload the blobs.
+ * 2. The USER sets the ACLs, and the blob access is validated against these ACLs.
+ * 3. The SUPERVISOR interacts with nimbus through HdfsClientBlobStore to download the blobs. Here, unlike local
+ * blob store the supervisor interacts with HDFS directly to download the blobs. The call to HdfsBlobStore is made as a "null"
+ * subject. The blobstore gets the hadoop user and validates permissions for the supervisor.
+ */
+public class HdfsBlobStore extends BlobStore {
+    public static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStore.class);
+    private static final String DATA_PREFIX = "data_";
+    private static final String META_PREFIX = "meta_";
+    private static final HashMap<String, Subject> alreadyLoggedInUsers = new HashMap<>();
+
+    private BlobStoreAclHandler aclHandler;
+    private HdfsBlobStoreImpl hbs;
+    private Subject localSubject;
+    private Map<String, Object> conf;
+
+    /**
+     * Get the subject from Hadoop so we can use it to validate the acls. There is no direct
+     * interface from UserGroupInformation to get the subject, so do a doAs and get the context.
+     * We could probably run everything in the doAs but for now just grab the subject.
+     */
+    private Subject getHadoopUser() {
+        Subject subj;
+        try {
+            subj = UserGroupInformation.getCurrentUser().doAs(
+                    new PrivilegedAction<Subject>() {
+                        @Override
+                        public Subject run() {
+                            return Subject.getSubject(AccessController.getContext());
+                        }
+                    });
+        } catch (IOException e) {
+            throw new RuntimeException("Error creating subject and logging user in!", e);
+        }
+        return subj;
+    }
+
+    /**
+     * If who is null then we want to use the user hadoop says we are.
+     * Required for the supervisor to call these routines as its not
+     * logged in as anyone.
+     */
+    private Subject checkAndGetSubject(Subject who) {
+        if (who == null) {
+            return localSubject;
+        }
+        return who;
+    }
+
+    @Override
+    public void prepare(Map<String, Object> conf, String overrideBase, NimbusInfo nimbusInfo) {
+        this.conf = conf;
+        prepareInternal(conf, overrideBase, null);
+    }
+
+    /**
+     * Allow a Hadoop Configuration to be passed for testing. If it's null then the hadoop configs
+     * must be in your classpath.
+     */
+    protected void prepareInternal(Map<String, Object> conf, String overrideBase, Configuration hadoopConf) {
+        this.conf = conf;
+        if (overrideBase == null) {
+            overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
+        }
+        if (overrideBase == null) {
+            throw new RuntimeException("You must specify a blobstore directory for HDFS to use!");
+        }
+        LOG.debug("directory is: {}", overrideBase);
+        try {
+            // if a HDFS keytab/principal have been supplied login, otherwise assume they are
+            // logged in already or running insecure HDFS.
+            String principal = (String) conf.get(Config.BLOBSTORE_HDFS_PRINCIPAL);
+            String keyTab = (String) conf.get(Config.BLOBSTORE_HDFS_KEYTAB);
+
+            if (principal != null && keyTab != null) {
+                String combinedKey = principal + " from " + keyTab;
+                synchronized (alreadyLoggedInUsers) {
+                    localSubject = alreadyLoggedInUsers.get(combinedKey);
+                    if (localSubject == null) {
+                        UserGroupInformation.loginUserFromKeytab(principal, keyTab);
+                        localSubject = getHadoopUser();
+                        alreadyLoggedInUsers.put(combinedKey, localSubject);
+                    }
+                }
+            } else {
+                if (principal == null && keyTab != null) {
+                    throw new RuntimeException("You must specify an HDFS principal to go with the keytab!");
+
+                } else {
+                    if (principal != null && keyTab == null) {
+                        throw new RuntimeException("You must specify HDFS keytab go with the principal!");
+                    }
+                }
+                localSubject = getHadoopUser();
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Error logging in from keytab!", e);
+        }
+        aclHandler = new BlobStoreAclHandler(conf);
+        Path baseDir = new Path(overrideBase, BASE_BLOBS_DIR_NAME);
+        try {
+            if (hadoopConf != null) {
+                hbs = new HdfsBlobStoreImpl(baseDir, conf, hadoopConf);
+            } else {
+                hbs = new HdfsBlobStoreImpl(baseDir, conf);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who)
+            throws AuthorizationException, KeyAlreadyExistsException {
+        if (meta.get_replication_factor() <= 0) {
+            meta.set_replication_factor((int)conf.get(Config.STORM_BLOBSTORE_REPLICATION_FACTOR));
+        }
+        who = checkAndGetSubject(who);
+        validateKey(key);
+        aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | ADMIN);
+        BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+        aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, key);
+        if (hbs.exists(DATA_PREFIX + key)) {
+            throw new KeyAlreadyExistsException(key);
+        }
+        BlobStoreFileOutputStream mOut = null;
+        try {
+            BlobStoreFile metaFile = hbs.write(META_PREFIX + key, true);
+            metaFile.setMetadata(meta);
+            mOut = new BlobStoreFileOutputStream(metaFile);
+            mOut.write(Utils.thriftSerialize(meta));
+            mOut.close();
+            mOut = null;
+            BlobStoreFile dataFile = hbs.write(DATA_PREFIX + key, true);
+            dataFile.setMetadata(meta);
+            return new BlobStoreFileOutputStream(dataFile);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            if (mOut != null) {
+                try {
+                    mOut.cancel();
+                } catch (IOException e) {
+                    //Ignored
+                }
+            }
+        }
+    }
+
+    @Override
+    public AtomicOutputStream updateBlob(String key, Subject who)
+            throws AuthorizationException, KeyNotFoundException {
+        who = checkAndGetSubject(who);
+        SettableBlobMeta meta = getStoredBlobMeta(key);
+        validateKey(key);
+        aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
+        try {
+            BlobStoreFile dataFile = hbs.write(DATA_PREFIX + key, false);
+            dataFile.setMetadata(meta);
+            return new BlobStoreFileOutputStream(dataFile);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private SettableBlobMeta getStoredBlobMeta(String key) throws KeyNotFoundException {
+        InputStream in = null;
+        try {
+            BlobStoreFile pf = hbs.read(META_PREFIX + key);
+            try {
+                in = pf.getInputStream();
+            } catch (FileNotFoundException fnf) {
+                throw new KeyNotFoundException(key);
+            }
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            byte[] buffer = new byte[2048];
+            int len;
+            while ((len = in.read(buffer)) > 0) {
+                out.write(buffer, 0, len);
+            }
+            in.close();
+            in = null;
+            return Utils.thriftDeserialize(SettableBlobMeta.class, out.toByteArray());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            if (in != null) {
+                try {
+                    in.close();
+                } catch (IOException e) {
+                    //Ignored
+                }
+            }
+        }
+    }
+
+    @Override
+    public ReadableBlobMeta getBlobMeta(String key, Subject who)
+            throws AuthorizationException, KeyNotFoundException {
+        who = checkAndGetSubject(who);
+        validateKey(key);
+        SettableBlobMeta meta = getStoredBlobMeta(key);
+        aclHandler.validateUserCanReadMeta(meta.get_acl(), who, key);
+        ReadableBlobMeta rbm = new ReadableBlobMeta();
+        rbm.set_settable(meta);
+        try {
+            BlobStoreFile pf = hbs.read(DATA_PREFIX + key);
+            rbm.set_version(pf.getModTime());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return rbm;
+    }
+
+    @Override
+    public void setBlobMeta(String key, SettableBlobMeta meta, Subject who)
+            throws AuthorizationException, KeyNotFoundException {
+        if (meta.get_replication_factor() <= 0) {
+            meta.set_replication_factor((int)conf.get(Config.STORM_BLOBSTORE_REPLICATION_FACTOR));
+        }
+        who = checkAndGetSubject(who);
+        validateKey(key);
+        aclHandler.normalizeSettableBlobMeta(key,  meta, who, ADMIN);
+        BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
+        SettableBlobMeta orig = getStoredBlobMeta(key);
+        aclHandler.hasPermissions(orig.get_acl(), ADMIN, who, key);
+        BlobStoreFileOutputStream mOut = null;
+        writeMetadata(key, meta);
+    }
+
+    @Override
+    public void deleteBlob(String key, Subject who)
+            throws AuthorizationException, KeyNotFoundException {
+        who = checkAndGetSubject(who);
+        validateKey(key);
+        SettableBlobMeta meta = getStoredBlobMeta(key);
+        aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
+        try {
+            hbs.deleteKey(DATA_PREFIX + key);
+            hbs.deleteKey(META_PREFIX + key);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public InputStreamWithMeta getBlob(String key, Subject who)
+            throws AuthorizationException, KeyNotFoundException {
+        who = checkAndGetSubject(who);
+        validateKey(key);
+        SettableBlobMeta meta = getStoredBlobMeta(key);
+        aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
+        try {
+            return new BlobStoreFileInputStream(hbs.read(DATA_PREFIX + key));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Iterator<String> listKeys() {
+        try {
+            return new KeyTranslationIterator(hbs.listKeys(), DATA_PREFIX);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        //Empty
+    }
+
+    @Override
+    public int getBlobReplication(String key, Subject who) throws AuthorizationException, KeyNotFoundException {
+        who = checkAndGetSubject(who);
+        validateKey(key);
+        SettableBlobMeta meta = getStoredBlobMeta(key);
+        aclHandler.hasAnyPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, key);
+        try {
+            return hbs.getBlobReplication(DATA_PREFIX + key);
+        } catch (IOException exp) {
+            throw new RuntimeException(exp);
+        }
+    }
+
+    @Override
+    public int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException {
+        who = checkAndGetSubject(who);
+        validateKey(key);
+        SettableBlobMeta meta = getStoredBlobMeta(key);
+        meta.set_replication_factor(replication);
+        aclHandler.hasAnyPermissions(meta.get_acl(), WRITE | ADMIN, who, key);
+        try {
+            writeMetadata(key, meta);
+            return hbs.updateBlobReplication(DATA_PREFIX + key, replication);
+        } catch (IOException exp) {
+            throw new RuntimeException(exp);
+        }
+    }
+
+    public void writeMetadata(String key, SettableBlobMeta meta)
+            throws AuthorizationException, KeyNotFoundException {
+        BlobStoreFileOutputStream mOut = null;
+        try {
+            BlobStoreFile hdfsFile = hbs.write(META_PREFIX + key, false);
+            hdfsFile.setMetadata(meta);
+            mOut = new BlobStoreFileOutputStream(hdfsFile);
+            mOut.write(Utils.thriftSerialize(meta));
+            mOut.close();
+            mOut = null;
+        } catch (IOException exp) {
+            throw new RuntimeException(exp);
+        } finally {
+            if (mOut != null) {
+                try {
+                    mOut.cancel();
+                } catch (IOException e) {
+                    //Ignored
+                }
+            }
+        }
+    }
+
+    public void fullCleanup(long age) throws IOException {
+        hbs.fullCleanup(age);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java
new file mode 100644
index 0000000..3021e66
--- /dev/null
+++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.storm.blobstore.BlobStoreFile;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.regex.Matcher;
+
+public class HdfsBlobStoreFile extends BlobStoreFile {
+    public static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreFile.class);
+
+    private final String _key;
+    private final boolean _isTmp;
+    private final Path _path;
+    private Long _modTime = null;
+    private final boolean _mustBeNew;
+    private final Configuration _hadoopConf;
+    private final FileSystem _fs;
+    private SettableBlobMeta meta;
+
+    // files are world-wide readable and owner writable
+    final public static FsPermission BLOBSTORE_FILE_PERMISSION =
+            FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+    public HdfsBlobStoreFile(Path base, String name, Configuration hconf) {
+        if (BLOBSTORE_DATA_FILE.equals(name)) {
+            _isTmp = false;
+        } else {
+            Matcher m = TMP_NAME_PATTERN.matcher(name);
+            if (!m.matches()) {
+                throw new IllegalArgumentException("File name does not match '"+name+"' !~ "+TMP_NAME_PATTERN);
+            }
+            _isTmp = true;
+        }
+        _hadoopConf = hconf;
+        _key = base.getName();
+        _path = new Path(base, name);
+        _mustBeNew = false;
+        try {
+            _fs = _path.getFileSystem(_hadoopConf);
+        } catch (IOException e) {
+            throw new RuntimeException("Error getting filesystem for path: " + _path, e);
+        }
+    }
+
+    public HdfsBlobStoreFile(Path base, boolean isTmp, boolean mustBeNew, Configuration hconf) {
+        _key = base.getName();
+        _hadoopConf = hconf;
+        _isTmp = isTmp;
+        _mustBeNew = mustBeNew;
+        if (_isTmp) {
+            _path = new Path(base, System.currentTimeMillis()+TMP_EXT);
+        } else {
+            _path = new Path(base, BLOBSTORE_DATA_FILE);
+        }
+        try {
+            _fs = _path.getFileSystem(_hadoopConf);
+        } catch (IOException e) {
+            throw new RuntimeException("Error getting filesystem for path: " + _path, e);
+        }
+    }
+
+    @Override
+    public void delete() throws IOException {
+        _fs.delete(_path, true);
+    }
+
+    @Override
+    public boolean isTmp() {
+        return _isTmp;
+    }
+
+    @Override
+    public String getKey() {
+        return _key;
+    }
+
+    @Override
+    public long getModTime() throws IOException {
+        if (_modTime == null) {
+            FileSystem fs = _path.getFileSystem(_hadoopConf);
+            _modTime = fs.getFileStatus(_path).getModificationTime();
+        }
+        return _modTime;
+    }
+
+    private void checkIsNotTmp() {
+        if (!isTmp()) {
+            throw new IllegalStateException("Can only operate on a temporary blobstore file.");
+        }
+    }
+
+    private void checkIsTmp() {
+        if (isTmp()) {
+            throw new IllegalStateException("Cannot operate on a temporary blobstore file.");
+        }
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+        checkIsTmp();
+        return _fs.open(_path);
+    }
+
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+        checkIsNotTmp();
+        OutputStream out = null;
+        FsPermission fileperms = new FsPermission(BLOBSTORE_FILE_PERMISSION);
+        try {
+            out = _fs.create(_path, (short)this.getMetadata().get_replication_factor());
+            _fs.setPermission(_path, fileperms);
+            _fs.setReplication(_path, (short)this.getMetadata().get_replication_factor());
+        } catch (IOException e) {
+            //Try to create the parent directory, may not work
+            FsPermission dirperms = new FsPermission(HdfsBlobStoreImpl.BLOBSTORE_DIR_PERMISSION);
+            if (!_fs.mkdirs(_path.getParent(), dirperms)) {
+                LOG.warn("error creating parent dir: " + _path.getParent());
+            }
+            out = _fs.create(_path, (short)this.getMetadata().get_replication_factor());
+            _fs.setPermission(_path, dirperms);
+            _fs.setReplication(_path, (short)this.getMetadata().get_replication_factor());
+        }
+        if (out == null) {
+            throw new IOException("Error in creating: " + _path);
+        }
+        return out;
+    }
+
+    @Override
+    public void commit() throws IOException {
+        checkIsNotTmp();
+        // FileContext supports atomic rename, whereas FileSystem doesn't
+        FileContext fc = FileContext.getFileContext(_hadoopConf);
+        Path dest = new Path(_path.getParent(), BLOBSTORE_DATA_FILE);
+        if (_mustBeNew) {
+            fc.rename(_path, dest);
+        } else {
+            fc.rename(_path, dest, Options.Rename.OVERWRITE);
+        }
+        // Note, we could add support for setting the replication factor
+    }
+
+    @Override
+    public void cancel() throws IOException {
+        checkIsNotTmp();
+        delete();
+    }
+
+    @Override
+    public String toString() {
+        return _path+":"+(_isTmp ? "tmp": BlobStoreFile.BLOBSTORE_DATA_FILE)+":"+_key;
+    }
+
+    @Override
+    public long getFileLength() throws IOException {
+        return _fs.getFileStatus(_path).getLen();
+    }
+
+    @Override
+    public SettableBlobMeta getMetadata() {
+        return meta;
+    }
+
+    @Override
+    public void setMetadata(SettableBlobMeta meta) {
+        this.meta = meta;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
new file mode 100644
index 0000000..702a16f
--- /dev/null
+++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStoreFile;
+import org.apache.storm.utils.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * HDFS blob store impl.
+ */
+public class HdfsBlobStoreImpl {
+    private static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreImpl.class);
+
+    private static final long FULL_CLEANUP_FREQ = 60 * 60 * 1000l;
+    private static final int BUCKETS = 1024;
+    private static final String BLOBSTORE_DATA = "data";
+    
+    private Timer timer;
+
+    public class KeyInHashDirIterator implements Iterator<String> {
+        private int currentBucket = 0;
+        private Iterator<String> it = null;
+        private String next = null;
+
+        public KeyInHashDirIterator() throws IOException {
+            primeNext();
+        }
+
+        private void primeNext() throws IOException {
+            while (it == null && currentBucket < BUCKETS) {
+                String name = String.valueOf(currentBucket);
+                Path dir = new Path(_fullPath, name);
+                try {
+                    it = listKeys(dir);
+                } catch (FileNotFoundException e) {
+                    it = null;
+                }
+                if (it == null || !it.hasNext()) {
+                    it = null;
+                    currentBucket++;
+                } else {
+                    next = it.next();
+                }
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return next != null;
+        }
+
+        @Override
+        public String next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            String current = next;
+            next = null;
+            if (it != null) {
+                if (!it.hasNext()) {
+                    it = null;
+                    currentBucket++;
+                    try {
+                        primeNext();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                } else {
+                    next = it.next();
+                }
+            }
+            return current;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("Delete Not Supported");
+        }
+    }
+
+
+    private Path _fullPath;
+    private FileSystem _fs;
+    private Configuration _hadoopConf;
+
+    // blobstore directory is private!
+    final public static FsPermission BLOBSTORE_DIR_PERMISSION =
+            FsPermission.createImmutable((short) 0700); // rwx--------
+
+    public HdfsBlobStoreImpl(Path path, Map<String, Object> conf) throws IOException {
+        this(path, conf, new Configuration());
+    }
+
+    public HdfsBlobStoreImpl(Path path, Map<String, Object> conf,
+                             Configuration hconf) throws IOException {
+        LOG.info("Blob store based in {}", path);
+        _fullPath = path;
+        _hadoopConf = hconf;
+        _fs = path.getFileSystem(_hadoopConf);
+
+        if (!_fs.exists(_fullPath)) {
+            FsPermission perms = new FsPermission(BLOBSTORE_DIR_PERMISSION);
+            boolean success = _fs.mkdirs(_fullPath, perms);
+            if (!success) {
+                throw new IOException("Error creating blobstore directory: " + _fullPath);
+            }
+        }
+
+        Object shouldCleanup = conf.get(Config.BLOBSTORE_CLEANUP_ENABLE);
+        if (ObjectReader.getBoolean(shouldCleanup, false)) {
+            LOG.debug("Starting hdfs blobstore cleaner");
+            TimerTask cleanup = new TimerTask() {
+                @Override
+                public void run() {
+                    try {
+                        fullCleanup(FULL_CLEANUP_FREQ);
+                    } catch (IOException e) {
+                        LOG.error("Error trying to cleanup", e);
+                    }
+                }
+            };
+            timer = new Timer("HdfsBlobStore cleanup thread", true);
+            timer.scheduleAtFixedRate(cleanup, 0, FULL_CLEANUP_FREQ);
+        }
+    }
+
+    /**
+     * @return all keys that are available for reading.
+     * @throws IOException on any error.
+     */
+    public Iterator<String> listKeys() throws IOException {
+        return new KeyInHashDirIterator();
+    }
+
+    /**
+     * Get an input stream for reading a part.
+     *
+     * @param key the key of the part to read.
+     * @return the where to read the data from.
+     * @throws IOException on any error
+     */
+    public BlobStoreFile read(String key) throws IOException {
+        return new HdfsBlobStoreFile(getKeyDir(key), BLOBSTORE_DATA, _hadoopConf);
+    }
+
+    /**
+     * Get an object tied to writing the data.
+     *
+     * @param key the key of the part to write to.
+     * @param create whether the file needs to be new or not.
+     * @return an object that can be used to both write to, but also commit/cancel the operation.
+     * @throws IOException on any error
+     */
+    public BlobStoreFile write(String key, boolean create) throws IOException {
+        return new HdfsBlobStoreFile(getKeyDir(key), true, create, _hadoopConf);
+    }
+
+    /**
+     * Check if the key exists in the blob store.
+     *
+     * @param key the key to check for
+     * @return true if it exists else false.
+     */
+    public boolean exists(String key) {
+        Path dir = getKeyDir(key);
+        boolean res = false;
+        try {
+            _fs = dir.getFileSystem(_hadoopConf);
+            res = _fs.exists(dir);
+        } catch (IOException e) {
+            LOG.warn("Exception checking for exists on: " + key);
+        }
+        return res;
+    }
+
+    /**
+     * Delete a key from the blob store
+     *
+     * @param key the key to delete
+     * @throws IOException on any error
+     */
+    public void deleteKey(String key) throws IOException {
+        Path keyDir = getKeyDir(key);
+        HdfsBlobStoreFile pf = new HdfsBlobStoreFile(keyDir, BLOBSTORE_DATA,
+                _hadoopConf);
+        pf.delete();
+        delete(keyDir);
+    }
+
+    protected Path getKeyDir(String key) {
+        String hash = String.valueOf(Math.abs((long) key.hashCode()) % BUCKETS);
+        Path hashDir = new Path(_fullPath, hash);
+
+        Path ret = new Path(hashDir, key);
+        LOG.debug("{} Looking for {} in {}", new Object[]{_fullPath, key, hash});
+        return ret;
+    }
+
+    public void fullCleanup(long age) throws IOException {
+        long cleanUpIfBefore = System.currentTimeMillis() - age;
+        Iterator<String> keys = new KeyInHashDirIterator();
+        while (keys.hasNext()) {
+            String key = keys.next();
+            Path keyDir = getKeyDir(key);
+            Iterator<BlobStoreFile> i = listBlobStoreFiles(keyDir);
+            if (!i.hasNext()) {
+                //The dir is empty, so try to delete it, may fail, but that is OK
+                try {
+                    _fs.delete(keyDir, true);
+                } catch (Exception e) {
+                    LOG.warn("Could not delete " + keyDir + " will try again later");
+                }
+            }
+            while (i.hasNext()) {
+                BlobStoreFile f = i.next();
+                if (f.isTmp()) {
+                    if (f.getModTime() <= cleanUpIfBefore) {
+                        f.delete();
+                    }
+                }
+            }
+        }
+    }
+
+    protected Iterator<BlobStoreFile> listBlobStoreFiles(Path path) throws IOException {
+        ArrayList<BlobStoreFile> ret = new ArrayList<BlobStoreFile>();
+        FileStatus[] files = _fs.listStatus(new Path[]{path});
+        if (files != null) {
+            for (FileStatus sub : files) {
+                try {
+                    ret.add(new HdfsBlobStoreFile(sub.getPath().getParent(), sub.getPath().getName(),
+                            _hadoopConf));
+                } catch (IllegalArgumentException e) {
+                    //Ignored the file did not match
+                    LOG.warn("Found an unexpected file in {} {}", path, sub.getPath().getName());
+                }
+            }
+        }
+        return ret.iterator();
+    }
+
+    protected Iterator<String> listKeys(Path path) throws IOException {
+        ArrayList<String> ret = new ArrayList<String>();
+        FileStatus[] files = _fs.listStatus(new Path[]{path});
+        if (files != null) {
+            for (FileStatus sub : files) {
+                try {
+                    ret.add(sub.getPath().getName().toString());
+                } catch (IllegalArgumentException e) {
+                    //Ignored the file did not match
+                    LOG.debug("Found an unexpected file in {} {}", path, sub.getPath().getName());
+                }
+            }
+        }
+        return ret.iterator();
+    }
+
+    protected int getBlobReplication(String key) throws IOException {
+        Path path = getKeyDir(key);
+        Path dest = new Path(path, BLOBSTORE_DATA);
+        return _fs.getFileStatus(dest).getReplication();
+    }
+
+    protected int updateBlobReplication(String key, int replication) throws IOException {
+        Path path = getKeyDir(key);
+        Path dest = new Path(path, BLOBSTORE_DATA);
+        _fs.setReplication(dest, (short) replication);
+        return _fs.getFileStatus(dest).getReplication();
+    }
+
+    protected void delete(Path path) throws IOException {
+        _fs.delete(path, true);
+    }
+
+    public void shutdown() {
+        if (timer != null) {
+            timer.cancel();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
new file mode 100644
index 0000000..2cb4dc3
--- /dev/null
+++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import org.apache.storm.blobstore.AtomicOutputStream;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.blobstore.InputStreamWithMeta;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyAlreadyExistsException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.utils.NimbusClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ *  Client to access the HDFS blobStore. At this point, this is meant to only be used by the
+ *  supervisor.  Don't trust who the client says they are so pass null for all Subjects.
+ *
+ *  The HdfsBlobStore implementation takes care of the null Subjects. It assigns Subjects
+ *  based on what hadoop says who the users are. These users must be configured accordingly
+ *  in the SUPERVISOR_ADMINS for ACL validation and for the supervisors to download the blobs.
+ *  This API is only used by the supervisor in order to talk directly to HDFS.
+ */
+public class HdfsClientBlobStore extends ClientBlobStore {
+    private static final Logger LOG = LoggerFactory.getLogger(HdfsClientBlobStore.class);
+    private HdfsBlobStore _blobStore;
+    private Map _conf;
+    private NimbusClient client;
+
+    @Override
+    public void prepare(Map<String, Object> conf) {
+        this._conf = conf;
+        _blobStore = new HdfsBlobStore();
+        _blobStore.prepare(conf, null, null);
+    }
+
+    @Override
+    public AtomicOutputStream createBlobToExtend(String key, SettableBlobMeta meta)
+            throws AuthorizationException, KeyAlreadyExistsException {
+        return _blobStore.createBlob(key, meta, null);
+    }
+
+    @Override
+    public AtomicOutputStream updateBlob(String key)
+            throws AuthorizationException, KeyNotFoundException {
+        return _blobStore.updateBlob(key, null);
+    }
+
+    @Override
+    public ReadableBlobMeta getBlobMeta(String key)
+            throws AuthorizationException, KeyNotFoundException {
+        return _blobStore.getBlobMeta(key, null);
+    }
+
+    @Override
+    public void setBlobMetaToExtend(String key, SettableBlobMeta meta)
+            throws AuthorizationException, KeyNotFoundException {
+        _blobStore.setBlobMeta(key, meta, null);
+    }
+
+    @Override
+    public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException {
+        _blobStore.deleteBlob(key, null);
+    }
+
+    @Override
+    public InputStreamWithMeta getBlob(String key)
+            throws AuthorizationException, KeyNotFoundException {
+        return _blobStore.getBlob(key, null);
+    }
+
+    @Override
+    public Iterator<String> listKeys() {
+        return _blobStore.listKeys();
+    }
+
+    @Override
+    public int getBlobReplication(String key) throws AuthorizationException, KeyNotFoundException {
+        return _blobStore.getBlobReplication(key, null);
+    }
+
+    @Override
+    public int updateBlobReplication(String key, int replication) throws AuthorizationException, KeyNotFoundException {
+        return _blobStore.updateBlobReplication(key, replication, null);
+    }
+
+    @Override
+    public boolean setClient(Map<String, Object> conf, NimbusClient client) {
+        this.client = client;
+        return true;
+    }
+
+    @Override
+    public void createStateInZookeeper(String key) {
+        // Do nothing
+    }
+
+    @Override
+    public void shutdown() {
+        close();
+    }
+
+    @Override
+    public void close() {
+        if(client != null) {
+            client.close();
+            client = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
new file mode 100644
index 0000000..a125793
--- /dev/null
+++ b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
@@ -0,0 +1,540 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.AtomicOutputStream;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.BlobStoreAclHandler;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AccessControlType;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
+import org.apache.storm.security.auth.FixedGroupsMapping;
+import org.apache.storm.security.auth.NimbusPrincipal;
+import org.apache.storm.security.auth.SingleUserPrincipal;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+public class BlobStoreTest {
+
+    @ClassRule
+    public static final MiniDFSClusterRule DFS_CLUSTER_RULE = new MiniDFSClusterRule();
+
+    private static final Logger LOG = LoggerFactory.getLogger(BlobStoreTest.class);
+    URI base;
+    File baseFile;
+    private static final Map<String, Object> CONF = new HashMap<>();
+    public static final int READ = 0x01;
+    public static final int WRITE = 0x02;
+    public static final int ADMIN = 0x04;
+
+    @Before
+    public void init() {
+        initializeConfigs();
+        baseFile = new File("/tmp/blob-store-test-" + UUID.randomUUID());
+        base = baseFile.toURI();
+    }
+
+    @After
+    public void cleanup()
+        throws IOException {
+        FileUtils.deleteDirectory(baseFile);
+    }
+
+    // Method which initializes nimbus admin
+    public static void initializeConfigs() {
+        CONF.put(Config.NIMBUS_ADMINS, "admin");
+        CONF.put(Config.NIMBUS_ADMINS_GROUPS, "adminsGroup");
+
+        // Construct a groups mapping for the FixedGroupsMapping class
+        Map<String, Set<String>> groupsMapping = new HashMap<>();
+        Set<String> groupSet = new HashSet<>();
+        groupSet.add("adminsGroup");
+        groupsMapping.put("adminsGroupsUser", groupSet);
+
+        // Now create a params map to put it in to our conf
+        Map<String, Object> paramMap = new HashMap<>();
+        paramMap.put(FixedGroupsMapping.STORM_FIXED_GROUP_MAPPING, groupsMapping);
+        CONF.put(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN, "org.apache.storm.security.auth.FixedGroupsMapping");
+        CONF.put(Config.STORM_GROUP_MAPPING_SERVICE_PARAMS, paramMap);
+        CONF.put(Config.NIMBUS_SUPERVISOR_USERS, "supervisor");
+    }
+
+    //Gets Nimbus Subject with NimbusPrincipal set on it
+    public static Subject getNimbusSubject() {
+        Subject nimbus = new Subject();
+        nimbus.getPrincipals().add(new NimbusPrincipal());
+        return nimbus;
+    }
+
+    // Overloading the assertStoreHasExactly method accomodate Subject in order to check for authorization
+    public static void assertStoreHasExactly(BlobStore store, Subject who, String... keys)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        Set<String> expected = new HashSet<>(Arrays.asList(keys));
+        Set<String> found = new HashSet<>();
+        Iterator<String> c = store.listKeys();
+        while (c.hasNext()) {
+            String keyName = c.next();
+            found.add(keyName);
+        }
+        Set<String> extra = new HashSet<>(found);
+        extra.removeAll(expected);
+        assertTrue("Found extra keys in the blob store " + extra, extra.isEmpty());
+        Set<String> missing = new HashSet<>(expected);
+        missing.removeAll(found);
+        assertTrue("Found keys missing from the blob store " + missing, missing.isEmpty());
+    }
+
+    public static void assertStoreHasExactly(BlobStore store, String... keys)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        assertStoreHasExactly(store, null, keys);
+    }
+
+    // Overloading the readInt method accomodate Subject in order to check for authorization (security turned on)
+    public static int readInt(BlobStore store, Subject who, String key) throws IOException, KeyNotFoundException, AuthorizationException {
+        try (InputStream in = store.getBlob(key, who)) {
+            return in.read();
+        }
+    }
+
+    public static int readInt(BlobStore store, String key)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        return readInt(store, null, key);
+    }
+
+    public static void readAssertEquals(BlobStore store, String key, int value)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        assertEquals(value, readInt(store, key));
+    }
+
+    // Checks for assertion when we turn on security
+    public void readAssertEqualsWithAuth(BlobStore store, Subject who, String key, int value)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        assertEquals(value, readInt(store, who, key));
+    }
+
+    private AutoCloseableBlobStoreContainer initHdfs(String dirName)
+        throws Exception {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.BLOBSTORE_DIR, dirName);
+        conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal");
+        conf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 3);
+        HdfsBlobStore store = new HdfsBlobStore();
+        store.prepareInternal(conf, null, DFS_CLUSTER_RULE.getDfscluster().getConfiguration(0));
+        return new AutoCloseableBlobStoreContainer(store);
+    }
+
+    private static class AutoCloseableBlobStoreContainer implements AutoCloseable {
+
+        private final HdfsBlobStore blobStore;
+
+        public AutoCloseableBlobStoreContainer(HdfsBlobStore blobStore) {
+            this.blobStore = blobStore;
+        }
+
+        @Override
+        public void close() throws Exception {
+            this.blobStore.shutdown();
+        }
+
+    }
+
+    @Test
+    public void testHdfsReplication()
+        throws Exception {
+        try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstoreReplication")) {
+            testReplication("/storm/blobstoreReplication/test", container.blobStore);
+        }
+    }
+
+    @Test
+    public void testBasicHdfs()
+        throws Exception {
+        try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore1")) {
+            testBasic(container.blobStore);
+        }
+    }
+
+    @Test
+    public void testMultipleHdfs()
+        throws Exception {
+        // use different blobstore dir so it doesn't conflict with other test
+        try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore2")) {
+            testMultiple(container.blobStore);
+        }
+    }
+
+    @Test
+    public void testHdfsWithAuth()
+        throws Exception {
+        // use different blobstore dir so it doesn't conflict with other tests
+        try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore3")) {
+            testWithAuthentication(container.blobStore);
+        }
+    }
+
+    // Test for replication.
+    public void testReplication(String path, BlobStore store)
+        throws Exception {
+        SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+        metadata.set_replication_factor(4);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, null)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        assertEquals("Blobstore replication not matching", store.getBlobReplication("test", null), 4);
+        store.deleteBlob("test", null);
+
+        //Test for replication with NIMBUS as user
+        Subject admin = getSubject("admin");
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        metadata.set_replication_factor(4);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        assertEquals("Blobstore replication not matching", store.getBlobReplication("test", admin), 4);
+        store.updateBlobReplication("test", 5, admin);
+        assertEquals("Blobstore replication not matching", store.getBlobReplication("test", admin), 5);
+        store.deleteBlob("test", admin);
+
+        //Test for replication using SUPERVISOR access
+        Subject supervisor = getSubject("supervisor");
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        metadata.set_replication_factor(4);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, supervisor)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        assertEquals("Blobstore replication not matching", store.getBlobReplication("test", supervisor), 4);
+        store.updateBlobReplication("test", 5, supervisor);
+        assertEquals("Blobstore replication not matching", store.getBlobReplication("test", supervisor), 5);
+        store.deleteBlob("test", supervisor);
+
+        Subject adminsGroupsUser = getSubject("adminsGroupsUser");
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        metadata.set_replication_factor(4);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, adminsGroupsUser)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        assertEquals("Blobstore replication not matching", store.getBlobReplication("test", adminsGroupsUser), 4);
+        store.updateBlobReplication("test", 5, adminsGroupsUser);
+        assertEquals("Blobstore replication not matching", store.getBlobReplication("test", adminsGroupsUser), 5);
+        store.deleteBlob("test", adminsGroupsUser);
+
+        //Test for a user having read or write or admin access to read replication for a blob
+        String createSubject = "createSubject";
+        String writeSubject = "writeSubject";
+        String adminSubject = "adminSubject";
+        Subject who = getSubject(createSubject);
+        AccessControl writeAccess = new AccessControl(AccessControlType.USER, READ);
+        AccessControl adminAccess = new AccessControl(AccessControlType.USER, ADMIN);
+        writeAccess.set_name(writeSubject);
+        adminAccess.set_name(adminSubject);
+        List<AccessControl> acl = Arrays.asList(writeAccess, adminAccess);
+        metadata = new SettableBlobMeta(acl);
+        metadata.set_replication_factor(4);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        who = getSubject(writeSubject);
+        assertEquals("Blobstore replication not matching", store.getBlobReplication("test", who), 4);
+
+        //Test for a user having WRITE or ADMIN privileges to change replication of a blob
+        who = getSubject(adminSubject);
+        store.updateBlobReplication("test", 5, who);
+        assertEquals("Blobstore replication not matching", store.getBlobReplication("test", who), 5);
+        store.deleteBlob("test", getSubject(createSubject));
+    }
+
+    public Subject getSubject(String name) {
+        Subject subject = new Subject();
+        SingleUserPrincipal user = new SingleUserPrincipal(name);
+        subject.getPrincipals().add(user);
+        return subject;
+    }
+
+    // Check for Blobstore with authentication
+    public void testWithAuthentication(BlobStore store)
+        throws Exception {
+        //Test for Nimbus Admin
+        Subject admin = getSubject("admin");
+        assertStoreHasExactly(store);
+        SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) {
+            assertStoreHasExactly(store, "test");
+            out.write(1);
+        }
+        store.deleteBlob("test", admin);
+
+        //Test for Nimbus Groups Admin
+        Subject adminsGroupsUser = getSubject("adminsGroupsUser");
+        assertStoreHasExactly(store);
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, adminsGroupsUser)) {
+            assertStoreHasExactly(store, "test");
+            out.write(1);
+        }
+        store.deleteBlob("test", adminsGroupsUser);
+
+        //Test for Supervisor Admin
+        Subject supervisor = getSubject("supervisor");
+        assertStoreHasExactly(store);
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, supervisor)) {
+            assertStoreHasExactly(store, "test");
+            out.write(1);
+        }
+        store.deleteBlob("test", supervisor);
+
+        //Test for Nimbus itself as a user
+        Subject nimbus = getNimbusSubject();
+        assertStoreHasExactly(store);
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, nimbus)) {
+            assertStoreHasExactly(store, "test");
+            out.write(1);
+        }
+        store.deleteBlob("test", nimbus);
+
+        // Test with a dummy test_subject for cases where subject !=null (security turned on)
+        Subject who = getSubject("test_subject");
+        assertStoreHasExactly(store);
+
+        // Tests for case when subject != null (security turned on) and
+        // acls for the blob are set to WORLD_EVERYTHING
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        // Testing whether acls are set to WORLD_EVERYTHING
+        assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEqualsWithAuth(store, who, "test", 1);
+
+        LOG.info("Deleting test");
+        store.deleteBlob("test", who);
+        assertStoreHasExactly(store);
+
+        // Tests for case when subject != null (security turned on) and
+        // acls are not set for the blob (DEFAULT)
+        LOG.info("Creating test again");
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test");
+        // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because
+        // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have
+        // complete access to the blob
+        assertTrue("ACL does not contain WORLD_EVERYTHING", !metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEqualsWithAuth(store, who, "test", 2);
+
+        LOG.info("Updating test");
+        try (AtomicOutputStream out = store.updateBlob("test", who)) {
+            out.write(3);
+        }
+        assertStoreHasExactly(store, "test");
+        readAssertEqualsWithAuth(store, who, "test", 3);
+
+        LOG.info("Updating test again");
+        try (AtomicOutputStream out = store.updateBlob("test", who)) {
+            out.write(4);
+        }
+        LOG.info("SLEEPING");
+        Thread.sleep(2);
+        assertStoreHasExactly(store, "test");
+        readAssertEqualsWithAuth(store, who, "test", 3);
+
+        //Test for subject with no principals and acls set to WORLD_EVERYTHING
+        who = new Subject();
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+        LOG.info("Creating test");
+        try (AtomicOutputStream out = store.createBlob("test-empty-subject-WE", metadata, who)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test-empty-subject-WE", "test");
+        // Testing whether acls are set to WORLD_EVERYTHING
+        assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2);
+
+        //Test for subject with no principals and acls set to DEFAULT
+        who = new Subject();
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        LOG.info("Creating other");
+        try (AtomicOutputStream out = store.createBlob("test-empty-subject-DEF", metadata, who)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test-empty-subject-DEF", "test", "test-empty-subject-WE");
+        // Testing whether acls are set to WORLD_EVERYTHING
+        assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2);
+
+        if (store instanceof HdfsBlobStore) {
+            ((HdfsBlobStore) store).fullCleanup(1);
+        } else {
+            fail("Error the blobstore is of unknowntype");
+        }
+    }
+
+    public void testBasic(BlobStore store)
+        throws Exception {
+        assertStoreHasExactly(store);
+        LOG.info("Creating test");
+        // Tests for case when subject == null (security turned off) and
+        // acls for the blob are set to WORLD_EVERYTHING
+        SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, null)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        // Testing whether acls are set to WORLD_EVERYTHING
+        assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEquals(store, "test", 1);
+
+        LOG.info("Deleting test");
+        store.deleteBlob("test", null);
+        assertStoreHasExactly(store);
+
+        // The following tests are run for both hdfs and local store to test the
+        // update blob interface
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+        LOG.info("Creating test again");
+        try (AtomicOutputStream out = store.createBlob("test", metadata, null)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test");
+        readAssertEquals(store, "test", 2);
+        LOG.info("Updating test");
+        try (AtomicOutputStream out = store.updateBlob("test", null)) {
+            out.write(3);
+        }
+        assertStoreHasExactly(store, "test");
+        readAssertEquals(store, "test", 3);
+
+        LOG.info("Updating test again");
+        try (AtomicOutputStream out = store.updateBlob("test", null)) {
+            out.write(4);
+        }
+        LOG.info("SLEEPING");
+        Thread.sleep(2);
+
+        if (store instanceof HdfsBlobStore) {
+            ((HdfsBlobStore) store).fullCleanup(1);
+        } else {
+            fail("Error the blobstore is of unknowntype");
+        }
+    }
+
+    public void testMultiple(BlobStore store)
+        throws Exception {
+        assertStoreHasExactly(store);
+        LOG.info("Creating test");
+        try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), null)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        readAssertEquals(store, "test", 1);
+
+        LOG.info("Creating other");
+        try (AtomicOutputStream out = store.createBlob("other", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
+            null)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test", "other");
+        readAssertEquals(store, "test", 1);
+        readAssertEquals(store, "other", 2);
+
+        LOG.info("Updating other");
+        try (AtomicOutputStream out = store.updateBlob("other", null)) {
+            out.write(5);
+        }
+        assertStoreHasExactly(store, "test", "other");
+        readAssertEquals(store, "test", 1);
+        readAssertEquals(store, "other", 5);
+
+        LOG.info("Deleting test");
+        store.deleteBlob("test", null);
+        assertStoreHasExactly(store, "other");
+        readAssertEquals(store, "other", 5);
+
+        LOG.info("Creating test again");
+        try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
+            null)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test", "other");
+        readAssertEquals(store, "test", 2);
+        readAssertEquals(store, "other", 5);
+
+        LOG.info("Updating test");
+        try (AtomicOutputStream out = store.updateBlob("test", null)) {
+            out.write(3);
+        }
+        assertStoreHasExactly(store, "test", "other");
+        readAssertEquals(store, "test", 3);
+        readAssertEquals(store, "other", 5);
+
+        LOG.info("Deleting other");
+        store.deleteBlob("other", null);
+        assertStoreHasExactly(store, "test");
+        readAssertEquals(store, "test", 3);
+
+        LOG.info("Updating test again");
+        try (AtomicOutputStream out = store.updateBlob("test", null)) {
+            out.write(4);
+        }
+        LOG.info("SLEEPING");
+        Thread.sleep(2);
+
+        if (store instanceof HdfsBlobStore) {
+            ((HdfsBlobStore) store).fullCleanup(1);
+        } else {
+            fail("Error the blobstore is of unknowntype");
+        }
+        assertStoreHasExactly(store, "test");
+        readAssertEquals(store, "test", 3);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
new file mode 100644
index 0000000..752e563
--- /dev/null
+++ b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
@@ -0,0 +1,224 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.blobstore;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.storm.blobstore.BlobStoreFile;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+public class HdfsBlobStoreImplTest {
+
+    @ClassRule
+    public static final MiniDFSClusterRule DFS_CLUSTER_RULE = new MiniDFSClusterRule();
+
+    private static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreImplTest.class);
+
+    // key dir needs to be number 0 to number of buckets, choose one so we know where to look
+    private static String KEYDIR = "0";
+    private Path blobDir = new Path("/storm/blobstore1");
+    private Path fullKeyDir = new Path(blobDir, KEYDIR);
+    private String BLOBSTORE_DATA = "data";
+
+    public class TestHdfsBlobStoreImpl extends HdfsBlobStoreImpl implements AutoCloseable {
+
+        public TestHdfsBlobStoreImpl(Path path, Map<String, Object> conf) throws IOException {
+            super(path, conf);
+        }
+
+        public TestHdfsBlobStoreImpl(Path path, Map<String, Object> conf,
+            Configuration hconf) throws IOException {
+            super(path, conf, hconf);
+        }
+
+        protected Path getKeyDir(String key) {
+            return new Path(new Path(blobDir, KEYDIR), key);
+        }
+
+        @Override
+        public void close() throws Exception {
+            this.shutdown();
+        }
+    }
+
+    // Be careful about adding additional tests as the dfscluster will be shared
+    @Test
+    public void testMultiple() throws Exception {
+        String testString = "testingblob";
+        String validKey = "validkeyBasic";
+
+        //Will be closed automatically when shutting down the DFS cluster
+        FileSystem fs = DFS_CLUSTER_RULE.getDfscluster().getFileSystem();
+        Map<String, Object> conf = new HashMap<>();
+
+        try (TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, DFS_CLUSTER_RULE.getHadoopConf())) {
+            // should have created blobDir
+            assertTrue("BlobStore dir wasn't created", fs.exists(blobDir));
+            assertEquals("BlobStore dir was created with wrong permissions",
+                HdfsBlobStoreImpl.BLOBSTORE_DIR_PERMISSION, fs.getFileStatus(blobDir).getPermission());
+
+            // test exist with non-existent key
+            assertFalse("file exists but shouldn't", hbs.exists("bogus"));
+
+            // test write
+            BlobStoreFile pfile = hbs.write(validKey, false);
+            // Adding metadata to avoid null pointer exception
+            SettableBlobMeta meta = new SettableBlobMeta();
+            meta.set_replication_factor(1);
+            pfile.setMetadata(meta);
+            try (OutputStream ios = pfile.getOutputStream()) {
+                ios.write(testString.getBytes(StandardCharsets.UTF_8));
+            }
+
+            // test commit creates properly
+            assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir));
+            pfile.commit();
+            Path dataFile = new Path(new Path(fullKeyDir, validKey), BLOBSTORE_DATA);
+            assertTrue("blob data not committed", fs.exists(dataFile));
+            assertEquals("BlobStore dir was created with wrong permissions",
+                HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, fs.getFileStatus(dataFile).getPermission());
+            assertTrue("key doesn't exist but should", hbs.exists(validKey));
+
+            // test read
+            BlobStoreFile readpFile = hbs.read(validKey);
+            try (InputStream inStream = readpFile.getInputStream()) {
+                String readString = IOUtils.toString(inStream, StandardCharsets.UTF_8);
+                assertEquals("string read from blob doesn't match", testString, readString);
+            }
+
+            // test listkeys
+            Iterator<String> keys = hbs.listKeys();
+            assertTrue("blob has one key", keys.hasNext());
+            assertEquals("one key in blobstore", validKey, keys.next());
+
+            // delete
+            hbs.deleteKey(validKey);
+            assertFalse("key not deleted", fs.exists(dataFile));
+            assertFalse("key not deleted", hbs.exists(validKey));
+
+            // Now do multiple
+            String testString2 = "testingblob2";
+            String validKey2 = "validkey2";
+
+            // test write
+            pfile = hbs.write(validKey, false);
+            pfile.setMetadata(meta);
+            try (OutputStream ios = pfile.getOutputStream()) {
+                ios.write(testString.getBytes(StandardCharsets.UTF_8));
+            }
+
+            // test commit creates properly
+            assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir));
+            pfile.commit();
+            assertTrue("blob data not committed", fs.exists(dataFile));
+            assertEquals("BlobStore dir was created with wrong permissions",
+                HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, fs.getFileStatus(dataFile).getPermission());
+            assertTrue("key doesn't exist but should", hbs.exists(validKey));
+
+            // test write again
+            pfile = hbs.write(validKey2, false);
+            pfile.setMetadata(meta);
+            try (OutputStream ios2 = pfile.getOutputStream()) {
+                ios2.write(testString2.getBytes(StandardCharsets.UTF_8));
+            }
+
+            // test commit second creates properly
+            pfile.commit();
+            Path dataFile2 = new Path(new Path(fullKeyDir, validKey2), BLOBSTORE_DATA);
+            assertTrue("blob data not committed", fs.exists(dataFile2));
+            assertEquals("BlobStore dir was created with wrong permissions",
+                HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, fs.getFileStatus(dataFile2).getPermission());
+            assertTrue("key doesn't exist but should", hbs.exists(validKey2));
+
+            // test listkeys
+            keys = hbs.listKeys();
+            int total = 0;
+            boolean key1Found = false;
+            boolean key2Found = false;
+            while (keys.hasNext()) {
+                total++;
+                String key = keys.next();
+                if (key.equals(validKey)) {
+                    key1Found = true;
+                } else if (key.equals(validKey2)) {
+                    key2Found = true;
+                } else {
+                    fail("Found key that wasn't expected: " + key);
+                }
+            }
+            assertEquals("number of keys is wrong", 2, total);
+            assertTrue("blobstore missing key1", key1Found);
+            assertTrue("blobstore missing key2", key2Found);
+
+            // test read
+            readpFile = hbs.read(validKey);
+            try (InputStream inStream = readpFile.getInputStream()) {
+                String readString = IOUtils.toString(inStream, StandardCharsets.UTF_8);
+                assertEquals("string read from blob doesn't match", testString, readString);
+            }
+
+            // test read
+            readpFile = hbs.read(validKey2);
+            try (InputStream inStream = readpFile.getInputStream()) {
+                String readString = IOUtils.toString(inStream, StandardCharsets.UTF_8);
+                assertEquals("string read from blob doesn't match", testString2, readString);
+            }
+
+            hbs.deleteKey(validKey);
+            assertFalse("key not deleted", hbs.exists(validKey));
+            hbs.deleteKey(validKey2);
+            assertFalse("key not deleted", hbs.exists(validKey2));
+        }
+    }
+
+    @Test
+    public void testGetFileLength() throws Exception {
+        Map<String, Object> conf = new HashMap<>();
+        String validKey = "validkeyBasic";
+        String testString = "testingblob";
+        try (TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, DFS_CLUSTER_RULE.getHadoopConf())) {
+            BlobStoreFile pfile = hbs.write(validKey, false);
+            // Adding metadata to avoid null pointer exception
+            SettableBlobMeta meta = new SettableBlobMeta();
+            meta.set_replication_factor(1);
+            pfile.setMetadata(meta);
+            try (OutputStream ios = pfile.getOutputStream()) {
+                ios.write(testString.getBytes(StandardCharsets.UTF_8));
+            }
+            assertEquals(testString.getBytes(StandardCharsets.UTF_8).length, pfile.getFileLength());
+        }
+    }
+}


Mime
View raw message