kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [4/4] kudu git commit: util: remove old failure detector and resettable heartbeater code
Date Wed, 23 Aug 2017 01:17:05 GMT
util: remove old failure detector and resettable heartbeater code

With the move to periodic timers, these classes are no longer needed.

Change-Id: I7a8f77a56ed66e06e182a70e18b2837a264a0a4e
Reviewed-on: http://gerrit.cloudera.org:8080/7736
Reviewed-by: Dan Burkert <danburkert@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e8693965
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e8693965
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e8693965

Branch: refs/heads/master
Commit: e869396543c0d6dba984e0d515a726c3e29747f2
Parents: 21b0f3d
Author: Adar Dembo <adar@cloudera.com>
Authored: Fri Aug 18 19:13:14 2017 -0700
Committer: Adar Dembo <adar@cloudera.com>
Committed: Wed Aug 23 01:15:20 2017 +0000

----------------------------------------------------------------------
 build-support/iwyu/iwyu-filter.awk           |   1 -
 src/kudu/util/CMakeLists.txt                 |   4 -
 src/kudu/util/failure_detector-test.cc       | 118 ------------
 src/kudu/util/failure_detector.cc            | 218 ----------------------
 src/kudu/util/failure_detector.h             | 177 ------------------
 src/kudu/util/resettable_heartbeater-test.cc | 108 -----------
 src/kudu/util/resettable_heartbeater.cc      | 185 ------------------
 src/kudu/util/resettable_heartbeater.h       |  80 --------
 8 files changed, 891 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e8693965/build-support/iwyu/iwyu-filter.awk
