kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [kudu] 02/02: [util] interface to invalidate cache entries
Date Thu, 02 May 2019 21:08:47 GMT
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 0a8bdd42705f84e58df263a192ccb4cc2109dbf1
Author: Alexey Serbin <alexey@apache.org>
AuthorDate: Mon Apr 29 23:40:46 2019 -0700

    [util] interface to invalidate cache entries
    
    Added a means to invalidate entries in Cache.  Basically, it's a way
    to iterate over all entries in a cache while having an ability to
    remove an entry based on a set of criteria specified using an entry's
    key and value.  The generic usage pattern of the introduced invalidation
    mechanism is to purge of entries matching criteria based on their keys
    and values.
    
    In particular, that's useful for periodic scrubbing tasks which get
    rid of expired entries in a TTLCache.  It's about invalidating expired
    entries in a TTL cache even if the cache hasn't reached its capacity
    yet.  The latter, being an optimization, allows for more effective
    usage of the OS memory in the presence of multiple concurrent processes.
    
    Added corresponding tests as well.
    
    Change-Id: I1e340c3270b5580114151cdd40afe82576c18099
    Reviewed-on: http://gerrit.cloudera.org:8080/13196
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Reviewed-by: Andrew Wong <awong@cloudera.com>
---
 src/kudu/util/cache-test.cc     | 97 +++++++++++++++++++++++++++++++++++++++++
 src/kudu/util/cache.cc          | 63 ++++++++++++++++++++++++++
 src/kudu/util/cache.h           | 94 ++++++++++++++++++++++++++++++++++++++-
 src/kudu/util/nvm_cache.cc      | 52 +++++++++++++++++++++-
 src/kudu/util/ttl_cache-test.cc | 51 ++++++++++++++++++++++
 src/kudu/util/ttl_cache.h       |  1 +
 6 files changed, 354 insertions(+), 4 deletions(-)

diff --git a/src/kudu/util/cache-test.cc b/src/kudu/util/cache-test.cc
index a2c401d..454f2a8 100644
--- a/src/kudu/util/cache-test.cc
+++ b/src/kudu/util/cache-test.cc
@@ -15,6 +15,7 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/block_cache_metrics.h"
@@ -332,6 +333,102 @@ TEST_P(CacheTest, HeavyEntries) {
   ASSERT_LE(cached_weight, cache_size() + cache_size() / 10);
 }
 
