kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] branch master updated: file cache: unify across file types
Date Wed, 08 Jan 2020 23:17:16 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 4dca6b0  file cache: unify across file types
4dca6b0 is described below

commit 4dca6b02a5e7b2e589646e79bc6e509f27c13c3d
Author: Adar Dembo <adar@cloudera.com>
AuthorDate: Tue Jan 7 17:22:00 2020 -0800

    file cache: unify across file types
    
    Back when the file cache was first added (commit 6a124f040), there was a
    choice between using a templatized approach, instantiated per-file-type and
    a non-templatized approach, usable by any file type. At the time, file cache
    usage was restricted to the block managers, and so the templatized approach
    carried the day.
    
    Since then we've seen more and more reports of users hitting fd limits due
    to high numbers of log segments. I'd like to address these by expanding the
    purview of the file cache to include all long-lived opened fds, including
    segment files and index chunks.
    
    This patch lays the groundwork for that by de-templatizing the file cache,
    "unifying" it for all supported file types. Eventually, it'll be
    instantiated once per server and rolled out to the WAL, but for now only the
    underlying implementation changes.
    
    Change-Id: I1e5cf6cadb7f0d45492af8b8a7541853d587409c
    Reviewed-on: http://gerrit.cloudera.org:8080/14988
    Tested-by: Adar Dembo <adar@cloudera.com>
    Reviewed-by: Andrew Wong <awong@cloudera.com>
    Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
---
 src/kudu/fs/file_block_manager.h        |   4 +-
 src/kudu/fs/log_block_manager.h         |   5 +-
 src/kudu/util/file_cache-stress-test.cc | 138 +++++++++-------
 src/kudu/util/file_cache-test.cc        |  70 +++++++-
 src/kudu/util/file_cache.cc             | 276 +++++++++++++++++++-------------
 src/kudu/util/file_cache.h              |  60 +++++--
 6 files changed, 355 insertions(+), 198 deletions(-)