----------------------------------------------------------------------
diff --git a/build-support/iwyu/iwyu-filter.awk b/build-support/iwyu/iwyu-filter.awk
index b68dbb4..06977e3 100644
--- a/build-support/iwyu/iwyu-filter.awk
+++ b/build-support/iwyu/iwyu-filter.awk
@@ -134,7 +134,6 @@ BEGIN {
   muted["kudu/util/bit-util-test.cc"]
   muted["kudu/util/compression/compression-test.cc"]
   muted["kudu/util/env_util-test.cc"]
-  muted["kudu/util/failure_detector-test.cc"]
   muted["kudu/util/file_cache-stress-test.cc"]
   muted["kudu/util/group_varint-test.cc"]
   muted["kudu/util/minidump.cc"]

http://git-wip-us.apache.org/repos/asf/kudu/blob/e8693965/src/kudu/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index 836facf..077f156 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -134,7 +134,6 @@ set(UTIL_SRCS
   env.cc env_posix.cc env_util.cc
   errno.cc
   faststring.cc
-  failure_detector.cc
   fault_injection.cc
   file_cache.cc
   flags.cc
@@ -173,7 +172,6 @@ set(UTIL_SRCS
   pb_util-internal.cc
   process_memory.cc
   random_util.cc
-  resettable_heartbeater.cc
   rolling_log.cc
   rw_mutex.cc
   rwc_lock.cc
@@ -340,7 +338,6 @@ ADD_KUDU_TEST(easy_json-test)
 ADD_KUDU_TEST(env-test LABELS no_tsan)
 ADD_KUDU_TEST(env_util-test)
 ADD_KUDU_TEST(errno-test)
-ADD_KUDU_TEST(failure_detector-test)
 ADD_KUDU_TEST(faststring-test)
 ADD_KUDU_TEST(file_cache-test)
 ADD_KUDU_TEST(file_cache-stress-test RUN_SERIAL true)
@@ -375,7 +372,6 @@ ADD_KUDU_TEST(path_util-test)
 ADD_KUDU_TEST(process_memory-test RUN_SERIAL true)
 ADD_KUDU_TEST(random-test)
 ADD_KUDU_TEST(random_util-test)
-ADD_KUDU_TEST(resettable_heartbeater-test)
 ADD_KUDU_TEST(rle-test)
 ADD_KUDU_TEST(rolling_log-test)
 ADD_KUDU_TEST(rw_mutex-test)

http://git-wip-us.apache.org/repos/asf/kudu/blob/e8693965/src/kudu/util/failure_detector-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/failure_detector-test.cc b/src/kudu/util/failure_detector-test.cc
deleted file mode 100644
index f8f4478..0000000
--- a/src/kudu/util/failure_detector-test.cc
+++ /dev/null
@@ -1,118 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <algorithm>
-#include <ostream>
-#include <string>
-
-#include <glog/logging.h>
-#include <gtest/gtest.h>
-
-#include "kudu/gutil/bind.h"
-#include "kudu/gutil/bind_helpers.h"
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/map-util.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/util/countdown_latch.h"
-#include "kudu/util/failure_detector.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/status.h"
-#include "kudu/util/test_macros.h"
-#include "kudu/util/test_util.h"
-
-namespace kudu {
-
-// How often we expect a node to heartbeat to assert its "aliveness".
-static const int kExpectedHeartbeatPeriodMillis = 100;
-
-// Number of heartbeats after which the FD will consider the node dead.
-static const int kMaxMissedHeartbeats = 2;
-
-// Let's check for failures every 100ms on average +/- 10ms.
-static const int kFailureMonitorMeanMillis = 100;
-static const int kFailureMonitorStddevMillis = 10;
-
-static const char* kNodeName = "node-1";
-static const char* kTestTabletName = "test-tablet";
-
-class FailureDetectorTest : public KuduTest {
- public:
-  FailureDetectorTest()
-    : KuduTest(),
-      latch_(1),
-      monitor_(new RandomizedFailureMonitor(SeedRandom(),
-                                            kFailureMonitorMeanMillis,
-                                            kFailureMonitorStddevMillis)) {
-  }
-
-  void FailureFunction(const std::string& name, const Status& status) {
-    LOG(INFO) << "Detected failure of " << name;
-    latch_.CountDown();
-  }
-
- protected:
-  void WaitForFailure() {
-    latch_.Wait();
-  }
-
-  CountDownLatch latch_;
-  gscoped_ptr<RandomizedFailureMonitor> monitor_;
-};
-
-// Tests that we can track a node, that while we notify that we're received messages from
-// that node everything is ok and that once we stop doing so the failure detection function
-// gets called.
-TEST_F(FailureDetectorTest, TestDetectsFailure) {
-  ASSERT_OK(monitor_->Start());
-
-  scoped_refptr<FailureDetector> detector(new TimedFailureDetector(
-      MonoDelta::FromMilliseconds(kExpectedHeartbeatPeriodMillis * kMaxMissedHeartbeats)));
-
-  monitor_->MonitorFailureDetector(kTestTabletName, detector);
-  ASSERT_FALSE(detector->IsTracking(kNodeName));
-  ASSERT_OK(detector->Track(kNodeName,
-                            MonoTime::Now(),
-                            Bind(&FailureDetectorTest::FailureFunction, Unretained(this))));
-  ASSERT_TRUE(detector->IsTracking(kNodeName));
-
-  const int kNumPeriodsToWait = 4;  // Num heartbeat periods to wait for a failure.
-  const int kUpdatesPerPeriod = 10; // Num updates we give per period to minimize test flakiness.
-
-  for (int i = 0; i < kNumPeriodsToWait * kUpdatesPerPeriod; i++) {
-    // Report in (heartbeat) to the detector.
-    ASSERT_OK(detector->MessageFrom(kNodeName, MonoTime::Now()));
-
-    // We sleep for a fraction of heartbeat period, to minimize test flakiness.
-    SleepFor(MonoDelta::FromMilliseconds(kExpectedHeartbeatPeriodMillis / kUpdatesPerPeriod));
-
-    // The latch shouldn't have counted down, since the node's been reporting that
-    // it's still alive.
-    ASSERT_EQ(1, latch_.count());
-  }
-
-  // If we stop reporting he node is alive the failure callback is eventually
-  // triggered and we exit.
-  WaitForFailure();
-
-  ASSERT_OK(detector->UnTrack(kNodeName));
-  ASSERT_FALSE(detector->IsTracking(kNodeName));
-
-  ASSERT_OK(monitor_->UnmonitorFailureDetector(kTestTabletName));
-  monitor_->Shutdown();
-}
-
-}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e8693965/src/kudu/util/failure_detector.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/failure_detector.cc b/src/kudu/util/failure_detector.cc
deleted file mode 100644
index 7adb403..0000000
--- a/src/kudu/util/failure_detector.cc
+++ /dev/null
@@ -1,218 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/util/failure_detector.h"
-
-#include <cstddef>
-#include <mutex>
-#include <ostream>
-#include <unordered_map>
-
-#include <glog/logging.h>
-
-#include "kudu/gutil/basictypes.h"
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/map-util.h"
-#include "kudu/gutil/stl_util.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/status.h"
-#include "kudu/util/thread.h"
-
-using std::string;
-using std::unordered_map;
-using strings::Substitute;
-
-namespace kudu {
-
-const int64_t RandomizedFailureMonitor::kMinWakeUpTimeMillis = 10;
-
-TimedFailureDetector::TimedFailureDetector(MonoDelta failure_period)
-    : failure_period_(failure_period) {}
-
-TimedFailureDetector::~TimedFailureDetector() {
-  STLDeleteValues(&nodes_);
-}
-
-Status TimedFailureDetector::Track(const string& name,
-                                   const MonoTime& now,
-                                   const FailureDetectedCallback& callback) {
-  std::lock_guard<simple_spinlock> lock(lock_);
-  gscoped_ptr<Node> node(new Node);
-  node->permanent_name = name;
-  node->callback = callback;
-  node->last_heard_of = now;
-  node->status = ALIVE;
-  if (!InsertIfNotPresent(&nodes_, name, node.get())) {
-    return Status::AlreadyPresent(
-        Substitute("Node with name '$0' is already being monitored", name));
-  }
-  ignore_result(node.release());
-  return Status::OK();
-}
-
-Status TimedFailureDetector::UnTrack(const string& name) {
-  std::lock_guard<simple_spinlock> lock(lock_);
-  Node* node = EraseKeyReturnValuePtr(&nodes_, name);
-  if (PREDICT_FALSE(node == NULL)) {
-    return Status::NotFound(Substitute("Node with name '$0' not found", name));
-  }
-  delete node;
-  return Status::OK();
-}
-
-bool TimedFailureDetector::IsTracking(const std::string& name) {
-  std::lock_guard<simple_spinlock> lock(lock_);
-  return ContainsKey(nodes_, name);
-}
-
-Status TimedFailureDetector::MessageFrom(const std::string& name, const MonoTime&
now) {
-  VLOG(3) << "Received message from " << name << " at " << now.ToString();
-  std::lock_guard<simple_spinlock> lock(lock_);
-  Node* node = FindPtrOrNull(nodes_, name);
-  if (node == NULL) {
-    VLOG(1) << "Not tracking node: " << name;
-    return Status::NotFound(Substitute("Message from unknown node '$0'", name));
-  }
-  node->last_heard_of = now;
-  node->status = ALIVE;
-  return Status::OK();
-}
-
-FailureDetector::NodeStatus TimedFailureDetector::GetNodeStatusUnlocked(const std::string&
name,
-                                                                        const MonoTime&
now) {
-  Node* node = FindOrDie(nodes_, name);
-  if ((now - node->last_heard_of) > failure_period_) {
-    node->status = DEAD;
-  }
-  return node->status;
-}
-
-void TimedFailureDetector::CheckForFailures(const MonoTime& now) {
-  typedef unordered_map<string, FailureDetectedCallback> CallbackMap;
-  CallbackMap callbacks;
-  {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    for (const NodeMap::value_type& entry : nodes_) {
-      if (GetNodeStatusUnlocked(entry.first, now) == DEAD) {
-        InsertOrDie(&callbacks, entry.first, entry.second->callback);
-      }
-    }
-  }
-  // Invoke failure callbacks outside of lock.
-  for (const CallbackMap::value_type& entry : callbacks) {
-    const string& node_name = entry.first;
-    const FailureDetectedCallback& callback = entry.second;
-    callback.Run(node_name, Status::RemoteError(Substitute("Node '$0' failed", node_name)));
-  }
-}
-
-RandomizedFailureMonitor::RandomizedFailureMonitor(uint32_t random_seed,
-                                                   int64_t period_mean_millis,
-                                                   int64_t period_stddev_millis)
-    : period_mean_millis_(period_mean_millis),
-      period_stddev_millis_(period_stddev_millis),
-      random_(random_seed),
-      run_latch_(0),
-      shutdown_(false) {
-}
-
-RandomizedFailureMonitor::~RandomizedFailureMonitor() {
-  Shutdown();
-}
-
-Status RandomizedFailureMonitor::Start() {
-  CHECK(!thread_);
-  run_latch_.Reset(1);
-  return Thread::Create("failure-monitors", "failure-monitor",
-                        &RandomizedFailureMonitor::RunThread,
-                        this, &thread_);
-}
-
-void RandomizedFailureMonitor::Shutdown() {
-  if (!thread_) {
-    return;
-  }
-
-  {
-    std::lock_guard<simple_spinlock> l(lock_);
-    if (shutdown_) {
-      return;
-    }
-    shutdown_ = true;
-  }
-
-  run_latch_.CountDown();
-  CHECK_OK(ThreadJoiner(thread_.get()).Join());
-  thread_.reset();
-}
-
-Status RandomizedFailureMonitor::MonitorFailureDetector(const string& name,
-                                                        const scoped_refptr<FailureDetector>&
fd) {
-  std::lock_guard<simple_spinlock> l(lock_);
-  bool inserted = InsertIfNotPresent(&fds_, name, fd);
-  if (PREDICT_FALSE(!inserted)) {
-    return Status::AlreadyPresent(Substitute("Already monitoring failure detector '$0'",
name));
-  }
-  return Status::OK();
-}
-
-Status RandomizedFailureMonitor::UnmonitorFailureDetector(const string& name) {
-  std::lock_guard<simple_spinlock> l(lock_);
-  int count = fds_.erase(name);
-  if (PREDICT_FALSE(count == 0)) {
-    return Status::NotFound(Substitute("Failure detector '$0' not found", name));
-  }
-  return Status::OK();
-}
-
-void RandomizedFailureMonitor::RunThread() {
-  VLOG(1) << "Failure monitor thread starting";
-
-  while (true) {
-    int64_t wait_millis = random_.Normal(period_mean_millis_, period_stddev_millis_);
-    if (wait_millis < kMinWakeUpTimeMillis) {
-      wait_millis = kMinWakeUpTimeMillis;
-    }
-
-    MonoDelta wait_delta = MonoDelta::FromMilliseconds(wait_millis);
-    VLOG(3) << "RandomizedFailureMonitor sleeping for: " << wait_delta.ToString();
-    if (run_latch_.WaitFor(wait_delta)) {
-      // CountDownLatch reached 0.
-      std::lock_guard<simple_spinlock> lock(lock_);
-      // Check if we were told to shutdown.
-      if (shutdown_) {
-        // Latch fired: exit loop.
-        VLOG(1) << "RandomizedFailureMonitor thread shutting down";
-        return;
-      }
-    }
-
-    // Take a copy of the FD map under the lock.
-    FDMap fds_copy;
-    {
-      std::lock_guard<simple_spinlock> l(lock_);
-      fds_copy = fds_;
-    }
-
-    MonoTime now = MonoTime::Now();
-    for (const FDMap::value_type& entry : fds_copy) {
-      entry.second->CheckForFailures(now);
-    }
-  }
-}
-
-}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e8693965/src/kudu/util/failure_detector.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/failure_detector.h b/src/kudu/util/failure_detector.h
deleted file mode 100644
index 1e986b4..0000000
--- a/src/kudu/util/failure_detector.h
+++ /dev/null
@@ -1,177 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef KUDU_UTIL_FAILURE_DETECTOR_H_
-#define KUDU_UTIL_FAILURE_DETECTOR_H_
-
-#include <cstdint>
-#include <string>
-#include <unordered_map>
-
-#include "kudu/gutil/callback.h"
-#include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/util/countdown_latch.h"
-#include "kudu/util/locks.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/random.h"
-
-namespace kudu {
-class Status;
-class Thread;
-
-// A generic interface for failure detector implementations.
-// A failure detector is responsible for deciding whether a certain server is dead or alive.
-class FailureDetector : public RefCountedThreadSafe<FailureDetector> {
- public:
-  enum NodeStatus {
-    DEAD,
-    ALIVE
-  };
-  typedef std::unordered_map<std::string, NodeStatus> StatusMap;
-
-  typedef Callback<void(const std::string& name,
-                        const Status& status)> FailureDetectedCallback;
-
-  virtual ~FailureDetector() {}
-
-  // Registers a node with 'name' in the failure detector.
-  //
-  // If it returns Status::OK() the failure detector will from now
-  // expect messages from the machine with 'name' and will trigger
-  // 'callback' if a failure is detected.
-  //
-  // Returns Status::AlreadyPresent() if a machine with 'name' is
-  // already registered in this failure detector.
-  virtual Status Track(const std::string& name,
-                       const MonoTime& now,
-                       const FailureDetectedCallback& callback) = 0;
-
-  // Stops tracking node with 'name'.
-  virtual Status UnTrack(const std::string& name) = 0;
-
-  // Return true iff the named entity is currently being tracked.
-  virtual bool IsTracking(const std::string& name) = 0;
-
-  // Records that a message from machine with 'name' was received at 'now'.
-  virtual Status MessageFrom(const std::string& name, const MonoTime& now) = 0;
-
-  // Checks the failure status of each tracked node. If the failure criteria is
-  // met, the failure callback is invoked.
-  virtual void CheckForFailures(const MonoTime& now) = 0;
-};
-
-// A simple failure detector implementation that considers a node dead
-// when they have not reported by a certain time interval.
-class TimedFailureDetector : public FailureDetector {
- public:
-  // Some monitorable entity.
-  struct Node {
-    std::string permanent_name;
-    MonoTime last_heard_of;
-    FailureDetectedCallback callback;
-    NodeStatus status;
-  };
-
-  explicit TimedFailureDetector(MonoDelta failure_period);
-  virtual ~TimedFailureDetector();
-
-  virtual Status Track(const std::string& name,
-                       const MonoTime& now,
-                       const FailureDetectedCallback& callback) OVERRIDE;
-
-  virtual Status UnTrack(const std::string& name) OVERRIDE;
-
-  virtual bool IsTracking(const std::string& name) OVERRIDE;
-
-  virtual Status MessageFrom(const std::string& name, const MonoTime& now) OVERRIDE;
-
-  virtual void CheckForFailures(const MonoTime& now) OVERRIDE;
-
- private:
-  typedef std::unordered_map<std::string, Node*> NodeMap;
-
-  // Check if the named failure detector has failed.
-  // Does not invoke the callback.
-  FailureDetector::NodeStatus GetNodeStatusUnlocked(const std::string& name,
-                                                    const MonoTime& now);
-
-  const MonoDelta failure_period_;
-  mutable simple_spinlock lock_;
-  NodeMap nodes_;
-
-  DISALLOW_COPY_AND_ASSIGN(TimedFailureDetector);
-};
-
-// A randomized failure monitor that wakes up in normally-distributed intervals
-// and runs CheckForFailures() on each failure detector it monitors.
-//
-// The wake up interval is defined by a normal distribution with the specified
-// mean and standard deviation, in milliseconds, with minimum possible value
-// pinned at kMinWakeUpTimeMillis.
-//
-// We use a random wake up interval to avoid thundering herd / lockstep problems
-// when multiple nodes react to the failure of another node.
-class RandomizedFailureMonitor {
- public:
-  // The minimum time the FailureMonitor will wait.
-  static const int64_t kMinWakeUpTimeMillis;
-
-  RandomizedFailureMonitor(uint32_t random_seed,
-                           int64_t period_mean_millis,
-                           int64_t period_std_dev_millis);
-  ~RandomizedFailureMonitor();
-
-  // Starts the failure monitor.
-  Status Start();
-
-  // Stops the failure monitor.
-  void Shutdown();
-
-  // Adds a failure detector to be monitored.
-  Status MonitorFailureDetector(const std::string& name,
-                                const scoped_refptr<FailureDetector>& fd);
-
-  // Unmonitors the failure detector with the specified name.
-  Status UnmonitorFailureDetector(const std::string& name);
-
- private:
-  typedef std::unordered_map<std::string, scoped_refptr<FailureDetector> > FDMap;
-
-  // Runs the monitor thread.
-  void RunThread();
-
-  // Mean & std. deviation of random period to sleep for between checking the
-  // failure detectors.
-  const int64_t period_mean_millis_;
-  const int64_t period_stddev_millis_;
-  ThreadSafeRandom random_;
-
-  scoped_refptr<Thread> thread_;
-  CountDownLatch run_latch_;
-
-  mutable simple_spinlock lock_;
-  FDMap fds_;
-  bool shutdown_; // Whether the failure monitor should shut down.
-
-  DISALLOW_COPY_AND_ASSIGN(RandomizedFailureMonitor);
-};
-
-}  // namespace kudu
-
-#endif /* KUDU_UTIL_FAILURE_DETECTOR_H_ */

http://git-wip-us.apache.org/repos/asf/kudu/blob/e8693965/src/kudu/util/resettable_heartbeater-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/resettable_heartbeater-test.cc b/src/kudu/util/resettable_heartbeater-test.cc
deleted file mode 100644
index e3989b7..0000000
--- a/src/kudu/util/resettable_heartbeater-test.cc
+++ /dev/null
@@ -1,108 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/util/resettable_heartbeater.h"
-
-#include <cstdint>
-#include <ostream>
-#include <string>
-
-#include <boost/bind.hpp> // IWYU pragma: keep
-#include <glog/logging.h>
-#include <gtest/gtest.h>
-
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/util/countdown_latch.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/status.h"
-#include "kudu/util/test_macros.h"
-#include "kudu/util/test_util.h"
-
-namespace kudu {
-
-// Number of heartbeats we want to observe before allowing the test to end.
-static const int kNumHeartbeats = 2;
-
-class ResettableHeartbeaterTest : public KuduTest {
- public:
-  ResettableHeartbeaterTest()
-    : KuduTest(),
-      latch_(kNumHeartbeats) {
-  }
-
- protected:
-  void CreateHeartbeater(uint64_t period_ms, const std::string& name) {
-    period_ms_ = period_ms;
-    heartbeater_.reset(
-        new ResettableHeartbeater(name,
-                                  MonoDelta::FromMilliseconds(period_ms),
-                                  boost::bind(&ResettableHeartbeaterTest::HeartbeatFunction,
-                                              this)));
-  }
-
-  Status HeartbeatFunction() {
-    latch_.CountDown();
-    return Status::OK();
-  }
-
-  void WaitForCountDown() {
-    // Wait a large multiple (in the worst case) of the required time before we
-    // time out and fail the test. Large to avoid test flakiness.
-    const uint64_t kMaxWaitMillis = period_ms_ * kNumHeartbeats * 20;
-    CHECK(latch_.WaitFor(MonoDelta::FromMilliseconds(kMaxWaitMillis)))
-        << "Failed to count down " << kNumHeartbeats << " times in " <<
kMaxWaitMillis
-        << " ms: latch count == " << latch_.count();
-  }
-
-  CountDownLatch latch_;
-  uint64_t period_ms_;
-  gscoped_ptr<ResettableHeartbeater> heartbeater_;
-};
-
-// Tests that if Reset() is not called the heartbeat method is called
-// the expected number of times.
-TEST_F(ResettableHeartbeaterTest, TestRegularHeartbeats) {
-  const int64_t kHeartbeatPeriodMillis = 100; // Heartbeat every 100ms.
-  CreateHeartbeater(kHeartbeatPeriodMillis, CURRENT_TEST_NAME());
-  ASSERT_OK(heartbeater_->Start());
-  WaitForCountDown();
-  ASSERT_OK(heartbeater_->Stop());
-}
-
-// Tests that if we Reset() the heartbeater in a period smaller than
-// the heartbeat period the heartbeat method never gets called.
-// After we stop resetting heartbeats should resume as normal
-TEST_F(ResettableHeartbeaterTest, TestResetHeartbeats) {
-  const int64_t kHeartbeatPeriodMillis = 800;   // Heartbeat every 800ms.
-  const int64_t kNumResetSlicesPerPeriod = 40;  // Reset 40 times per heartbeat period.
-  // Reset once every 800ms / 40 = 20ms.
-  const int64_t kResetPeriodMillis = kHeartbeatPeriodMillis / kNumResetSlicesPerPeriod;
-
-  CreateHeartbeater(kHeartbeatPeriodMillis, CURRENT_TEST_NAME());
-  ASSERT_OK(heartbeater_->Start());
-  // Call Reset() in a loop for 2 heartbeat periods' worth of time, with sleeps
-  // in-between as defined above.
-  for (int i = 0; i < kNumResetSlicesPerPeriod * 2; i++) {
-    heartbeater_->Reset();
-    ASSERT_EQ(kNumHeartbeats, latch_.count()); // Ensure we haven't counted down, yet.
-    SleepFor(MonoDelta::FromMilliseconds(kResetPeriodMillis));
-  }
-  WaitForCountDown();
-  ASSERT_OK(heartbeater_->Stop());
-}
-
-}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e8693965/src/kudu/util/resettable_heartbeater.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/resettable_heartbeater.cc b/src/kudu/util/resettable_heartbeater.cc
deleted file mode 100644
index 5ec87fc..0000000
--- a/src/kudu/util/resettable_heartbeater.cc
+++ /dev/null
@@ -1,185 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/util/resettable_heartbeater.h"
-
-#include <algorithm>
-#include <cstdint>
-#include <cstdlib>
-#include <mutex>
-#include <ostream>
-
-#include <glog/logging.h>
-
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/countdown_latch.h"
-#include "kudu/util/locks.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/random.h"
-#include "kudu/util/status.h"
-#include "kudu/util/thread.h"
-
-namespace kudu {
-using std::string;
-
-class ResettableHeartbeaterThread {
- public:
-  ResettableHeartbeaterThread(std::string name, MonoDelta period,
-                              HeartbeatFunction function);
-
-  Status Start();
-  Status Stop();
-  void Reset();
-
- private:
-  void RunThread();
-  bool IsCurrentThread() const;
-
-  const string name_;
-
-  // The heartbeat period.
-  const MonoDelta period_;
-
-  // The function to call to perform the heartbeat
-  const HeartbeatFunction function_;
-
-  // The actual running thread (NULL before it is started)
-  scoped_refptr<kudu::Thread> thread_;
-
-  CountDownLatch run_latch_;
-
-  // Whether the heartbeater should shutdown.
-  bool shutdown_;
-
-  // lock that protects access to 'shutdown_' and to 'run_latch_'
-  // Reset() method.
-  mutable simple_spinlock lock_;
-  DISALLOW_COPY_AND_ASSIGN(ResettableHeartbeaterThread);
-};
-
-ResettableHeartbeater::ResettableHeartbeater(const std::string& name,
-                                             MonoDelta period,
-                                             HeartbeatFunction function)
-    : thread_(new ResettableHeartbeaterThread(name, period, function)) {
-}
-
-Status ResettableHeartbeater::Start() {
-  return thread_->Start();
-}
-
-Status ResettableHeartbeater::Stop() {
-  return thread_->Stop();
-}
-void ResettableHeartbeater::Reset() {
-  thread_->Reset();
-}
-
-ResettableHeartbeater::~ResettableHeartbeater() {
-  WARN_NOT_OK(Stop(), "Unable to stop heartbeater thread");
-}
-
-ResettableHeartbeaterThread::ResettableHeartbeaterThread(
-    std::string name, MonoDelta period, HeartbeatFunction function)
-    : name_(std::move(name)),
-      period_(period),
-      function_(std::move(function)),
-      run_latch_(0),
-      shutdown_(false) {}
-
-void ResettableHeartbeaterThread::RunThread() {
-  CHECK(IsCurrentThread());
-  VLOG(1) << "Heartbeater: " << name_ << " thread starting";
-
-  bool prev_reset_was_manual = false;
-  Random rng(random());
-  while (true) {
-    MonoDelta wait_period = period_;
-    if (prev_reset_was_manual) {
-      // When the caller does a manual reset, we randomize the subsequent wait
-      // timeout between period_/2 and period_. This builds in some jitter so
-      // multiple tablets on the same TS don't end up heartbeating in lockstep.
-      int64_t half_period_ms = period_.ToMilliseconds() / 2;
-      wait_period = MonoDelta::FromMilliseconds(
-          half_period_ms +
-          rng.NextDoubleFraction() * half_period_ms);
-      prev_reset_was_manual = false;
-    }
-    if (run_latch_.WaitFor(wait_period)) {
-      // CountDownLatch reached 0 -- this means there was a manual reset.
-      prev_reset_was_manual = true;
-      std::lock_guard<simple_spinlock> lock(lock_);
-      // check if we were told to shutdown
-      if (shutdown_) {
-        // Latch fired -- exit loop
-        VLOG(1) << "Heartbeater: " << name_ << " thread finished";
-        return;
-      } else {
-        // otherwise it's just a reset, reset the latch
-        // and continue;
-        run_latch_.Reset(1);
-        continue;
-      }
-    }
-
-    Status s = function_();
-    if (!s.ok()) {
-      LOG(WARNING)<< "Failed to heartbeat in heartbeater: " << name_
-      << " Status: " << s.ToString();
-      continue;
-    }
-  }
-}
-
-bool ResettableHeartbeaterThread::IsCurrentThread() const {
-  return thread_.get() == kudu::Thread::current_thread();
-}
-
-Status ResettableHeartbeaterThread::Start() {
-  CHECK(thread_ == nullptr);
-  run_latch_.Reset(1);
-  return kudu::Thread::Create("heartbeater", strings::Substitute("$0-heartbeat", name_),
-                              &ResettableHeartbeaterThread::RunThread,
-                              this, &thread_);
-}
-
-void ResettableHeartbeaterThread::Reset() {
-  if (!thread_) {
-    return;
-  }
-  run_latch_.CountDown();
-}
-
-Status ResettableHeartbeaterThread::Stop() {
-  if (!thread_) {
-    return Status::OK();
-  }
-
-  {
-    std::lock_guard<simple_spinlock> l(lock_);
-    if (shutdown_) {
-      return Status::OK();
-    }
-    shutdown_ = true;
-  }
-
-  run_latch_.CountDown();
-  RETURN_NOT_OK(ThreadJoiner(thread_.get()).Join());
-  return Status::OK();
-}
-
-}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e8693965/src/kudu/util/resettable_heartbeater.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/resettable_heartbeater.h b/src/kudu/util/resettable_heartbeater.h
deleted file mode 100644
index 5699f91..0000000
--- a/src/kudu/util/resettable_heartbeater.h
+++ /dev/null
@@ -1,80 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef KUDU_UTIL_RESETTABLE_HEARTBEATER_H_
-#define KUDU_UTIL_RESETTABLE_HEARTBEATER_H_
-
-#include <string>
-
-#include <boost/function.hpp>
-
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/macros.h"
-
-namespace kudu {
-class MonoDelta;
-class Status;
-class ResettableHeartbeaterThread;
-
-typedef boost::function<Status()> HeartbeatFunction;
-
-// A resettable hearbeater that takes a function and calls
-// it to perform a regular heartbeat, unless Reset() is called
-// in which case the heartbeater resets the heartbeat period.
-// The point is to send "I'm Alive" heartbeats only if no regular
-// messages are sent in the same period.
-//
-// TODO Eventually this should be used instead of the master heartbeater
-// as it shares a lot of logic with the exception of the specific master
-// stuff (and the fact that it is resettable).
-//
-// TODO We'll have a lot of these per server, so eventually we need
-// to refactor this so that multiple heartbeaters share something like
-// java's ScheduledExecutor.
-//
-// TODO Do something about failed hearbeats, right now this is just
-// logging. Probably could take more arguments and do more of an
-// exponential backoff.
-//
-// This class is thread safe.
-class ResettableHeartbeater {
- public:
-  ResettableHeartbeater(const std::string& name,
-                        MonoDelta period,
-                        HeartbeatFunction function);
-
-  // Starts the heartbeater
-  Status Start();
-
-  // Stops the hearbeater
-  Status Stop();
-
-  // Resets the heartbeat period.
-  // When this is called, the subsequent heartbeat has some built-in jitter and
-  // may trigger before a full period (as specified to the constructor).
-  void Reset();
-
-  ~ResettableHeartbeater();
- private:
-  gscoped_ptr<ResettableHeartbeaterThread> thread_;
-
-  DISALLOW_COPY_AND_ASSIGN(ResettableHeartbeater);
-};
-
-}  // namespace kudu
-
-#endif /* KUDU_UTIL_RESETTABLE_HEARTBEATER_H_ */


Mime
View raw message