kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] branch master updated: fs: move file cache to server
Date Mon, 13 Jan 2020 23:02:06 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 69172e8  fs: move file cache to server
69172e8 is described below

commit 69172e80ce2c1c1eaed19242d08037372f38ed19
Author: Adar Dembo <adar@cloudera.com>
AuthorDate: Fri Jan 10 16:49:36 2020 -0800

    fs: move file cache to server
    
    To use the file cache in the WAL, it must be hoisted out from under the
    block manager into a more central location. I opted for ServerBase rather
    than FsManager given that some tests use multiple FsManagers.
    
    In an effort to avoid excessive plumbing in FsManager-using tests, I
    resuscitated the non-file-cache block manager code paths. It's not too much
    complexity and the PREDICT_TRUE macros should mitigate the cost of the
    branches somewhat. Both cached and non-cached code paths have test coverage:
    1. Block manager tests explicitly instantiate a file cache.
    2. All other non-server tests do not.
    3. Server tests use the server's file cache.
    
    Change-Id: Ice92c3622c954b06b773c58d51f08082010d7de3
    Reviewed-on: http://gerrit.cloudera.org:8080/15011
    Reviewed-by: Andrew Wong <awong@cloudera.com>
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
---
 src/kudu/fs/block_manager-stress-test.cc |  55 +++++++--------
 src/kudu/fs/block_manager-test.cc        |  19 +++---
 src/kudu/fs/block_manager.cc             |  63 -----------------
 src/kudu/fs/block_manager.h              |   6 --
 src/kudu/fs/file_block_manager.cc        |  20 ++++--
 src/kudu/fs/file_block_manager.h         |   5 +-
 src/kudu/fs/fs_manager.cc                |  10 +--
 src/kudu/fs/fs_manager.h                 |   7 ++
 src/kudu/fs/log_block_manager-test.cc    |  48 ++++++++-----
 src/kudu/fs/log_block_manager.cc         | 114 ++++++++++++++++++++-----------
 src/kudu/fs/log_block_manager.h          |   5 +-
 src/kudu/server/server_base.cc           |  52 +++++++++++++-
 src/kudu/server/server_base.h            |   4 ++
 13 files changed, 230 insertions(+), 178 deletions(-)

diff --git a/src/kudu/fs/block_manager-stress-test.cc b/src/kudu/fs/block_manager-stress-test.cc
index 828c10a..9ea3798 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -49,7 +49,9 @@
 #include "kudu/util/env.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/file_cache-test-util.h"
+#include "kudu/util/file_cache.h"
 #include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/random.h"
 #include "kudu/util/slice.h"
@@ -61,7 +63,6 @@
 DECLARE_bool(cache_force_single_shard);
 DECLARE_double(log_container_excess_space_before_cleanup_fraction);
 DECLARE_double(log_container_live_metadata_before_compact_ratio);
-DECLARE_int64(block_manager_max_open_files);
 DECLARE_uint64(log_container_max_size);
 DECLARE_uint64(log_container_preallocate_bytes);
 