diff --git a/src/kudu/fs/file_block_manager.h b/src/kudu/fs/file_block_manager.h
index 877b14a..032c672 100644
--- a/src/kudu/fs/file_block_manager.h
+++ b/src/kudu/fs/file_block_manager.h
@@ -37,7 +37,6 @@ namespace kudu {
 class BlockId;
 class Env;
 class MemTracker;
-class RandomAccessFile;
 
 namespace fs {
 class DataDirManager;
@@ -49,7 +48,6 @@ class FileBlockDeletionTransaction;
 class FileBlockLocation;
 class FileReadableBlock;
 class FileWritableBlock;
-
 struct BlockManagerMetrics;
 } // namespace internal
 
@@ -136,7 +134,7 @@ class FileBlockManager : public BlockManager {
   const BlockManagerOptions opts_;
 
   // Manages files opened for reading.
-  FileCache<RandomAccessFile> file_cache_;
+  FileCache file_cache_;
 
   // For generating block IDs.
   ThreadSafeRandom rand_;
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 016b597..fcdafac 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -46,11 +46,10 @@
 namespace kudu {
 
 class Env;
-class RWFile;
 
 namespace fs {
-class Dir;
 class DataDirManager;
+class Dir;
 class FsErrorManager;
 struct FsReport;
 
@@ -442,7 +441,7 @@ class LogBlockManager : public BlockManager {
                      boost::optional<int64_t>> block_limits_by_data_dir_;
 
   // Manages files opened for reading.
-  FileCache<RWFile> file_cache_;
+  FileCache file_cache_;
 
   // Holds (and owns) all containers loaded from disk.
   std::unordered_map<std::string,
diff --git a/src/kudu/util/file_cache-stress-test.cc b/src/kudu/util/file_cache-stress-test.cc
index babc1d7..8fa1b74 100644
--- a/src/kudu/util/file_cache-stress-test.cc
+++ b/src/kudu/util/file_cache-stress-test.cc
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "kudu/util/file_cache.h"
-
+#include <algorithm>
 #include <cstddef>
 #include <cstdint>
 #include <deque>
@@ -31,7 +30,6 @@
 #include <vector>
 
 #include <gflags/gflags.h>
-#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -39,9 +37,11 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/env.h"
 #include "kudu/util/file_cache-test-util.h"
+#include "kudu/util/file_cache.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
@@ -73,7 +73,6 @@ namespace kudu {
 // FD limit to enforce during the test.
 static const int kTestMaxOpenFiles = 100;
 
-template <class FileType>
 class FileCacheStressTest : public KuduTest {
 
 // Like CHECK_OK(), but dumps the contents of the cache before failing.
@@ -102,10 +101,10 @@ class FileCacheStressTest : public KuduTest {
     // Use a single shard. Otherwise, the cache can be a little bit "sloppy"
     // depending on the number of CPUs on the system.
     FLAGS_cache_force_single_shard = true;
-    cache_.reset(new FileCache<FileType>("test",
-                                         env_,
-                                         kTestMaxOpenFiles,
-                                         scoped_refptr<MetricEntity>()));
+    cache_.reset(new FileCache("test",
+                               env_,
+                               kTestMaxOpenFiles,
+                               scoped_refptr<MetricEntity>()));
   }
 
   void SetUp() override {
@@ -118,8 +117,14 @@ class FileCacheStressTest : public KuduTest {
     MetricMap metrics;
 
     do {
-      // Create a new file with some (0-32k) random data in it.
-      string next_file_name = GetTestPath(oid_generator.Next());
+      // Choose randomly between creating a file to be opened read-only and one
+      // to be opened read-write.
+      bool use_rwf = rand.OneIn(2);
+
+      // Create a new file with some (0-32k) random data in it. The file name
+      // will signal what file type to open as.
+      string next_file_name = GetTestPath((use_rwf ? kRWFPrefix : kRAFPrefix) +
+                                          oid_generator.Next());
       {
         unique_ptr<WritableFile> next_file;
         CHECK_OK(env_->NewWritableFile(next_file_name, &next_file));
@@ -143,7 +148,8 @@ class FileCacheStressTest : public KuduTest {
     Random rand(rand_.Next32());
 
     // Active opened files in this thread.
-    deque<shared_ptr<FileType>> files;
+    deque<shared_ptr<RWFile>> rwfs;
+    deque<shared_ptr<RandomAccessFile>> rafs;
 
     // Metrics generated by this thread. They will be merged into the main
     // metrics map when the thread is done.
@@ -164,25 +170,46 @@ class FileCacheStressTest : public KuduTest {
         if (!GetRandomFile(OPEN, &rand, &to_open)) {
           continue;
         }
-        shared_ptr<FileType> new_file;
-        TEST_CHECK_OK(cache_->OpenExistingFile(to_open, &new_file));
+        if (HasPrefixString(BaseName(to_open), kRWFPrefix)) {
+          shared_ptr<RWFile> rwf;
+          TEST_CHECK_OK(cache_->OpenExistingFile(to_open, &rwf));
+          rwfs.emplace_back(std::move(rwf));
+        } else {
+          CHECK(HasPrefixString(BaseName(to_open), kRAFPrefix));
+
+          shared_ptr<RandomAccessFile> raf;
+          TEST_CHECK_OK(cache_->OpenExistingFile(to_open, &raf));
+          rafs.emplace_back(std::move(raf));
+        }
         FinishedOpen(to_open);
         metrics[BaseName(to_open)]["open"]++;
-        files.emplace_back(new_file);
       } else if (next_action < 35) {
         // Close a file.
-        if (files.empty()) {
+        if (rwfs.empty() && rafs.empty()) {
           continue;
         }
-        shared_ptr<FileType> file = files.front();
-        files.pop_front();
-        metrics[BaseName(file->filename())]["close"]++;
+        shared_ptr<File> f;
+        if (rafs.empty() || (!rwfs.empty() && rand.OneIn(2))) {
+          f = rwfs.front();
+          rwfs.pop_front();
+        } else {
+          f = rafs.front();
+          rafs.pop_front();
+        }
+        metrics[BaseName(f->filename())]["close"]++;
       } else if (next_action < 70) {
         // Read a random chunk from a file.
-        TEST_CHECK_OK(ReadRandomChunk(files, &metrics, &rand));
+        if (rwfs.empty() && rafs.empty()) {
+          continue;
+        }
+        if (rafs.empty() || (!rwfs.empty() && rand.OneIn(2))) {
+          TEST_CHECK_OK(ReadRandomChunk(rwfs, &metrics, &rand));
+        } else {
+          TEST_CHECK_OK(ReadRandomChunk(rafs, &metrics, &rand));
+        }
       } else if (next_action < 90) {
         // Write a random chunk to a file.
-        TEST_CHECK_OK(WriteRandomChunk(files, &metrics, &rand));
+        TEST_CHECK_OK(WriteRandomChunk(rwfs, &metrics, &rand));
       } else if (next_action < 100) {
         // Delete a file.
         string to_delete;
@@ -204,6 +231,9 @@ class FileCacheStressTest : public KuduTest {
   const MetricMap& metrics() const { return metrics_; }
 
  private:
+  static constexpr const char* const kRWFPrefix = "rwf-";
+  static constexpr const char* const kRAFPrefix = "raf-";
+
   enum GetMode {
     OPEN,
     DELETE
@@ -246,6 +276,7 @@ class FileCacheStressTest : public KuduTest {
 
   // Reads a random chunk of data from a random file in 'files'. On success,
   // writes to 'metrics'.
+  template <class FileType>
   static Status ReadRandomChunk(const deque<shared_ptr<FileType>>& files,
                                 MetricMap* metrics,
                                 Random* rand) {
@@ -267,11 +298,22 @@ class FileCacheStressTest : public KuduTest {
 
   // Writes a random chunk of data to a random file in 'files'. On success,
   // updates 'metrics'.
-  //
-  // No-op for file implementations that don't support writing.
-  static Status WriteRandomChunk(const deque<shared_ptr<FileType>>& files,
+  static Status WriteRandomChunk(const deque<shared_ptr<RWFile>>& files,
                                  MetricMap* metrics,
-                                 Random* rand);
+                                 Random* rand) {
+    if (files.empty()) {
+      return Status::OK();
+    }
+    const shared_ptr<RWFile>& file = files[rand->Uniform(files.size())];
+
+    uint64_t file_size;
+    RETURN_NOT_OK(file->Size(&file_size));
+    uint64_t off = file_size > 0 ? rand->Uniform(file_size) : 0;
+    uint8_t buf[64];
+    RETURN_NOT_OK(file->Write(off, GenerateRandomChunk(buf, sizeof(buf), rand)));
+    (*metrics)[BaseName(file->filename())]["write"]++;
+    return Status::OK();
+  }
 
   static Slice GenerateRandomChunk(uint8_t* buffer, size_t max_length, Random* rand) {
     size_t len = rand->Uniform(max_length);
@@ -292,7 +334,7 @@ class FileCacheStressTest : public KuduTest {
     }
   }
 
-  unique_ptr<FileCache<FileType>> cache_;
+  unique_ptr<FileCache> cache_;
 
   // Used to seed per-thread PRNGs.
   ThreadSafeRandom rand_;
@@ -316,63 +358,35 @@ class FileCacheStressTest : public KuduTest {
   MetricMap metrics_;
 };
 
-template <>
-Status FileCacheStressTest<RWFile>::WriteRandomChunk(
-    const deque<shared_ptr<RWFile>>& files,
-    MetricMap* metrics,
-    Random* rand) {
-  if (files.empty()) {
-    return Status::OK();
-  }
-  const shared_ptr<RWFile>& file = files[rand->Uniform(files.size())];
-
-  uint64_t file_size;
-  RETURN_NOT_OK(file->Size(&file_size));
-  uint64_t off = file_size > 0 ? rand->Uniform(file_size) : 0;
-  uint8_t buf[64];
-  RETURN_NOT_OK(file->Write(off, GenerateRandomChunk(buf, sizeof(buf), rand)));
-  (*metrics)[BaseName(file->filename())]["write"]++;
-  return Status::OK();
-}
-
-template <>
-Status FileCacheStressTest<RandomAccessFile>::WriteRandomChunk(
-    const deque<shared_ptr<RandomAccessFile>>& /* unused */,
-    MetricMap* /* unused */,
-    Random* /* unused */) {
-  return Status::OK();
-}
-
-typedef ::testing::Types<RWFile, RandomAccessFile> FileTypes;
-TYPED_TEST_CASE(FileCacheStressTest, FileTypes);
-
-TYPED_TEST(FileCacheStressTest, TestStress) {
+TEST_F(FileCacheStressTest, TestStress) {
   OverrideFlagForSlowTests("test_num_producer_threads", "2");
   OverrideFlagForSlowTests("test_num_consumer_threads", "8");
   OverrideFlagForSlowTests("test_duration_secs", "30");
 
   // Start the threads.
   PeriodicOpenFdChecker checker(
-      this->env_,
-      this->GetTestPath("*"),           // only count within our test dir
+      env_,
+      GetTestPath("*"),                 // only count within our test dir
       kTestMaxOpenFiles +               // cache capacity
       FLAGS_test_num_producer_threads + // files being written
       FLAGS_test_num_consumer_threads); // files being opened
   checker.Start();
   vector<thread> producers;
+  producers.reserve(FLAGS_test_num_producer_threads);
   for (int i = 0; i < FLAGS_test_num_producer_threads; i++) {
-    producers.emplace_back(&FileCacheStressTest<TypeParam>::ProducerThread, this);
+    producers.emplace_back(&FileCacheStressTest::ProducerThread, this);
   }
   vector<thread> consumers;
+  consumers.reserve(FLAGS_test_num_consumer_threads);
   for (int i = 0; i < FLAGS_test_num_consumer_threads; i++) {
-    consumers.emplace_back(&FileCacheStressTest<TypeParam>::ConsumerThread, this);
+    consumers.emplace_back(&FileCacheStressTest::ConsumerThread, this);
   }
 
   // Let the test run.
   SleepFor(MonoDelta::FromSeconds(FLAGS_test_duration_secs));
 
   // Stop the threads.
-  this->NotifyThreads();
+  NotifyThreads();
   checker.Stop();
   for (auto& p : producers) {
     p.join();
@@ -383,7 +397,7 @@ TYPED_TEST(FileCacheStressTest, TestStress) {
 
   // Log the metrics.
   unordered_map<string, int> action_counts;
-  for (const auto& file_action_pair : this->metrics()) {
+  for (const auto& file_action_pair : metrics()) {
     for (const auto& action_count_pair : file_action_pair.second) {
       VLOG(2) << Substitute("$0: $1: $2",
                             file_action_pair.first,
diff --git a/src/kudu/util/file_cache-test.cc b/src/kudu/util/file_cache-test.cc
index 4abd743..0c2dea8 100644
--- a/src/kudu/util/file_cache-test.cc
+++ b/src/kudu/util/file_cache-test.cc
@@ -85,10 +85,10 @@ class FileCacheTest : public KuduTest {
 
  protected:
   Status ReinitCache(int max_open_files) {
-    cache_.reset(new FileCache<FileType>("test",
-                                         env_,
-                                         max_open_files,
-                                         nullptr));
+    cache_.reset(new FileCache("test",
+                               env_,
+                               max_open_files,
+                               /*entity=*/ nullptr));
     return cache_->Init();
   }
 
@@ -111,7 +111,7 @@ class FileCacheTest : public KuduTest {
 
   Random rand_;
   int initial_open_fds_;
-  unique_ptr<FileCache<FileType>> cache_;
+  unique_ptr<FileCache> cache_;
 };
 
 typedef ::testing::Types<RWFile, RandomAccessFile> FileTypes;
@@ -184,7 +184,7 @@ TYPED_TEST(FileCacheTest, TestBasicOperations) {
 }
 
 TYPED_TEST(FileCacheTest, TestDeletion) {
-  // Deleting a file that doesn't exist does nothing/
+  // Deleting a file that doesn't exist does nothing.
   ASSERT_TRUE(this->cache_->DeleteFile("/does/not/exist").IsNotFound());
 
   // Create a test file, then delete it. It will be deleted immediately.
@@ -323,6 +323,7 @@ TYPED_TEST(FileCacheTest, TestNoRecursiveDeadlock) {
   ASSERT_OK(this->WriteTestFile(kFile, "test data"));
 
   vector<std::thread> threads;
+  threads.reserve(2);
   for (int i = 0; i < 2; i++) {
     threads.emplace_back([&]() {
       for (int i = 0; i < 10000; i++) {
@@ -345,11 +346,66 @@ TEST_F(RandomAccessFileCacheTest, TestMemoryFootprintDoesNotCrash) {
   ASSERT_OK(this->WriteTestFile(kFile, "test data"));
 
   shared_ptr<RandomAccessFile> f;
-  ASSERT_OK(this->cache_->OpenExistingFile(kFile, &f));
+  ASSERT_OK(cache_->OpenExistingFile(kFile, &f));
 
   // This used to crash due to a kudu_malloc_usable_size() call on a memory
   // address that wasn't the start of an actual heap allocation.
   LOG(INFO) << f->memory_footprint();
 }
 
+class MixedFileCacheTest : public KuduTest {
+};
+
+TEST_F(MixedFileCacheTest, TestBothFileTypes) {
+  const string kFile1 = GetTestPath("foo");
+  const string kData1 = "test data 1";
+  const string kFile2 = GetTestPath("foo2");
+  const string kData2 = "test data 2";
+
+  // Create the two test files.
+  {
+    unique_ptr<RWFile> f;
+    ASSERT_OK(env_->NewRWFile(kFile1, &f));
+    ASSERT_OK(f->Write(0, kData1));
+    ASSERT_OK(env_->NewRWFile(kFile2, &f));
+    ASSERT_OK(f->Write(0, kData2));
+  }
+
+  FileCache cache("test", env_, 1, /*entity=*/ nullptr);
+  ASSERT_OK(cache.Init());
+
+  // Open the test files, each as a different file type.
+  shared_ptr<RWFile> rwf;
+  ASSERT_OK(cache.OpenExistingFile(kFile1, &rwf));
+  shared_ptr<RandomAccessFile> raf;
+  ASSERT_OK(cache.OpenExistingFile(kFile2, &raf));
+
+  // Verify the correct file contents for each test file.
+  uint64_t size;
+  ASSERT_OK(rwf->Size(&size));
+  uint8_t buf[16];
+  Slice s1(buf, size);
+  ASSERT_OK(rwf->Read(0, s1));
+  ASSERT_EQ(kData1, s1);
+  ASSERT_OK(raf->Size(&size));
+  Slice s2(buf, size);
+  ASSERT_OK(raf->Read(0, s2));
+  ASSERT_EQ(kData2, s2);
+
+  // It's okay to reopen the test file using the same file type, but not with a
+  // different file type.
+  //
+  // These checks are expensive so they're only done in DEBUG mode.
+  shared_ptr<RWFile> rwf2;
+  ASSERT_OK(cache.OpenExistingFile(kFile1, &rwf2));
+  shared_ptr<RandomAccessFile> raf2;
+  ASSERT_OK(cache.OpenExistingFile(kFile2, &raf2));
+#ifndef NDEBUG
+  ASSERT_DEATH({ cache.OpenExistingFile(kFile1, &raf); },
+               "!FindDescriptorUnlocked");
+  ASSERT_DEATH({ cache.OpenExistingFile(kFile2, &rwf); },
+               "!FindDescriptorUnlocked");
+#endif
+}
+
 } // namespace kudu
diff --git a/src/kudu/util/file_cache.cc b/src/kudu/util/file_cache.cc
index c5ed6f4..06e8965 100644
--- a/src/kudu/util/file_cache.cc
+++ b/src/kudu/util/file_cache.cc
@@ -68,14 +68,13 @@ FileType* CacheValueToFileType(Slice s) {
       s.mutable_data()));
 }
 
-template <class FileType>
 class EvictionCallback : public Cache::EvictionCallback {
  public:
   EvictionCallback() {}
 
   void EvictedEntry(Slice key, Slice value) override {
     VLOG(2) << "Evicted fd belonging to " << key.ToString();
-    delete CacheValueToFileType<FileType>(value);
+    delete CacheValueToFileType<File>(value);
   }
 
  private:
@@ -93,7 +92,7 @@ class ScopedOpenedDescriptor;
 template <class FileType>
 class BaseDescriptor {
  public:
-  BaseDescriptor(FileCache<FileType>* file_cache,
+  BaseDescriptor(FileCache* file_cache,
                  string filename)
       : file_cache_(file_cache),
         file_name_(std::move(filename)) {}
@@ -169,7 +168,7 @@ class BaseDescriptor {
   bool invalidated() const { return flags_.load() & INVALIDATED; }
 
  private:
-  FileCache<FileType>* file_cache_;
+  FileCache* file_cache_;
   const string file_name_;
   enum Flags {
     FILE_DELETED = 1 << 0,
@@ -226,7 +225,7 @@ class Descriptor : public FileType {
 template <>
 class Descriptor<RWFile> : public RWFile {
  public:
-  Descriptor(FileCache<RWFile>* file_cache, const string& filename)
+  Descriptor(FileCache* file_cache, const string& filename)
       : base_(file_cache, filename) {}
 
   ~Descriptor() = default;
@@ -307,7 +306,7 @@ class Descriptor<RWFile> : public RWFile {
   }
 
  private:
-  friend class FileCache<RWFile>;
+  friend class ::kudu::FileCache;
 
   Status Init() {
     return once_.Init(&Descriptor<RWFile>::InitOnce, this);
@@ -353,7 +352,7 @@ class Descriptor<RWFile> : public RWFile {
 template <>
 class Descriptor<RandomAccessFile> : public RandomAccessFile {
  public:
-  Descriptor(FileCache<RandomAccessFile>* file_cache, const string& filename)
+  Descriptor(FileCache* file_cache, const string& filename)
       : base_(file_cache, filename) {}
 
   ~Descriptor() = default;
@@ -405,7 +404,7 @@ class Descriptor<RandomAccessFile> : public RandomAccessFile {
   }
 
  private:
-  friend class FileCache<RandomAccessFile>;
+  friend class ::kudu::FileCache;
 
   Status Init() {
     return once_.Init(&Descriptor<RandomAccessFile>::InitOnce, this);
@@ -448,14 +447,15 @@ class Descriptor<RandomAccessFile> : public RandomAccessFile {
 
 } // namespace internal
 
-template <class FileType>
-FileCache<FileType>::FileCache(const string& cache_name,
-                               Env* env,
-                               int max_open_files,
-                               const scoped_refptr<MetricEntity>& entity)
+const char* const FileCache::kAlreadyDeleted = "File already marked as deleted";
+
+FileCache::FileCache(const string& cache_name,
+                     Env* env,
+                     int max_open_files,
+                     const scoped_refptr<MetricEntity>& entity)
     : env_(env),
       cache_name_(cache_name),
-      eviction_cb_(new EvictionCallback<FileType>()),
+      eviction_cb_(new EvictionCallback()),
       cache_(NewCache(max_open_files, cache_name)),
       running_(1) {
   if (entity) {
@@ -466,60 +466,98 @@ FileCache<FileType>::FileCache(const string& cache_name,
                           cache_name, max_open_files);
 }
 
-template <class FileType>
-FileCache<FileType>::~FileCache() {
+FileCache::~FileCache() {
   running_.CountDown();
   if (descriptor_expiry_thread_) {
     descriptor_expiry_thread_->Join();
   }
 }
 
-template <class FileType>
-Status FileCache<FileType>::Init() {
+Status FileCache::Init() {
   return Thread::Create("cache", Substitute("$0-evict", cache_name_),
                         &FileCache::RunDescriptorExpiry, this,
                         &descriptor_expiry_thread_);
 }
 
-template <class FileType>
-Status FileCache<FileType>::OpenExistingFile(const string& file_name,
-                                             shared_ptr<FileType>* file) {
-  shared_ptr<internal::Descriptor<FileType>> desc;
+template <>
+Status FileCache::OpenExistingFile(const string& file_name,
+                                   shared_ptr<RWFile>* file) {
+  shared_ptr<internal::Descriptor<RWFile>> d;
   {
-    // Find an existing descriptor, or create one if none exists.
     std::lock_guard<simple_spinlock> l(lock_);
-    RETURN_NOT_OK(FindDescriptorUnlocked(file_name, &desc));
-    if (desc) {
-      VLOG(2) << "Found existing descriptor: " << desc->filename();
-    } else {
-      desc = std::make_shared<internal::Descriptor<FileType>>(this, file_name);
-      InsertOrDie(&descriptors_, file_name, desc);
-      VLOG(2) << "Created new descriptor: " << desc->filename();
-    }
+    d = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST, &rwf_descs_);
+    DCHECK(d);
+
+    // Enforce the invariant that a particular file name may only be used by one
+    // descriptor at a time. This is expensive so it's only done in DEBUG mode.
+    DCHECK(!FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &raf_descs_));
+  }
+  if (d->base_.deleted()) {
+    return Status::NotFound(kAlreadyDeleted, file_name);
   }
 
-  // Check that the underlying file can be opened (no-op for found
-  // descriptors). Done outside the lock.
-  RETURN_NOT_OK(desc->Init());
-  *file = std::move(desc);
+  // Check that the underlying file can be opened (no-op for found descriptors).
+  //
+  // Done outside the lock.
+  RETURN_NOT_OK(d->Init());
+  *file = std::move(d);
   return Status::OK();
 }
 
-template <class FileType>
-Status FileCache<FileType>::DeleteFile(const string& file_name) {
+template <>
+Status FileCache::OpenExistingFile(const string& file_name,
+                                   shared_ptr<RandomAccessFile>* file) {
+  shared_ptr<internal::Descriptor<RandomAccessFile>> d;
   {
     std::lock_guard<simple_spinlock> l(lock_);
-    shared_ptr<internal::Descriptor<FileType>> desc;
-    RETURN_NOT_OK(FindDescriptorUnlocked(file_name, &desc));
+    d = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST, &raf_descs_);
+    DCHECK(d);
 
-    if (desc) {
-      VLOG(2) << "Marking file for deletion: " << file_name;
-      desc->base_.MarkDeleted();
-      return Status::OK();
+    // Enforce the invariant that a particular file name may only be used by one
+    // descriptor at a time. This is expensive so it's only done in DEBUG mode.
+    DCHECK(!FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &rwf_descs_));
+  }
+  if (d->base_.deleted()) {
+    return Status::NotFound(kAlreadyDeleted, file_name);
+  }
+
+  // Check that the underlying file can be opened (no-op for found descriptors).
+  //
+  // Done outside the lock.
+  RETURN_NOT_OK(d->Init());
+  *file = std::move(d);
+  return Status::OK();
+}
+
+Status FileCache::DeleteFile(const string& file_name) {
+  // Mark any outstanding descriptor as deleted. Because there may only be one
+  // descriptor per file name, we can short circuit the search if we find a
+  // descriptor in the first map.
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    {
+      auto d = FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &rwf_descs_);
+      if (d) {
+        if (d->base_.deleted()) {
+          return Status::NotFound(kAlreadyDeleted, file_name);
+        }
+        d->base_.MarkDeleted();
+        return Status::OK();
+      }
+    }
+    {
+      auto d = FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &raf_descs_);
+      if (d) {
+        if (d->base_.deleted()) {
+          return Status::NotFound(kAlreadyDeleted, file_name);
+        }
+        d->base_.MarkDeleted();
+        return Status::OK();
+      }
     }
   }
 
-  // There is no outstanding descriptor. Delete the file now.
+  // There are no outstanding descriptors. Delete the file now.
   //
   // Make sure it's been fully evicted from the cache (perhaps it was opened
   // previously?) so that the filesystem can reclaim the file data instantly.
@@ -527,123 +565,147 @@ Status FileCache<FileType>::DeleteFile(const string& file_name)
{
   return env_->DeleteFile(file_name);
 }
 
-template <class FileType>
-void FileCache<FileType>::Invalidate(const string& file_name) {
-  // Ensure that there is an invalidated descriptor in the map for this filename.
+void FileCache::Invalidate(const string& file_name) {
+  // Ensure that there are invalidated descriptors in both maps for this
+  // filename. This ensures that any concurrent opens during this method will
+  // see the invalidation and result in a CHECK failure.
   //
-  // This ensures that any concurrent OpenExistingFile() during this method wil
-  // see the invalidation and issue a CHECK failure.
-  shared_ptr<internal::Descriptor<FileType>> desc;
+  // Note: this temporarily violates the invariant that no two descriptors may
+  // share the same file name. That's OK because the invalidation CHECK failure
+  // occurs before the client trips on the broken invariant.
+  shared_ptr<internal::Descriptor<RWFile>> rwf_desc;
+  shared_ptr<internal::Descriptor<RandomAccessFile>> raf_desc;
   {
-    // Find an existing descriptor, or create one if none exists.
     std::lock_guard<simple_spinlock> l(lock_);
-    auto it = descriptors_.find(file_name);
-    if (it != descriptors_.end()) {
-      desc = it->second.lock();
-    }
-    if (!desc) {
-      desc = std::make_shared<internal::Descriptor<FileType>>(this, file_name);
-      descriptors_.emplace(file_name, desc);
-    }
+    rwf_desc = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST,
+                                      &rwf_descs_);
+    DCHECK(rwf_desc);
+    rwf_desc->base_.MarkInvalidated();
 
-    desc->base_.MarkInvalidated();
+    raf_desc = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST,
+                                      &raf_descs_);
+    DCHECK(raf_desc);
+    raf_desc->base_.MarkInvalidated();
   }
+
   // Remove it from the cache so that if the same path is opened again, we
   // will re-open a new FD rather than retrieving one that might have been
   // cached prior to invalidation.
   cache_->Erase(file_name);
 
-  // Remove the invalidated descriptor from the map. We are guaranteed it
-  // is still there because we've held a strong reference to it for
-  // the duration of this method, and no other methods erase strong
-  // references from the map.
+  // Remove the invalidated descriptors from the maps. We are guaranteed they
+  // are still there because we've held a strong references to them for the
+  // duration of this method, and no other methods erase strong references from
+  // the maps.
   {
     std::lock_guard<simple_spinlock> l(lock_);
-    CHECK_EQ(1, descriptors_.erase(file_name));
+    CHECK_EQ(1, rwf_descs_.erase(file_name));
+    CHECK_EQ(1, raf_descs_.erase(file_name));
   }
 }
 
-template <class FileType>
-int FileCache<FileType>::NumDescriptorsForTests() const {
+size_t FileCache::NumDescriptorsForTests() const {
   std::lock_guard<simple_spinlock> l(lock_);
-  return descriptors_.size();
+  return rwf_descs_.size() + raf_descs_.size();
+}
+
+string FileCache::ToDebugString() const {
+  string ret;
+
+  // We need to iterate through the descriptor maps, so make temporary copies
+  // of them.
+  DescriptorMap<RWFile> rwfs_copy;
+  DescriptorMap<RandomAccessFile> rafs_copy;
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    rwfs_copy = rwf_descs_;
+    rafs_copy = raf_descs_;
+  }
+
+  // Dump the contents of the copies.
+  ret += MapToDebugString(rwfs_copy, "rwf");
+  ret += MapToDebugString(rafs_copy, "raf");
+  return ret;
 }
 
 template <class FileType>
-string FileCache<FileType>::ToDebugString() const {
-  std::lock_guard<simple_spinlock> l(lock_);
+string FileCache::MapToDebugString(const DescriptorMap<FileType>& descs,
+                                   const string& prefix) {
   string ret;
-  for (const auto& e : descriptors_) {
+  for (const auto& e : descs) {
     bool strong = false;
     bool deleted = false;
     bool opened = false;
-    shared_ptr<internal::Descriptor<FileType>> desc = e.second.lock();
-    if (desc) {
+    shared_ptr<internal::Descriptor<FileType>> d = e.second.lock();
+    if (d) {
       strong = true;
-      if (desc->base_.deleted()) {
+      if (d->base_.deleted()) {
         deleted = true;
       }
-      internal::ScopedOpenedDescriptor<FileType> o(
-          desc->base_.LookupFromCache());
-      if (o.opened()) {
+      internal::ScopedOpenedDescriptor<FileType> sod(d->base_.LookupFromCache());
+      if (sod.opened()) {
         opened = true;
       }
     }
     if (strong) {
-      ret += Substitute("$0 (S$1$2)\n", e.first,
+      ret += Substitute("$0: $1 (S$2$3)\n", prefix, e.first,
                         deleted ? "D" : "", opened ? "O" : "");
     } else {
-      ret += Substitute("$0\n", e.first);
+      ret += Substitute("$0: $1\n", prefix, e.first);
     }
   }
   return ret;
 }
 
 template <class FileType>
-Status FileCache<FileType>::FindDescriptorUnlocked(
+shared_ptr<internal::Descriptor<FileType>> FileCache::FindDescriptorUnlocked(
     const string& file_name,
-    shared_ptr<internal::Descriptor<FileType>>* file) {
+    FindMode mode,
+    DescriptorMap<FileType>* descs) {
   DCHECK(lock_.is_locked());
 
-  auto it = descriptors_.find(file_name);
-  if (it != descriptors_.end()) {
+  shared_ptr<internal::Descriptor<FileType>> d;
+  auto it = descs->find(file_name);
+  if (it != descs->end()) {
     // Found the descriptor. Has it expired?
-    shared_ptr<internal::Descriptor<FileType>> desc = it->second.lock();
-    if (desc) {
-      CHECK(!desc->base_.invalidated());
-      if (desc->base_.deleted()) {
-        return Status::NotFound("File already marked for deletion", file_name);
-      }
+    d = it->second.lock();
+    if (d) {
+      CHECK(!d->base_.invalidated());
 
       // Descriptor is still valid, return it.
-      if (file) {
-        *file = desc;
-      }
-      return Status::OK();
+      VLOG(2) << "Found existing descriptor: " << file_name;
+      return d;
     }
     // Descriptor has expired; erase it and pretend we found nothing.
-    descriptors_.erase(it);
+    descs->erase(it);
   }
-  return Status::OK();
+
+  if (mode == FindMode::CREATE_IF_NOT_EXIST) {
+    d = std::make_shared<internal::Descriptor<FileType>>(this, file_name);
+    EmplaceOrDie(descs, file_name, d);
+    VLOG(2) << "Created new descriptor: " << file_name;
+  }
+  return d;
 }
 
 template <class FileType>
-void FileCache<FileType>::RunDescriptorExpiry() {
+void FileCache::ExpireDescriptorsFromMap(DescriptorMap<FileType>* descs) {
+  for (auto it = descs->begin(); it != descs->end();) {
+    if (it->second.expired()) {
+      it = descs->erase(it);
+    } else {
+      it++;
+    }
+  }
+}
+
+void FileCache::RunDescriptorExpiry() {
   while (!running_.WaitFor(MonoDelta::FromMilliseconds(
       FLAGS_file_cache_expiry_period_ms))) {
     std::lock_guard<simple_spinlock> l(lock_);
-    for (auto it = descriptors_.begin(); it != descriptors_.end();) {
-      if (it->second.expired()) {
-        it = descriptors_.erase(it);
-      } else {
-        it++;
-      }
-    }
+    ExpireDescriptorsFromMap(&rwf_descs_);
+    ExpireDescriptorsFromMap(&raf_descs_);
   }
 }
 
-// Explicit specialization for callers outside this compilation unit.
-template class FileCache<RWFile>;
-template class FileCache<RandomAccessFile>;
-
 } // namespace kudu
diff --git a/src/kudu/util/file_cache.h b/src/kudu/util/file_cache.h
index 021c758..e8c3250 100644
--- a/src/kudu/util/file_cache.h
+++ b/src/kudu/util/file_cache.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <cstddef>
 #include <memory>
 #include <string>
 #include <unordered_map>
@@ -33,12 +34,13 @@
 namespace kudu {
 
 class Env;
+class RWFile;
+class RandomAccessFile;
 
 namespace internal {
 
 template <class FileType>
 class BaseDescriptor;
-
 template <class FileType>
 class Descriptor;
 
@@ -61,19 +63,19 @@ class Thread;
 // The core of the client-facing API is the cache descriptor. A descriptor
 // uniquely identifies an opened file. To a client, a descriptor is just an
 // open file interface of the variety defined in util/env.h. Clients open
-// descriptors via the OpenExistingFile() cache method.
+// descriptors via the OpenExisting*() cache methods.
 //
 // Descriptors are shared objects; an existing descriptor is handed back to a
 // client if a file with the same name is already opened. To facilitate
-// descriptor sharing, the file cache maintains a by-file-name descriptor map.
-// The values are weak references to the descriptors so that map entries don't
-// affect the descriptor lifecycle.
+// descriptor sharing, the file cache maintains by-file-name descriptor maps
+// (one per file type). The values are weak references to the descriptors so
+// that map entries don't affect the descriptor lifecycle.
 //
 // LRU cache
 // ---------
 // The lower half of the file cache is a standard LRU cache whose keys are file
 // names and whose values are pointers to opened file objects allocated on the
-// heap. Unlike the descriptor map, this cache has an upper bound on capacity,
+// heap. Unlike the descriptor maps, this cache has an upper bound on capacity,
 // and handles are evicted (and closed) according to an LRU algorithm.
 //
 // Whenever a descriptor is used by a client in file I/O, its file name is used
@@ -92,7 +94,6 @@ class Thread;
 // descriptor, the file is deleted immediately.
 //
 // Every public method in the file cache is thread safe.
-template <class FileType>
 class FileCache {
  public:
   // Creates a new file cache.
@@ -120,6 +121,7 @@ class FileCache {
   // The descriptor is opened immediately to verify that the on-disk file can
   // be opened, but may be closed later if the cache reaches its upper bound on
   // the number of open files.
+  template <class FileType>
   Status OpenExistingFile(const std::string& file_name,
                           std::shared_ptr<FileType>* file);
 
@@ -153,30 +155,56 @@ class FileCache {
   // from multiple threads.
   void Invalidate(const std::string& file_name);
 
-  // Returns the number of entries in the descriptor map.
+  // Returns the number of entries in the descriptor maps.
   //
   // Only intended for unit tests.
-  int NumDescriptorsForTests() const;
+  size_t NumDescriptorsForTests() const;
 
   // Dumps the contents of the file cache. Intended for debugging.
   std::string ToDebugString() const;
 
  private:
-  friend class internal::BaseDescriptor<FileType>;
+  friend class internal::BaseDescriptor<RWFile>;
+  friend class internal::BaseDescriptor<RandomAccessFile>;
+
+  template <class FileType>
+  using DescriptorMap = std::unordered_map<std::string,
+                                           std::weak_ptr<internal::Descriptor<FileType>>>;
 
-  template<class FileType2>
+  template <class FileType>
   FRIEND_TEST(FileCacheTest, TestBasicOperations);
 
+  // Dumps a descriptor map in 'descriptors'. All output will be prefixed by 'prefix'.
+  template <class FileType>
+  static std::string MapToDebugString(const DescriptorMap<FileType>& descs,
+                                      const std::string& prefix);
+
+  // Removes all expired descriptors from 'descs'.
+  template <class FileType>
+  static void ExpireDescriptorsFromMap(DescriptorMap<FileType>* descs);
+
   // Looks up a descriptor by file name.
   //
   // Must be called with 'lock_' held.
-  Status FindDescriptorUnlocked(
+  enum class FindMode {
+    // Only return an existing descriptor from the map; don't create a new one.
+    DONT_CREATE,
+
+    // Create a new descriptor if one did not exist in the map.
+    CREATE_IF_NOT_EXIST,
+  };
+  template <class FileType>
+  std::shared_ptr<internal::Descriptor<FileType>> FindDescriptorUnlocked(
       const std::string& file_name,
-      std::shared_ptr<internal::Descriptor<FileType>>* file);
+      FindMode mode,
+      DescriptorMap<FileType>* descs);
 
-  // Periodically removes expired descriptors from 'descriptors_'.
+  // Periodically removes expired descriptors from the descriptor maps.
   void RunDescriptorExpiry();
 
+  // Status message prefix for files that have already been marked as deleted.
+  static const char* const kAlreadyDeleted;
+
   // Interface to the underlying filesystem.
   Env* env_;
 
@@ -194,8 +222,8 @@ class FileCache {
   mutable simple_spinlock lock_;
 
   // Maps filenames to descriptors.
-  std::unordered_map<std::string,
-                     std::weak_ptr<internal::Descriptor<FileType>>> descriptors_;
+  DescriptorMap<RWFile> rwf_descs_;
+  DescriptorMap<RandomAccessFile> raf_descs_;
 
   // Calls RunDescriptorExpiry() in a loop until 'running_' isn't set.
   scoped_refptr<Thread> descriptor_expiry_thread_;


Mime
View raw message