storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: [STORM-2916] separate hdfs-blobstore from storm-hdfs
Date Thu, 01 Feb 2018 15:24:16 GMT
Repository: storm
Updated Branches:
  refs/heads/master 18045a3fc -> d68416b24


http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
deleted file mode 100644
index 7130153..0000000
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
+++ /dev/null
@@ -1,395 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.hdfs.blobstore;
-
-import java.io.ByteArrayOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import javax.security.auth.Subject;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.storm.Config;
-import org.apache.storm.blobstore.AtomicOutputStream;
-import org.apache.storm.blobstore.BlobStore;
-import org.apache.storm.blobstore.BlobStoreAclHandler;
-import org.apache.storm.blobstore.BlobStoreFile;
-import org.apache.storm.blobstore.InputStreamWithMeta;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyAlreadyExistsException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.generated.ReadableBlobMeta;
-import org.apache.storm.generated.SettableBlobMeta;
-import org.apache.storm.nimbus.NimbusInfo;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.storm.blobstore.BlobStoreAclHandler.ADMIN;
-import static org.apache.storm.blobstore.BlobStoreAclHandler.READ;
-import static org.apache.storm.blobstore.BlobStoreAclHandler.WRITE;
-
-/**
- * Provides a HDFS file system backed blob store implementation.
- * Note that this provides an api for having HDFS be the backing store for the blobstore,
- * it is not a service/daemon.
- *
- * We currently have NIMBUS_ADMINS and SUPERVISOR_ADMINS configuration. NIMBUS_ADMINS are given READ, WRITE and ADMIN
- * access whereas the SUPERVISOR_ADMINS are given READ access in order to read and download the blobs form the nimbus.
- *
- * The ACLs for the blob store are validated against whether the subject is a NIMBUS_ADMIN, SUPERVISOR_ADMIN or USER
- * who has read, write or admin privileges in order to perform respective operations on the blob.
- *
- * For hdfs blob store
- * 1. The USER interacts with nimbus to upload and access blobs through NimbusBlobStore Client API. Here, unlike
- * local blob store which stores the blobs locally, the nimbus talks to HDFS to upload the blobs.
- * 2. The USER sets the ACLs, and the blob access is validated against these ACLs.
- * 3. The SUPERVISOR interacts with nimbus through HdfsClientBlobStore to download the blobs. Here, unlike local
- * blob store the supervisor interacts with HDFS directly to download the blobs. The call to HdfsBlobStore is made as a "null"
- * subject. The blobstore gets the hadoop user and validates permissions for the supervisor.
- */
-public class HdfsBlobStore extends BlobStore {
-    public static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStore.class);
-    private static final String DATA_PREFIX = "data_";
-    private static final String META_PREFIX = "meta_";
-    private static final HashMap<String, Subject> alreadyLoggedInUsers = new HashMap<>();
-
-    private BlobStoreAclHandler aclHandler;
-    private HdfsBlobStoreImpl hbs;
-    private Subject localSubject;
-    private Map<String, Object> conf;
-
-    /**
-     * Get the subject from Hadoop so we can use it to validate the acls. There is no direct
-     * interface from UserGroupInformation to get the subject, so do a doAs and get the context.
-     * We could probably run everything in the doAs but for now just grab the subject.
-     */
-    private Subject getHadoopUser() {
-        Subject subj;
-        try {
-            subj = UserGroupInformation.getCurrentUser().doAs(
-                    new PrivilegedAction<Subject>() {
-                        @Override
-                        public Subject run() {
-                            return Subject.getSubject(AccessController.getContext());
-                        }
-                    });
-        } catch (IOException e) {
-            throw new RuntimeException("Error creating subject and logging user in!", e);
-        }
-        return subj;
-    }
-
-    /**
-     * If who is null then we want to use the user hadoop says we are.
-     * Required for the supervisor to call these routines as its not
-     * logged in as anyone.
-     */
-    private Subject checkAndGetSubject(Subject who) {
-        if (who == null) {
-            return localSubject;
-        }
-        return who;
-    }
-
-    @Override
-    public void prepare(Map<String, Object> conf, String overrideBase, NimbusInfo nimbusInfo) {
-        this.conf = conf;
-        prepareInternal(conf, overrideBase, null);
-    }
-
-    /**
-     * Allow a Hadoop Configuration to be passed for testing. If it's null then the hadoop configs
-     * must be in your classpath.
-     */
-    protected void prepareInternal(Map<String, Object> conf, String overrideBase, Configuration hadoopConf) {
-        this.conf = conf;
-        if (overrideBase == null) {
-            overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
-        }
-        if (overrideBase == null) {
-            throw new RuntimeException("You must specify a blobstore directory for HDFS to use!");
-        }
-        LOG.debug("directory is: {}", overrideBase);
-        try {
-            // if a HDFS keytab/principal have been supplied login, otherwise assume they are
-            // logged in already or running insecure HDFS.
-            String principal = (String) conf.get(Config.BLOBSTORE_HDFS_PRINCIPAL);
-            String keyTab = (String) conf.get(Config.BLOBSTORE_HDFS_KEYTAB);
-
-            if (principal != null && keyTab != null) {
-                String combinedKey = principal + " from " + keyTab;
-                synchronized (alreadyLoggedInUsers) {
-                    localSubject = alreadyLoggedInUsers.get(combinedKey);
-                    if (localSubject == null) {
-                        UserGroupInformation.loginUserFromKeytab(principal, keyTab);
-                        localSubject = getHadoopUser();
-                        alreadyLoggedInUsers.put(combinedKey, localSubject);
-                    }
-                }
-            } else {
-                if (principal == null && keyTab != null) {
-                    throw new RuntimeException("You must specify an HDFS principal to go with the keytab!");
-
-                } else {
-                    if (principal != null && keyTab == null) {
-                        throw new RuntimeException("You must specify HDFS keytab go with the principal!");
-                    }
-                }
-                localSubject = getHadoopUser();
-            }
-        } catch (IOException e) {
-            throw new RuntimeException("Error logging in from keytab!", e);
-        }
-        aclHandler = new BlobStoreAclHandler(conf);
-        Path baseDir = new Path(overrideBase, BASE_BLOBS_DIR_NAME);
-        try {
-            if (hadoopConf != null) {
-                hbs = new HdfsBlobStoreImpl(baseDir, conf, hadoopConf);
-            } else {
-                hbs = new HdfsBlobStoreImpl(baseDir, conf);
-            }
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who)
-            throws AuthorizationException, KeyAlreadyExistsException {
-        if (meta.get_replication_factor() <= 0) {
-            meta.set_replication_factor((int)conf.get(Config.STORM_BLOBSTORE_REPLICATION_FACTOR));
-        }
-        who = checkAndGetSubject(who);
-        validateKey(key);
-        aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | ADMIN);
-        BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
-        aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, key);
-        if (hbs.exists(DATA_PREFIX + key)) {
-            throw new KeyAlreadyExistsException(key);
-        }
-        BlobStoreFileOutputStream mOut = null;
-        try {
-            BlobStoreFile metaFile = hbs.write(META_PREFIX + key, true);
-            metaFile.setMetadata(meta);
-            mOut = new BlobStoreFileOutputStream(metaFile);
-            mOut.write(Utils.thriftSerialize(meta));
-            mOut.close();
-            mOut = null;
-            BlobStoreFile dataFile = hbs.write(DATA_PREFIX + key, true);
-            dataFile.setMetadata(meta);
-            return new BlobStoreFileOutputStream(dataFile);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        } finally {
-            if (mOut != null) {
-                try {
-                    mOut.cancel();
-                } catch (IOException e) {
-                    //Ignored
-                }
-            }
-        }
-    }
-
-    @Override
-    public AtomicOutputStream updateBlob(String key, Subject who)
-            throws AuthorizationException, KeyNotFoundException {
-        who = checkAndGetSubject(who);
-        SettableBlobMeta meta = getStoredBlobMeta(key);
-        validateKey(key);
-        aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
-        try {
-            BlobStoreFile dataFile = hbs.write(DATA_PREFIX + key, false);
-            dataFile.setMetadata(meta);
-            return new BlobStoreFileOutputStream(dataFile);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private SettableBlobMeta getStoredBlobMeta(String key) throws KeyNotFoundException {
-        InputStream in = null;
-        try {
-            BlobStoreFile pf = hbs.read(META_PREFIX + key);
-            try {
-                in = pf.getInputStream();
-            } catch (FileNotFoundException fnf) {
-                throw new KeyNotFoundException(key);
-            }
-            ByteArrayOutputStream out = new ByteArrayOutputStream();
-            byte[] buffer = new byte[2048];
-            int len;
-            while ((len = in.read(buffer)) > 0) {
-                out.write(buffer, 0, len);
-            }
-            in.close();
-            in = null;
-            return Utils.thriftDeserialize(SettableBlobMeta.class, out.toByteArray());
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        } finally {
-            if (in != null) {
-                try {
-                    in.close();
-                } catch (IOException e) {
-                    //Ignored
-                }
-            }
-        }
-    }
-
-    @Override
-    public ReadableBlobMeta getBlobMeta(String key, Subject who)
-            throws AuthorizationException, KeyNotFoundException {
-        who = checkAndGetSubject(who);
-        validateKey(key);
-        SettableBlobMeta meta = getStoredBlobMeta(key);
-        aclHandler.validateUserCanReadMeta(meta.get_acl(), who, key);
-        ReadableBlobMeta rbm = new ReadableBlobMeta();
-        rbm.set_settable(meta);
-        try {
-            BlobStoreFile pf = hbs.read(DATA_PREFIX + key);
-            rbm.set_version(pf.getModTime());
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-        return rbm;
-    }
-
-    @Override
-    public void setBlobMeta(String key, SettableBlobMeta meta, Subject who)
-            throws AuthorizationException, KeyNotFoundException {
-        if (meta.get_replication_factor() <= 0) {
-            meta.set_replication_factor((int)conf.get(Config.STORM_BLOBSTORE_REPLICATION_FACTOR));
-        }
-        who = checkAndGetSubject(who);
-        validateKey(key);
-        aclHandler.normalizeSettableBlobMeta(key,  meta, who, ADMIN);
-        BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
-        SettableBlobMeta orig = getStoredBlobMeta(key);
-        aclHandler.hasPermissions(orig.get_acl(), ADMIN, who, key);
-        BlobStoreFileOutputStream mOut = null;
-        writeMetadata(key, meta);
-    }
-
-    @Override
-    public void deleteBlob(String key, Subject who)
-            throws AuthorizationException, KeyNotFoundException {
-        who = checkAndGetSubject(who);
-        validateKey(key);
-        SettableBlobMeta meta = getStoredBlobMeta(key);
-        aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
-        try {
-            hbs.deleteKey(DATA_PREFIX + key);
-            hbs.deleteKey(META_PREFIX + key);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public InputStreamWithMeta getBlob(String key, Subject who)
-            throws AuthorizationException, KeyNotFoundException {
-        who = checkAndGetSubject(who);
-        validateKey(key);
-        SettableBlobMeta meta = getStoredBlobMeta(key);
-        aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
-        try {
-            return new BlobStoreFileInputStream(hbs.read(DATA_PREFIX + key));
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public Iterator<String> listKeys() {
-        try {
-            return new KeyTranslationIterator(hbs.listKeys(), DATA_PREFIX);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void shutdown() {
-        //Empty
-    }
-
-    @Override
-    public int getBlobReplication(String key, Subject who) throws AuthorizationException, KeyNotFoundException {
-        who = checkAndGetSubject(who);
-        validateKey(key);
-        SettableBlobMeta meta = getStoredBlobMeta(key);
-        aclHandler.hasAnyPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, key);
-        try {
-            return hbs.getBlobReplication(DATA_PREFIX + key);
-        } catch (IOException exp) {
-            throw new RuntimeException(exp);
-        }
-    }
-
-    @Override
-    public int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException {
-        who = checkAndGetSubject(who);
-        validateKey(key);
-        SettableBlobMeta meta = getStoredBlobMeta(key);
-        meta.set_replication_factor(replication);
-        aclHandler.hasAnyPermissions(meta.get_acl(), WRITE | ADMIN, who, key);
-        try {
-            writeMetadata(key, meta);
-            return hbs.updateBlobReplication(DATA_PREFIX + key, replication);
-        } catch (IOException exp) {
-            throw new RuntimeException(exp);
-        }
-    }
-
-    public void writeMetadata(String key, SettableBlobMeta meta)
-            throws AuthorizationException, KeyNotFoundException {
-        BlobStoreFileOutputStream mOut = null;
-        try {
-            BlobStoreFile hdfsFile = hbs.write(META_PREFIX + key, false);
-            hdfsFile.setMetadata(meta);
-            mOut = new BlobStoreFileOutputStream(hdfsFile);
-            mOut.write(Utils.thriftSerialize(meta));
-            mOut.close();
-            mOut = null;
-        } catch (IOException exp) {
-            throw new RuntimeException(exp);
-        } finally {
-            if (mOut != null) {
-                try {
-                    mOut.cancel();
-                } catch (IOException e) {
-                    //Ignored
-                }
-            }
-        }
-    }
-
-    public void fullCleanup(long age) throws IOException {
-        hbs.fullCleanup(age);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java
deleted file mode 100644
index 0192d94..0000000
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreFile.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hdfs.blobstore;
-
-import org.apache.storm.blobstore.BlobStoreFile;
-import org.apache.storm.generated.SettableBlobMeta;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.regex.Matcher;
-
-public class HdfsBlobStoreFile extends BlobStoreFile {
-    public static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreFile.class);
-
-    private final String _key;
-    private final boolean _isTmp;
-    private final Path _path;
-    private Long _modTime = null;
-    private final boolean _mustBeNew;
-    private final Configuration _hadoopConf;
-    private final FileSystem _fs;
-    private SettableBlobMeta meta;
-
-    // files are world-wide readable and owner writable
-    final public static FsPermission BLOBSTORE_FILE_PERMISSION =
-            FsPermission.createImmutable((short) 0644); // rw-r--r--
-
-    public HdfsBlobStoreFile(Path base, String name, Configuration hconf) {
-        if (BLOBSTORE_DATA_FILE.equals(name)) {
-            _isTmp = false;
-        } else {
-            Matcher m = TMP_NAME_PATTERN.matcher(name);
-            if (!m.matches()) {
-                throw new IllegalArgumentException("File name does not match '"+name+"' !~ "+TMP_NAME_PATTERN);
-            }
-            _isTmp = true;
-        }
-        _hadoopConf = hconf;
-        _key = base.getName();
-        _path = new Path(base, name);
-        _mustBeNew = false;
-        try {
-            _fs = _path.getFileSystem(_hadoopConf);
-        } catch (IOException e) {
-            throw new RuntimeException("Error getting filesystem for path: " + _path, e);
-        }
-    }
-
-    public HdfsBlobStoreFile(Path base, boolean isTmp, boolean mustBeNew, Configuration hconf) {
-        _key = base.getName();
-        _hadoopConf = hconf;
-        _isTmp = isTmp;
-        _mustBeNew = mustBeNew;
-        if (_isTmp) {
-            _path = new Path(base, System.currentTimeMillis()+TMP_EXT);
-        } else {
-            _path = new Path(base, BLOBSTORE_DATA_FILE);
-        }
-        try {
-            _fs = _path.getFileSystem(_hadoopConf);
-        } catch (IOException e) {
-            throw new RuntimeException("Error getting filesystem for path: " + _path, e);
-        }
-    }
-
-    @Override
-    public void delete() throws IOException {
-        _fs.delete(_path, true);
-    }
-
-    @Override
-    public boolean isTmp() {
-        return _isTmp;
-    }
-
-    @Override
-    public String getKey() {
-        return _key;
-    }
-
-    @Override
-    public long getModTime() throws IOException {
-        if (_modTime == null) {
-            FileSystem fs = _path.getFileSystem(_hadoopConf);
-            _modTime = fs.getFileStatus(_path).getModificationTime();
-        }
-        return _modTime;
-    }
-
-    private void checkIsNotTmp() {
-        if (!isTmp()) {
-            throw new IllegalStateException("Can only operate on a temporary blobstore file.");
-        }
-    }
-
-    private void checkIsTmp() {
-        if (isTmp()) {
-            throw new IllegalStateException("Cannot operate on a temporary blobstore file.");
-        }
-    }
-
-    @Override
-    public InputStream getInputStream() throws IOException {
-        checkIsTmp();
-        return _fs.open(_path);
-    }
-
-    @Override
-    public OutputStream getOutputStream() throws IOException {
-        checkIsNotTmp();
-        OutputStream out = null;
-        FsPermission fileperms = new FsPermission(BLOBSTORE_FILE_PERMISSION);
-        try {
-            out = _fs.create(_path, (short)this.getMetadata().get_replication_factor());
-            _fs.setPermission(_path, fileperms);
-            _fs.setReplication(_path, (short)this.getMetadata().get_replication_factor());
-        } catch (IOException e) {
-            //Try to create the parent directory, may not work
-            FsPermission dirperms = new FsPermission(HdfsBlobStoreImpl.BLOBSTORE_DIR_PERMISSION);
-            if (!_fs.mkdirs(_path.getParent(), dirperms)) {
-                LOG.warn("error creating parent dir: " + _path.getParent());
-            }
-            out = _fs.create(_path, (short)this.getMetadata().get_replication_factor());
-            _fs.setPermission(_path, dirperms);
-            _fs.setReplication(_path, (short)this.getMetadata().get_replication_factor());
-        }
-        if (out == null) {
-            throw new IOException("Error in creating: " + _path);
-        }
-        return out;
-    }
-
-    @Override
-    public void commit() throws IOException {
-        checkIsNotTmp();
-        // FileContext supports atomic rename, whereas FileSystem doesn't
-        FileContext fc = FileContext.getFileContext(_hadoopConf);
-        Path dest = new Path(_path.getParent(), BLOBSTORE_DATA_FILE);
-        if (_mustBeNew) {
-            fc.rename(_path, dest);
-        } else {
-            fc.rename(_path, dest, Options.Rename.OVERWRITE);
-        }
-        // Note, we could add support for setting the replication factor
-    }
-
-    @Override
-    public void cancel() throws IOException {
-        checkIsNotTmp();
-        delete();
-    }
-
-    @Override
-    public String toString() {
-        return _path+":"+(_isTmp ? "tmp": BlobStoreFile.BLOBSTORE_DATA_FILE)+":"+_key;
-    }
-
-    @Override
-    public long getFileLength() throws IOException {
-        return _fs.getFileStatus(_path).getLen();
-    }
-
-    @Override
-    public SettableBlobMeta getMetadata() {
-        return meta;
-    }
-
-    @Override
-    public void setMetadata(SettableBlobMeta meta) {
-        this.meta = meta;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
deleted file mode 100644
index eb9d0b8..0000000
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hdfs.blobstore;
-
-import org.apache.storm.Config;
-import org.apache.storm.blobstore.BlobStoreFile;
-import org.apache.storm.utils.ObjectReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Timer;
-import java.util.TimerTask;
-
-/**
- * HDFS blob store impl.
- */
-public class HdfsBlobStoreImpl {
-    private static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreImpl.class);
-
-    private static final long FULL_CLEANUP_FREQ = 60 * 60 * 1000l;
-    private static final int BUCKETS = 1024;
-    private static final String BLOBSTORE_DATA = "data";
-    
-    private Timer timer;
-
-    public class KeyInHashDirIterator implements Iterator<String> {
-        private int currentBucket = 0;
-        private Iterator<String> it = null;
-        private String next = null;
-
-        public KeyInHashDirIterator() throws IOException {
-            primeNext();
-        }
-
-        private void primeNext() throws IOException {
-            while (it == null && currentBucket < BUCKETS) {
-                String name = String.valueOf(currentBucket);
-                Path dir = new Path(_fullPath, name);
-                try {
-                    it = listKeys(dir);
-                } catch (FileNotFoundException e) {
-                    it = null;
-                }
-                if (it == null || !it.hasNext()) {
-                    it = null;
-                    currentBucket++;
-                } else {
-                    next = it.next();
-                }
-            }
-        }
-
-        @Override
-        public boolean hasNext() {
-            return next != null;
-        }
-
-        @Override
-        public String next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            String current = next;
-            next = null;
-            if (it != null) {
-                if (!it.hasNext()) {
-                    it = null;
-                    currentBucket++;
-                    try {
-                        primeNext();
-                    } catch (IOException e) {
-                        throw new RuntimeException(e);
-                    }
-                } else {
-                    next = it.next();
-                }
-            }
-            return current;
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException("Delete Not Supported");
-        }
-    }
-
-
-    private Path _fullPath;
-    private FileSystem _fs;
-    private Configuration _hadoopConf;
-
-    // blobstore directory is private!
-    final public static FsPermission BLOBSTORE_DIR_PERMISSION =
-            FsPermission.createImmutable((short) 0700); // rwx--------
-
-    public HdfsBlobStoreImpl(Path path, Map<String, Object> conf) throws IOException {
-        this(path, conf, new Configuration());
-    }
-
-    public HdfsBlobStoreImpl(Path path, Map<String, Object> conf,
-                             Configuration hconf) throws IOException {
-        LOG.info("Blob store based in {}", path);
-        _fullPath = path;
-        _hadoopConf = hconf;
-        _fs = path.getFileSystem(_hadoopConf);
-
-        if (!_fs.exists(_fullPath)) {
-            FsPermission perms = new FsPermission(BLOBSTORE_DIR_PERMISSION);
-            boolean success = _fs.mkdirs(_fullPath, perms);
-            if (!success) {
-                throw new IOException("Error creating blobstore directory: " + _fullPath);
-            }
-        }
-
-        Object shouldCleanup = conf.get(Config.BLOBSTORE_CLEANUP_ENABLE);
-        if (ObjectReader.getBoolean(shouldCleanup, false)) {
-            LOG.debug("Starting hdfs blobstore cleaner");
-            TimerTask cleanup = new TimerTask() {
-                @Override
-                public void run() {
-                    try {
-                        fullCleanup(FULL_CLEANUP_FREQ);
-                    } catch (IOException e) {
-                        LOG.error("Error trying to cleanup", e);
-                    }
-                }
-            };
-            timer = new Timer("HdfsBlobStore cleanup thread", true);
-            timer.scheduleAtFixedRate(cleanup, 0, FULL_CLEANUP_FREQ);
-        }
-    }
-
-    /**
-     * @return all keys that are available for reading.
-     * @throws IOException on any error.
-     */
-    public Iterator<String> listKeys() throws IOException {
-        return new KeyInHashDirIterator();
-    }
-
-    /**
-     * Get an input stream for reading a part.
-     *
-     * @param key the key of the part to read.
-     * @return the where to read the data from.
-     * @throws IOException on any error
-     */
-    public BlobStoreFile read(String key) throws IOException {
-        return new HdfsBlobStoreFile(getKeyDir(key), BLOBSTORE_DATA, _hadoopConf);
-    }
-
-    /**
-     * Get an object tied to writing the data.
-     *
-     * @param key the key of the part to write to.
-     * @param create whether the file needs to be new or not.
-     * @return an object that can be used to both write to, but also commit/cancel the operation.
-     * @throws IOException on any error
-     */
-    public BlobStoreFile write(String key, boolean create) throws IOException {
-        return new HdfsBlobStoreFile(getKeyDir(key), true, create, _hadoopConf);
-    }
-
-    /**
-     * Check if the key exists in the blob store.
-     *
-     * @param key the key to check for
-     * @return true if it exists else false.
-     */
-    public boolean exists(String key) {
-        Path dir = getKeyDir(key);
-        boolean res = false;
-        try {
-            _fs = dir.getFileSystem(_hadoopConf);
-            res = _fs.exists(dir);
-        } catch (IOException e) {
-            LOG.warn("Exception checking for exists on: " + key);
-        }
-        return res;
-    }
-
-    /**
-     * Delete a key from the blob store
-     *
-     * @param key the key to delete
-     * @throws IOException on any error
-     */
-    public void deleteKey(String key) throws IOException {
-        Path keyDir = getKeyDir(key);
-        HdfsBlobStoreFile pf = new HdfsBlobStoreFile(keyDir, BLOBSTORE_DATA,
-                _hadoopConf);
-        pf.delete();
-        delete(keyDir);
-    }
-
-    protected Path getKeyDir(String key) {
-        String hash = String.valueOf(Math.abs((long) key.hashCode()) % BUCKETS);
-        Path hashDir = new Path(_fullPath, hash);
-
-        Path ret = new Path(hashDir, key);
-        LOG.debug("{} Looking for {} in {}", new Object[]{_fullPath, key, hash});
-        return ret;
-    }
-
-    public void fullCleanup(long age) throws IOException {
-        long cleanUpIfBefore = System.currentTimeMillis() - age;
-        Iterator<String> keys = new KeyInHashDirIterator();
-        while (keys.hasNext()) {
-            String key = keys.next();
-            Path keyDir = getKeyDir(key);
-            Iterator<BlobStoreFile> i = listBlobStoreFiles(keyDir);
-            if (!i.hasNext()) {
-                //The dir is empty, so try to delete it, may fail, but that is OK
-                try {
-                    _fs.delete(keyDir, true);
-                } catch (Exception e) {
-                    LOG.warn("Could not delete " + keyDir + " will try again later");
-                }
-            }
-            while (i.hasNext()) {
-                BlobStoreFile f = i.next();
-                if (f.isTmp()) {
-                    if (f.getModTime() <= cleanUpIfBefore) {
-                        f.delete();
-                    }
-                }
-            }
-        }
-    }
-
-    protected Iterator<BlobStoreFile> listBlobStoreFiles(Path path) throws IOException {
-        ArrayList<BlobStoreFile> ret = new ArrayList<BlobStoreFile>();
-        FileStatus[] files = _fs.listStatus(new Path[]{path});
-        if (files != null) {
-            for (FileStatus sub : files) {
-                try {
-                    ret.add(new HdfsBlobStoreFile(sub.getPath().getParent(), sub.getPath().getName(),
-                            _hadoopConf));
-                } catch (IllegalArgumentException e) {
-                    //Ignored the file did not match
-                    LOG.warn("Found an unexpected file in {} {}", path, sub.getPath().getName());
-                }
-            }
-        }
-        return ret.iterator();
-    }
-
-    protected Iterator<String> listKeys(Path path) throws IOException {
-        ArrayList<String> ret = new ArrayList<String>();
-        FileStatus[] files = _fs.listStatus(new Path[]{path});
-        if (files != null) {
-            for (FileStatus sub : files) {
-                try {
-                    ret.add(sub.getPath().getName().toString());
-                } catch (IllegalArgumentException e) {
-                    //Ignored the file did not match
-                    LOG.debug("Found an unexpected file in {} {}", path, sub.getPath().getName());
-                }
-            }
-        }
-        return ret.iterator();
-    }
-
-    protected int getBlobReplication(String key) throws IOException {
-        Path path = getKeyDir(key);
-        Path dest = new Path(path, BLOBSTORE_DATA);
-        return _fs.getFileStatus(dest).getReplication();
-    }
-
-    protected int updateBlobReplication(String key, int replication) throws IOException {
-        Path path = getKeyDir(key);
-        Path dest = new Path(path, BLOBSTORE_DATA);
-        _fs.setReplication(dest, (short) replication);
-        return _fs.getFileStatus(dest).getReplication();
-    }
-
-    protected void delete(Path path) throws IOException {
-        _fs.delete(path, true);
-    }
-
-    public void shutdown() {
-        if (timer != null) {
-            timer.cancel();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
deleted file mode 100644
index e68d5e3..0000000
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hdfs.blobstore;
-
-import org.apache.storm.blobstore.AtomicOutputStream;
-import org.apache.storm.blobstore.ClientBlobStore;
-import org.apache.storm.blobstore.InputStreamWithMeta;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.ReadableBlobMeta;
-import org.apache.storm.generated.SettableBlobMeta;
-import org.apache.storm.generated.KeyAlreadyExistsException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.utils.NimbusClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- *  Client to access the HDFS blobStore. At this point, this is meant to only be used by the
- *  supervisor.  Don't trust who the client says they are so pass null for all Subjects.
- *
- *  The HdfsBlobStore implementation takes care of the null Subjects. It assigns Subjects
- *  based on what hadoop says who the users are. These users must be configured accordingly
- *  in the SUPERVISOR_ADMINS for ACL validation and for the supervisors to download the blobs.
- *  This API is only used by the supervisor in order to talk directly to HDFS.
- */
-public class HdfsClientBlobStore extends ClientBlobStore {
-    private static final Logger LOG = LoggerFactory.getLogger(HdfsClientBlobStore.class);
-    private HdfsBlobStore _blobStore;
-    private Map _conf;
-    private NimbusClient client;
-
-    @Override
-    public void prepare(Map<String, Object> conf) {
-        this._conf = conf;
-        _blobStore = new HdfsBlobStore();
-        _blobStore.prepare(conf, null, null);
-    }
-
-    @Override
-    public AtomicOutputStream createBlobToExtend(String key, SettableBlobMeta meta)
-            throws AuthorizationException, KeyAlreadyExistsException {
-        return _blobStore.createBlob(key, meta, null);
-    }
-
-    @Override
-    public AtomicOutputStream updateBlob(String key)
-            throws AuthorizationException, KeyNotFoundException {
-        return _blobStore.updateBlob(key, null);
-    }
-
-    @Override
-    public ReadableBlobMeta getBlobMeta(String key)
-            throws AuthorizationException, KeyNotFoundException {
-        return _blobStore.getBlobMeta(key, null);
-    }
-
-    @Override
-    public void setBlobMetaToExtend(String key, SettableBlobMeta meta)
-            throws AuthorizationException, KeyNotFoundException {
-        _blobStore.setBlobMeta(key, meta, null);
-    }
-
-    @Override
-    public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException {
-        _blobStore.deleteBlob(key, null);
-    }
-
-    @Override
-    public InputStreamWithMeta getBlob(String key)
-            throws AuthorizationException, KeyNotFoundException {
-        return _blobStore.getBlob(key, null);
-    }
-
-    @Override
-    public Iterator<String> listKeys() {
-        return _blobStore.listKeys();
-    }
-
-    @Override
-    public int getBlobReplication(String key) throws AuthorizationException, KeyNotFoundException {
-        return _blobStore.getBlobReplication(key, null);
-    }
-
-    @Override
-    public int updateBlobReplication(String key, int replication) throws AuthorizationException, KeyNotFoundException {
-        return _blobStore.updateBlobReplication(key, replication, null);
-    }
-
-    @Override
-    public boolean setClient(Map<String, Object> conf, NimbusClient client) {
-        this.client = client;
-        return true;
-    }
-
-    @Override
-    public void createStateInZookeeper(String key) {
-        // Do nothing
-    }
-
-    @Override
-    public void shutdown() {
-        close();
-    }
-
-    @Override
-    public void close() {
-        if(client != null) {
-            client.close();
-            client = null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
deleted file mode 100644
index 19ff38c..0000000
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
+++ /dev/null
@@ -1,544 +0,0 @@
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hdfs.blobstore;
-
-import org.apache.storm.Config;
-import org.apache.storm.blobstore.AtomicOutputStream;
-import org.apache.storm.blobstore.BlobStore;
-import org.apache.storm.blobstore.BlobStoreAclHandler;
-import org.apache.storm.generated.AccessControl;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.generated.SettableBlobMeta;
-import org.apache.storm.generated.AccessControlType;
-
-import org.apache.storm.security.auth.FixedGroupsMapping;
-import org.apache.storm.security.auth.NimbusPrincipal;
-import org.apache.storm.security.auth.SingleUserPrincipal;
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.Subject;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.UUID;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.Iterator;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
-import org.junit.ClassRule;
-
-public class BlobStoreTest {
-
-    @ClassRule
-    public static final MiniDFSClusterRule DFS_CLUSTER_RULE = new MiniDFSClusterRule();
-
-    private static final Logger LOG = LoggerFactory.getLogger(BlobStoreTest.class);
-    URI base;
-    File baseFile;
-    private static final Map<String, Object> CONF = new HashMap<>();
-    public static final int READ = 0x01;
-    public static final int WRITE = 0x02;
-    public static final int ADMIN = 0x04;
-
-    @Before
-    public void init() {
-        initializeConfigs();
-        baseFile = new File("/tmp/blob-store-test-" + UUID.randomUUID());
-        base = baseFile.toURI();
-    }
-
-    @After
-    public void cleanup()
-        throws IOException {
-        FileUtils.deleteDirectory(baseFile);
-    }
-
-    // Method which initializes nimbus admin
-    public static void initializeConfigs() {
-        CONF.put(Config.NIMBUS_ADMINS, "admin");
-        CONF.put(Config.NIMBUS_ADMINS_GROUPS, "adminsGroup");
-
-        // Construct a groups mapping for the FixedGroupsMapping class
-        Map<String, Set<String>> groupsMapping = new HashMap<>();
-        Set<String> groupSet = new HashSet<>();
-        groupSet.add("adminsGroup");
-        groupsMapping.put("adminsGroupsUser", groupSet);
-
-        // Now create a params map to put it in to our conf
-        Map<String, Object> paramMap = new HashMap<>();
-        paramMap.put(FixedGroupsMapping.STORM_FIXED_GROUP_MAPPING, groupsMapping);
-        CONF.put(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN, "org.apache.storm.security.auth.FixedGroupsMapping");
-        CONF.put(Config.STORM_GROUP_MAPPING_SERVICE_PARAMS, paramMap);
-        CONF.put(Config.NIMBUS_SUPERVISOR_USERS, "supervisor");
-    }
-
-    //Gets Nimbus Subject with NimbusPrincipal set on it
-    public static Subject getNimbusSubject() {
-        Subject nimbus = new Subject();
-        nimbus.getPrincipals().add(new NimbusPrincipal());
-        return nimbus;
-    }
-
-    // Overloading the assertStoreHasExactly method accomodate Subject in order to check for authorization
-    public static void assertStoreHasExactly(BlobStore store, Subject who, String... keys)
-        throws IOException, KeyNotFoundException, AuthorizationException {
-        Set<String> expected = new HashSet<>(Arrays.asList(keys));
-        Set<String> found = new HashSet<>();
-        Iterator<String> c = store.listKeys();
-        while (c.hasNext()) {
-            String keyName = c.next();
-            found.add(keyName);
-        }
-        Set<String> extra = new HashSet<>(found);
-        extra.removeAll(expected);
-        assertTrue("Found extra keys in the blob store " + extra, extra.isEmpty());
-        Set<String> missing = new HashSet<>(expected);
-        missing.removeAll(found);
-        assertTrue("Found keys missing from the blob store " + missing, missing.isEmpty());
-    }
-
-    public static void assertStoreHasExactly(BlobStore store, String... keys)
-        throws IOException, KeyNotFoundException, AuthorizationException {
-        assertStoreHasExactly(store, null, keys);
-    }
-
-    // Overloading the readInt method accomodate Subject in order to check for authorization (security turned on)
-    public static int readInt(BlobStore store, Subject who, String key) throws IOException, KeyNotFoundException, AuthorizationException {
-        try (InputStream in = store.getBlob(key, who)) {
-            return in.read();
-        }
-    }
-
-    public static int readInt(BlobStore store, String key)
-        throws IOException, KeyNotFoundException, AuthorizationException {
-        return readInt(store, null, key);
-    }
-
-    public static void readAssertEquals(BlobStore store, String key, int value)
-        throws IOException, KeyNotFoundException, AuthorizationException {
-        assertEquals(value, readInt(store, key));
-    }
-
-    // Checks for assertion when we turn on security
-    public void readAssertEqualsWithAuth(BlobStore store, Subject who, String key, int value)
-        throws IOException, KeyNotFoundException, AuthorizationException {
-        assertEquals(value, readInt(store, who, key));
-    }
-
-    private AutoCloseableBlobStoreContainer initHdfs(String dirName)
-        throws Exception {
-        Map<String, Object> conf = new HashMap<>();
-        conf.put(Config.BLOBSTORE_DIR, dirName);
-        conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal");
-        conf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 3);
-        HdfsBlobStore store = new HdfsBlobStore();
-        store.prepareInternal(conf, null, DFS_CLUSTER_RULE.getDfscluster().getConfiguration(0));
-        return new AutoCloseableBlobStoreContainer(store);
-    }
-
-    private static class AutoCloseableBlobStoreContainer implements AutoCloseable {
-
-        private final HdfsBlobStore blobStore;
-
-        public AutoCloseableBlobStoreContainer(HdfsBlobStore blobStore) {
-            this.blobStore = blobStore;
-        }
-
-        @Override
-        public void close() throws Exception {
-            this.blobStore.shutdown();
-        }
-
-    }
-
-    @Test
-    public void testHdfsReplication()
-        throws Exception {
-        try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstoreReplication")) {
-            testReplication("/storm/blobstoreReplication/test", container.blobStore);
-        }
-    }
-
-    @Test
-    public void testBasicHdfs()
-        throws Exception {
-        try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore1")) {
-            testBasic(container.blobStore);
-        }
-    }
-
-    @Test
-    public void testMultipleHdfs()
-        throws Exception {
-        // use different blobstore dir so it doesn't conflict with other test
-        try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore2")) {
-            testMultiple(container.blobStore);
-        }
-    }
-
-    @Test
-    public void testHdfsWithAuth()
-        throws Exception {
-        // use different blobstore dir so it doesn't conflict with other tests
-        try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore3")) {
-            testWithAuthentication(container.blobStore);
-        }
-    }
-
-    // Test for replication.
-    public void testReplication(String path, BlobStore store)
-        throws Exception {
-        SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
-        metadata.set_replication_factor(4);
-        try (AtomicOutputStream out = store.createBlob("test", metadata, null)) {
-            out.write(1);
-        }
-        assertStoreHasExactly(store, "test");
-        assertEquals("Blobstore replication not matching", store.getBlobReplication("test", null), 4);
-        store.deleteBlob("test", null);
-
-        //Test for replication with NIMBUS as user
-        Subject admin = getSubject("admin");
-        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-        metadata.set_replication_factor(4);
-        try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) {
-            out.write(1);
-        }
-        assertStoreHasExactly(store, "test");
-        assertEquals("Blobstore replication not matching", store.getBlobReplication("test", admin), 4);
-        store.updateBlobReplication("test", 5, admin);
-        assertEquals("Blobstore replication not matching", store.getBlobReplication("test", admin), 5);
-        store.deleteBlob("test", admin);
-
-        //Test for replication using SUPERVISOR access
-        Subject supervisor = getSubject("supervisor");
-        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-        metadata.set_replication_factor(4);
-        try (AtomicOutputStream out = store.createBlob("test", metadata, supervisor)) {
-            out.write(1);
-        }
-        assertStoreHasExactly(store, "test");
-        assertEquals("Blobstore replication not matching", store.getBlobReplication("test", supervisor), 4);
-        store.updateBlobReplication("test", 5, supervisor);
-        assertEquals("Blobstore replication not matching", store.getBlobReplication("test", supervisor), 5);
-        store.deleteBlob("test", supervisor);
-
-        Subject adminsGroupsUser = getSubject("adminsGroupsUser");
-        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-        metadata.set_replication_factor(4);
-        try (AtomicOutputStream out = store.createBlob("test", metadata, adminsGroupsUser)) {
-            out.write(1);
-        }
-        assertStoreHasExactly(store, "test");
-        assertEquals("Blobstore replication not matching", store.getBlobReplication("test", adminsGroupsUser), 4);
-        store.updateBlobReplication("test", 5, adminsGroupsUser);
-        assertEquals("Blobstore replication not matching", store.getBlobReplication("test", adminsGroupsUser), 5);
-        store.deleteBlob("test", adminsGroupsUser);
-
-        //Test for a user having read or write or admin access to read replication for a blob
-        String createSubject = "createSubject";
-        String writeSubject = "writeSubject";
-        String adminSubject = "adminSubject";
-        Subject who = getSubject(createSubject);
-        AccessControl writeAccess = new AccessControl(AccessControlType.USER, READ);
-        AccessControl adminAccess = new AccessControl(AccessControlType.USER, ADMIN);
-        writeAccess.set_name(writeSubject);
-        adminAccess.set_name(adminSubject);
-        List<AccessControl> acl = Arrays.asList(writeAccess, adminAccess);
-        metadata = new SettableBlobMeta(acl);
-        metadata.set_replication_factor(4);
-        try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
-            out.write(1);
-        }
-        assertStoreHasExactly(store, "test");
-        who = getSubject(writeSubject);
-        assertEquals("Blobstore replication not matching", store.getBlobReplication("test", who), 4);
-
-        //Test for a user having WRITE or ADMIN privileges to change replication of a blob
-        who = getSubject(adminSubject);
-        store.updateBlobReplication("test", 5, who);
-        assertEquals("Blobstore replication not matching", store.getBlobReplication("test", who), 5);
-        store.deleteBlob("test", getSubject(createSubject));
-    }
-
-    public Subject getSubject(String name) {
-        Subject subject = new Subject();
-        SingleUserPrincipal user = new SingleUserPrincipal(name);
-        subject.getPrincipals().add(user);
-        return subject;
-    }
-
-    // Check for Blobstore with authentication
-    public void testWithAuthentication(BlobStore store)
-        throws Exception {
-        //Test for Nimbus Admin
-        Subject admin = getSubject("admin");
-        assertStoreHasExactly(store);
-        SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-        try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) {
-            assertStoreHasExactly(store, "test");
-            out.write(1);
-        }
-        store.deleteBlob("test", admin);
-
-        //Test for Nimbus Groups Admin
-        Subject adminsGroupsUser = getSubject("adminsGroupsUser");
-        assertStoreHasExactly(store);
-        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-        try (AtomicOutputStream out = store.createBlob("test", metadata, adminsGroupsUser)) {
-            assertStoreHasExactly(store, "test");
-            out.write(1);
-        }
-        store.deleteBlob("test", adminsGroupsUser);
-
-        //Test for Supervisor Admin
-        Subject supervisor = getSubject("supervisor");
-        assertStoreHasExactly(store);
-        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-        try (AtomicOutputStream out = store.createBlob("test", metadata, supervisor)) {
-            assertStoreHasExactly(store, "test");
-            out.write(1);
-        }
-        store.deleteBlob("test", supervisor);
-
-        //Test for Nimbus itself as a user
-        Subject nimbus = getNimbusSubject();
-        assertStoreHasExactly(store);
-        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-        try (AtomicOutputStream out = store.createBlob("test", metadata, nimbus)) {
-            assertStoreHasExactly(store, "test");
-            out.write(1);
-        }
-        store.deleteBlob("test", nimbus);
-
-        // Test with a dummy test_subject for cases where subject !=null (security turned on)
-        Subject who = getSubject("test_subject");
-        assertStoreHasExactly(store);
-
-        // Tests for case when subject != null (security turned on) and
-        // acls for the blob are set to WORLD_EVERYTHING
-        metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
-        try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
-            out.write(1);
-        }
-        assertStoreHasExactly(store, "test");
-        // Testing whether acls are set to WORLD_EVERYTHING
-        assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-        readAssertEqualsWithAuth(store, who, "test", 1);
-
-        LOG.info("Deleting test");
-        store.deleteBlob("test", who);
-        assertStoreHasExactly(store);
-
-        // Tests for case when subject != null (security turned on) and
-        // acls are not set for the blob (DEFAULT)
-        LOG.info("Creating test again");
-        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-        try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
-            out.write(2);
-        }
-        assertStoreHasExactly(store, "test");
-        // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because
-        // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have
-        // complete access to the blob
-        assertTrue("ACL does not contain WORLD_EVERYTHING", !metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-        readAssertEqualsWithAuth(store, who, "test", 2);
-
-        LOG.info("Updating test");
-        try (AtomicOutputStream out = store.updateBlob("test", who)) {
-            out.write(3);
-        }
-        assertStoreHasExactly(store, "test");
-        readAssertEqualsWithAuth(store, who, "test", 3);
-
-        LOG.info("Updating test again");
-        try (AtomicOutputStream out = store.updateBlob("test", who)) {
-            out.write(4);
-        }
-        LOG.info("SLEEPING");
-        Thread.sleep(2);
-        assertStoreHasExactly(store, "test");
-        readAssertEqualsWithAuth(store, who, "test", 3);
-
-        //Test for subject with no principals and acls set to WORLD_EVERYTHING
-        who = new Subject();
-        metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
-        LOG.info("Creating test");
-        try (AtomicOutputStream out = store.createBlob("test-empty-subject-WE", metadata, who)) {
-            out.write(2);
-        }
-        assertStoreHasExactly(store, "test-empty-subject-WE", "test");
-        // Testing whether acls are set to WORLD_EVERYTHING
-        assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-        readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2);
-
-        //Test for subject with no principals and acls set to DEFAULT
-        who = new Subject();
-        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-        LOG.info("Creating other");
-        try (AtomicOutputStream out = store.createBlob("test-empty-subject-DEF", metadata, who)) {
-            out.write(2);
-        }
-        assertStoreHasExactly(store, "test-empty-subject-DEF", "test", "test-empty-subject-WE");
-        // Testing whether acls are set to WORLD_EVERYTHING
-        assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-        readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2);
-
-        if (store instanceof HdfsBlobStore) {
-            ((HdfsBlobStore) store).fullCleanup(1);
-        } else {
-            fail("Error the blobstore is of unknowntype");
-        }
-    }
-
-    public void testBasic(BlobStore store)
-        throws Exception {
-        assertStoreHasExactly(store);
-        LOG.info("Creating test");
-        // Tests for case when subject == null (security turned off) and
-        // acls for the blob are set to WORLD_EVERYTHING
-        SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
-        try (AtomicOutputStream out = store.createBlob("test", metadata, null)) {
-            out.write(1);
-        }
-        assertStoreHasExactly(store, "test");
-        // Testing whether acls are set to WORLD_EVERYTHING
-        assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-        readAssertEquals(store, "test", 1);
-
-        LOG.info("Deleting test");
-        store.deleteBlob("test", null);
-        assertStoreHasExactly(store);
-
-        // The following tests are run for both hdfs and local store to test the
-        // update blob interface
-        metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
-        LOG.info("Creating test again");
-        try (AtomicOutputStream out = store.createBlob("test", metadata, null)) {
-            out.write(2);
-        }
-        assertStoreHasExactly(store, "test");
-        readAssertEquals(store, "test", 2);
-        LOG.info("Updating test");
-        try (AtomicOutputStream out = store.updateBlob("test", null)) {
-            out.write(3);
-        }
-        assertStoreHasExactly(store, "test");
-        readAssertEquals(store, "test", 3);
-
-        LOG.info("Updating test again");
-        try (AtomicOutputStream out = store.updateBlob("test", null)) {
-            out.write(4);
-        }
-        LOG.info("SLEEPING");
-        Thread.sleep(2);
-
-        if (store instanceof HdfsBlobStore) {
-            ((HdfsBlobStore) store).fullCleanup(1);
-        } else {
-            fail("Error the blobstore is of unknowntype");
-        }
-    }
-
-    public void testMultiple(BlobStore store)
-        throws Exception {
-        assertStoreHasExactly(store);
-        LOG.info("Creating test");
-        try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), null)) {
-            out.write(1);
-        }
-        assertStoreHasExactly(store, "test");
-        readAssertEquals(store, "test", 1);
-
-        LOG.info("Creating other");
-        try (AtomicOutputStream out = store.createBlob("other", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
-            null)) {
-            out.write(2);
-        }
-        assertStoreHasExactly(store, "test", "other");
-        readAssertEquals(store, "test", 1);
-        readAssertEquals(store, "other", 2);
-
-        LOG.info("Updating other");
-        try (AtomicOutputStream out = store.updateBlob("other", null)) {
-            out.write(5);
-        }
-        assertStoreHasExactly(store, "test", "other");
-        readAssertEquals(store, "test", 1);
-        readAssertEquals(store, "other", 5);
-
-        LOG.info("Deleting test");
-        store.deleteBlob("test", null);
-        assertStoreHasExactly(store, "other");
-        readAssertEquals(store, "other", 5);
-
-        LOG.info("Creating test again");
-        try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
-            null)) {
-            out.write(2);
-        }
-        assertStoreHasExactly(store, "test", "other");
-        readAssertEquals(store, "test", 2);
-        readAssertEquals(store, "other", 5);
-
-        LOG.info("Updating test");
-        try (AtomicOutputStream out = store.updateBlob("test", null)) {
-            out.write(3);
-        }
-        assertStoreHasExactly(store, "test", "other");
-        readAssertEquals(store, "test", 3);
-        readAssertEquals(store, "other", 5);
-
-        LOG.info("Deleting other");
-        store.deleteBlob("other", null);
-        assertStoreHasExactly(store, "test");
-        readAssertEquals(store, "test", 3);
-
-        LOG.info("Updating test again");
-        try (AtomicOutputStream out = store.updateBlob("test", null)) {
-            out.write(4);
-        }
-        LOG.info("SLEEPING");
-        Thread.sleep(2);
-
-        if (store instanceof HdfsBlobStore) {
-            ((HdfsBlobStore) store).fullCleanup(1);
-        } else {
-            fail("Error the blobstore is of unknowntype");
-        }
-        assertStoreHasExactly(store, "test");
-        readAssertEquals(store, "test", 3);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
deleted file mode 100644
index 3628c79..0000000
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
+++ /dev/null
@@ -1,226 +0,0 @@
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hdfs.blobstore;
-
-import org.apache.storm.blobstore.BlobStoreFile;
-import org.apache.storm.generated.SettableBlobMeta;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import static org.junit.Assert.*;
-
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
-import org.junit.ClassRule;
-
-public class HdfsBlobStoreImplTest {
-
-    @ClassRule
-    public static final MiniDFSClusterRule DFS_CLUSTER_RULE = new MiniDFSClusterRule();
-
-    private static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreImplTest.class);
-
-    // key dir needs to be number 0 to number of buckets, choose one so we know where to look
-    private static String KEYDIR = "0";
-    private Path blobDir = new Path("/storm/blobstore1");
-    private Path fullKeyDir = new Path(blobDir, KEYDIR);
-    private String BLOBSTORE_DATA = "data";
-
-    public class TestHdfsBlobStoreImpl extends HdfsBlobStoreImpl implements AutoCloseable {
-
-        public TestHdfsBlobStoreImpl(Path path, Map<String, Object> conf) throws IOException {
-            super(path, conf);
-        }
-
-        public TestHdfsBlobStoreImpl(Path path, Map<String, Object> conf,
-            Configuration hconf) throws IOException {
-            super(path, conf, hconf);
-        }
-
-        protected Path getKeyDir(String key) {
-            return new Path(new Path(blobDir, KEYDIR), key);
-        }
-
-        @Override
-        public void close() throws Exception {
-            this.shutdown();
-        }
-    }
-
-    // Be careful about adding additional tests as the dfscluster will be shared
-    @Test
-    public void testMultiple() throws Exception {
-        String testString = "testingblob";
-        String validKey = "validkeyBasic";
-
-        //Will be closed automatically when shutting down the DFS cluster
-        FileSystem fs = DFS_CLUSTER_RULE.getDfscluster().getFileSystem();
-        Map<String, Object> conf = new HashMap<>();
-
-        try (TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, DFS_CLUSTER_RULE.getHadoopConf())) {
-            // should have created blobDir
-            assertTrue("BlobStore dir wasn't created", fs.exists(blobDir));
-            assertEquals("BlobStore dir was created with wrong permissions",
-                HdfsBlobStoreImpl.BLOBSTORE_DIR_PERMISSION, fs.getFileStatus(blobDir).getPermission());
-
-            // test exist with non-existent key
-            assertFalse("file exists but shouldn't", hbs.exists("bogus"));
-
-            // test write
-            BlobStoreFile pfile = hbs.write(validKey, false);
-            // Adding metadata to avoid null pointer exception
-            SettableBlobMeta meta = new SettableBlobMeta();
-            meta.set_replication_factor(1);
-            pfile.setMetadata(meta);
-            try (OutputStream ios = pfile.getOutputStream()) {
-                ios.write(testString.getBytes(StandardCharsets.UTF_8));
-            }
-
-            // test commit creates properly
-            assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir));
-            pfile.commit();
-            Path dataFile = new Path(new Path(fullKeyDir, validKey), BLOBSTORE_DATA);
-            assertTrue("blob data not committed", fs.exists(dataFile));
-            assertEquals("BlobStore dir was created with wrong permissions",
-                HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, fs.getFileStatus(dataFile).getPermission());
-            assertTrue("key doesn't exist but should", hbs.exists(validKey));
-
-            // test read
-            BlobStoreFile readpFile = hbs.read(validKey);
-            try (InputStream inStream = readpFile.getInputStream()) {
-                String readString = IOUtils.toString(inStream, StandardCharsets.UTF_8);
-                assertEquals("string read from blob doesn't match", testString, readString);
-            }
-
-            // test listkeys
-            Iterator<String> keys = hbs.listKeys();
-            assertTrue("blob has one key", keys.hasNext());
-            assertEquals("one key in blobstore", validKey, keys.next());
-
-            // delete
-            hbs.deleteKey(validKey);
-            assertFalse("key not deleted", fs.exists(dataFile));
-            assertFalse("key not deleted", hbs.exists(validKey));
-
-            // Now do multiple
-            String testString2 = "testingblob2";
-            String validKey2 = "validkey2";
-
-            // test write
-            pfile = hbs.write(validKey, false);
-            pfile.setMetadata(meta);
-            try (OutputStream ios = pfile.getOutputStream()) {
-                ios.write(testString.getBytes(StandardCharsets.UTF_8));
-            }
-
-            // test commit creates properly
-            assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir));
-            pfile.commit();
-            assertTrue("blob data not committed", fs.exists(dataFile));
-            assertEquals("BlobStore dir was created with wrong permissions",
-                HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, fs.getFileStatus(dataFile).getPermission());
-            assertTrue("key doesn't exist but should", hbs.exists(validKey));
-
-            // test write again
-            pfile = hbs.write(validKey2, false);
-            pfile.setMetadata(meta);
-            try (OutputStream ios2 = pfile.getOutputStream()) {
-                ios2.write(testString2.getBytes(StandardCharsets.UTF_8));
-            }
-
-            // test commit second creates properly
-            pfile.commit();
-            Path dataFile2 = new Path(new Path(fullKeyDir, validKey2), BLOBSTORE_DATA);
-            assertTrue("blob data not committed", fs.exists(dataFile2));
-            assertEquals("BlobStore dir was created with wrong permissions",
-                HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, fs.getFileStatus(dataFile2).getPermission());
-            assertTrue("key doesn't exist but should", hbs.exists(validKey2));
-
-            // test listkeys
-            keys = hbs.listKeys();
-            int total = 0;
-            boolean key1Found = false;
-            boolean key2Found = false;
-            while (keys.hasNext()) {
-                total++;
-                String key = keys.next();
-                if (key.equals(validKey)) {
-                    key1Found = true;
-                } else if (key.equals(validKey2)) {
-                    key2Found = true;
-                } else {
-                    fail("Found key that wasn't expected: " + key);
-                }
-            }
-            assertEquals("number of keys is wrong", 2, total);
-            assertTrue("blobstore missing key1", key1Found);
-            assertTrue("blobstore missing key2", key2Found);
-
-            // test read
-            readpFile = hbs.read(validKey);
-            try (InputStream inStream = readpFile.getInputStream()) {
-                String readString = IOUtils.toString(inStream, StandardCharsets.UTF_8);
-                assertEquals("string read from blob doesn't match", testString, readString);
-            }
-
-            // test read
-            readpFile = hbs.read(validKey2);
-            try (InputStream inStream = readpFile.getInputStream()) {
-                String readString = IOUtils.toString(inStream, StandardCharsets.UTF_8);
-                assertEquals("string read from blob doesn't match", testString2, readString);
-            }
-
-            hbs.deleteKey(validKey);
-            assertFalse("key not deleted", hbs.exists(validKey));
-            hbs.deleteKey(validKey2);
-            assertFalse("key not deleted", hbs.exists(validKey2));
-        }
-    }
-
-    @Test
-    public void testGetFileLength() throws Exception {
-        Map<String, Object> conf = new HashMap<>();
-        String validKey = "validkeyBasic";
-        String testString = "testingblob";
-        try (TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, DFS_CLUSTER_RULE.getHadoopConf())) {
-            BlobStoreFile pfile = hbs.write(validKey, false);
-            // Adding metadata to avoid null pointer exception
-            SettableBlobMeta meta = new SettableBlobMeta();
-            meta.set_replication_factor(1);
-            pfile.setMetadata(meta);
-            try (OutputStream ios = pfile.getOutputStream()) {
-                ios.write(testString.getBytes(StandardCharsets.UTF_8));
-            }
-            assertEquals(testString.getBytes(StandardCharsets.UTF_8).length, pfile.getFileLength());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f58d4729/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ccace42..dee254e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -351,6 +351,7 @@
         <module>external/storm-autocreds</module>
         <module>external/storm-kafka</module>
         <module>external/storm-hdfs</module>
+        <module>external/storm-hdfs-blobstore</module>
         <module>external/storm-hbase</module>
         <module>external/storm-hive</module>
         <module>external/storm-jdbc</module>


Mime
View raw message