@@ -84,6 +85,7 @@ DEFINE_int32(num_inconsistencies, 16,
 DEFINE_string(block_manager_paths, "", "Comma-separated list of paths to "
               "use for block storage. If empty, will use the default unit "
               "test path");
+DEFINE_int32(max_open_files, 32, "Maximum size of the test's file cache");
 
 using std::string;
 using std::shared_ptr;
@@ -117,23 +119,21 @@ template <typename T>
 class BlockManagerStressTest : public KuduTest {
  public:
   BlockManagerStressTest() :
-    rand_seed_(SeedRandom()),
-    stop_latch_(1),
-    test_error_manager_(new FsErrorManager()),
-    test_tablet_name_("test_tablet"),
-    total_blocks_written_(0),
-    total_bytes_written_(0),
-    total_blocks_read_(0),
-    total_bytes_read_(0),
-    total_blocks_deleted_(0) {
+      rand_seed_(SeedRandom()),
+      stop_latch_(1),
+      file_cache_("test_cache", env_, FLAGS_max_open_files,
+                  scoped_refptr<MetricEntity>()),
+      test_tablet_name_("test_tablet"),
+      total_blocks_written_(0),
+      total_bytes_written_(0),
+      total_blocks_read_(0),
+      total_bytes_read_(0),
+      total_blocks_deleted_(0) {
 
     // Increase the number of containers created.
     FLAGS_log_container_max_size = 1 * 1024 * 1024;
     FLAGS_log_container_preallocate_bytes = 1 * 1024 * 1024;
 
-    // Ensure the file cache is under stress too.
-    FLAGS_block_manager_max_open_files = 32;
-
     // Maximize the amount of cleanup triggered by the extra space heuristic.
     FLAGS_log_container_excess_space_before_cleanup_fraction = 0.0;
 
@@ -146,16 +146,13 @@ class BlockManagerStressTest : public KuduTest {
 
     // Defer block manager creation until after the above flags are set.
     bm_.reset(CreateBlockManager());
-    bm_->Open(nullptr);
-    dd_manager_->CreateDataDirGroup(test_tablet_name_);
+    CHECK_OK(file_cache_.Init());
+    CHECK_OK(bm_->Open(nullptr));
+    CHECK_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
     CHECK_OK(dd_manager_->GetDataDirGroupPB(test_tablet_name_, &test_group_pb_));
   }
 
   virtual void TearDown() override {
-    // Ensure the proper destructor order. The directory manager must outlive
-    // the block manager.
-    bm_.reset();
-
     // If non-standard paths were provided we need to delete them in between
     // test runs.
     if (!FLAGS_block_manager_paths.empty()) {
@@ -164,7 +161,6 @@ class BlockManagerStressTest : public KuduTest {
                     Substitute("Couldn't recursively delete $0", dd));
       }
     }
-    dd_manager_.reset();
   }
 
   BlockManager* CreateBlockManager() {
@@ -188,8 +184,8 @@ class BlockManagerStressTest : public KuduTest {
       CHECK_OK(DataDirManager::OpenExistingForTests(env_, data_dirs,
           DataDirManagerOptions(), &dd_manager_));
     }
-    return new T(env_, dd_manager_.get(), test_error_manager_.get(),
-                 BlockManagerOptions());
+    return new T(env_, dd_manager_.get(), &error_manager_,
+                 &file_cache_, BlockManagerOptions());
   }
 
   void RunTest(double secs) {
@@ -266,14 +262,13 @@ class BlockManagerStressTest : public KuduTest {
   // Protects written_blocks_.
   simple_spinlock lock_;
 
-  // The block manager.
-  unique_ptr<BlockManager> bm_;
-
-  // The directory manager.
   unique_ptr<DataDirManager> dd_manager_;
 
-  // The error manager.
-  unique_ptr<FsErrorManager> test_error_manager_;
+  FsErrorManager error_manager_;
+
+  FileCache file_cache_;
+
+  unique_ptr<BlockManager> bm_;
 
   // Test group of disk to spread data across.
   DataDirGroupPB test_group_pb_;
@@ -484,7 +479,7 @@ void BlockManagerStressTest<T>::DeleterThread() {
 
 template <>
 int BlockManagerStressTest<FileBlockManager>::GetMaxFdCount() const {
-  return FLAGS_block_manager_max_open_files +
+  return FLAGS_max_open_files +
       // Each open block exists outside the file cache.
       (FLAGS_num_writer_threads * FLAGS_block_group_size * FLAGS_block_group_number) +
       // Each reader thread can open a file outside the cache if its lookup
@@ -495,7 +490,7 @@ int BlockManagerStressTest<FileBlockManager>::GetMaxFdCount() const
{
 
 template <>
 int BlockManagerStressTest<LogBlockManager>::GetMaxFdCount() const {
-  return FLAGS_block_manager_max_open_files +
+  return FLAGS_max_open_files +
       // If all containers are full, each open block could theoretically
       // result in a new container, which is two files briefly outside the
       // cache (before they are inserted and evict other cached files).
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index 1acb0d2..404c591 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -50,6 +50,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/array_view.h" // IWYU pragma: keep
 #include "kudu/util/env.h"
+#include "kudu/util/file_cache.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
@@ -119,11 +120,12 @@ template <typename T>
 class BlockManagerTest : public KuduTest {
  public:
   BlockManagerTest() :
-    test_tablet_name_("test_tablet"),
-    test_block_opts_(CreateBlockOptions({ test_tablet_name_ })),
-    test_error_manager_(new FsErrorManager()),
-    bm_(CreateBlockManager(scoped_refptr<MetricEntity>(),
-                           shared_ptr<MemTracker>())) {
+      test_tablet_name_("test_tablet"),
+      test_block_opts_(CreateBlockOptions({ test_tablet_name_ })),
+      file_cache_("test_cache", env_, 1, scoped_refptr<MetricEntity>()),
+      bm_(CreateBlockManager(scoped_refptr<MetricEntity>(),
+                             shared_ptr<MemTracker>())) {
+    CHECK_OK(file_cache_.Init());
   }
 
   virtual void SetUp() override {
@@ -167,8 +169,8 @@ class BlockManagerTest : public KuduTest {
     BlockManagerOptions opts;
     opts.metric_entity = metric_entity;
     opts.parent_mem_tracker = parent_mem_tracker;
-    return new T(env_, this->dd_manager_.get(), test_error_manager_.get(),
-                 std::move(opts));
+    return new T(env_, this->dd_manager_.get(), &error_manager_,
+                 &file_cache_, std::move(opts));
   }
 
   Status ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity,
@@ -231,8 +233,9 @@ class BlockManagerTest : public KuduTest {
   DataDirGroupPB test_group_pb_;
   string test_tablet_name_;
   CreateBlockOptions test_block_opts_;
-  unique_ptr<FsErrorManager> test_error_manager_;
+  FsErrorManager error_manager_;
   unique_ptr<DataDirManager> dd_manager_;
+  FileCache file_cache_;
   unique_ptr<T> bm_;
 };
 
diff --git a/src/kudu/fs/block_manager.cc b/src/kudu/fs/block_manager.cc
index 7156c91..4efeb0a 100644
--- a/src/kudu/fs/block_manager.cc
+++ b/src/kudu/fs/block_manager.cc
@@ -17,19 +17,9 @@
 
 #include "kudu/fs/block_manager.h"
 
-#include <algorithm>
-#include <mutex>
-#include <ostream>
-
 #include <gflags/gflags.h>
-#include <glog/logging.h>
 
-#include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/macros.h"
-#include "kudu/gutil/strings/numbers.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/env.h"
-#include "kudu/util/faststring.h"
 #include "kudu/util/flag_tags.h"
 
 // The default value is optimized for throughput in the case that
@@ -53,64 +43,11 @@ DEFINE_string(block_manager_preflush_control, "finalize",
               "never be pre-flushed but still be flushed when closed.");
 TAG_FLAG(block_manager_preflush_control, experimental);
 
-DEFINE_int64(block_manager_max_open_files, -1,
-             "Maximum number of open file descriptors to be used for data "
-             "blocks. If -1, Kudu will automatically calculate this value. "
-             "This is a soft limit. It is an error to use a value of 0.");
-TAG_FLAG(block_manager_max_open_files, advanced);
-TAG_FLAG(block_manager_max_open_files, evolving);
-
-static bool ValidateMaxOpenFiles(const char* /*flagname*/, int64_t value) {
-  if (value == 0 || value < -1) {
-    LOG(ERROR) << "Invalid max open files: cannot be " << value;
-    return false;
-  }
-  return true;
-}
-DEFINE_validator(block_manager_max_open_files, &ValidateMaxOpenFiles);
-
-using strings::Substitute;
-
 namespace kudu {
 namespace fs {
 
 BlockManagerOptions::BlockManagerOptions()
   : read_only(false) {}
 
-int64_t GetFileCacheCapacityForBlockManager(Env* env) {
-  // Maximize this process' open file limit first, if possible.
-  static std::once_flag once;
-  std::call_once(once, [&]() {
-    env->IncreaseResourceLimit(Env::ResourceLimitType::OPEN_FILES_PER_PROCESS);
-  });
-
-  uint64_t rlimit =
-      env->GetResourceLimit(Env::ResourceLimitType::OPEN_FILES_PER_PROCESS);
-  // See block_manager_max_open_files.
-  if (FLAGS_block_manager_max_open_files == -1) {
-    // Use file-max as a possible upper bound.
-    faststring buf;
-    uint64_t buf_val;
-    if (ReadFileToString(env, "/proc/sys/fs/file-max", &buf).ok() &&
-        safe_strtou64(buf.ToString(), &buf_val)) {
-      rlimit = std::min(rlimit, buf_val);
-    }
-
-    // Callers of this function expect a signed 64-bit integer, and rlimit
-    // is an uint64_t type, so we need to avoid overflow.
-    // The percentage we currently use is 40% by default, and although in fact
-    // 40% of any value of the `uint64_t` type must be less than `kint64max`,
-    // but the percentage may be adjusted in the future, such as to 60%, so to
-    // prevent accidental overflow, we cap rlimit here.
-    return std::min((rlimit / 5) * 2, static_cast<uint64_t>(kint64max));
-  }
-  LOG_IF(FATAL, FLAGS_block_manager_max_open_files > rlimit) <<
-      Substitute(
-          "Configured open file limit (block_manager_max_open_files) $0 "
-          "exceeds process open file limit (ulimit) $1",
-          FLAGS_block_manager_max_open_files, rlimit);
-  return FLAGS_block_manager_max_open_files;
-}
-
 } // namespace fs
 } // namespace kudu
diff --git a/src/kudu/fs/block_manager.h b/src/kudu/fs/block_manager.h
index 43dbeac..87239c3 100644
--- a/src/kudu/fs/block_manager.h
+++ b/src/kudu/fs/block_manager.h
@@ -30,10 +30,8 @@
 namespace kudu {
 
 class BlockId;
-class Env;
 class MemTracker;
 class Slice;
-
 template <typename T>
 class ArrayView;
 
@@ -318,9 +316,5 @@ class BlockDeletionTransaction {
   virtual Status CommitDeletedBlocks(std::vector<BlockId>* deleted) = 0;
 };
 
-// Compute an upper bound for a file cache embedded within a block manager
-// using resource limits obtained from the system.
-int64_t GetFileCacheCapacityForBlockManager(Env* env);
-
 } // namespace fs
 } // namespace kudu
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index cef55c1..0c5796c 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -695,13 +695,13 @@ bool FileBlockManager::FindBlockPath(const BlockId& block_id,
 FileBlockManager::FileBlockManager(Env* env,
                                    DataDirManager* dd_manager,
                                    FsErrorManager* error_manager,
+                                   FileCache* file_cache,
                                    BlockManagerOptions opts)
   : env_(DCHECK_NOTNULL(env)),
     dd_manager_(dd_manager),
     error_manager_(DCHECK_NOTNULL(error_manager)),
     opts_(std::move(opts)),
-    file_cache_("fbm", env_, GetFileCacheCapacityForBlockManager(env_),
-                opts_.metric_entity),
+    file_cache_(file_cache),
     rand_(GetRandomSeed32()),
     next_block_id_(rand_.Next64()),
     mem_tracker_(MemTracker::CreateTracker(-1,
@@ -716,8 +716,6 @@ FileBlockManager::~FileBlockManager() {
 }
 
 Status FileBlockManager::Open(FsReport* report) {
-  RETURN_NOT_OK(file_cache_.Init());
-
   // Prepare the filesystem report and either return or log it.
   FsReport local_report;
   set<int> failed_dirs = dd_manager_->GetFailedDirs();
@@ -830,7 +828,13 @@ Status FileBlockManager::OpenBlock(const BlockId& block_id,
   VLOG(1) << "Opening block with id " << block_id.ToString() << " at "
<< path;
 
   shared_ptr<RandomAccessFile> reader;
-  RETURN_NOT_OK_FBM_DISK_FAILURE(file_cache_.OpenExistingFile(path, &reader));
+  if (PREDICT_TRUE(file_cache_)) {
+    RETURN_NOT_OK_FBM_DISK_FAILURE(file_cache_->OpenExistingFile(path, &reader));
+  } else {
+    unique_ptr<RandomAccessFile> r;
+    RETURN_NOT_OK_FBM_DISK_FAILURE(env_->NewRandomAccessFile(path, &r));
+    reader.reset(r.release());
+  }
   block->reset(new internal::FileReadableBlock(this, block_id, reader));
   return Status::OK();
 }
@@ -854,7 +858,11 @@ Status FileBlockManager::DeleteBlock(const BlockId& block_id) {
     return Status::NotFound(
         Substitute("Block $0 not found", block_id.ToString()));
   }
-  RETURN_NOT_OK_FBM_DISK_FAILURE(file_cache_.DeleteFile(path));
+  if (PREDICT_TRUE(file_cache_)) {
+    RETURN_NOT_OK_FBM_DISK_FAILURE(file_cache_->DeleteFile(path));
+  } else {
+    RETURN_NOT_OK_FBM_DISK_FAILURE(env_->DeleteFile(path));
+  }
 
   // We don't bother fsyncing the parent directory as there's nothing to be
   // gained by ensuring that the deletion is made durable. Even if we did
diff --git a/src/kudu/fs/file_block_manager.h b/src/kudu/fs/file_block_manager.h
index 032c672..38b0653 100644
--- a/src/kudu/fs/file_block_manager.h
+++ b/src/kudu/fs/file_block_manager.h
@@ -27,7 +27,6 @@
 #include "kudu/fs/block_manager.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/util/atomic.h"
-#include "kudu/util/file_cache.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/random.h"
 #include "kudu/util/status.h"
@@ -36,6 +35,7 @@ namespace kudu {
 
 class BlockId;
 class Env;
+class FileCache;
 class MemTracker;
 
 namespace fs {
@@ -74,6 +74,7 @@ class FileBlockManager : public BlockManager {
   FileBlockManager(Env* env,
                    DataDirManager* dd_manager,
                    FsErrorManager* error_manager,
+                   FileCache* file_cache,
                    BlockManagerOptions opts);
 
   virtual ~FileBlockManager();
@@ -134,7 +135,7 @@ class FileBlockManager : public BlockManager {
   const BlockManagerOptions opts_;
 
   // Manages files opened for reading.
-  FileCache file_cache_;
+  FileCache* file_cache_;
 
   // For generating block IDs.
   ThreadSafeRandom rand_;
diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc
index 62d7ab1..a60b922 100644
--- a/src/kudu/fs/fs_manager.cc
+++ b/src/kudu/fs/fs_manager.cc
@@ -140,7 +140,8 @@ FsManagerOpts::FsManagerOpts()
     metadata_root(FLAGS_fs_metadata_dir),
     block_manager_type(FLAGS_block_manager),
     read_only(false),
-    update_instances(UpdateInstanceBehavior::UPDATE_AND_IGNORE_FAILURES) {
+    update_instances(UpdateInstanceBehavior::UPDATE_AND_IGNORE_FAILURES),
+    file_cache(nullptr) {
   data_roots = strings::Split(FLAGS_fs_data_dirs, ",", strings::SkipEmpty());
 }
 
@@ -149,7 +150,8 @@ FsManagerOpts::FsManagerOpts(const string& root)
     data_roots({ root }),
     block_manager_type(FLAGS_block_manager),
     read_only(false),
-    update_instances(UpdateInstanceBehavior::UPDATE_AND_IGNORE_FAILURES) {}
+    update_instances(UpdateInstanceBehavior::UPDATE_AND_IGNORE_FAILURES),
+    file_cache(nullptr) {}
 
 FsManager::FsManager(Env* env, FsManagerOpts opts)
   : env_(DCHECK_NOTNULL(env)),
@@ -299,10 +301,10 @@ void FsManager::InitBlockManager() {
   bm_opts.read_only = opts_.read_only;
   if (opts_.block_manager_type == "file") {
     block_manager_.reset(new FileBlockManager(
-        env_, dd_manager_.get(), error_manager_.get(), std::move(bm_opts)));
+        env_, dd_manager_.get(), error_manager_.get(), opts_.file_cache, std::move(bm_opts)));
   } else {
     block_manager_.reset(new LogBlockManager(
-        env_, dd_manager_.get(), error_manager_.get(), std::move(bm_opts)));
+        env_, dd_manager_.get(), error_manager_.get(), opts_.file_cache, std::move(bm_opts)));
   }
 }
 
diff --git a/src/kudu/fs/fs_manager.h b/src/kudu/fs/fs_manager.h
index 796c145..c719be7 100644
--- a/src/kudu/fs/fs_manager.h
+++ b/src/kudu/fs/fs_manager.h
@@ -44,6 +44,7 @@ DECLARE_bool(enable_data_block_fsync);
 namespace kudu {
 
 class BlockId;
+class FileCache;
 class InstanceMetadataPB;
 class MemTracker;
 
@@ -123,6 +124,12 @@ struct FsManagerOpts {
   //
   // Defaults to UPDATE_AND_IGNORE_FAILURES.
   fs::UpdateInstanceBehavior update_instances;
+
+  // The file cache to be used for long-lived opened files (e.g. in the block
+  // manager). If null, opened files will not be cached.
+  //
+  // Defaults to null.
+  FileCache* file_cache;
 };
 
 // FsManager provides helpers to read data and metadata files,
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index 7166516..e1e2676 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -55,6 +55,7 @@
 #include "kudu/gutil/strings/util.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/env.h"
+#include "kudu/util/file_cache.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/path_util.h"
 #include "kudu/util/pb_util.h"
@@ -81,7 +82,6 @@ DECLARE_double(env_inject_eio);
 DECLARE_double(log_container_excess_space_before_cleanup_fraction);
 DECLARE_double(log_container_live_metadata_before_compact_ratio);
 DECLARE_int32(fs_target_data_dirs_per_tablet);
-DECLARE_int64(block_manager_max_open_files);
 DECLARE_int64(log_container_max_blocks);
 DECLARE_string(block_manager_preflush_control);
 DECLARE_string(env_inject_eio_globs);
@@ -113,10 +113,14 @@ class LogBlockContainer;
 class LogBlockManagerTest : public KuduTest {
  public:
   LogBlockManagerTest() :
-    test_tablet_name_("test_tablet"),
-    test_block_opts_({ test_tablet_name_ }),
-    test_error_manager_(new FsErrorManager()),
-    bm_(CreateBlockManager(scoped_refptr<MetricEntity>())) {
+      test_tablet_name_("test_tablet"),
+      test_block_opts_({ test_tablet_name_ }),
+      // Use a small file cache (smaller than the number of containers).
+      //
+      // Not strictly necessary except for TestDeleteFromContainerAfterMetadataCompaction.
+      file_cache_("test_cache", env_, 50, scoped_refptr<MetricEntity>()),
+      bm_(CreateBlockManager(scoped_refptr<MetricEntity>())) {
+    CHECK_OK(file_cache_.Init());
   }
 
   void SetUp() override {
@@ -137,10 +141,11 @@ class LogBlockManagerTest : public KuduTest {
       CHECK_OK(DataDirManager::CreateNewForTests(env_, test_data_dirs,
           DataDirManagerOptions(), &dd_manager_));
     }
+
     BlockManagerOptions opts;
     opts.metric_entity = metric_entity;
-    return new LogBlockManager(env_, dd_manager_.get(), test_error_manager_.get(),
-                               std::move(opts));
+    return new LogBlockManager(env_, dd_manager_.get(), &error_manager_,
+                               &file_cache_, std::move(opts));
   }
 
   Status ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity =
nullptr,
@@ -224,7 +229,8 @@ class LogBlockManagerTest : public KuduTest {
   CreateBlockOptions test_block_opts_;
 
   unique_ptr<DataDirManager> dd_manager_;
-  unique_ptr<FsErrorManager> test_error_manager_;
+  FsErrorManager error_manager_;
+  FileCache file_cache_;
   unique_ptr<LogBlockManager> bm_;
 
  private:
@@ -1506,10 +1512,14 @@ TEST_F(LogBlockManagerTest, TestDeleteDeadContainersAtStartup) {
   FLAGS_log_container_max_size = 0;
 
   // Create one container.
-  unique_ptr<WritableBlock> block;
-  ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block));
-  ASSERT_OK(block->Append("a"));
-  ASSERT_OK(block->Close());
+  BlockId block_id;
+  {
+    unique_ptr<WritableBlock> block;
+    ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block));
+    ASSERT_OK(block->Append("a"));
+    ASSERT_OK(block->Close());
+    block_id = block->id();
+  }
   string data_file_name;
   string metadata_file_name;
   NO_FATALS(GetOnlyContainerDataFile(&data_file_name));
@@ -1525,7 +1535,7 @@ TEST_F(LogBlockManagerTest, TestDeleteDeadContainersAtStartup) {
   {
     shared_ptr<BlockDeletionTransaction> deletion_transaction =
         this->bm_->NewDeletionTransaction();
-    deletion_transaction->AddDeletedBlock(block->id());
+    deletion_transaction->AddDeletedBlock(block_id);
     vector<BlockId> deleted;
     ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
   }
@@ -1602,8 +1612,6 @@ TEST_F(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup)
{
 TEST_F(LogBlockManagerTest, TestDeleteFromContainerAfterMetadataCompaction) {
   // Compact aggressively.
   FLAGS_log_container_live_metadata_before_compact_ratio = 0.99;
-  // Use a small file cache (smaller than the number of containers).
-  FLAGS_block_manager_max_open_files = 50;
   // Use a single shard so that we have an accurate max cache capacity
   // regardless of the number of cores on the machine.
   FLAGS_cache_force_single_shard = true;
@@ -1681,7 +1689,7 @@ TEST_F(LogBlockManagerTest, TestOpenWithFailedDirectories) {
       DataDirManagerOptions(), &dd_manager_));
 
   // Wire in a callback to fail data directories.
-  test_error_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
+  error_manager_.SetErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
       Bind(&DataDirManager::MarkDirFailedByUuid, Unretained(dd_manager_.get())));
   bm_.reset(CreateBlockManager(nullptr));
 
@@ -1990,6 +1998,10 @@ TEST_F(LogBlockManagerTest, TestHalfPresentContainer) {
   };
 
   const auto CreateMetadataFile = [&] () {
+    // We're often recreating an existing file, so we must invalidate any
+    // entry in the file cache first.
+    file_cache_.Invalidate(metadata_file_name);
+
     unique_ptr<WritableFile> metadata_file_writer;
     ASSERT_OK(env_->NewWritableFile(metadata_file_name, &metadata_file_writer));
     ASSERT_OK(metadata_file_writer->Append(Slice("a")));
@@ -1997,6 +2009,10 @@ TEST_F(LogBlockManagerTest, TestHalfPresentContainer) {
   };
 
   const auto CreateDataFile = [&] () {
+    // We're often recreating an existing file, so we must invalidate any
+    // entry in the file cache first.
+    file_cache_.Invalidate(data_file_name);
+
     unique_ptr<WritableFile> data_file_writer;
     ASSERT_OK(env_->NewWritableFile(data_file_name, &data_file_writer));
     data_file_writer->Close();
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index b6c4c4b..b09f319 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -720,10 +720,21 @@ LogBlockContainer::~LogBlockContainer() {
     CHECK(!block_manager_->opts_.read_only);
     string data_file_name = data_file_->filename();
     string metadata_file_name = metadata_file_->filename();
-    CONTAINER_DISK_FAILURE(block_manager_->file_cache_.DeleteFile(data_file_name),
-        "Could not delete dead container data file " + data_file_name);
-    CONTAINER_DISK_FAILURE(block_manager_->file_cache_.DeleteFile(metadata_file_name),
-        "Could not delete dead container metadata file " + metadata_file_name);
+    string data_failure_msg =
+        "Could not delete dead container data file " + data_file_name;
+    string metadata_failure_msg =
+        "Could not delete dead container metadata file " + metadata_file_name;
+    if (PREDICT_TRUE(block_manager_->file_cache_)) {
+      CONTAINER_DISK_FAILURE(block_manager_->file_cache_->DeleteFile(data_file_name),
+                             data_failure_msg);
+      CONTAINER_DISK_FAILURE(block_manager_->file_cache_->DeleteFile(metadata_file_name),
+                             metadata_failure_msg);
+    } else {
+      CONTAINER_DISK_FAILURE(block_manager_->env_->DeleteFile(data_file_name),
+                             data_failure_msg);
+      CONTAINER_DISK_FAILURE(block_manager_->env_->DeleteFile(metadata_file_name),
+                             metadata_failure_msg);
+    }
   }
 }
 
@@ -746,24 +757,28 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
   string data_path;
   Status metadata_status;
   Status data_status;
-  unique_ptr<RWFile> metadata_writer;
-  unique_ptr<RWFile> data_file;
-  RWFileOptions wr_opts;
-  wr_opts.mode = Env::MUST_CREATE;
+  shared_ptr<RWFile> metadata_writer;
+  shared_ptr<RWFile> data_file;
 
   // Repeat in the event of a container id collision (unlikely).
   //
   // When looping, we delete any created-and-orphaned files.
   do {
+    unique_ptr<RWFile> rwf;
+
     if (metadata_writer) {
       block_manager->env()->DeleteFile(metadata_path);
     }
     common_path = JoinPathSegments(dir->dir(),
                                    block_manager->oid_generator()->Next());
     metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix);
+    RWFileOptions wr_opts;
+    wr_opts.mode = Env::MUST_CREATE;
     metadata_status = block_manager->env()->NewRWFile(wr_opts,
                                                       metadata_path,
-                                                      &metadata_writer);
+                                                      &rwf);
+    metadata_writer.reset(rwf.release());
+
     if (data_file) {
       block_manager->env()->DeleteFile(data_path);
     }
@@ -772,29 +787,27 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
     rw_opts.mode = Env::MUST_CREATE;
     data_status = block_manager->env()->NewRWFile(rw_opts,
                                                   data_path,
-                                                  &data_file);
+                                                  &rwf);
+    data_file.reset(rwf.release());
   } while (PREDICT_FALSE(metadata_status.IsAlreadyPresent() ||
                          data_status.IsAlreadyPresent()));
   if (metadata_status.ok() && data_status.ok()) {
-    unique_ptr<WritablePBContainerFile> metadata_file;
-    shared_ptr<RWFile> cached_data_file;
-
-    metadata_writer.reset();
-    shared_ptr<RWFile> cached_metadata_writer;
-    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_.OpenExistingFile(
-        metadata_path, &cached_metadata_writer));
-    metadata_file.reset(new WritablePBContainerFile(
-        std::move(cached_metadata_writer)));
-
-    data_file.reset();
-    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_.OpenExistingFile(
-        data_path, &cached_data_file));
+    if (PREDICT_TRUE(block_manager->file_cache_)) {
+      metadata_writer.reset();
+      RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile(
+          metadata_path, &metadata_writer));
+      data_file.reset();
+      RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile(
+          data_path, &data_file));
+    }
+    unique_ptr<WritablePBContainerFile> metadata_file(new WritablePBContainerFile(
+        std::move(metadata_writer)));
     RETURN_NOT_OK_CONTAINER_DISK_FAILURE(metadata_file->CreateNew(BlockRecordPB()));
 
     container->reset(new LogBlockContainer(block_manager,
                                            dir,
                                            std::move(metadata_file),
-                                           std::move(cached_data_file)));
+                                           std::move(data_file)));
     VLOG(1) << "Created log block container " << (*container)->ToString();
   }
 
@@ -819,16 +832,28 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
 
   // Open the existing metadata and data files for writing.
   shared_ptr<RWFile> metadata_file;
-  RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_.OpenExistingFile(
-      metadata_path, &metadata_file));
+  shared_ptr<RWFile> data_file;
+  if (PREDICT_TRUE(block_manager->file_cache_)) {
+    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile(
+        metadata_path, &metadata_file));
+    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile(
+        data_path, &data_file));
+  } else {
+    RWFileOptions opts;
+    opts.mode = Env::MUST_EXIST;
+    unique_ptr<RWFile> rwf;
+    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->env()->NewRWFile(opts,
+        metadata_path, &rwf));
+    metadata_file.reset(rwf.release());
+    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->env()->NewRWFile(opts,
+        data_path, &rwf));
+    data_file.reset(rwf.release());
+  }
+
   unique_ptr<WritablePBContainerFile> metadata_pb_writer;
   metadata_pb_writer.reset(new WritablePBContainerFile(std::move(metadata_file)));
   RETURN_NOT_OK_CONTAINER_DISK_FAILURE(metadata_pb_writer->OpenExisting());
 
-  shared_ptr<RWFile> data_file;
-  RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_.OpenExistingFile(
-        data_path, &data_file));
-
   uint64_t data_file_size;
   RETURN_NOT_OK_CONTAINER_DISK_FAILURE(data_file->Size(&data_file_size));
 
@@ -1198,8 +1223,17 @@ Status LogBlockContainer::SyncMetadata() {
 
 Status LogBlockContainer::ReopenMetadataWriter() {
   shared_ptr<RWFile> f;
-  RETURN_NOT_OK_HANDLE_ERROR(block_manager_->file_cache_.OpenExistingFile(
-      metadata_file_->filename(), &f));
+  if (PREDICT_TRUE(block_manager_->file_cache_)) {
+    RETURN_NOT_OK_HANDLE_ERROR(block_manager_->file_cache_->OpenExistingFile(
+        metadata_file_->filename(), &f));
+  } else {
+    unique_ptr<RWFile> f_uniq;
+    RWFileOptions opts;
+    opts.mode = Env::MUST_EXIST;
+    RETURN_NOT_OK_HANDLE_ERROR(block_manager_->env_->NewRWFile(opts,
+        metadata_file_->filename(), &f_uniq));
+    f.reset(f_uniq.release());
+  }
   unique_ptr<WritablePBContainerFile> w;
   w.reset(new WritablePBContainerFile(std::move(f)));
   RETURN_NOT_OK_HANDLE_ERROR(w->OpenExisting());
@@ -1909,6 +1943,7 @@ const map<int64_t, int64_t> LogBlockManager::kPerFsBlockSizeBlockLimits({
 LogBlockManager::LogBlockManager(Env* env,
                                  DataDirManager* dd_manager,
                                  FsErrorManager* error_manager,
+                                 FileCache* file_cache,
                                  BlockManagerOptions opts)
   : env_(DCHECK_NOTNULL(env)),
     dd_manager_(DCHECK_NOTNULL(dd_manager)),
@@ -1917,8 +1952,7 @@ LogBlockManager::LogBlockManager(Env* env,
     mem_tracker_(MemTracker::CreateTracker(-1,
                                            "log_block_manager",
                                            opts_.parent_mem_tracker)),
-    file_cache_("lbm", env, GetFileCacheCapacityForBlockManager(env),
-                opts_.metric_entity),
+    file_cache_(file_cache),
     buggy_el6_kernel_(IsBuggyEl6Kernel(env->GetKernelRelease())),
     next_block_id_(1) {
   managed_block_shards_.resize(kBlockMapChunk);
@@ -1986,8 +2020,6 @@ LogBlockManager::~LogBlockManager() {
   } while (false)
 
 Status LogBlockManager::Open(FsReport* report) {
-  RETURN_NOT_OK(file_cache_.Init());
-
   // Establish (and log) block limits for each data directory using kernel,
   // filesystem, and gflags information.
   for (const auto& dd : dd_manager_->dirs()) {
@@ -2984,10 +3016,12 @@ Status LogBlockManager::RewriteMetadataFile(const LogBlockContainer&
container,
                                          "could not get file size of temporary metadata file");
   RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(env_->RenameFile(tmp_file_name, metadata_file_name),
                                          "could not rename temporary metadata file");
-  // Evict the old path from the file cache, so that when we re-open the new
-  // metadata file for write, we don't accidentally get a cache hit on the
-  // old file descriptor pointing to the now-deleted old version.
-  file_cache_.Invalidate(metadata_file_name);
+  if (PREDICT_TRUE(file_cache_)) {
+    // Evict the old path from the file cache, so that when we re-open the new
+    // metadata file for write, we don't accidentally get a cache hit on the
+    // old file descriptor pointing to the now-deleted old version.
+    file_cache_->Invalidate(metadata_file_name);
+  }
 
   tmp_deleter.cancel();
   *file_bytes_delta = (static_cast<int64_t>(old_metadata_size) - new_metadata_size);
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index fcdafac..91c95f3 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -37,7 +37,6 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/atomic.h"
-#include "kudu/util/file_cache.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/oid_generator.h"
@@ -46,6 +45,7 @@
 namespace kudu {
 
 class Env;
+class FileCache;
 
 namespace fs {
 class DataDirManager;
@@ -189,6 +189,7 @@ class LogBlockManager : public BlockManager {
   LogBlockManager(Env* env,
                   DataDirManager* dd_manager,
                   FsErrorManager* error_manager,
+                  FileCache* file_cache,
                   BlockManagerOptions opts);
 
   virtual ~LogBlockManager();
@@ -441,7 +442,7 @@ class LogBlockManager : public BlockManager {
                      boost::optional<int64_t>> block_limits_by_data_dir_;
 
   // Manages files opened for reading.
-  FileCache file_cache_;
+  FileCache* file_cache_;
 
   // Holds (and owns) all containers loaded from disk.
   std::unordered_map<std::string,
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 7539bc0..dcca690 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -17,11 +17,12 @@
 
 #include "kudu/server/server_base.h"
 
+#include <algorithm>
 #include <cstdint>
 #include <functional>
+#include <mutex>
 #include <sstream>
 #include <string>
-#include <utility>
 #include <vector>
 
 #include <boost/algorithm/string/predicate.hpp>
@@ -38,7 +39,9 @@
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/fs/fs_report.h"
+#include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/walltime.h"
@@ -63,6 +66,8 @@
 #include "kudu/server/webserver.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/env.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/file_cache.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/flag_validators.h"
 #include "kudu/util/flags.h"
@@ -211,6 +216,11 @@ DEFINE_uint64(gc_tcmalloc_memory_interval_seconds, 30,
 TAG_FLAG(gc_tcmalloc_memory_interval_seconds, advanced);
 TAG_FLAG(gc_tcmalloc_memory_interval_seconds, runtime);
 
+DEFINE_uint64(server_max_open_files, 0,
+              "Maximum number of open file descriptors. If 0, Kudu will "
+              "automatically calculate this value. This is a soft limit");
+TAG_FLAG(server_max_open_files, advanced);
+
 DECLARE_bool(use_hybrid_clock);
 DECLARE_int32(dns_resolver_max_threads_num);
 DECLARE_uint32(dns_resolver_cache_capacity_mb);
@@ -352,6 +362,41 @@ shared_ptr<MemTracker> CreateMemTrackerForServer() {
   return shared_ptr<MemTracker>(MemTracker::CreateTracker(-1, id_str));
 }
 
+int64_t GetFileCacheCapacity(Env* env) {
+  // Maximize this process' open file limit first, if possible.
+  static std::once_flag once;
+  std::call_once(once, [&]() {
+    env->IncreaseResourceLimit(Env::ResourceLimitType::OPEN_FILES_PER_PROCESS);
+  });
+
+  uint64_t rlimit =
+      env->GetResourceLimit(Env::ResourceLimitType::OPEN_FILES_PER_PROCESS);
+  // See server_max_open_files.
+  if (FLAGS_server_max_open_files == 0) {
+    // Use file-max as a possible upper bound.
+    faststring buf;
+    uint64_t buf_val;
+    if (ReadFileToString(env, "/proc/sys/fs/file-max", &buf).ok() &&
+        safe_strtou64(buf.ToString(), &buf_val)) {
+      rlimit = std::min(rlimit, buf_val);
+    }
+
+    // Callers of this function expect a signed 64-bit integer, and rlimit
+    // is an uint64_t type, so we need to avoid overflow.
+    // The percentage we currently use is 40% by default, and although in fact
+    // 40% of any value of the `uint64_t` type must be less than `kint64max`,
+    // but the percentage may be adjusted in the future, such as to 60%, so to
+    // prevent accidental overflow, we cap rlimit here.
+    return std::min((rlimit / 5) * 2, static_cast<uint64_t>(kint64max));
+  }
+  LOG_IF(FATAL, FLAGS_server_max_open_files > rlimit) <<
+      Substitute(
+          "Configured open file limit (server_max_open_files) $0 "
+          "exceeds process open file limit (ulimit) $1",
+          FLAGS_server_max_open_files, rlimit);
+  return FLAGS_server_max_open_files;
+}
+
 } // anonymous namespace
 
 ServerBase::ServerBase(string name, const ServerBaseOptions& options,
@@ -362,6 +407,8 @@ ServerBase::ServerBase(string name, const ServerBaseOptions& options,
       metric_registry_(new MetricRegistry()),
       metric_entity_(METRIC_ENTITY_server.Instantiate(metric_registry_.get(),
                                                       metric_namespace)),
+      file_cache_(new FileCache("file cache", options.env,
+                                GetFileCacheCapacity(options.env), metric_entity_)),
       rpc_server_(new RpcServer(options.rpc_opts)),
       result_tracker_(new rpc::ResultTracker(shared_ptr<MemTracker>(
           MemTracker::CreateTracker(-1, "result-tracker", mem_tracker_)))),
@@ -380,6 +427,7 @@ ServerBase::ServerBase(string name, const ServerBaseOptions& options,
   fs_opts.block_manager_type = options.fs_opts.block_manager_type;
   fs_opts.wal_root = options.fs_opts.wal_root;
   fs_opts.data_roots = options.fs_opts.data_roots;
+  fs_opts.file_cache = file_cache_.get();
   fs_manager_.reset(new FsManager(options.env, std::move(fs_opts)));
 
   if (FLAGS_use_hybrid_clock) {
@@ -444,6 +492,8 @@ Status ServerBase::Init() {
 
   RETURN_NOT_OK(security::InitKerberosForServer(FLAGS_principal, FLAGS_keytab_file));
 
+  RETURN_NOT_OK(file_cache_->Init());
+
   fs::FsReport report;
   Status s = fs_manager_->Open(&report);
   // No instance files existed. Try creating a new FS layout.
diff --git a/src/kudu/server/server_base.h b/src/kudu/server/server_base.h
index 77e4c7d..c744007 100644
--- a/src/kudu/server/server_base.h
+++ b/src/kudu/server/server_base.h
@@ -31,6 +31,7 @@
 namespace kudu {
 
 class DnsResolver;
+class FileCache;
 class FsManager;
 class MemTracker;
 class MetricEntity;
@@ -104,6 +105,8 @@ class ServerBase {
 
   DnsResolver* dns_resolver() const { return dns_resolver_.get(); }
 
+  FileCache* file_cache() const { return file_cache_.get(); }
+
   // Return a PB describing the status of the server (version info, bound ports, etc)
   Status GetStatusPB(ServerStatusPB* status) const;
 
@@ -175,6 +178,7 @@ class ServerBase {
   std::shared_ptr<MemTracker> mem_tracker_;
   std::unique_ptr<MetricRegistry> metric_registry_;
   scoped_refptr<MetricEntity> metric_entity_;
+  std::unique_ptr<FileCache> file_cache_;
   std::unique_ptr<FsManager> fs_manager_;
   std::unique_ptr<RpcServer> rpc_server_;
   std::unique_ptr<Webserver> web_server_;


Mime
View raw message