kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] 02/04: [clock] move clock sync wait into HybridClock::Init()
Date Thu, 26 Sep 2019 17:12:00 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit e110cb88b145edb6d536ae0806e02736f5bf51e5
Author: Alexey Serbin <alexey@apache.org>
AuthorDate: Tue Sep 24 22:11:31 2019 -0700

    [clock] move clock sync wait into HybridClock::Init()
    
    This patch moves the logic to await on clock synchronisation from
    SystemNtp::Init() into HybridClock::Init().  This is to consolidate
    and reuse the logic for other clock sources, such as built-in NTP client
    coming with one of follow-up patches.
    
    Change-Id: I4bc5d3160a68932a31cde5ff221f74452bb14a21
    Reviewed-on: http://gerrit.cloudera.org:8080/14301
    Tested-by: Alexey Serbin <aserbin@cloudera.com>
    Reviewed-by: Adar Dembo <adar@cloudera.com>
---
 src/kudu/clock/hybrid_clock.cc | 54 +++++++++++++++++++++++++++++++++++-------
 src/kudu/clock/system_ntp.cc   | 54 +-----------------------------------------
 src/kudu/clock/time_service.h  |  8 +++++--
 3 files changed, 52 insertions(+), 64 deletions(-)

diff --git a/src/kudu/clock/hybrid_clock.cc b/src/kudu/clock/hybrid_clock.cc
index bec8cba..6db1990 100644
--- a/src/kudu/clock/hybrid_clock.cc
+++ b/src/kudu/clock/hybrid_clock.cc
@@ -63,14 +63,22 @@ DEFINE_string(time_source, "system",
               "'system' or 'mock' (for tests only)");
 TAG_FLAG(time_source, experimental);
 DEFINE_validator(time_source, [](const char* /* flag_name */, const string& value) {
-    if (boost::iequals(value, "system") ||
-        boost::iequals(value, "mock")) {
-      return true;
-    }
-    LOG(ERROR) << "unknown value for 'time_source': '" << value << "'"
-               << " (expected one of 'system' or 'mock')";
-    return false;
-  });
+  if (boost::iequals(value, "system") ||
+      boost::iequals(value, "mock")) {
+    return true;
+  }
+  LOG(ERROR) << "unknown value for 'time_source': '" << value << "'"
+             << " (expected one of 'system' or 'mock')";
+  return false;
+});
+
+DEFINE_int32(ntp_initial_sync_wait_secs, 60,
+             "Amount of time in seconds to wait for clock synchronisation at "
+             "startup. A value of zero means Kudu will fail to start "
+             "if the clock is unsynchronized. This flag can prevent Kudu from "
+             "crashing if it starts before NTP can synchronize the clock.");
+TAG_FLAG(ntp_initial_sync_wait_secs, evolving);
+TAG_FLAG(ntp_initial_sync_wait_secs, advanced);
 
 METRIC_DEFINE_gauge_uint64(server, hybrid_clock_timestamp,
                            "Hybrid Clock Timestamp",
@@ -126,7 +134,35 @@ Status HybridClock::Init() {
   } else {
     return Status::InvalidArgument("invalid NTP source", FLAGS_time_source);
   }
-  RETURN_NOT_OK(time_service_->Init());
+
+  const auto wait_s = FLAGS_ntp_initial_sync_wait_secs;
+  const auto deadline = MonoTime::Now() + MonoDelta::FromSeconds(wait_s);
+  bool need_log = true;
+  Status s;
+  do {
+    s = time_service_->Init();
+    if (!s.IsServiceUnavailable()) {
+      break;
+    }
+    if (need_log) {
+      // Log about what's going on, just once.
+      if (wait_s > 0) {
+        LOG(INFO) << Substitute("waiting up to --ntp_initial_sync_wait_secs=$0 "
+                                "seconds for the clock to synchronize", wait_s);
+      } else {
+        LOG(INFO) << Substitute("not waiting for clock synchronization: "
+                                "--ntp_initial_sync_wait_secs=$0 is nonpositive",
+                                wait_s);
+      }
+      need_log = false;
+    }
+    SleepFor(MonoDelta::FromSeconds(1));
+  } while (MonoTime::Now() < deadline);
+
+  if (!s.ok()) {
+    time_service_->DumpDiagnostics(/* log= */nullptr);
+    return s.CloneAndPrepend("timed out waiting for clock synchronisation");
+  }
 
   state_ = kInitialized;
 
diff --git a/src/kudu/clock/system_ntp.cc b/src/kudu/clock/system_ntp.cc
index 9993838..9163175 100644
--- a/src/kudu/clock/system_ntp.cc
+++ b/src/kudu/clock/system_ntp.cc
@@ -25,7 +25,6 @@
 #include <string>
 #include <vector>
 
-#include <gflags/gflags.h>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 
@@ -33,23 +32,13 @@
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/errno.h"
-#include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
-#include "kudu/util/monotime.h"
 #include "kudu/util/path_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/subprocess.h"
 
 DECLARE_bool(inject_unsync_time_errors);
 
-DEFINE_int32(ntp_initial_sync_wait_secs, 60,
-             "Amount of time in seconds to wait for NTP to synchronize the "
-             "clock at startup. A value of zero means Kudu will fail to start "
-             "if the clock is unsynchronized. This flag can prevent Kudu from "
-             "crashing if it starts before NTP can synchronize the clock.");
-TAG_FLAG(ntp_initial_sync_wait_secs, evolving);
-TAG_FLAG(ntp_initial_sync_wait_secs, advanced);
-
 using std::string;
 using std::vector;
 using strings::Substitute;
@@ -115,36 +104,6 @@ void TryRun(vector<string> cmd, vector<string>* log) {
 
 }
 
-Status WaitForNtp() {
-  int32_t wait_secs = FLAGS_ntp_initial_sync_wait_secs;
-  if (wait_secs <= 0) {
-    LOG(INFO) << Substitute("Not waiting for clock synchronization: "
-                            "--ntp_initial_sync_wait_secs=$0 is nonpositive",
-                            wait_secs);
-    return Status::OK();
-  }
-  LOG(INFO) << Substitute("Waiting up to --ntp_initial_sync_wait_secs=$0 "
-                          "seconds for the clock to synchronize", wait_secs);
-
-  // We previously relied on ntpd/chrony support tools to wait, but that
-  // approach doesn't work in environments where ntpd is unreachable but the
-  // clock is still synchronized (i.e. running inside a Linux container).
-  //
-  // Now we just interrogate the kernel directly.
-  Status s;
-  for (int i = 0; i < wait_secs; i++) {
-    timex timex;
-    s = CallAdjTime(&timex);
-    if (s.ok() || !s.IsServiceUnavailable()) {
-      return s;
-    }
-    SleepFor(MonoDelta::FromSeconds(1));
-  }
-
-  // Return the last failure.
-  return s.CloneAndPrepend("Timed out waiting for clock sync");
-}
-
 } // anonymous namespace
 
 void SystemNtp::DumpDiagnostics(vector<string>* log) const {
@@ -172,19 +131,9 @@ void SystemNtp::DumpDiagnostics(vector<string>* log) const {
   TryRun({"chronyc", "-n", "sources"}, log);
 }
 
-
 Status SystemNtp::Init() {
   timex timex;
-  Status s = CallAdjTime(&timex);
-  if (s.IsServiceUnavailable()) {
-    s = WaitForNtp().AndThen([&timex]() {
-          return CallAdjTime(&timex);
-        });
-  }
-  if (!s.ok()) {
-    DumpDiagnostics(/* log= */nullptr);
-    return s;
-  }
+  RETURN_NOT_OK(CallAdjTime(&timex));
 
   // Calculate the sleep skew adjustment according to the max tolerance of the clock.
   // Tolerance comes in parts per million but needs to be applied a scaling factor.
@@ -197,7 +146,6 @@ Status SystemNtp::Init() {
   return Status::OK();
 }
 
-
 Status SystemNtp::WalltimeWithError(uint64_t *now_usec,
                                     uint64_t *error_usec) {
   // Read the time. This will return an error if the clock is not synchronized.
diff --git a/src/kudu/clock/time_service.h b/src/kudu/clock/time_service.h
index aa64255..cc28714 100644
--- a/src/kudu/clock/time_service.h
+++ b/src/kudu/clock/time_service.h
@@ -32,8 +32,12 @@ class TimeService {
   TimeService() = default;
   virtual ~TimeService() = default;
 
-  // Initialize the NTP source, validating that it is available and
-  // properly synchronized.
+  // Initialize the NTP source, validating that it is available and properly
+  // synchronized: Status::OK() is returned in such case. If the source
+  // is not yet synchronized, then Status::ServiceUnavailable() is returned:
+  // a caller may call this method again to eventually get Status::OK().
+  // In case of other non-OK() return statuses, the caller should not invoke
+  // this method again.
   virtual Status Init() = 0;
 
   // Return the current wall time in microseconds since the Unix epoch in '*now_usec'.


Mime
View raw message