+TEST_P(CacheTest, InvalidateAllEntries) {
+  constexpr const int kEntriesNum = 1024;
+  // This scenarios assumes no evictions are done at the cache capacity.
+  ASSERT_LE(kEntriesNum, cache_size());
+
+  // Running invalidation on empty cache should yield no invalidated entries.
+  ASSERT_EQ(0, cache_->Invalidate({}));
+  for (auto i = 0; i < kEntriesNum; ++i) {
+    Insert(i, i);
+  }
+  // Remove a few entries from the cache (sparse pattern of keys).
+  constexpr const int kSparseKeys[] = {1, 100, 101, 500, 501, 512, 999, 1001};
+  for (const auto key : kSparseKeys) {
+    Erase(key);
+  }
+  ASSERT_EQ(ARRAYSIZE(kSparseKeys), evicted_keys_.size());
+
+  // All inserted entries, except for the removed one, should be invalidated.
+  ASSERT_EQ(kEntriesNum - ARRAYSIZE(kSparseKeys), cache_->Invalidate({}));
+  // In the end, no entries should be left in the cache.
+  ASSERT_EQ(kEntriesNum, evicted_keys_.size());
+}
+
+TEST_P(CacheTest, InvalidateNoEntries) {
+  constexpr const int kEntriesNum = 10;
+  // This scenarios assumes no evictions are done at the cache capacity.
+  ASSERT_LE(kEntriesNum, cache_size());
+
+  const Cache::ValidityFunc func = [](Slice /* key */, Slice /* value */) {
+    return true;
+  };
+  // Running invalidation on empty cache should yield no invalidated entries.
+  ASSERT_EQ(0, cache_->Invalidate({ func }));
+
+  for (auto i = 0; i < kEntriesNum; ++i) {
+    Insert(i, i);
+  }
+
+  // No entries should be invalidated since the validity function considers
+  // all entries valid.
+  ASSERT_EQ(0, cache_->Invalidate({ func }));
+  ASSERT_TRUE(evicted_keys_.empty());
+}
+
+TEST_P(CacheTest, InvalidateNoEntriesNoAdvanceIterationFunctor) {
+  constexpr const int kEntriesNum = 256;
+  // This scenarios assumes no evictions are done at the cache capacity.
+  ASSERT_LE(kEntriesNum, cache_size());
+
+  const Cache::InvalidationControl ctl = {
+    Cache::kInvalidateAllEntriesFunc,
+    [](size_t /* valid_entries_count */, size_t /* invalid_entries_count */) {
+      // Never advance over the item list.
+      return false;
+    }
+  };
+
+  // Running invalidation on empty cache should yield no invalidated entries.
+  ASSERT_EQ(0, cache_->Invalidate(ctl));
+
+  for (auto i = 0; i < kEntriesNum; ++i) {
+    Insert(i, i);
+  }
+
+  // No entries should be invalidated since the iteration functor doesn't
+  // advance over the list of entries, even if every entry is declared invalid.
+  ASSERT_EQ(0, cache_->Invalidate(ctl));
+  // In the end, all entries should be in the cache.
+  ASSERT_EQ(0, evicted_keys_.size());
+}
+
+TEST_P(CacheTest, InvalidateOddKeyEntries) {
+  constexpr const int kEntriesNum = 64;
+  // This scenarios assumes no evictions are done at the cache capacity.
+  ASSERT_LE(kEntriesNum, cache_size());
+
+  const Cache::ValidityFunc func = [](Slice key, Slice /* value */) {
+    return DecodeInt(key) % 2 == 0;
+  };
+  // Running invalidation on empty cache should yield no invalidated entries.
+  ASSERT_EQ(0, cache_->Invalidate({ func }));
+
+  for (auto i = 0; i < kEntriesNum; ++i) {
+    Insert(i, i);
+  }
+  ASSERT_EQ(kEntriesNum / 2, cache_->Invalidate({ func }));
+  ASSERT_EQ(kEntriesNum / 2, evicted_keys_.size());
+  for (auto i = 0; i < kEntriesNum; ++i) {
+    if (i % 2 == 0) {
+      ASSERT_EQ(i,  Lookup(i));
+    } else {
+      ASSERT_EQ(-1,  Lookup(i));
+    }
+  }
+}
+
 // This class is dedicated for scenarios specific for FIFOCache.
 // The scenarios use a single-shard cache for simpler logic.
 class FIFOCacheTest : public CacheBaseTest {
diff --git a/src/kudu/util/cache.cc b/src/kudu/util/cache.cc
index b823515..665626d 100644
--- a/src/kudu/util/cache.cc
+++ b/src/kudu/util/cache.cc
@@ -60,6 +60,16 @@ namespace kudu {
 Cache::~Cache() {
 }
 
+const Cache::ValidityFunc Cache::kInvalidateAllEntriesFunc = [](
+    Slice /* key */, Slice /* value */) {
+  return false;
+};
+
+const Cache::IterationFunc Cache::kIterateOverAllEntriesFunc = [](
+    size_t /* valid_entries_num */, size_t /* invalid_entries_num */) {
+  return true;
+};
+
 namespace {
 
 typedef simple_spinlock MutexType;
@@ -221,6 +231,7 @@ class CacheShard {
   Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching);
   void Release(Cache::Handle* handle);
   void Erase(const Slice& key, uint32_t hash);
+  size_t Invalidate(const Cache::InvalidationControl& ctl);
 
  private:
   void RL_Remove(RLHandle* e);
@@ -484,6 +495,50 @@ void CacheShard<policy>::Erase(const Slice& key, uint32_t hash)
{
   }
 }
 
+template<Cache::EvictionPolicy policy>
+size_t CacheShard<policy>::Invalidate(const Cache::InvalidationControl& ctl) {
+  size_t invalid_entry_count = 0;
+  size_t valid_entry_count = 0;
+  RLHandle* to_remove_head = nullptr;
+
+  {
+    std::lock_guard<MutexType> l(mutex_);
+
+    // rl_.next is the oldest (a.k.a. least relevant) entry in the recency list.
+    RLHandle* h = rl_.next;
+    while (h != nullptr && h != &rl_ &&
+           ctl.iteration_func(valid_entry_count, invalid_entry_count)) {
+      if (ctl.validity_func(h->key(), h->value())) {
+        // Continue iterating over the list.
+        h = h->next;
+        ++valid_entry_count;
+        continue;
+      }
+      // Copy the handle slated for removal.
+      RLHandle* h_to_remove = h;
+      // Prepare for next iteration of the cycle.
+      h = h->next;
+
+      RL_Remove(h_to_remove);
+      table_.Remove(h_to_remove->key(), h_to_remove->hash);
+      if (Unref(h_to_remove)) {
+        h_to_remove->next = to_remove_head;
+        to_remove_head = h_to_remove;
+      }
+      ++invalid_entry_count;
+    }
+  }
+  // Once removed from the lookup table and the recency list, the entries
+  // with no references left must be deallocated because Cache::Release()
+  // wont be called for them from elsewhere.
+  while (to_remove_head != nullptr) {
+    RLHandle* next = to_remove_head->next;
+    FreeEntry(to_remove_head);
+    to_remove_head = next;
+  }
+  return invalid_entry_count;
+}
+
 // Determine the number of bits of the hash that should be used to determine
 // the cache shard. This, in turn, determines the number of shards.
 int DetermineShardBits() {
@@ -610,6 +665,14 @@ class ShardedCache : public Cache {
   uint8_t* MutableValue(PendingHandle* h) override {
     return reinterpret_cast<RLHandle*>(h)->mutable_val_ptr();
   }
+
+  size_t Invalidate(const InvalidationControl& ctl) override {
+    size_t invalidated_count = 0;
+    for (auto& shard: shards_) {
+      invalidated_count += shard->Invalidate(ctl);
+    }
+    return invalidated_count;
+  }
 };
 
 }  // end anonymous namespace
diff --git a/src/kudu/util/cache.h b/src/kudu/util/cache.h
index e52cd57..32213cc 100644
--- a/src/kudu/util/cache.h
+++ b/src/kudu/util/cache.h
@@ -19,9 +19,11 @@
 
 #include <cstddef>
 #include <cstdint>
+#include <functional>
 #include <iosfwd>
 #include <memory>
 #include <string>
+#include <utility>
 
 #include "kudu/gutil/macros.h"
 #include "kudu/util/slice.h"
@@ -71,7 +73,7 @@ class Cache {
   //
   // Sample usage:
   //
-  //   Cache* cache = NewLRUCache(...);
+  //   unique_ptr<Cache> cache(NewCache(...));
   //   ...
   //   {
   //     unique_ptr<Cache::Handle, Cache::HandleDeleter> h(
@@ -81,7 +83,7 @@ class Cache {
   //
   // Or:
   //
-  //   Cache* cache = NewLRUCache(...);
+  //   unique_ptr<Cache> cache(NewCache(...));
   //   ...
   //   {
   //     Cache::UniqueHandle h(cache->Lookup(...), Cache::HandleDeleter(cache));
@@ -232,6 +234,94 @@ class Cache {
   // Free 'ptr', which must have been previously allocated using 'Allocate'.
   virtual void Free(PendingHandle* ptr) = 0;
 
+  // Forward declaration to simplify the layout of types/typedefs needed for the
+  // Invalidate() method while trying to adhere to the code style guide.
+  struct InvalidationControl;
+
+  // Invalidate cache's entries, effectively evicting non-valid ones from the
+  // cache. The invalidation process iterates over cache's recency list(s),
+  // from the oldest (less relevant) entries to the newest (more relevant) ones.
+  //
+  // The provided control structure 'ctl' is responsible for the following:
+  //   * determine whether an entry is valid or not
+  //   * determine how to iterate over the entries in the cache's recency list
+  //
+  // NOTE: The invalidation process might hold a lock while iterating over
+  //       the cache's entries. Using proper IterationFunc might help to reduce
+  //       contention with the concurrent request for the cache's contents.
+  //       See the in-line documentation for IterationFunc for more details.
+  virtual size_t Invalidate(const InvalidationControl& ctl) = 0;
+
+  // Functor to define a criterion on a cache entry's validity. Upon call
+  // of Cache::Invalidate() method, if the functor returns 'false' for the
+  // specified key and value, the cache evicts the entry, otherwise the entry
+  // stays in the cache.
+  typedef std::function<bool(Slice /* key */,
+                             Slice /* value */)>
+      ValidityFunc;
+
+  // Functor to define whether to continue or stop iterating over the cache's
+  // entries based on the number of encountered invalid and valid entries
+  // during the Cache::Invalidate() call. If a cache contains multiple
+  // sub-caches (e.g., shards), those parameters are per sub-cache. For example,
+  // in case of multi-shard cache, when the 'iteration_func' returns 'false',
+  // the invalidation at current shard stops and switches to the next
+  // non-yet-processed shard, if any is present.
+  //
+  // The choice of the signature for the iteration functor is to allow for
+  // effective purging of non-valid (e.g., expired) entries in caches with
+  // the FIFO eviction policy (e.g., TTL caches).
+  //
+  // The first parameter of the functor is useful for short-circuiting
+  // the invalidation process once some valid entries have been encountered.
+  // For example, that's useful in case if the recency list has its entries
+  // ordered in FIFO-like order (e.g., TTL cache with FIFO eviction policy),
+  // so most-likely-invalid entries are in the very beginning of the list.
+  // In the latter case, once a valid (e.g., not yet expired) entry is
+  // encountered, there is no need to iterate any further: all the entries past
+  // the first valid one in the recency list should be valid as well.
+  //
+  // The second parameter is useful when the validity criterion is fuzzy,
+  // but there is a target number of entries to invalidate during each
+  // invocation of the Invalidate() method or there is some logic that reads
+  // the cache's metric(s) once the given number of entries have been evicted:
+  // e.g., compare the result memory footprint of the cache against a threshold
+  // to decide whether to continue invalidation of entries.
+  //
+  // Summing both parameters of the functor is useful when it's necessary to
+  // limit the number of entries processed per one invocation of the
+  // Invalidate() method. It makes sense in cases when a 'lazy' invalidation
+  // process is run by a periodic task along with a significant amount of
+  // concurrent requests to the cache, and the number of entries in the cache
+  // is huge. Given the fact that in most cases it's necessary to guard
+  // the access to the cache's recency list while iterating over it entries,
+  // limiting the number of entries to process at once allows for better control
+  // over the duration of the guarded/locked sections.
+  typedef std::function<bool(size_t /* valid_entries_num */,
+                             size_t /* invalid_entries_num */)>
+      IterationFunc;
+
+  // A helper function for 'validity_func' of the Invalidate() method:
+  // invalidate all entries.
+  static const ValidityFunc kInvalidateAllEntriesFunc;
+
+  // A helper function for 'iteration_func' of the Invalidate() method:
+  // examine all entries.
+  static const IterationFunc kIterateOverAllEntriesFunc;
+
+  // Control structure for the Invalidate() method. Combines the validity
+  // and the iteration functors.
+  struct InvalidationControl {
+    // NOLINTNEXTLINE(google-explicit-constructor)
+    InvalidationControl(ValidityFunc vfunctor = kInvalidateAllEntriesFunc,
+                        IterationFunc ifunctor = kIterateOverAllEntriesFunc)
+        : validity_func(std::move(vfunctor)),
+          iteration_func(std::move(ifunctor)) {
+    }
+    const ValidityFunc validity_func;
+    const IterationFunc iteration_func;
+  };
+
  private:
   DISALLOW_COPY_AND_ASSIGN(Cache);
 };
diff --git a/src/kudu/util/nvm_cache.cc b/src/kudu/util/nvm_cache.cc
index 20d086b..4cf2828 100644
--- a/src/kudu/util/nvm_cache.cc
+++ b/src/kudu/util/nvm_cache.cc
@@ -208,6 +208,7 @@ class NvmLRUCache {
   Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching);
   void Release(Cache::Handle* handle);
   void Erase(const Slice& key, uint32_t hash);
+  size_t Invalidate(const Cache::InvalidationControl& ctl);
   void* AllocateAndRetry(size_t size);
 
  private:
@@ -456,8 +457,47 @@ void NvmLRUCache::Erase(const Slice& key, uint32_t hash) {
     FreeEntry(e);
   }
 }
-static const int kNumShardBits = 4;
-static const int kNumShards = 1 << kNumShardBits;
+
+size_t NvmLRUCache::Invalidate(const Cache::InvalidationControl& ctl) {
+  size_t invalid_entry_count = 0;
+  size_t valid_entry_count = 0;
+  LRUHandle* to_remove_head = nullptr;
+
+  {
+    std::lock_guard<MutexType> l(mutex_);
+
+    // rl_.next is the oldest entry in the recency list.
+    LRUHandle* h = lru_.next;
+    while (h != nullptr && h != &lru_ &&
+           ctl.iteration_func(valid_entry_count, invalid_entry_count)) {
+      if (ctl.validity_func(h->key(), h->value())) {
+        // Continue iterating over the list.
+        h = h->next;
+        ++valid_entry_count;
+        continue;
+      }
+      // Copy the handle slated for removal.
+      LRUHandle* h_to_remove = h;
+      // Prepare for next iteration of the cycle.
+      h = h->next;
+
+      NvmLRU_Remove(h_to_remove);
+      table_.Remove(h_to_remove->key(), h_to_remove->hash);
+      if (Unref(h_to_remove)) {
+        h_to_remove->next = to_remove_head;
+        to_remove_head = h_to_remove;
+      }
+      ++invalid_entry_count;
+    }
+  }
+
+  FreeLRUEntries(to_remove_head);
+
+  return invalid_entry_count;
+}
+
+constexpr const int kNumShardBits = 4;
+constexpr const int kNumShards = 1 << kNumShardBits;
 
 class ShardedLRUCache : public Cache {
  private:
@@ -556,6 +596,14 @@ class ShardedLRUCache : public Cache {
   virtual void Free(PendingHandle* ph) OVERRIDE {
     vmem_free(vmp_, ph);
   }
+
+  size_t Invalidate(const InvalidationControl& ctl) override {
+    size_t invalidated_count = 0;
+    for (auto& shard: shards_) {
+      invalidated_count += shard->Invalidate(ctl);
+    }
+    return invalidated_count;
+  }
 };
 
 } // end anonymous namespace
diff --git a/src/kudu/util/ttl_cache-test.cc b/src/kudu/util/ttl_cache-test.cc
index 3123de0..e81991c 100644
--- a/src/kudu/util/ttl_cache-test.cc
+++ b/src/kudu/util/ttl_cache-test.cc
@@ -31,8 +31,10 @@
 #include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/cache.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/slice.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 #include "kudu/util/ttl_cache_metrics.h"
@@ -371,4 +373,53 @@ TEST_F(TTLCacheTest, Basic) {
   ASSERT_EQ(cache_capacity + 1, GetCacheUsage());
 }
 
+// Test invalidation of expired entries in the underlying cache.
+TEST_F(TTLCacheTest, InvalidationOfExpiredEntries) {
+  constexpr size_t cache_capacity = 512;
+  const auto entry_ttl = MonoDelta::FromMilliseconds(250);
+
+  TTLTestCache cache(cache_capacity, entry_ttl);
+  {
+    unique_ptr<TTLCacheTestMetrics> metrics(
+        new TTLCacheTestMetrics(metric_entity_));
+    cache.SetMetrics(std::move(metrics));
+  }
+
+  const Cache::InvalidationControl ctl = {
+    [&](Slice /* key */, Slice value) {
+      CHECK_EQ(sizeof(TTLTestCache::Entry), value.size());
+      const auto* entry = reinterpret_cast<const TTLTestCache::Entry*>(
+          value.data());
+      return (entry->exp_time > MonoTime::Now());
+    },
+    [&](size_t valid_entry_count, size_t /* invalid_entry_count */) {
+      return valid_entry_count == 0;
+    }
+  };
+  ASSERT_EQ(0, cache.cache_->Invalidate(ctl));
+
+  for (auto i = 0; i < cache_capacity / 2; ++i) {
+    cache.Put(Substitute("0$0", i), unique_ptr<TestValue>(new TestValue), 1);
+  }
+  ASSERT_EQ(0, cache.cache_->Invalidate(ctl));
+  ASSERT_EQ(0, GetCacheEvictionsExpired());
+
+  SleepFor(entry_ttl);
+
+  ASSERT_EQ(cache_capacity / 2, cache.cache_->Invalidate(ctl));
+  ASSERT_EQ(cache_capacity / 2, GetCacheEvictionsExpired());
+  ASSERT_EQ(0, GetCacheUsage());
+
+  for (auto i = 0; i < cache_capacity / 2; ++i) {
+    cache.Put(Substitute("1$0", i), unique_ptr<TestValue>(new TestValue), 1);
+  }
+  SleepFor(entry_ttl);
+  for (auto i = 0; i < cache_capacity / 2; ++i) {
+    cache.Put(Substitute("2$0", i), unique_ptr<TestValue>(new TestValue), 1);
+  }
+  ASSERT_EQ(cache_capacity / 2, cache.cache_->Invalidate(ctl));
+  ASSERT_EQ(cache_capacity, GetCacheEvictionsExpired());
+  ASSERT_EQ(cache_capacity / 2, GetCacheUsage());
+}
+
 } // namespace kudu
diff --git a/src/kudu/util/ttl_cache.h b/src/kudu/util/ttl_cache.h
index 95fb52e..65911a8 100644
--- a/src/kudu/util/ttl_cache.h
+++ b/src/kudu/util/ttl_cache.h
@@ -194,6 +194,7 @@ class TTLCache {
 
  private:
   friend class EvictionCallback;
+  FRIEND_TEST(TTLCacheTest, InvalidationOfExpiredEntries);
 
   // An entry to store in the underlying FIFO cache.
   struct Entry {


Mime
View raw message