kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [1/6] kudu git commit: block_manager: consolidate data directory management
Date Wed, 02 Nov 2016 21:39:43 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 401a59274 -> bd4361254


block_manager: consolidate data directory management

This patch changes how data directories are handled. All of the duplicated
logic between the two block managers is now encapsulated in two new classes
(DataDir and DataDirManager), which can serve as the future foundation for
a "disk grouping" feature.

There should be little to no functional changes. Of note:
- This refactoring makes the LBM check instance file integrity during Open()
  for the first time. Given that the remainder of the instance file handling
  was identical in both block managers, this should be harmless.
- The LBM now opens per-directory instance files serially. This is a tiny
  amount of I/O next to container scanning, and while it can be done
  concurrently in the DataDirManager, I don't think it's worth the effort.

Change-Id: Ifc9d8999117a6383e7487b6c5bf065f10247b1d7
Reviewed-on: http://gerrit.cloudera.org:8080/4793
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mpercy@apache.org>
Reviewed-by: Dinesh Bhat <dinesh@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/3a3f714e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/3a3f714e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/3a3f714e

Branch: refs/heads/master
Commit: 3a3f714e4df4ed1ddef5fdf7c55086d1dd0f4794
Parents: 401a592
Author: Adar Dembo <adar@cloudera.com>
Authored: Fri Oct 21 18:24:37 2016 -0700
Committer: Adar Dembo <adar@cloudera.com>
Committed: Wed Nov 2 18:11:03 2016 +0000

----------------------------------------------------------------------
 src/kudu/fs/CMakeLists.txt        |   1 +
 src/kudu/fs/block_manager.cc      |   2 -
 src/kudu/fs/block_manager.h       |   3 -
 src/kudu/fs/data_dirs.cc          | 327 ++++++++++++++++++++++++++++
 src/kudu/fs/data_dirs.h           | 148 +++++++++++++
 src/kudu/fs/file_block_manager.cc | 160 ++------------
 src/kudu/fs/file_block_manager.h  |  22 +-
 src/kudu/fs/log_block_manager.cc  | 376 +++++++++------------------------
 src/kudu/fs/log_block_manager.h   |  40 +---
 9 files changed, 609 insertions(+), 470 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/3a3f714e/src/kudu/fs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/fs/CMakeLists.txt b/src/kudu/fs/CMakeLists.txt
index 23a713e..3f72bb6 100644
--- a/src/kudu/fs/CMakeLists.txt
+++ b/src/kudu/fs/CMakeLists.txt
@@ -30,6 +30,7 @@ add_library(kudu_fs
   block_manager.cc
   block_manager_metrics.cc
   block_manager_util.cc
+  data_dirs.cc
   file_block_manager.cc
   fs_manager.cc
   log_block_manager.cc)

http://git-wip-us.apache.org/repos/asf/kudu/blob/3a3f714e/src/kudu/fs/block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager.cc b/src/kudu/fs/block_manager.cc
index f50c59a..7f921ae 100644
--- a/src/kudu/fs/block_manager.cc
+++ b/src/kudu/fs/block_manager.cc
@@ -31,8 +31,6 @@ TAG_FLAG(block_manager_lock_dirs, unsafe);
 namespace kudu {
 namespace fs {
 
-const char* BlockManager::kInstanceMetadataFileName = "block_manager_instance";
-
 BlockManagerOptions::BlockManagerOptions()
   : read_only(false) {
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/3a3f714e/src/kudu/fs/block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager.h b/src/kudu/fs/block_manager.h
index cc5c5a1..3f4cfe4 100644
--- a/src/kudu/fs/block_manager.h
+++ b/src/kudu/fs/block_manager.h
@@ -231,9 +231,6 @@ class BlockManager {
   //
   // On success, guarantees that outstanding data is durable.
   virtual Status CloseBlocks(const std::vector<WritableBlock*>& blocks) = 0;
-
- protected:
-  static const char* kInstanceMetadataFileName;
 };
 
 // Closes a group of blocks.

http://git-wip-us.apache.org/repos/asf/kudu/blob/3a3f714e/src/kudu/fs/data_dirs.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/data_dirs.cc b/src/kudu/fs/data_dirs.cc
new file mode 100644
index 0000000..632f5b1
--- /dev/null
+++ b/src/kudu/fs/data_dirs.cc
@@ -0,0 +1,327 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/fs/data_dirs.h"
+
+#include <cerrno>
+#include <deque>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/fs/block_manager.h"
+#include "kudu/fs/block_manager_util.h"
+#include "kudu/fs/fs.pb.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/env.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/oid_generator.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/threadpool.h"
+
+namespace kudu {
+
+namespace fs {
+
+using env_util::ScopedFileDeleter;
+using std::deque;
+using std::string;
+using std::unique_ptr;
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+
+namespace {
+
+const char kInstanceMetadataFileName[] = "block_manager_instance";
+
+const char kHolePunchErrorMsg[] =
+    "Error during hole punch test. The log block manager requires a "
+    "filesystem with hole punching support such as ext4 or xfs. On el6, "
+    "kernel version 2.6.32-358 or newer is required. To run without hole "
+    "punching (at the cost of some efficiency and scalability), reconfigure "
+    "Kudu with --block_manager=file. Refer to the Kudu documentation for more "
+    "details. Raw error message follows";
+
+Status CheckHolePunch(Env* env, const string& path) {
+  // Arbitrary constants.
+  static uint64_t kFileSize = 4096 * 4;
+  static uint64_t kHoleOffset = 4096;
+  static uint64_t kHoleSize = 8192;
+  static uint64_t kPunchedFileSize = kFileSize - kHoleSize;
+
+  // Open the test file.
+  string filename = JoinPathSegments(path, "hole_punch_test_file");
+  gscoped_ptr<RWFile> file;
+  RWFileOptions opts;
+  RETURN_NOT_OK(env->NewRWFile(opts, filename, &file));
+
+  // The file has been created; delete it on exit no matter what happens.
+  ScopedFileDeleter file_deleter(env, filename);
+
+  // Preallocate it, making sure the file's size is what we'd expect.
+  uint64_t sz;
+  RETURN_NOT_OK(file->PreAllocate(0, kFileSize));
+  RETURN_NOT_OK(env->GetFileSizeOnDisk(filename, &sz));
+  if (sz != kFileSize) {
+    return Status::IOError(Substitute(
+        "Unexpected pre-punch file size for $0: expected $1 but got $2",
+        filename, kFileSize, sz));
+  }
+
+  // Punch the hole, testing the file's size again.
+  RETURN_NOT_OK(file->PunchHole(kHoleOffset, kHoleSize));
+  RETURN_NOT_OK(env->GetFileSizeOnDisk(filename, &sz));
+  if (sz != kPunchedFileSize) {
+    return Status::IOError(Substitute(
+        "Unexpected post-punch file size for $0: expected $1 but got $2",
+        filename, kPunchedFileSize, sz));
+  }
+
+  return Status::OK();
+}
+
+} // anonymous namespace
+
+DataDir::DataDir(Env* env,
+                 string dir,
+                 unique_ptr<PathInstanceMetadataFile> metadata_file,
+                 unique_ptr<ThreadPool> pool)
+    : env_(env),
+      dir_(std::move(dir)),
+      metadata_file_(std::move(metadata_file)),
+      pool_(std::move(pool)),
+      is_shutdown_(false) {
+}
+
+DataDir::~DataDir() {
+  Shutdown();
+}
+
+void DataDir::Shutdown() {
+  if (is_shutdown_) {
+    return;
+  }
+
+  pool_->Wait();
+  pool_->Shutdown();
+  is_shutdown_ = true;
+}
+
+void DataDir::ExecClosure(const Closure& task) {
+  Status s = pool_->SubmitClosure(task);
+  if (!s.ok()) {
+    WARN_NOT_OK(
+        s, "Could not submit task to thread pool, running it synchronously");
+    task.Run();
+  }
+}
+
+void DataDir::WaitOnClosures() {
+  pool_->Wait();
+}
+
+DataDirManager::DataDirManager(Env* env,
+                               string block_manager_type,
+                               vector<string> paths)
+    : env_(env),
+      block_manager_type_(std::move(block_manager_type)),
+      paths_(std::move(paths)),
+      data_dirs_next_(0) {
+  DCHECK_GT(paths_.size(), 0);
+}
+
+DataDirManager::~DataDirManager() {
+  Shutdown();
+}
+
+void DataDirManager::Shutdown() {
+  // We may be waiting here for a while on outstanding closures.
+  LOG_SLOW_EXECUTION(INFO, 1000,
+                     Substitute("waiting on $0 block manager thread pools",
+                                data_dirs_.size())) {
+    for (const auto& dd : data_dirs_) {
+      dd->Shutdown();
+    }
+  }
+}
+
+Status DataDirManager::Create(int flags) {
+  deque<ScopedFileDeleter*> delete_on_failure;
+  ElementDeleter d(&delete_on_failure);
+
+  // The UUIDs and indices will be included in every instance file.
+  ObjectIdGenerator gen;
+  vector<string> all_uuids(paths_.size());
+  for (string& u : all_uuids) {
+    u = gen.Next();
+  }
+  int idx = 0;
+
+  // Ensure the data paths exist and create the instance files.
+  unordered_set<string> to_sync;
+  for (const auto& p : paths_) {
+    bool created;
+    RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(env_, p, &created),
+                          Substitute("Could not create directory $0", p));
+    if (created) {
+      delete_on_failure.push_front(new ScopedFileDeleter(env_, p));
+      to_sync.insert(DirName(p));
+    }
+
+    if (flags & FLAG_CREATE_TEST_HOLE_PUNCH) {
+      RETURN_NOT_OK_PREPEND(CheckHolePunch(env_, p), kHolePunchErrorMsg);
+    }
+
+    string instance_filename = JoinPathSegments(p, kInstanceMetadataFileName);
+    PathInstanceMetadataFile metadata(env_, block_manager_type_,
+                                      instance_filename);
+    RETURN_NOT_OK_PREPEND(metadata.Create(all_uuids[idx], all_uuids), instance_filename);
+    delete_on_failure.push_front(new ScopedFileDeleter(env_, instance_filename));
+    idx++;
+  }
+
+  // Ensure newly created directories are synchronized to disk.
+  if (flags & FLAG_CREATE_FSYNC) {
+    for (const string& dir : to_sync) {
+      RETURN_NOT_OK_PREPEND(env_->SyncDir(dir),
+                            Substitute("Unable to synchronize directory $0", dir));
+    }
+  }
+
+  // Success: don't delete any files.
+  for (ScopedFileDeleter* deleter : delete_on_failure) {
+    deleter->Cancel();
+  }
+  return Status::OK();
+}
+
+Status DataDirManager::Open(int max_data_dirs, LockMode mode) {
+  vector<PathInstanceMetadataFile*> instances;
+  vector<unique_ptr<DataDir>> dds;
+
+  int i = 0;
+  for (const auto& p : paths_) {
+    // Open and lock the data dir's metadata instance file.
+    string instance_filename = JoinPathSegments(p, kInstanceMetadataFileName);
+    gscoped_ptr<PathInstanceMetadataFile> instance(
+        new PathInstanceMetadataFile(env_, block_manager_type_,
+                                     instance_filename));
+    RETURN_NOT_OK_PREPEND(instance->LoadFromDisk(),
+                          Substitute("Could not open $0", instance_filename));
+    if (mode != LockMode::NONE) {
+      Status s = instance->Lock();
+      if (!s.ok()) {
+        Status new_status = s.CloneAndPrepend(Substitute(
+            "Could not lock $0", instance_filename));
+        if (mode == LockMode::OPTIONAL) {
+          LOG(WARNING) << new_status.ToString();
+          LOG(WARNING) << "Proceeding without lock";
+        } else {
+          DCHECK(LockMode::MANDATORY == mode);
+          RETURN_NOT_OK(new_status);
+        }
+      }
+    }
+    instances.push_back(instance.get());
+
+    // Create a per-dir thread pool.
+    gscoped_ptr<ThreadPool> pool;
+    RETURN_NOT_OK(ThreadPoolBuilder(Substitute("data dir $0", i))
+                  .set_max_threads(1)
+                  .Build(&pool));
+
+    // Create the data directory in-memory structure itself.
+    unique_ptr<DataDir> dd(new DataDir(
+        env_, p,
+        unique_ptr<PathInstanceMetadataFile>(instance.release()),
+        unique_ptr<ThreadPool>(pool.release())));
+
+    dds.emplace_back(std::move(dd));
+    i++;
+  }
+
+  RETURN_NOT_OK_PREPEND(PathInstanceMetadataFile::CheckIntegrity(instances),
+                        Substitute("Could not verify integrity of files: $0",
+                                   JoinStrings(paths_, ",")));
+
+  // Build uuid index and data directory maps.
+  UuidIndexMap dd_by_uuid_idx;
+  ReverseUuidIndexMap uuid_idx_by_dd;
+  for (const auto& dd : dds) {
+    const PathSetPB& path_set = dd->instance()->metadata()->path_set();
+    uint32_t idx = -1;
+    for (int i = 0; i < path_set.all_uuids_size(); i++) {
+      if (path_set.uuid() == path_set.all_uuids(i)) {
+        idx = i;
+        break;
+      }
+    }
+    DCHECK_NE(idx, -1); // Guaranteed by CheckIntegrity().
+    if (idx > max_data_dirs) {
+      return Status::NotSupported(
+          Substitute("Block manager supports a maximum of $0 paths", max_data_dirs));
+    }
+    InsertOrDie(&dd_by_uuid_idx, idx, dd.get());
+    InsertOrDie(&uuid_idx_by_dd, dd.get(), idx);
+  }
+
+  data_dirs_.swap(dds);
+  data_dir_by_uuid_idx_.swap(dd_by_uuid_idx);
+  uuid_idx_by_data_dir_.swap(uuid_idx_by_dd);
+  return Status::OK();
+}
+
+DataDir* DataDirManager::GetNextDataDir() {
+  // Round robin through the data dirs.
+  int32_t cur_idx;
+  int32_t next_idx;
+  do {
+    cur_idx = data_dirs_next_.Load();
+    next_idx = (cur_idx + 1) % data_dirs_.size();
+  } while (!data_dirs_next_.CompareAndSet(cur_idx, next_idx));
+
+  return data_dirs_[cur_idx].get();
+}
+
+DataDir* DataDirManager::FindDataDirByUuidIndex(uint16_t uuid_idx) const {
+  return FindPtrOrNull(data_dir_by_uuid_idx_, uuid_idx);
+}
+
+bool DataDirManager::FindUuidIndexByDataDir(DataDir* dir, uint16_t* uuid_idx) const {
+  return FindCopy(uuid_idx_by_data_dir_, dir, uuid_idx);
+}
+
+} // namespace fs
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/3a3f714e/src/kudu/fs/data_dirs.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/data_dirs.h b/src/kudu/fs/data_dirs.h
new file mode 100644
index 0000000..1402077
--- /dev/null
+++ b/src/kudu/fs/data_dirs.h
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "kudu/gutil/callback_forward.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+class Env;
+class ThreadPool;
+
+namespace fs {
+class PathInstanceMetadataFile;
+
+// Representation of a data directory in use by the block manager.
+class DataDir {
+ public:
+  DataDir(Env* env,
+          std::string dir,
+          std::unique_ptr<PathInstanceMetadataFile> metadata_file,
+          std::unique_ptr<ThreadPool> pool);
+  ~DataDir();
+
+  // Shuts down this dir's thread pool, waiting for any closures submitted via
+  // ExecClosure() to finish first.
+  void Shutdown();
+
+  // Run a task on this dir's thread pool.
+  //
+  // Normally the task is performed asynchronously. However, if submission to
+  // the pool fails, it runs synchronously on the current thread.
+  void ExecClosure(const Closure& task);
+
+  // Waits for any outstanding closures submitted via ExecClosure() to finish.
+  void WaitOnClosures();
+
+  const std::string& dir() const { return dir_; }
+
+  const PathInstanceMetadataFile* instance() const {
+    return metadata_file_.get();
+  }
+
+ private:
+  Env* env_;
+  const std::string dir_;
+  const std::unique_ptr<PathInstanceMetadataFile> metadata_file_;
+  const std::unique_ptr<ThreadPool> pool_;
+
+  bool is_shutdown_;
+
+  DISALLOW_COPY_AND_ASSIGN(DataDir);
+};
+
+// Encapsulates knowledge of data directory management on behalf of block
+// managers.
+class DataDirManager {
+ public:
+  // Flags for Create().
+  static const int FLAG_CREATE_TEST_HOLE_PUNCH = 0x1;
+  static const int FLAG_CREATE_FSYNC = 0x2;
+
+  enum class LockMode {
+    MANDATORY,
+    OPTIONAL,
+    NONE,
+  };
+
+  DataDirManager(Env* env,
+                 std::string block_manager_type,
+                 std::vector<std::string> paths);
+  ~DataDirManager();
+
+  // Shuts down all directories' thread pools.
+  void Shutdown();
+
+  // Initializes the data directories on disk.
+  //
+  // Returns an error if initialized directories already exist.
+  Status Create(int flags);
+
+  // Opens existing data directories from disk.
+  //
+  // Returns an error if the number of on-disk data directories found exceeds
+  // 'max_data_dirs', or if 'mode' is MANDATORY and locks could not be taken.
+  Status Open(int max_data_dirs, LockMode mode);
+
+  // Retrieves the next data directory. Directories are rotated
+  // via round-robin.
+  DataDir* GetNextDataDir();
+
+  // Finds a data directory by uuid index, returning nullptr if it can't be
+  // found.
+  //
+  // More information on uuid indexes and their relation to data directories
+  // can be found next to PathSetPB in fs.proto.
+  DataDir* FindDataDirByUuidIndex(uint16_t uuid_idx) const;
+
+  // Finds a uuid index by data directory, returning false if it can't be found.
+  bool FindUuidIndexByDataDir(DataDir* dir,
+                              uint16_t* uuid_idx) const;
+
+  const std::vector<std::unique_ptr<DataDir>>& data_dirs() const {
+    return data_dirs_;
+  }
+
+ private:
+  Env* env_;
+  const std::string block_manager_type_;
+  const std::vector<std::string> paths_;
+
+  std::vector<std::unique_ptr<DataDir>> data_dirs_;
+
+  AtomicInt<int32_t> data_dirs_next_;
+
+  typedef std::unordered_map<uint16_t, DataDir*> UuidIndexMap;
+  UuidIndexMap data_dir_by_uuid_idx_;
+
+  typedef std::unordered_map<DataDir*, uint16_t> ReverseUuidIndexMap;
+  ReverseUuidIndexMap uuid_idx_by_data_dir_;
+
+  DISALLOW_COPY_AND_ASSIGN(DataDirManager);
+};
+
+} // namespace fs
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/3a3f714e/src/kudu/fs/file_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index 75de798..2a49eb1 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -17,16 +17,10 @@
 
 #include "kudu/fs/file_block_manager.h"
 
-#include <deque>
 #include <string>
-#include <unordered_set>
 #include <vector>
 
 #include "kudu/fs/block_manager_metrics.h"
-#include "kudu/fs/block_manager_util.h"
-#include "kudu/fs/fs.pb.h"
-#include "kudu/gutil/map-util.h"
-#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/env.h"
@@ -34,15 +28,12 @@
 #include "kudu/util/malloc.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/metrics.h"
-#include "kudu/util/oid_generator.h"
 #include "kudu/util/path_util.h"
 #include "kudu/util/random_util.h"
 #include "kudu/util/status.h"
 
-using kudu::env_util::ScopedFileDeleter;
 using std::shared_ptr;
 using std::string;
-using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
@@ -496,167 +487,57 @@ Status FileBlockManager::SyncMetadata(const internal::FileBlockLocation& locatio
 
 bool FileBlockManager::FindBlockPath(const BlockId& block_id,
                                      string* path) const {
-  PathInstanceMetadataFile* metadata_file = FindPtrOrNull(
-      root_paths_by_idx_, internal::FileBlockLocation::GetRootPathIdx(block_id));
-  if (metadata_file) {
+  DataDir* dir = dd_manager_.FindDataDirByUuidIndex(
+      internal::FileBlockLocation::GetRootPathIdx(block_id));
+  if (dir) {
     *path = internal::FileBlockLocation::FromBlockId(
-        metadata_file->path(), block_id).GetFullPath();
+        dir->dir(), block_id).GetFullPath();
   }
-  return metadata_file != nullptr;
+  return dir != nullptr;
 }
 
 FileBlockManager::FileBlockManager(Env* env, const BlockManagerOptions& opts)
   : env_(DCHECK_NOTNULL(env)),
     read_only_(opts.read_only),
-    root_paths_(opts.root_paths),
+    dd_manager_(env, kBlockManagerType, opts.root_paths),
     rand_(GetRandomSeed32()),
     next_block_id_(rand_.Next64()),
     mem_tracker_(MemTracker::CreateTracker(-1,
                                            "file_block_manager",
                                            opts.parent_mem_tracker)) {
-  DCHECK_GT(root_paths_.size(), 0);
   if (opts.metric_entity) {
     metrics_.reset(new internal::BlockManagerMetrics(opts.metric_entity));
   }
 }
 
 FileBlockManager::~FileBlockManager() {
-  STLDeleteValues(&root_paths_by_idx_);
 }
 
 Status FileBlockManager::Create() {
   CHECK(!read_only_);
-
-  deque<ScopedFileDeleter*> delete_on_failure;
-  ElementDeleter d(&delete_on_failure);
-
-  if (root_paths_.size() > kMaxPaths) {
-    return Status::NotSupported(
-        Substitute("File block manager supports a maximum of $0 paths", kMaxPaths));
-  }
-
-  // The UUIDs and indices will be included in every instance file.
-  ObjectIdGenerator oid_generator;
-  vector<string> all_uuids(root_paths_.size());
-  for (string& u : all_uuids) {
-    u = oid_generator.Next();
-  }
-  int idx = 0;
-
-  // Ensure the data paths exist and create the instance files.
-  unordered_set<string> to_sync;
-  for (const string& root_path : root_paths_) {
-    bool created;
-    RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(env_, root_path, &created),
-                          Substitute("Could not create directory $0", root_path));
-    if (created) {
-      delete_on_failure.push_front(new ScopedFileDeleter(env_, root_path));
-      to_sync.insert(DirName(root_path));
-    }
-
-    string instance_filename = JoinPathSegments(
-        root_path, kInstanceMetadataFileName);
-    PathInstanceMetadataFile metadata(env_, kBlockManagerType,
-                                      instance_filename);
-    RETURN_NOT_OK_PREPEND(metadata.Create(all_uuids[idx], all_uuids),
-                          Substitute("Could not create $0", instance_filename));
-    delete_on_failure.push_front(new ScopedFileDeleter(env_, instance_filename));
-    idx++;
-  }
-
-  // Ensure newly created directories are synchronized to disk.
-  if (FLAGS_enable_data_block_fsync) {
-    for (const string& dir : to_sync) {
-      RETURN_NOT_OK_PREPEND(env_->SyncDir(dir),
-                            Substitute("Unable to synchronize directory $0", dir));
-    }
-  }
-
-  // Success: don't delete any files.
-  for (ScopedFileDeleter* deleter : delete_on_failure) {
-    deleter->Cancel();
-  }
-  return Status::OK();
+  return dd_manager_.Create(
+      FLAGS_enable_data_block_fsync ? DataDirManager::FLAG_CREATE_FSYNC : 0);
 }
 
 Status FileBlockManager::Open() {
-  vector<PathInstanceMetadataFile*> instances;
-  ElementDeleter deleter(&instances);
-  instances.reserve(root_paths_.size());
-
-  for (const string& root_path : root_paths_) {
-    if (!env_->FileExists(root_path)) {
-      return Status::NotFound(Substitute(
-          "FileBlockManager at $0 not found", root_path));
-    }
-    string instance_filename = JoinPathSegments(
-        root_path, kInstanceMetadataFileName);
-    gscoped_ptr<PathInstanceMetadataFile> metadata(
-        new PathInstanceMetadataFile(env_, kBlockManagerType,
-                                     instance_filename));
-    RETURN_NOT_OK_PREPEND(metadata->LoadFromDisk(),
-                          Substitute("Could not open $0", instance_filename));
-    if (FLAGS_block_manager_lock_dirs) {
-      Status s = metadata->Lock();
-      if (!s.ok()) {
-        Status new_status = s.CloneAndPrepend(Substitute(
-            "Could not lock $0", instance_filename));
-        if (read_only_) {
-          // Not fatal in read-only mode.
-          LOG(WARNING) << new_status.ToString();
-          LOG(WARNING) << "Proceeding without lock";
-        } else {
-          return new_status;
-        }
-      }
-    }
-
-    instances.push_back(metadata.release());
-  }
-
-  RETURN_NOT_OK_PREPEND(PathInstanceMetadataFile::CheckIntegrity(instances),
-                        Substitute("Could not verify integrity of files: $0",
-                                   JoinStrings(root_paths_, ",")));
-
-  PathMap instances_by_idx;
-  for (PathInstanceMetadataFile* instance : instances) {
-    const PathSetPB& path_set = instance->metadata()->path_set();
-    uint32_t idx = -1;
-    for (int i = 0; i < path_set.all_uuids_size(); i++) {
-      if (path_set.uuid() == path_set.all_uuids(i)) {
-        idx = i;
-        break;
-      }
-    }
-    DCHECK_NE(idx, -1); // Guaranteed by CheckIntegrity().
-    if (idx > kMaxPaths) {
-      return Status::NotSupported(
-          Substitute("File block manager supports a maximum of $0 paths", kMaxPaths));
-    }
-    InsertOrDie(&instances_by_idx, idx, instance);
+  DataDirManager::LockMode mode;
+  if (!FLAGS_block_manager_lock_dirs) {
+    mode = DataDirManager::LockMode::NONE;
+  } else if (read_only_) {
+    mode = DataDirManager::LockMode::OPTIONAL;
+  } else {
+    mode = DataDirManager::LockMode::MANDATORY;
   }
-  instances.clear();
-  instances_by_idx.swap(root_paths_by_idx_);
-  next_root_path_ = root_paths_by_idx_.begin();
-  return Status::OK();
+  return dd_manager_.Open(kMaxPaths, mode);
 }
 
 Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
                                      gscoped_ptr<WritableBlock>* block) {
   CHECK(!read_only_);
 
-  // Pick a root path using a simple round-robin block placement strategy.
-  uint16_t root_path_idx;
-  string root_path;
-  {
-    std::lock_guard<simple_spinlock> l(lock_);
-    root_path_idx = next_root_path_->first;
-    root_path = next_root_path_->second->path();
-    next_root_path_++;
-    if (next_root_path_ == root_paths_by_idx_.end()) {
-      next_root_path_ = root_paths_by_idx_.begin();
-    }
-  }
+  DataDir* dir = dd_manager_.GetNextDataDir();
+  uint16_t uuid_idx;
+  CHECK(dd_manager_.FindUuidIndexByDataDir(dir, &uuid_idx));
 
   string path;
   vector<string> created_dirs;
@@ -682,8 +563,7 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
       id.SetId(next_block_id_.Increment());
     } while (id.IsNull());
 
-    location = internal::FileBlockLocation::FromParts(
-        root_path, root_path_idx, id);
+    location = internal::FileBlockLocation::FromParts(dir->dir(), uuid_idx, id);
     path = location.GetFullPath();
     RETURN_NOT_OK_PREPEND(location.CreateBlockDir(env_, &created_dirs), path);
     WritableFileOptions wr_opts;

http://git-wip-us.apache.org/repos/asf/kudu/blob/3a3f714e/src/kudu/fs/file_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.h b/src/kudu/fs/file_block_manager.h
index 65fa2de..223f6db 100644
--- a/src/kudu/fs/file_block_manager.h
+++ b/src/kudu/fs/file_block_manager.h
@@ -18,7 +18,6 @@
 #ifndef KUDU_FS_FILE_BLOCK_MANAGER_H
 #define KUDU_FS_FILE_BLOCK_MANAGER_H
 
-#include <map>
 #include <memory>
 #include <string>
 #include <unordered_set>
@@ -26,6 +25,7 @@
 
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
+#include "kudu/fs/data_dirs.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/random.h"
@@ -38,7 +38,6 @@ class MetricEntity;
 class WritableFile;
 
 namespace fs {
-class PathInstanceMetadataFile;
 
 namespace internal {
 class FileBlockLocation;
@@ -103,8 +102,7 @@ class FileBlockManager : public BlockManager {
   // Looks up the path of the file backing a particular block ID.
   //
   // On success, overwrites 'path' with the file's path.
-  bool FindBlockPath(const BlockId& block_id,
-                     std::string* root_path) const;
+  bool FindBlockPath(const BlockId& block_id, std::string* path) const;
 
   Env* env() const { return env_; }
 
@@ -114,30 +112,20 @@ class FileBlockManager : public BlockManager {
   // If true, only read operations are allowed.
   const bool read_only_;
 
-  // Filesystem paths where all block directories are found.
-  const std::vector<std::string> root_paths_;
-
-  // Maps path indices their instance files.
-  //
-  // There's no need to synchronize access to the map as it is only written
-  // to during Create() and Open(); all subsequent accesses are reads.
-  typedef std::map<uint16_t, PathInstanceMetadataFile*> PathMap;
-  PathMap root_paths_by_idx_;
+  // Manages and owns all of the block manager's data directories.
+  DataDirManager dd_manager_;
 
   // For generating block IDs.
   ThreadSafeRandom rand_;
   AtomicInt<int64_t> next_block_id_;
 
-  // Protects 'dirty_dirs_' and 'next_root_path_'.
+  // Protects 'dirty_dirs_'.
   mutable simple_spinlock lock_;
 
   // Tracks the block directories which are dirty from block creation. This
   // lets us perform some simple coalescing when synchronizing metadata.
   std::unordered_set<std::string> dirty_dirs_;
 
-  // Points to the filesystem path to be used when creating the next block.
-  PathMap::iterator next_root_path_;
-
   // Metric container for the block manager.
   // May be null if instantiated without metrics.
   gscoped_ptr<internal::BlockManagerMetrics> metrics_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/3a3f714e/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 0046af9..4634e86 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -22,7 +22,10 @@
 
 #include "kudu/fs/block_manager_metrics.h"
 #include "kudu/fs/block_manager_util.h"
+#include "kudu/fs/data_dirs.h"
+#include "kudu/fs/fs.pb.h"
 #include "kudu/gutil/callback.h"
+#include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/strip.h"
@@ -103,18 +106,18 @@ METRIC_DEFINE_counter(server, log_block_manager_unavailable_containers,
                       "Number of non-full log block containers that are under root paths "
                       "whose disks are full");
 
-using kudu::env_util::ScopedFileDeleter;
-using kudu::fs::internal::LogBlock;
-using kudu::fs::internal::LogBlockContainer;
-using kudu::pb_util::ReadablePBContainerFile;
-using kudu::pb_util::WritablePBContainerFile;
-using std::unordered_map;
-using std::unordered_set;
-using strings::Substitute;
 
 namespace kudu {
 
 namespace fs {
+
+using internal::LogBlock;
+using internal::LogBlockContainer;
+using pb_util::ReadablePBContainerFile;
+using pb_util::WritablePBContainerFile;
+using std::unordered_set;
+using strings::Substitute;
+
 namespace internal {
 
 ////////////////////////////////////////////////////////////
@@ -170,21 +173,20 @@ class LogBlockContainer {
 
   // Creates a new block container in 'dir'.
   static Status Create(LogBlockManager* block_manager,
-                       PathInstanceMetadataPB* instance,
-                       const std::string& dir,
+                       DataDir* dir,
                        gscoped_ptr<LogBlockContainer>* container);
 
   // Opens an existing block container in 'dir'.
   //
   // Every container is comprised of two files: "<dir>/<id>.data" and
-  // "<dir>/<id>.metadata". Together, 'dir' and 'id' fully describe both files.
+  // "<dir>/<id>.metadata". Together, 'dir' and 'id' fully describe both
+  // files.
   //
   // Returns Status::Aborted() in the case that the metadata and data files
   // both appear to have no data (e.g. due to a crash just after creating
   // one of them but before writing any records).
   static Status Open(LogBlockManager* block_manager,
-                     PathInstanceMetadataPB* instance,
-                     const std::string& dir,
+                     DataDir* dir,
                      const std::string& id,
                      gscoped_ptr<LogBlockContainer>* container);
 
@@ -265,14 +267,13 @@ class LogBlockContainer {
   // This function is thread unsafe.
   void UpdateBytesWritten(int64_t more_bytes);
 
-  // Run a task on this container's root path thread pool.
+  // Run a task on this container's data directory thread pool.
   //
   // Normally the task is performed asynchronously. However, if submission to
   // the pool fails, it runs synchronously on the current thread.
   void ExecClosure(const Closure& task);
 
   // Simple accessors.
-  std::string dir() const { return DirName(path_); }
   const std::string& ToString() const { return path_; }
   LogBlockManager* block_manager() const { return block_manager_; }
   int64_t total_bytes_written() const { return total_bytes_written_; }
@@ -280,12 +281,12 @@ class LogBlockContainer {
     return total_bytes_written_ >= FLAGS_log_container_max_size;
   }
   const LogBlockManagerMetrics* metrics() const { return metrics_; }
-  const PathInstanceMetadataPB* instance() const { return instance_; }
-  const std::string& root_path() const { return root_path_; }
+  const DataDir* data_dir() const { return data_dir_; }
+  const PathInstanceMetadataPB* instance() const { return data_dir_->instance()->metadata(); }
 
  private:
-  LogBlockContainer(LogBlockManager* block_manager, PathInstanceMetadataPB* instance,
-                    std::string root_path, std::string path,
+  LogBlockContainer(LogBlockManager* block_manager, DataDir* data_dir,
+                    std::string path,
                     gscoped_ptr<WritablePBContainerFile> metadata_writer,
                     gscoped_ptr<RWFile> data_file);
 
@@ -296,9 +297,8 @@ class LogBlockContainer {
   // The owning block manager. Must outlive the container itself.
   LogBlockManager* const block_manager_;
 
-  // The path to the container's root path. This is the root directory under
-  // which the container lives.
-  const std::string root_path_;
+  // The data directory where the container lives.
+  DataDir* data_dir_;
 
   // The path to the container's files. Equivalent to "<dir>/<id>" (see the
   // container constructor).
@@ -324,26 +324,25 @@ class LogBlockContainer {
   // as the block manager.
   const LogBlockManagerMetrics* metrics_;
 
-  const PathInstanceMetadataPB* instance_;
-
   DISALLOW_COPY_AND_ASSIGN(LogBlockContainer);
 };
 
 LogBlockContainer::LogBlockContainer(
-    LogBlockManager* block_manager, PathInstanceMetadataPB* instance,
-    string root_path, string path, gscoped_ptr<WritablePBContainerFile> metadata_writer,
+    LogBlockManager* block_manager,
+    DataDir* data_dir,
+    string path,
+    gscoped_ptr<WritablePBContainerFile> metadata_writer,
     gscoped_ptr<RWFile> data_file)
     : block_manager_(block_manager),
-      root_path_(std::move(root_path)),
+      data_dir_(data_dir),
       path_(std::move(path)),
       metadata_pb_writer_(std::move(metadata_writer)),
       data_file_(std::move(data_file)),
-      metrics_(block_manager->metrics()),
-      instance_(instance) {}
+      metrics_(block_manager->metrics()) {
+}
 
 Status LogBlockContainer::Create(LogBlockManager* block_manager,
-                                 PathInstanceMetadataPB* instance,
-                                 const string& root_path,
+                                 DataDir* dir,
                                  gscoped_ptr<LogBlockContainer>* container) {
   string common_path;
   string metadata_path;
@@ -362,7 +361,8 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
     if (metadata_writer) {
       block_manager->env()->DeleteFile(metadata_path);
     }
-    common_path = JoinPathSegments(root_path, block_manager->oid_generator()->Next());
+    common_path = JoinPathSegments(dir->dir(),
+                                   block_manager->oid_generator()->Next());
     metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix);
     metadata_status = block_manager->env()->NewRWFile(wr_opts,
                                                       metadata_path,
@@ -383,8 +383,7 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
         new WritablePBContainerFile(std::move(metadata_writer)));
     RETURN_NOT_OK(metadata_pb_writer->Init(BlockRecordPB()));
     container->reset(new LogBlockContainer(block_manager,
-                                           instance,
-                                           root_path,
+                                           dir,
                                            common_path,
                                            std::move(metadata_pb_writer),
                                            std::move(data_file)));
@@ -396,11 +395,11 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
 }
 
 Status LogBlockContainer::Open(LogBlockManager* block_manager,
-                               PathInstanceMetadataPB* instance,
-                               const string& root_path, const string& id,
+                               DataDir* dir,
+                               const string& id,
                                gscoped_ptr<LogBlockContainer>* container) {
   Env* env = block_manager->env();
-  string common_path = JoinPathSegments(root_path, id);
+  string common_path = JoinPathSegments(dir->dir(), id);
   string metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix);
   string data_path = StrCat(common_path, LogBlockManager::kContainerDataFileSuffix);
 
@@ -455,13 +454,12 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
 
   // Create the in-memory container and populate it.
   gscoped_ptr<LogBlockContainer> open_container(new LogBlockContainer(block_manager,
-                                                                      instance,
-                                                                      root_path,
+                                                                      dir,
                                                                       common_path,
                                                                       std::move(metadata_pb_writer),
                                                                       std::move(data_file)));
   VLOG(1) << "Opened log block container " << open_container->ToString();
-  container->reset(open_container.release());
+  container->swap(open_container);
   return Status::OK();
 }
 
@@ -668,14 +666,7 @@ void LogBlockContainer::UpdateBytesWritten(int64_t more_bytes) {
 }
 
 void LogBlockContainer::ExecClosure(const Closure& task) {
-  ThreadPool* pool = FindOrDie(block_manager()->thread_pools_by_root_path_,
-                               dir());
-  Status s = pool->SubmitClosure(task);
-  if (!s.ok()) {
-    WARN_NOT_OK(
-        s, "Could not submit task to thread pool, running it synchronously");
-    task.Run();
-  }
+  data_dir_->ExecClosure(task);
 }
 
 ////////////////////////////////////////////////////////////
@@ -1102,15 +1093,13 @@ LogBlockManager::LogBlockManager(Env* env, const BlockManagerOptions& opts)
   : mem_tracker_(MemTracker::CreateTracker(-1,
                                            "log_block_manager",
                                            opts.parent_mem_tracker)),
-    // TODO: C++11 provides a single-arg constructor
+    dd_manager_(env, kBlockManagerType, opts.root_paths),
     blocks_by_block_id_(10,
                         BlockMap::hasher(),
                         BlockMap::key_equal(),
                         BlockAllocator(mem_tracker_)),
     env_(DCHECK_NOTNULL(env)),
     read_only_(opts.read_only),
-    root_paths_(opts.root_paths),
-    root_paths_idx_(0),
     next_block_id_(1) {
 
   // HACK: when running in a test environment, we often instantiate many
@@ -1125,7 +1114,6 @@ LogBlockManager::LogBlockManager(Env* env, const BlockManagerOptions& opts)
     next_block_id_.Store(r.Next64());
   }
 
-  DCHECK_GT(root_paths_.size(), 0);
   if (opts.metric_entity) {
     metrics_.reset(new internal::LogBlockManagerMetrics(opts.metric_entity));
   }
@@ -1143,122 +1131,55 @@ LogBlockManager::~LogBlockManager() {
   // destroyed before their containers.
   blocks_by_block_id_.clear();
 
-  // As LogBlock destructors run, some blocks may be deleted, so we might be
-  // waiting here for a little while.
-  LOG_SLOW_EXECUTION(INFO, 1000,
-                     Substitute("waiting on $0 log block manager thread pools",
-                                thread_pools_by_root_path_.size())) {
-    for (const ThreadPoolMap::value_type& e :
-                  thread_pools_by_root_path_) {
-      ThreadPool* p = e.second;
-      p->Wait();
-      p->Shutdown();
-    }
-  }
+  // Containers may have outstanding tasks running on data directories; shut
+  // them down before destroying the containers.
+  dd_manager_.Shutdown();
 
   STLDeleteElements(&all_containers_);
-  STLDeleteValues(&thread_pools_by_root_path_);
-  STLDeleteValues(&instances_by_root_path_);
 }
 
-static const char kHolePunchErrorMsg[] =
-    "Error during hole punch test. The log block manager requires a "
-    "filesystem with hole punching support such as ext4 or xfs. On el6, "
-    "kernel version 2.6.32-358 or newer is required. To run without hole "
-    "punching (at the cost of some efficiency and scalability), reconfigure "
-    "Kudu with --block_manager=file. Refer to the Kudu documentation for more "
-    "details. Raw error message follows";
-
 Status LogBlockManager::Create() {
   CHECK(!read_only_);
-
-  RETURN_NOT_OK(Init());
-
-  deque<ScopedFileDeleter*> delete_on_failure;
-  ElementDeleter d(&delete_on_failure);
-
-  // The UUIDs and indices will be included in every instance file.
-  vector<string> all_uuids(root_paths_.size());
-  for (string& u : all_uuids) {
-    u = oid_generator()->Next();
-  }
-  int idx = 0;
-
-  // Ensure the data paths exist and create the instance files.
-  unordered_set<string> to_sync;
-  for (const string& root_path : root_paths_) {
-    bool created;
-    RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(env_, root_path, &created),
-                          Substitute("Could not create directory $0", root_path));
-    if (created) {
-      delete_on_failure.push_front(new ScopedFileDeleter(env_, root_path));
-      to_sync.insert(DirName(root_path));
-    }
-
-    if (FLAGS_log_block_manager_test_hole_punching) {
-      RETURN_NOT_OK_PREPEND(CheckHolePunch(root_path), kHolePunchErrorMsg);
-    }
-
-    string instance_filename = JoinPathSegments(
-        root_path, kInstanceMetadataFileName);
-    PathInstanceMetadataFile metadata(env_, kBlockManagerType,
-                                      instance_filename);
-    RETURN_NOT_OK_PREPEND(metadata.Create(all_uuids[idx], all_uuids), instance_filename);
-    delete_on_failure.push_front(new ScopedFileDeleter(env_, instance_filename));
-    idx++;
-  }
-
-  // Ensure newly created directories are synchronized to disk.
-  if (FLAGS_enable_data_block_fsync) {
-    for (const string& dir : to_sync) {
-      RETURN_NOT_OK_PREPEND(env_->SyncDir(dir),
-                            Substitute("Unable to synchronize directory $0", dir));
-    }
-  }
-
-  // Success: don't delete any files.
-  for (ScopedFileDeleter* deleter : delete_on_failure) {
-    deleter->Cancel();
-  }
-  return Status::OK();
+  return dd_manager_.Create(FLAGS_enable_data_block_fsync ?
+      DataDirManager::FLAG_CREATE_TEST_HOLE_PUNCH | DataDirManager::FLAG_CREATE_FSYNC :
+      DataDirManager::FLAG_CREATE_TEST_HOLE_PUNCH);
 }
 
 Status LogBlockManager::Open() {
-  RETURN_NOT_OK(Init());
-
-  vector<Status> statuses(root_paths_.size());
-  unordered_map<string, PathInstanceMetadataFile*> metadata_files;
-  ValueDeleter deleter(&metadata_files);
-  for (const string& root_path : root_paths_) {
-    InsertOrDie(&metadata_files, root_path, nullptr);
+  DataDirManager::LockMode mode;
+  if (!FLAGS_block_manager_lock_dirs) {
+    mode = DataDirManager::LockMode::NONE;
+  } else if (read_only_) {
+    mode = DataDirManager::LockMode::OPTIONAL;
+  } else {
+    mode = DataDirManager::LockMode::MANDATORY;
   }
+  RETURN_NOT_OK(dd_manager_.Open(kuint32max, mode));
 
-  // Submit each open to its own thread pool and wait for them to complete.
+  vector<Status> statuses(dd_manager_.data_dirs().size());
   int i = 0;
-  for (const string& root_path : root_paths_) {
-    ThreadPool* pool = FindOrDie(thread_pools_by_root_path_, root_path);
-    RETURN_NOT_OK_PREPEND(pool->SubmitClosure(
-        Bind(&LogBlockManager::OpenRootPath,
+  for (const auto& dd : dd_manager_.data_dirs()) {
+    // Open the data dir asynchronously.
+    dd->ExecClosure(
+        Bind(&LogBlockManager::OpenDataDir,
              Unretained(this),
-             root_path,
-             &statuses[i],
-             &FindOrDie(metadata_files, root_path))),
-                          Substitute("Could not open root path $0", root_path));
+             dd.get(),
+             &statuses[i]));
     i++;
   }
-  for (const ThreadPoolMap::value_type& e :
-                thread_pools_by_root_path_) {
-    e.second->Wait();
+
+  // Wait for the opens to complete.
+  for (const auto& dd : dd_manager_.data_dirs()) {
+    dd->WaitOnClosures();
   }
 
-  // Ensure that no tasks failed.
-  for (const Status& s : statuses) {
+  // Ensure that no open failed.
+  for (const auto& s : statuses) {
     if (!s.ok()) {
       return s;
     }
   }
 
-  instances_by_root_path_.swap(metadata_files);
   return Status::OK();
 }
 
@@ -1269,10 +1190,10 @@ Status LogBlockManager::CreateBlock(const CreateBlockOptions& opts,
   // Root paths that are below their reserved space threshold. Initialize the
   // paths from the FullDiskCache. This function-local cache is necessary for
   // correctness in case the FullDiskCache expiration time is set to 0.
-  unordered_set<string> full_root_paths(root_paths_.size());
-  for (int i = 0; i < root_paths_.size(); i++) {
-    if (full_disk_cache_.IsRootFull(root_paths_[i])) {
-      InsertOrDie(&full_root_paths, root_paths_[i]);
+  unordered_set<const DataDir*> full_root_paths(dd_manager_.data_dirs().size());
+  for (const auto& dd : dd_manager_.data_dirs()) {
+    if (full_disk_cache_.IsRootFull(dd.get())) {
+      InsertOrDie(&full_root_paths, dd.get());
     }
   }
 
@@ -1287,41 +1208,31 @@ Status LogBlockManager::CreateBlock(const CreateBlockOptions& opts,
     container = GetAvailableContainer(full_root_paths);
     if (!container) {
       // If all root paths are full, we cannot allocate a block.
-      if (full_root_paths.size() == root_paths_.size()) {
+      if (full_root_paths.size() == dd_manager_.data_dirs().size()) {
         return Status::IOError("Unable to allocate block: All data directories are full. "
                                "Please free some disk space or consider changing the "
                                "fs_data_dirs_reserved_bytes configuration parameter",
                                "", ENOSPC);
       }
-      // Round robin through the root paths to select where the next
-      // container should live.
-      // TODO: Consider a more random scheme for block placement.
-      int32 cur_idx;
-      int32 next_idx;
+
+      DataDir* dir;
       do {
-        cur_idx = root_paths_idx_.Load();
-        next_idx = (cur_idx + 1) % root_paths_.size();
-      } while (!root_paths_idx_.CompareAndSet(cur_idx, next_idx) ||
-               ContainsKey(full_root_paths, root_paths_[cur_idx]));
-      string root_path = root_paths_[cur_idx];
-      if (full_disk_cache_.IsRootFull(root_path)) {
-        InsertOrDie(&full_root_paths, root_path);
+        dir = dd_manager_.GetNextDataDir();
+      } while (ContainsKey(full_root_paths, dir));
+      if (full_disk_cache_.IsRootFull(dir)) {
+        InsertOrDie(&full_root_paths, dir);
         continue;
       }
 
-      // Guaranteed by LogBlockManager::Open().
-      PathInstanceMetadataFile* instance = FindOrDie(instances_by_root_path_, root_path);
-
       gscoped_ptr<LogBlockContainer> new_container;
       RETURN_NOT_OK_PREPEND(LogBlockContainer::Create(this,
-                                                      instance->metadata(),
-                                                      root_path,
+                                                      dir,
                                                       &new_container),
-                            "Could not create new log block container at " + root_path);
+                            "Could not create new log block container at " + dir->dir());
       container = new_container.release();
       {
         std::lock_guard<simple_spinlock> l(lock_);
-        dirty_dirs_.insert(root_path);
+        dirty_dirs_.insert(dir->dir());
         AddNewContainerUnlocked(container);
       }
     }
@@ -1341,11 +1252,11 @@ Status LogBlockManager::CreateBlock(const CreateBlockOptions& opts,
       if (PREDICT_FALSE(s.IsIOError() && s.posix_code() == ENOSPC)) {
         LOG(ERROR) << Substitute("Log block manager: Insufficient disk space under path $0: "
                                  "Creation of new data blocks under this path can be retried after "
-                                 "$1 seconds: $2", container->root_path(),
+                                 "$1 seconds: $2", container->data_dir()->dir(),
                                  FLAGS_log_block_manager_full_disk_cache_seconds, s.ToString());
         // Blacklist this root globally and locally.
-        full_disk_cache_.MarkRootFull(container->root_path());
-        InsertOrDie(&full_root_paths, container->root_path());
+        full_disk_cache_.MarkRootFull(container->data_dir());
+        InsertOrDie(&full_root_paths, container->data_dir());
         MakeContainerAvailable(container);
         container = nullptr;
         continue;
@@ -1456,7 +1367,7 @@ void LogBlockManager::AddNewContainerUnlocked(LogBlockContainer* container) {
 }
 
 LogBlockContainer* LogBlockManager::GetAvailableContainer(
-    const unordered_set<string>& full_root_paths) {
+    const unordered_set<const DataDir*>& full_root_paths) {
   LogBlockContainer* container = nullptr;
   int64_t disk_full_containers_delta = 0;
   MonoTime now = MonoTime::Now();
@@ -1478,8 +1389,8 @@ LogBlockContainer* LogBlockManager::GetAvailableContainer(
       MonoTime expires;
       // Note: We must check 'full_disk_cache_' before 'full_root_paths' in
       // order to correctly use the expiry time provided by 'full_disk_cache_'.
-      if (full_disk_cache_.IsRootFull(container->root_path(), &expires) ||
-          ContainsKey(full_root_paths, container->root_path())) {
+      if (full_disk_cache_.IsRootFull(container->data_dir(), &expires) ||
+          ContainsKey(full_root_paths, container->data_dir())) {
         if (!expires.Initialized()) {
           // It's no longer in the cache but we still consider it unusable.
           // It will be moved back into 'available_containers_' on the next call.
@@ -1519,11 +1430,11 @@ Status LogBlockManager::SyncContainer(const LogBlockContainer& container) {
   bool to_sync = false;
   {
     std::lock_guard<simple_spinlock> l(lock_);
-    to_sync = dirty_dirs_.erase(container.dir());
+    to_sync = dirty_dirs_.erase(container.data_dir()->dir());
   }
 
   if (to_sync && FLAGS_enable_data_block_fsync) {
-    s = env_->SyncDir(container.dir());
+    s = env_->SyncDir(container.data_dir()->dir());
 
     // If SyncDir fails, the container directory must be restored to
     // dirty_dirs_. Otherwise a future successful LogWritableBlock::Close()
@@ -1534,7 +1445,7 @@ Status LogBlockManager::SyncContainer(const LogBlockContainer& container) {
     // we'll sync it again needlessly.
     if (!s.ok()) {
       std::lock_guard<simple_spinlock> l(lock_);
-      dirty_dirs_.insert(container.dir());
+      dirty_dirs_.insert(container.data_dir()->dir());
     }
   }
   return s;
@@ -1596,49 +1507,14 @@ scoped_refptr<LogBlock> LogBlockManager::RemoveLogBlock(const BlockId& block_id)
   return result;
 }
 
-void LogBlockManager::OpenRootPath(const string& root_path,
-                                   Status* result_status,
-                                   PathInstanceMetadataFile** result_metadata) {
-  if (!env_->FileExists(root_path)) {
-    *result_status = Status::NotFound(Substitute(
-        "LogBlockManager at $0 not found", root_path));
-    return;
-  }
-
-  // Open and lock the metadata instance file.
-  string instance_filename = JoinPathSegments(
-      root_path, kInstanceMetadataFileName);
-  gscoped_ptr<PathInstanceMetadataFile> metadata(
-      new PathInstanceMetadataFile(env_, kBlockManagerType,
-                                   instance_filename));
-  Status s = metadata->LoadFromDisk();
-  if (!s.ok()) {
-    *result_status = s.CloneAndPrepend(Substitute(
-        "Could not open $0", instance_filename));
-    return;
-  }
-  if (FLAGS_block_manager_lock_dirs) {
-    s = metadata->Lock();
-    if (!s.ok()) {
-      Status new_status = s.CloneAndPrepend(Substitute(
-          "Could not lock $0", instance_filename));
-      if (read_only_) {
-        // Not fatal in read-only mode.
-        LOG(WARNING) << new_status.ToString();
-        LOG(WARNING) << "Proceeding without lock";
-      } else {
-        *result_status = new_status;
-        return;
-      }
-    }
-  }
-
+void LogBlockManager::OpenDataDir(DataDir* dir,
+                                  Status* result_status) {
   // Find all containers and open them.
   vector<string> children;
-  s = env_->GetChildren(root_path, &children);
+  Status s = env_->GetChildren(dir->dir(), &children);
   if (!s.ok()) {
     *result_status = s.CloneAndPrepend(Substitute(
-        "Could not list children of $0", root_path));
+        "Could not list children of $0", dir->dir()));
     return;
   }
   for (const string& child : children) {
@@ -1647,8 +1523,7 @@ void LogBlockManager::OpenRootPath(const string& root_path,
       continue;
     }
     gscoped_ptr<LogBlockContainer> container;
-    s = LogBlockContainer::Open(this, metadata->metadata(),
-                                root_path, id, &container);
+    s = LogBlockContainer::Open(this, dir, id, &container);
     if (s.IsAborted()) {
       // Skip the container. Open() already handled logging for us.
       continue;
@@ -1713,7 +1588,6 @@ void LogBlockManager::OpenRootPath(const string& root_path,
   }
 
   *result_status = Status::OK();
-  *result_metadata = metadata.release();
 }
 
 void LogBlockManager::ProcessBlockRecord(const BlockRecordPB& record,
@@ -1760,67 +1634,11 @@ void LogBlockManager::ProcessBlockRecord(const BlockRecordPB& record,
   }
 }
 
-Status LogBlockManager::CheckHolePunch(const string& path) {
-  // Arbitrary constants.
-  static uint64_t kFileSize = 4096 * 4;
-  static uint64_t kHoleOffset = 4096;
-  static uint64_t kHoleSize = 8192;
-  static uint64_t kPunchedFileSize = kFileSize - kHoleSize;
-
-  // Open the test file.
-  string filename = JoinPathSegments(path, "hole_punch_test_file");
-  gscoped_ptr<RWFile> file;
-  RWFileOptions opts;
-  RETURN_NOT_OK(env_->NewRWFile(opts, filename, &file));
-
-  // The file has been created; delete it on exit no matter what happens.
-  ScopedFileDeleter file_deleter(env_, filename);
-
-  // Preallocate it, making sure the file's size is what we'd expect.
-  uint64_t sz;
-  RETURN_NOT_OK(file->PreAllocate(0, kFileSize));
-  RETURN_NOT_OK(env_->GetFileSizeOnDisk(filename, &sz));
-  if (sz != kFileSize) {
-    return Status::IOError(Substitute(
-        "Unexpected pre-punch file size for $0: expected $1 but got $2",
-        filename, kFileSize, sz));
-  }
-
-  // Punch the hole, testing the file's size again.
-  RETURN_NOT_OK(file->PunchHole(kHoleOffset, kHoleSize));
-  RETURN_NOT_OK(env_->GetFileSizeOnDisk(filename, &sz));
-  if (sz != kPunchedFileSize) {
-    return Status::IOError(Substitute(
-        "Unexpected post-punch file size for $0: expected $1 but got $2",
-        filename, kPunchedFileSize, sz));
-  }
-
-  return Status::OK();
-}
-
-Status LogBlockManager::Init() {
-  // Initialize thread pools.
-  ThreadPoolMap pools;
-  ValueDeleter d(&pools);
-  int i = 0;
-  for (const string& root : root_paths_) {
-    gscoped_ptr<ThreadPool> p;
-    RETURN_NOT_OK_PREPEND(ThreadPoolBuilder(Substitute("lbm root $0", i++))
-                          .set_max_threads(1)
-                          .Build(&p),
-                          "Could not build thread pool");
-    InsertOrDie(&pools, root, p.release());
-  }
-  thread_pools_by_root_path_.swap(pools);
-
-  return Status::OK();
-}
-
 std::string LogBlockManager::ContainerPathForTests(internal::LogBlockContainer* container) {
   return container->ToString();
 }
 
-bool FullDiskCache::IsRootFull(const std::string& root_path, MonoTime* expires_out) const {
+bool FullDiskCache::IsRootFull(const DataDir* root_path, MonoTime* expires_out) const {
   const MonoTime* expires;
   {
     shared_lock<rw_spinlock> l(lock_.get_lock());
@@ -1834,7 +1652,7 @@ bool FullDiskCache::IsRootFull(const std::string& root_path, MonoTime* expires_o
   return true; // Root is still full according to the cache.
 }
 
-void FullDiskCache::MarkRootFull(const string& root_path) {
+void FullDiskCache::MarkRootFull(const DataDir* root_path) {
   MonoTime expires = MonoTime::Now() +
       MonoDelta::FromSeconds(FLAGS_log_block_manager_full_disk_cache_seconds);
   std::lock_guard<percpu_rwlock> l(lock_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/3a3f714e/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index aff8faa..618af8a 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -30,6 +30,7 @@
 
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
+#include "kudu/fs/data_dirs.h"
 #include "kudu/fs/fs.pb.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
@@ -44,7 +45,6 @@ class MetricEntity;
 class ThreadPool;
 
 namespace fs {
-class PathInstanceMetadataFile;
 
 namespace internal {
 class LogBlock;
@@ -155,14 +155,14 @@ class FullDiskCache {
  public:
   // Returns true if the given 'root_path' has been marked full and the
   // associated cache entry has not expired.
-  bool IsRootFull(const std::string& root_path, MonoTime* expires_out = nullptr) const;
+  bool IsRootFull(const DataDir* root_path, MonoTime* expires_out = nullptr) const;
 
   // Marks the given 'root_path' as "full".
-  void MarkRootFull(const std::string& root_path);
+  void MarkRootFull(const DataDir* root_path);
 
  private:
   mutable percpu_rwlock lock_;
-  std::unordered_map<std::string, MonoTime> cache_;
+  std::unordered_map<const DataDir*, MonoTime> cache_;
 };
 
 // The log-backed block manager.
@@ -238,7 +238,7 @@ class LogBlockManager : public BlockManager {
   // 'full_root_paths' is a blacklist containing root paths that are full.
   // Containers with root paths in this list will not be returned.
   internal::LogBlockContainer* GetAvailableContainer(
-      const std::unordered_set<std::string>& full_root_paths);
+      const std::unordered_set<const DataDir*>& full_root_paths);
 
   // Indicate that this container is no longer in use and can be handed out
   // to other writers.
@@ -280,16 +280,10 @@ class LogBlockManager : public BlockManager {
                           internal::LogBlockContainer* container,
                           UntrackedBlockMap* block_map);
 
-  // Open a particular root path belonging to the block manager.
+  // Open a particular data directory belonging to the block manager.
   //
-  // Success or failure is set in 'result_status'. On success, also sets
-  // 'result_metadata' with an allocated metadata file.
-  void OpenRootPath(const std::string& root_path,
-                    Status* result_status,
-                    PathInstanceMetadataFile** result_metadata);
-
-  // Test for hole punching support at 'path'.
-  Status CheckHolePunch(const std::string& path);
+  // Success or failure is set in 'result_status'.
+  void OpenDataDir(DataDir* dir, Status* result_status);
 
   // Perform basic initialization.
   Status Init();
@@ -310,6 +304,9 @@ class LogBlockManager : public BlockManager {
   // Protects the block map, container structures, and 'dirty_dirs'.
   mutable simple_spinlock lock_;
 
+  // Manages and owns all of the block manager's data directories.
+  DataDirManager dd_manager_;
+
   // Maps block IDs to blocks that are now readable, either because they
   // already existed on disk when the block manager was opened, or because
   // they're WritableBlocks that were closed.
@@ -350,21 +347,6 @@ class LogBlockManager : public BlockManager {
   // If true, only read operations are allowed.
   const bool read_only_;
 
-  // Filesystem paths where all block directories are found.
-  const std::vector<std::string> root_paths_;
-
-  // Index of 'root_paths_' for the next created block.
-  AtomicInt<int32> root_paths_idx_;
-
-  // Maps root paths to instance metadata files found in each root path.
-  typedef std::unordered_map<std::string, PathInstanceMetadataFile*> InstanceMap;
-  InstanceMap instances_by_root_path_;
-
-  // Maps root paths to thread pools. Each pool runs at most one thread, and
-  // so serves as a "work queue" for that particular disk.
-  typedef std::unordered_map<std::string, ThreadPool*> ThreadPoolMap;
-  ThreadPoolMap thread_pools_by_root_path_;
-
   // A cache of which root paths are full as of the last time they were
   // checked. This cache expires its entries after some period of time.
   FullDiskCache full_disk_cache_;


Mime
View raw message