kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] branch master updated: [maintenance] Support priorities for tables in MM compaction
Date Tue, 21 May 2019 20:23:39 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new c584d0d  [maintenance] Support priorities for tables in MM compaction
c584d0d is described below

commit c584d0def49ac545121247794d80c8e0fefd8f6c
Author: Yingchun Lai <405403881@qq.com>
AuthorDate: Mon Mar 25 23:35:01 2019 -0400

    [maintenance] Support priorities for tables in MM compaction
    
    This commit adds a feature to specify different priorities for table compaction.
    
    In a Kudu cluster with thousands of tables, it's hard for a specified tablet's
    maintenance OPs to be launched when their scores are not the highest, even if
    the table the tablet belongs to is high priority for Kudu users. This patch
    allows administators to specify different priorities for tables by gflags, these
    maintenance OPs of these high priority tables have greater chance to be launched.
    
    Change-Id: I3ea3b73505157678a8fb551656123b64e6bfb304
    Reviewed-on: http://gerrit.cloudera.org:8080/12852
    Tested-by: Adar Dembo <adar@cloudera.com>
    Reviewed-by: Adar Dembo <adar@cloudera.com>
---
 src/kudu/tablet/tablet.h                  |   1 +
 src/kudu/tablet/tablet_metadata.h         |   2 +-
 src/kudu/tablet/tablet_mm_ops-test.cc     |   3 +-
 src/kudu/tablet/tablet_mm_ops.cc          |  15 ++-
 src/kudu/tablet/tablet_mm_ops.h           |  33 ++---
 src/kudu/tablet/tablet_replica.cc         |   2 +-
 src/kudu/tablet/tablet_replica.h          |   7 +-
 src/kudu/tablet/tablet_replica_mm_ops.cc  |  23 +++-
 src/kudu/tablet/tablet_replica_mm_ops.h   |  67 +++++-----
 src/kudu/util/maintenance_manager-test.cc | 177 +++++++++++++++++++++++----
 src/kudu/util/maintenance_manager.cc      | 197 +++++++++++++++++++++---------
 src/kudu/util/maintenance_manager.h       |  23 +++-
 src/kudu/util/maintenance_manager.proto   |  11 +-
 13 files changed, 408 insertions(+), 153 deletions(-)

diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 05c7c9b..c4a5691 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -417,6 +417,7 @@ class Tablet {
   // This method is thread-safe.
   void CancelMaintenanceOps();
 
+  const std::string& table_id() const { return metadata_->table_id(); }
   const std::string& tablet_id() const { return metadata_->tablet_id(); }
 
   // Return the metrics for this tablet.
diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h
index 62e7545..adf0d4c 100644
--- a/src/kudu/tablet/tablet_metadata.h
+++ b/src/kudu/tablet/tablet_metadata.h
@@ -119,7 +119,7 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata>
{
     return partition_;
   }
 
-  std::string table_id() const {
+  const std::string& table_id() const {
     DCHECK_NE(state_, kNotLoadedYet);
     return table_id_;
   }
diff --git a/src/kudu/tablet/tablet_mm_ops-test.cc b/src/kudu/tablet/tablet_mm_ops-test.cc
index a2f5572..3a13386 100644
--- a/src/kudu/tablet/tablet_mm_ops-test.cc
+++ b/src/kudu/tablet/tablet_mm_ops-test.cc
@@ -62,7 +62,7 @@ class KuduTabletMmOpsTest : public TabletTestBase<IntKeyTestSetup<INT64>>
{
   void StatsShouldChange(MaintenanceOp* op) {
     SleepFor(MonoDelta::FromMilliseconds(1));
     op->UpdateStats(&stats_);
-    ASSERT_TRUE(next_time_ < stats_.last_modified());
+    ASSERT_LT(next_time_, stats_.last_modified());
     next_time_ = stats_.last_modified();
   }
 
@@ -70,7 +70,6 @@ class KuduTabletMmOpsTest : public TabletTestBase<IntKeyTestSetup<INT64>>
{
     SleepFor(MonoDelta::FromMilliseconds(1));
     op->UpdateStats(&stats_);
     ASSERT_EQ(next_time_, stats_.last_modified());
-    next_time_ = stats_.last_modified();
   }
 
   void TestFirstCall(MaintenanceOp* op) {
diff --git a/src/kudu/tablet/tablet_mm_ops.cc b/src/kudu/tablet/tablet_mm_ops.cc
index c71bc6e..5948061 100644
--- a/src/kudu/tablet/tablet_mm_ops.cc
+++ b/src/kudu/tablet/tablet_mm_ops.cc
@@ -24,6 +24,7 @@
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/rowset.h"
 #include "kudu/tablet/tablet.h"
@@ -87,6 +88,10 @@ string TabletOpBase::LogPrefix() const {
   return tablet_->LogPrefix();
 }
 
+const std::string& TabletOpBase::table_id() const {
+  return tablet_->table_id();
+}
+
 ////////////////////////////////////////////////////////////
 // CompactRowSetsOp
 ////////////////////////////////////////////////////////////
@@ -262,12 +267,12 @@ void MajorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats)
{
   // cached stats.
   TabletMetrics* metrics = tablet_->metrics();
   if (metrics) {
-    int64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
-    int64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
-    int64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
-    int64_t new_num_rs_minor_delta_compacted =
+    uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
+    uint64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
+    uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
+    uint64_t new_num_rs_minor_delta_compacted =
         metrics->delta_minor_compact_rs_duration->TotalCount();
-    int64_t new_num_rs_major_delta_compacted =
+    uint64_t new_num_rs_major_delta_compacted =
         metrics->delta_major_compact_rs_duration->TotalCount();
     if (prev_stats_.valid() &&
         new_num_mrs_flushed == last_num_mrs_flushed_ &&
diff --git a/src/kudu/tablet/tablet_mm_ops.h b/src/kudu/tablet/tablet_mm_ops.h
index 8fc865e..6180652 100644
--- a/src/kudu/tablet/tablet_mm_ops.h
+++ b/src/kudu/tablet/tablet_mm_ops.h
@@ -22,7 +22,6 @@
 #include <string>
 
 #include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/maintenance_manager.h"
@@ -43,6 +42,8 @@ class TabletOpBase : public MaintenanceOp {
   std::string LogPrefix() const;
 
  protected:
+  const std::string& table_id() const override;
+
   Tablet* const tablet_;
 };
 
@@ -57,15 +58,15 @@ class CompactRowSetsOp : public TabletOpBase {
  public:
   explicit CompactRowSetsOp(Tablet* tablet);
 
-  virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+  void UpdateStats(MaintenanceOpStats* stats) override;
 
-  virtual bool Prepare() OVERRIDE;
+  bool Prepare() override;
 
-  virtual void Perform() OVERRIDE;
+  void Perform() override;
 
-  virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
+  scoped_refptr<Histogram> DurationHistogram() const override;
 
-  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
+  scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override;
 
  private:
   mutable simple_spinlock lock_;
@@ -83,15 +84,15 @@ class MinorDeltaCompactionOp : public TabletOpBase {
  public:
   explicit MinorDeltaCompactionOp(Tablet* tablet);
 
-  virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+  void UpdateStats(MaintenanceOpStats* stats) override;
 
-  virtual bool Prepare() OVERRIDE;
+  bool Prepare() override;
 
-  virtual void Perform() OVERRIDE;
+  void Perform() override;
 
-  virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
+  scoped_refptr<Histogram> DurationHistogram() const override;
 
-  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
+  scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override;
 
  private:
   mutable simple_spinlock lock_;
@@ -109,15 +110,15 @@ class MajorDeltaCompactionOp : public TabletOpBase {
  public:
   explicit MajorDeltaCompactionOp(Tablet* tablet);
 
-  virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+  void UpdateStats(MaintenanceOpStats* stats) override;
 
-  virtual bool Prepare() OVERRIDE;
+  bool Prepare() override;
 
-  virtual void Perform() OVERRIDE;
+  void Perform() override;
 
-  virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
+  scoped_refptr<Histogram> DurationHistogram() const override;
 
-  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
+  scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override;
 
  private:
   mutable simple_spinlock lock_;
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index c7b231b..a284215 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -38,6 +38,7 @@
 #include "kudu/consensus/log_anchor_registry.h"
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/consensus/raft_consensus.h"
+#include "kudu/fs/data_dirs.h"
 #include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/bind.h"
 #include "kudu/gutil/bind_helpers.h"
@@ -124,7 +125,6 @@ TabletReplica::TabletReplica(
     Callback<void(const std::string& reason)> mark_dirty_clbk)
     : meta_(DCHECK_NOTNULL(std::move(meta))),
       cmeta_manager_(DCHECK_NOTNULL(std::move(cmeta_manager))),
-      tablet_id_(meta_->tablet_id()),
       local_peer_pb_(std::move(local_peer_pb)),
       log_anchor_registry_(new LogAnchorRegistry()),
       apply_pool_(apply_pool),
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index bf9d63d..50fab2d 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -254,10 +254,8 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
     return log_anchor_registry_;
   }
 
-  // Returns the tablet_id of the tablet managed by this TabletReplica.
-  // Returns the correct tablet_id even if the underlying tablet is not available
-  // yet.
-  const std::string& tablet_id() const { return tablet_id_; }
+  const std::string& table_id() const { return meta_->table_id(); }
+  const std::string& tablet_id() const { return meta_->tablet_id(); }
 
   // Convenience method to return the permanent_uuid of this peer.
   std::string permanent_uuid() const { return tablet_->metadata()->fs_manager()->uuid();
}
@@ -322,7 +320,6 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   const scoped_refptr<TabletMetadata> meta_;
   const scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager_;
 
-  const std::string tablet_id_;
   const consensus::RaftPeerPB local_peer_pb_;
   scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_; // Assigned in tablet_replica-test
 
diff --git a/src/kudu/tablet/tablet_replica_mm_ops.cc b/src/kudu/tablet/tablet_replica_mm_ops.cc
index 879425a..08b7ac6 100644
--- a/src/kudu/tablet/tablet_replica_mm_ops.cc
+++ b/src/kudu/tablet/tablet_replica_mm_ops.cc
@@ -21,11 +21,13 @@
 #include <mutex>
 #include <ostream>
 #include <string>
+#include <utility>
 
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/tablet_metrics.h"
 #include "kudu/util/flag_tags.h"
@@ -122,6 +124,20 @@ void FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(MaintenanceOpStats
 }
 
 //
+// TabletReplicaOpBase.
+//
+TabletReplicaOpBase::TabletReplicaOpBase(std::string name,
+                                         IOUsage io_usage,
+                                         TabletReplica* tablet_replica)
+    : MaintenanceOp(std::move(name), io_usage),
+      tablet_replica_(tablet_replica) {
+}
+
+const std::string& TabletReplicaOpBase::table_id() const {
+  return tablet_replica_->table_id();
+}
+
+//
 // FlushMRSOp.
 //
 
@@ -260,9 +276,10 @@ scoped_refptr<AtomicGauge<uint32_t> > FlushDeltaMemStoresOp::RunningGauge()
cons
 //
 
 LogGCOp::LogGCOp(TabletReplica* tablet_replica)
-    : MaintenanceOp(StringPrintf("LogGCOp(%s)", tablet_replica->tablet()->tablet_id().c_str()),
-                    MaintenanceOp::LOW_IO_USAGE),
-      tablet_replica_(tablet_replica),
+    : TabletReplicaOpBase(StringPrintf("LogGCOp(%s)",
+                                       tablet_replica->tablet()->tablet_id().c_str()),
+                          MaintenanceOp::LOW_IO_USAGE,
+                          tablet_replica),
       log_gc_duration_(METRIC_log_gc_duration.Instantiate(
                            tablet_replica->tablet()->GetMetricEntity())),
       log_gc_running_(METRIC_log_gc_running.Instantiate(
diff --git a/src/kudu/tablet/tablet_replica_mm_ops.h b/src/kudu/tablet/tablet_replica_mm_ops.h
index 2404b48..62b4cb9 100644
--- a/src/kudu/tablet/tablet_replica_mm_ops.h
+++ b/src/kudu/tablet/tablet_replica_mm_ops.h
@@ -21,7 +21,6 @@
 #include <cstdint>
 #include <string>
 
-#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/tablet/tablet.h"
@@ -47,86 +46,92 @@ class FlushOpPerfImprovementPolicy {
   FlushOpPerfImprovementPolicy() {}
 };
 
+class TabletReplicaOpBase : public MaintenanceOp {
+ public:
+  explicit TabletReplicaOpBase(std::string name, IOUsage io_usage, TabletReplica* tablet_replica);
+
+ protected:
+  const std::string& table_id() const override;
+
+  TabletReplica *const tablet_replica_;
+};
+
 // Maintenance op for MRS flush. Only one can happen at a time.
-class FlushMRSOp : public MaintenanceOp {
+class FlushMRSOp : public TabletReplicaOpBase {
  public:
   explicit FlushMRSOp(TabletReplica* tablet_replica)
-    : MaintenanceOp(StringPrintf("FlushMRSOp(%s)", tablet_replica->tablet()->tablet_id().c_str()),
-                    MaintenanceOp::HIGH_IO_USAGE),
-      tablet_replica_(tablet_replica) {
+    : TabletReplicaOpBase(StringPrintf("FlushMRSOp(%s)",
+                                       tablet_replica->tablet()->tablet_id().c_str()),
+                          MaintenanceOp::HIGH_IO_USAGE,
+                          tablet_replica) {
     time_since_flush_.start();
   }
 
-  virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+  void UpdateStats(MaintenanceOpStats* stats) override;
 
-  virtual bool Prepare() OVERRIDE;
+  bool Prepare() override;
 
-  virtual void Perform() OVERRIDE;
+  void Perform() override;
 
-  virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
+  scoped_refptr<Histogram> DurationHistogram() const override;
 
-  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
+  scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override;
 
  private:
   // Lock protecting time_since_flush_.
   mutable simple_spinlock lock_;
   Stopwatch time_since_flush_;
-
-  TabletReplica *const tablet_replica_;
 };
 
 // Maintenance op for DMS flush.
 // Reports stats for all the DMS this tablet contains but only flushes one in Perform().
-class FlushDeltaMemStoresOp : public MaintenanceOp {
+class FlushDeltaMemStoresOp : public TabletReplicaOpBase {
  public:
   explicit FlushDeltaMemStoresOp(TabletReplica* tablet_replica)
-    : MaintenanceOp(StringPrintf("FlushDeltaMemStoresOp(%s)",
-                                 tablet_replica->tablet()->tablet_id().c_str()),
-                    MaintenanceOp::HIGH_IO_USAGE),
-      tablet_replica_(tablet_replica) {
+    : TabletReplicaOpBase(StringPrintf("FlushDeltaMemStoresOp(%s)",
+                                       tablet_replica->tablet()->tablet_id().c_str()),
+                          MaintenanceOp::HIGH_IO_USAGE,
+                          tablet_replica) {
     time_since_flush_.start();
   }
 
-  virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+  void UpdateStats(MaintenanceOpStats* stats) override;
 
-  virtual bool Prepare() OVERRIDE {
+  bool Prepare() override {
     return true;
   }
 
-  virtual void Perform() OVERRIDE;
+  void Perform() override;
 
-  virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
+  scoped_refptr<Histogram> DurationHistogram() const override;
 
-  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
+  scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override;
 
  private:
   // Lock protecting time_since_flush_
   mutable simple_spinlock lock_;
   Stopwatch time_since_flush_;
-
-  TabletReplica *const tablet_replica_;
 };
 
 // Maintenance task that runs log GC. Reports log retention that represents the amount of
data
 // that can be GC'd.
 //
 // Only one LogGC op can run at a time.
-class LogGCOp : public MaintenanceOp {
+class LogGCOp : public TabletReplicaOpBase {
  public:
   explicit LogGCOp(TabletReplica* tablet_replica);
 
-  virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+  void UpdateStats(MaintenanceOpStats* stats) override;
 
-  virtual bool Prepare() OVERRIDE;
+  bool Prepare() override;
 
-  virtual void Perform() OVERRIDE;
+  void Perform() override;
 
-  virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
+  scoped_refptr<Histogram> DurationHistogram() const override;
 
-  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
+  scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override;
 
  private:
-  TabletReplica *const tablet_replica_;
   scoped_refptr<Histogram> log_gc_duration_;
   scoped_refptr<AtomicGauge<uint32_t> > log_gc_running_;
   mutable Semaphore sem_;
diff --git a/src/kudu/util/maintenance_manager-test.cc b/src/kudu/util/maintenance_manager-test.cc
index 6777e06..8619453 100644
--- a/src/kudu/util/maintenance_manager-test.cc
+++ b/src/kudu/util/maintenance_manager-test.cc
@@ -15,24 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "kudu/util/maintenance_manager.h"
+
+#include <math.h>
+
+#include <algorithm>
 #include <atomic>
 #include <cstdint>
+#include <list>
 #include <memory>
 #include <mutex>
 #include <ostream>
 #include <string>
 #include <utility>
-#include <vector>
 
 #include <boost/bind.hpp> // IWYU pragma: keep
+#include <gflags/gflags.h>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
-#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/maintenance_manager.h"
 #include "kudu/util/maintenance_manager.pb.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
@@ -42,9 +46,9 @@
 #include "kudu/util/test_util.h"
 #include "kudu/util/thread.h"
 
+using std::list;
 using std::shared_ptr;
 using std::string;
-using std::vector;
 using strings::Substitute;
 
 METRIC_DEFINE_entity(test);
@@ -57,17 +61,29 @@ METRIC_DEFINE_histogram(test, maintenance_op_duration,
                         kudu::MetricUnit::kSeconds, "", 60000000LU, 2);
 
 DECLARE_int64(log_target_replay_size_mb);
+DECLARE_string(maintenance_manager_table_priorities);
+DECLARE_double(maintenance_op_multiplier);
+DECLARE_int32(max_priority_range);
+
 
 namespace kudu {
 
-static const int kHistorySize = 4;
+static const int kHistorySize = 6;
 static const char kFakeUuid[] = "12345";
 
 class MaintenanceManagerTest : public KuduTest {
  public:
   void SetUp() override {
+    StartManager(2);
+  }
+
+  void TearDown() override {
+    StopManager();
+  }
+
+  void StartManager(int32_t num_threads) {
     MaintenanceManager::Options options;
-    options.num_threads = 2;
+    options.num_threads = num_threads;
     options.polling_interval_ms = 1;
     options.history_size = kHistorySize;
     manager_.reset(new MaintenanceManager(options, kFakeUuid));
@@ -78,7 +94,7 @@ class MaintenanceManagerTest : public KuduTest {
     ASSERT_OK(manager_->Start());
   }
 
-  void TearDown() override {
+  void StopManager() {
     manager_->Shutdown();
   }
 
@@ -95,7 +111,8 @@ TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) {
 class TestMaintenanceOp : public MaintenanceOp {
  public:
   TestMaintenanceOp(const std::string& name,
-                    IOUsage io_usage)
+                    IOUsage io_usage,
+                    std::string table_id = "fake.table_id")
     : MaintenanceOp(name, io_usage),
       ram_anchored_(500),
       logs_retained_bytes_(0),
@@ -105,12 +122,13 @@ class TestMaintenanceOp : public MaintenanceOp {
       maintenance_ops_running_(METRIC_maintenance_ops_running.Instantiate(metric_entity_,
0)),
       remaining_runs_(1),
       prepared_runs_(0),
-      sleep_time_(MonoDelta::FromSeconds(0)) {
+      sleep_time_(MonoDelta::FromSeconds(0)),
+      table_id_(std::move(table_id)) {
   }
 
-  virtual ~TestMaintenanceOp() {}
+  ~TestMaintenanceOp() override = default;
 
-  virtual bool Prepare() OVERRIDE {
+  bool Prepare() override {
     std::lock_guard<Mutex> guard(lock_);
     if (remaining_runs_ == 0) {
       return false;
@@ -121,7 +139,7 @@ class TestMaintenanceOp : public MaintenanceOp {
     return true;
   }
 
-  virtual void Perform() OVERRIDE {
+  void Perform() override {
     {
       std::lock_guard<Mutex> guard(lock_);
       DLOG(INFO) << "Performing op " << name();
@@ -135,7 +153,7 @@ class TestMaintenanceOp : public MaintenanceOp {
     SleepFor(sleep_time_);
   }
 
-  virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE {
+  void UpdateStats(MaintenanceOpStats* stats) override {
     std::lock_guard<Mutex> guard(lock_);
     stats->set_runnable(remaining_runs_ > 0);
     stats->set_ram_anchored(ram_anchored_);
@@ -168,14 +186,18 @@ class TestMaintenanceOp : public MaintenanceOp {
     perf_improvement_ = perf_improvement;
   }
 
-  virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE {
+  scoped_refptr<Histogram> DurationHistogram() const override {
     return maintenance_op_duration_;
   }
 
-  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE
{
+  scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override {
     return maintenance_ops_running_;
   }
 
+  const std::string& table_id() const override {
+    return table_id_;
+  }
+
  private:
   Mutex lock_;
 
@@ -195,6 +217,7 @@ class TestMaintenanceOp : public MaintenanceOp {
 
   // The amount of time each op invocation will sleep.
   MonoDelta sleep_time_;
+  std::string table_id_;
 };
 
 // Create an op and wait for it to start running.  Unregister it while it is
@@ -266,7 +289,7 @@ TEST_F(MaintenanceManagerTest, TestMemoryPressure) {
 TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) {
   const int64_t kMB = 1024 * 1024;
 
-  manager_->Shutdown();
+  StopManager();
 
   TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE);
   op1.set_ram_anchored(0);
@@ -302,13 +325,13 @@ TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) {
   FLAGS_log_target_replay_size_mb = 50;
   op_and_why = manager_->FindBestOp();
   ASSERT_EQ(&op3, op_and_why.first);
-  EXPECT_EQ(op_and_why.second, "104857600 bytes log retention");
+  EXPECT_EQ(op_and_why.second, "104857600 bytes log retention, and flush 200 bytes memory");
 
   manager_->UnregisterOp(&op3);
 
   op_and_why = manager_->FindBestOp();
   ASSERT_EQ(&op2, op_and_why.first);
-  EXPECT_EQ(op_and_why.second, "104857600 bytes log retention");
+  EXPECT_EQ(op_and_why.second, "104857600 bytes log retention, and flush 100 bytes memory");
 
   manager_->UnregisterOp(&op2);
 }
@@ -341,10 +364,11 @@ TEST_F(MaintenanceManagerTest, TestRunningInstances) {
   manager_->GetMaintenanceManagerStatusDump(&status_pb);
   ASSERT_EQ(status_pb.running_operations_size(), 0);
 }
+
 // Test adding operations and make sure that the history of recently completed operations
 // is correct in that it wraps around and doesn't grow.
 TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) {
-  for (int i = 0; i < 5; i++) {
+  for (int i = 0; i < kHistorySize + 1; i++) {
     string name = Substitute("op$0", i);
     TestMaintenanceOp op(name, MaintenanceOp::HIGH_IO_USAGE);
     op.set_perf_improvement(1);
@@ -358,12 +382,123 @@ TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) {
 
     MaintenanceManagerStatusPB status_pb;
     manager_->GetMaintenanceManagerStatusDump(&status_pb);
-    // The size should be at most the history_size.
-    ASSERT_GE(kHistorySize, status_pb.completed_operations_size());
+    // The size should equal to the current completed OP size,
+    // and should be at most the kHistorySize.
+    ASSERT_EQ(std::min(kHistorySize, i + 1), status_pb.completed_operations_size());
     // The most recently completed op should always be first, even if we wrap
     // around.
     ASSERT_EQ(name, status_pb.completed_operations(0).name());
   }
 }
 
+// Test maintenance OP factors.
+// The OPs on different priority levels have different OP score multipliers.
+TEST_F(MaintenanceManagerTest, TestOpFactors) {
+  StopManager();
+
+  ASSERT_GE(FLAGS_max_priority_range, 1);
+  ASSERT_NE("", gflags::SetCommandLineOption(
+      "maintenance_manager_table_priorities",
+      Substitute("table_id_1:$0;table_id_2:$1;table_id_3:$2;table_id_4:$3;table_id_5:$4",
+                 -FLAGS_max_priority_range - 1, -1, 0, 1, FLAGS_max_priority_range + 1).c_str()));
+  TestMaintenanceOp op1("op1", MaintenanceOp::HIGH_IO_USAGE, "table_id_1");
+  TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE, "table_id_2");
+  TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE, "table_id_3");
+  TestMaintenanceOp op4("op4", MaintenanceOp::HIGH_IO_USAGE, "table_id_4");
+  TestMaintenanceOp op5("op5", MaintenanceOp::HIGH_IO_USAGE, "table_id_5");
+  TestMaintenanceOp op6("op6", MaintenanceOp::HIGH_IO_USAGE, "table_id_6");
+
+  manager_->UpdateTablePriorities();
+
+  ASSERT_DOUBLE_EQ(pow(FLAGS_maintenance_op_multiplier, -FLAGS_max_priority_range),
+                   manager_->PerfImprovement(1, op1.table_id()));
+  ASSERT_DOUBLE_EQ(pow(FLAGS_maintenance_op_multiplier, -1),
+                   manager_->PerfImprovement(1, op2.table_id()));
+  ASSERT_DOUBLE_EQ(1, manager_->PerfImprovement(1, op3.table_id()));
+  ASSERT_DOUBLE_EQ(FLAGS_maintenance_op_multiplier, manager_->PerfImprovement(1, op4.table_id()));
+  ASSERT_DOUBLE_EQ(pow(FLAGS_maintenance_op_multiplier, FLAGS_max_priority_range),
+                   manager_->PerfImprovement(1, op5.table_id()));
+  ASSERT_DOUBLE_EQ(1, manager_->PerfImprovement(1, op6.table_id()));
+}
+
+// Test priority OP launching.
+TEST_F(MaintenanceManagerTest, TestPriorityOpLaunch) {
+  StopManager();
+  StartManager(1);
+
+  ASSERT_NE("", gflags::SetCommandLineOption(
+      "maintenance_manager_table_priorities",
+      Substitute("table_id_1:$0;table_id_2:$1;table_id_3:$2;table_id_4:$3;table_id_5:$4",
+                 -FLAGS_max_priority_range - 1, -1, 0, 1, FLAGS_max_priority_range + 1).c_str()));
+
+  TestMaintenanceOp op1("op1", MaintenanceOp::HIGH_IO_USAGE, "table_id_1");
+  op1.set_perf_improvement(10);
+  op1.set_remaining_runs(1);
+  op1.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
+  TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE, "table_id_2");
+  op2.set_perf_improvement(10);
+  op2.set_remaining_runs(1);
+  op2.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
+  TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE, "table_id_3");
+  op3.set_perf_improvement(10);
+  op3.set_remaining_runs(1);
+  op3.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
+  TestMaintenanceOp op4("op4", MaintenanceOp::HIGH_IO_USAGE, "table_id_4");
+  op4.set_perf_improvement(10);
+  op4.set_remaining_runs(1);
+  op4.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
+  TestMaintenanceOp op5("op5", MaintenanceOp::HIGH_IO_USAGE, "table_id_5");
+  op5.set_perf_improvement(10);
+  op5.set_remaining_runs(1);
+  op5.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
+  TestMaintenanceOp op6("op6", MaintenanceOp::HIGH_IO_USAGE, "table_id_6");
+  op6.set_perf_improvement(12);
+  op6.set_remaining_runs(1);
+  op6.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
+  manager_->RegisterOp(&op1);
+  manager_->RegisterOp(&op2);
+  manager_->RegisterOp(&op3);
+  manager_->RegisterOp(&op4);
+  manager_->RegisterOp(&op5);
+  manager_->RegisterOp(&op6);
+
+  ASSERT_EVENTUALLY([&]() {
+    MaintenanceManagerStatusPB status_pb;
+    manager_->GetMaintenanceManagerStatusDump(&status_pb);
+    ASSERT_EQ(status_pb.completed_operations_size(), 6);
+  });
+
+  // Wait for instances to complete.
+  manager_->UnregisterOp(&op1);
+  manager_->UnregisterOp(&op2);
+  manager_->UnregisterOp(&op3);
+  manager_->UnregisterOp(&op4);
+  manager_->UnregisterOp(&op5);
+  manager_->UnregisterOp(&op6);
+
+  // Check that running instances are removed from collection after completion.
+  MaintenanceManagerStatusPB status_pb;
+  manager_->GetMaintenanceManagerStatusDump(&status_pb);
+  ASSERT_EQ(status_pb.running_operations_size(), 0);
+  ASSERT_EQ(status_pb.completed_operations_size(), 6);
+  // In perf_improvement score ascending order, the latter completed OP will list former.
+  list<string> ordered_ops({"op1",
+                            "op2",
+                            "op3",
+                            "op4",
+                            "op6",
+                            "op5"});
+  ASSERT_EQ(ordered_ops.size(), status_pb.completed_operations().size());
+  for (const auto& instance : status_pb.completed_operations()) {
+    ASSERT_EQ(ordered_ops.front(), instance.name());
+    ordered_ops.pop_front();
+  }
+}
+
 } // namespace kudu
diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc
index ea607aa..f783717 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -17,8 +17,9 @@
 
 #include "kudu/util/maintenance_manager.h"
 
+#include <algorithm>
 #include <cinttypes>
-#include <cstddef>
+#include <cmath>
 #include <cstdint>
 #include <memory>
 #include <mutex>
@@ -26,6 +27,7 @@
 #include <string>
 #include <type_traits>
 #include <utility>
+#include <vector>
 
 #include <boost/bind.hpp>
 #include <gflags/gflags.h>
@@ -33,6 +35,8 @@
 #include "kudu/gutil/dynamic_annotations.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/debug/trace_logging.h"
@@ -50,6 +54,8 @@
 
 using std::pair;
 using std::string;
+using std::vector;
+using strings::Split;
 using strings::Substitute;
 
 DEFINE_int32(maintenance_manager_num_threads, 1,
@@ -91,6 +97,29 @@ DEFINE_double(data_gc_prioritization_prob, 0.5,
              "such as delta compaction.");
 TAG_FLAG(data_gc_prioritization_prob, experimental);
 
+DEFINE_string(maintenance_manager_table_priorities, "",
+              "Priorities of tables, semicolon-separated list of table-priority pairs, and
each "
+              "table-priority pair is combined by table id, colon and priority level. Priority
"
+              "level is ranged in [-FLAGS_max_priority_range, FLAGS_max_priority_range]");
+TAG_FLAG(maintenance_manager_table_priorities, advanced);
+TAG_FLAG(maintenance_manager_table_priorities, experimental);
+TAG_FLAG(maintenance_manager_table_priorities, runtime);
+
+DEFINE_double(maintenance_op_multiplier, 1.1,
+              "Multiplier applied on different priority levels, table maintenance OPs on
level N "
+              "has multiplier of FLAGS_maintenance_op_multiplier^N, the last score will be
"
+              "multiplied by this multiplier. Note: this multiplier is only take effect on
"
+              "compaction OPs");
+TAG_FLAG(maintenance_op_multiplier, advanced);
+TAG_FLAG(maintenance_op_multiplier, experimental);
+TAG_FLAG(maintenance_op_multiplier, runtime);
+
+DEFINE_int32(max_priority_range, 5,
+             "Maximal priority range of OPs.");
+TAG_FLAG(max_priority_range, advanced);
+TAG_FLAG(max_priority_range, experimental);
+TAG_FLAG(max_priority_range, runtime);
+
 namespace kudu {
 
 MaintenanceOpStats::MaintenanceOpStats() {
@@ -107,7 +136,7 @@ void MaintenanceOpStats::Clear() {
   last_modified_ = MonoTime();
 }
 
-MaintenanceOp::MaintenanceOp(std::string name, IOUsage io_usage)
+MaintenanceOp::MaintenanceOp(string name, IOUsage io_usage)
     : name_(std::move(name)),
       running_(0),
       cancel_(false),
@@ -129,10 +158,10 @@ MaintenanceManagerStatusPB_OpInstancePB OpInstance::DumpToPB() const
{
   pb.set_thread_id(thread_id);
   pb.set_name(name);
   if (duration.Initialized()) {
-    pb.set_duration_millis(duration.ToMilliseconds());
+    pb.set_duration_millis(static_cast<int32_t>(duration.ToMilliseconds()));
   }
   MonoDelta delta(MonoTime::Now() - start_mono_time);
-  pb.set_millis_since_start(delta.ToMilliseconds());
+  pb.set_millis_since_start(static_cast<int32_t>(delta.ToMilliseconds()));
   return pb;
 }
 
@@ -143,7 +172,7 @@ const MaintenanceManager::Options MaintenanceManager::kDefaultOptions
= {
 };
 
 MaintenanceManager::MaintenanceManager(const Options& options,
-                                       std::string server_uuid)
+                                       string server_uuid)
   : server_uuid_(std::move(server_uuid)),
     num_threads_(options.num_threads <= 0 ?
                  FLAGS_maintenance_manager_num_threads : options.num_threads),
@@ -264,8 +293,7 @@ void MaintenanceManager::RunSchedulerThread() {
     //    1) there are no free threads available to perform a maintenance op.
     // or 2) we just tried to schedule an op but found nothing to run.
     // However, if it's time to shut down, we want to do so immediately.
-    while ((running_ops_ >= num_threads_ || prev_iter_found_no_work || disabled_for_tests())
&&
-           !shutdown_) {
+    while (CouldNotLaunchNewOp(prev_iter_found_no_work)) {
       cond_.WaitFor(polling_interval);
       prev_iter_found_no_work = false;
     }
@@ -274,42 +302,46 @@ void MaintenanceManager::RunSchedulerThread() {
       return;
     }
 
-    // Find the best op.
-    auto best_op_and_why = FindBestOp();
-    auto* op = best_op_and_why.first;
-    const auto& note = best_op_and_why.second;
+    // TODO(yingchun): move it to SetFlag, callback once as a gflags setter handler.
+    UpdateTablePriorities();
 
     // If we found no work to do, then we should sleep before trying again to schedule.
     // Otherwise, we can go right into trying to find the next op.
-    prev_iter_found_no_work = (op == nullptr);
-    if (!op) {
-      VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2)
-          << "No maintenance operations look worth doing.";
-      continue;
-    }
+    prev_iter_found_no_work = !FindAndLaunchOp(&guard);
+  }
+}
 
-    // Prepare the maintenance operation.
-    op->running_++;
-    running_ops_++;
-    guard.unlock();
-    bool ready = op->Prepare();
-    guard.lock();
-    if (!ready) {
-      LOG_WITH_PREFIX(INFO) << "Prepare failed for " << op->name()
-                            << ". Re-running scheduler.";
-      op->running_--;
-      running_ops_--;
-      op->cond_->Signal();
-      continue;
-    }
+bool MaintenanceManager::FindAndLaunchOp(std::unique_lock<Mutex>* guard) {
+  // Find the best op.
+  auto best_op_and_why = FindBestOp();
+  auto* op = best_op_and_why.first;
+  const auto& note = best_op_and_why.second;
 
-    LOG_AND_TRACE_WITH_PREFIX("maintenance", INFO)
-        << Substitute("Scheduling $0: $1", op->name(), note);
-    // Run the maintenance operation.
-    Status s = thread_pool_->SubmitFunc(boost::bind(
-        &MaintenanceManager::LaunchOp, this, op));
-    CHECK(s.ok());
+  if (!op) {
+    VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2)
+        << "No maintenance operations look worth doing.";
+    return false;
   }
+
+  // Prepare the maintenance operation.
+  IncreaseOpCount(op);
+  guard->unlock();
+  bool ready = op->Prepare();
+  guard->lock();
+  if (!ready) {
+    LOG_WITH_PREFIX(INFO) << "Prepare failed for " << op->name()
+                          << ". Re-running scheduler.";
+    DecreaseOpCount(op);
+    op->cond_->Signal();
+    return true;
+  }
+
+  LOG_AND_TRACE_WITH_PREFIX("maintenance", INFO)
+      << Substitute("Scheduling $0: $1", op->name(), note);
+  // Run the maintenance operation.
+  CHECK_OK(thread_pool_->SubmitFunc(boost::bind(&MaintenanceManager::LaunchOp, this,
op)));
+
+  return true;
 }
 
 // Finding the best operation goes through four filters:
@@ -335,8 +367,7 @@ void MaintenanceManager::RunSchedulerThread() {
 pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp() {
   TRACE_EVENT0("maintenance", "MaintenanceManager::FindBestOp");
 
-  size_t free_threads = num_threads_ - running_ops_;
-  if (free_threads == 0) {
+  if (!HasFreeThreads()) {
     return {nullptr, "no free threads"};
   }
 
@@ -348,7 +379,7 @@ pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp() {
 
   int64_t most_logs_retained_bytes = 0;
   int64_t most_logs_retained_bytes_ram_anchored = 0;
-  MaintenanceOp* most_logs_retained_bytes_op = nullptr;
+  MaintenanceOp* most_logs_retained_bytes_ram_anchored_op = nullptr;
 
   int64_t most_data_retained_bytes = 0;
   MaintenanceOp* most_data_retained_bytes_op = nullptr;
@@ -367,8 +398,8 @@ pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp() {
     }
 
     const auto logs_retained_bytes = stats.logs_retained_bytes();
-    if (logs_retained_bytes > low_io_most_logs_retained_bytes &&
-        op->io_usage() == MaintenanceOp::LOW_IO_USAGE) {
+    if (op->io_usage() == MaintenanceOp::LOW_IO_USAGE &&
+        logs_retained_bytes > low_io_most_logs_retained_bytes) {
       low_io_most_logs_retained_bytes_op = op;
       low_io_most_logs_retained_bytes = logs_retained_bytes;
       VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2)
@@ -387,7 +418,7 @@ pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp() {
     if (std::make_pair(logs_retained_bytes, ram_anchored) >
         std::make_pair(most_logs_retained_bytes,
                        most_logs_retained_bytes_ram_anchored)) {
-      most_logs_retained_bytes_op = op;
+      most_logs_retained_bytes_ram_anchored_op = op;
       most_logs_retained_bytes = logs_retained_bytes;
       most_logs_retained_bytes_ram_anchored = ram_anchored;
     }
@@ -401,7 +432,7 @@ pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp() {
                         op->name(), data_retained_bytes);
     }
 
-    const auto perf_improvement = stats.perf_improvement();
+    const auto perf_improvement = PerfImprovement(stats.perf_improvement(), op->table_id());
     if ((!best_perf_improvement_op) ||
         (perf_improvement > best_perf_improvement)) {
       best_perf_improvement_op = op;
@@ -420,7 +451,7 @@ pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp() {
   double capacity_pct;
   if (memory_pressure_func_(&capacity_pct)) {
     if (!most_ram_anchored_op) {
-      std::string msg = StringPrintf("System under memory pressure "
+      string msg = StringPrintf("System under memory pressure "
           "(%.2f%% of limit used). However, there are no ops currently "
           "runnable which would free memory.", capacity_pct);
       KLOG_EVERY_N_SECS(WARNING, 5) << msg;
@@ -432,10 +463,13 @@ pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp()
{
     return {most_ram_anchored_op, std::move(note)};
   }
 
-  if (most_logs_retained_bytes_op &&
+  // Look at ops that free up more log retention, and also free up more memory.
+  if (most_logs_retained_bytes_ram_anchored_op &&
       most_logs_retained_bytes / 1024 / 1024 >= FLAGS_log_target_replay_size_mb) {
-    string note = Substitute("$0 bytes log retention", most_logs_retained_bytes);
-    return {most_logs_retained_bytes_op, std::move(note)};
+    string note = Substitute("$0 bytes log retention, and flush $1 bytes memory",
+                             most_logs_retained_bytes,
+                             most_logs_retained_bytes_ram_anchored);
+    return {most_logs_retained_bytes_ram_anchored_op, std::move(note)};
   }
 
   // Look at ops that we can run quickly that free up data on disk.
@@ -449,6 +483,7 @@ pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp() {
     VLOG(1) << "Skipping data GC due to prioritizing perf improvement";
   }
 
+  // Look at ops that can improve read/write performance most.
   if (best_perf_improvement_op && best_perf_improvement > 0) {
     string note = StringPrintf("perf score=%.6f", best_perf_improvement);
     return {best_perf_improvement_op, std::move(note)};
@@ -456,6 +491,16 @@ pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp() {
   return {nullptr, "no ops with positive improvement"};
 }
 
+double MaintenanceManager::PerfImprovement(double perf_improvement,
+                                           const string& table_id) const {
+  int32_t priority = 0;
+  if (!FindCopy(table_priorities_, table_id, &priority)) {
+    return perf_improvement;
+  }
+
+  return perf_improvement * std::pow(FLAGS_maintenance_op_multiplier, priority);
+}
+
 void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
   int64_t thread_id = Thread::CurrentThreadId();
   OpInstance op_instance;
@@ -481,9 +526,7 @@ void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
     completed_ops_count_++;
 
     op->DurationHistogram()->Increment(op_instance.duration.ToMilliseconds());
-
-    running_ops_--;
-    op->running_--;
+    DecreaseOpCount(op);
     op->cond_->Signal();
     cond_.Signal(); // Wake up scheduler.
   });
@@ -507,9 +550,6 @@ void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
 void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb)
{
   DCHECK(out_pb != nullptr);
   std::lock_guard<Mutex> guard(lock_);
-  auto best_op_and_why = FindBestOp();
-  auto* best_op = best_op_and_why.first;
-
   for (const auto& val : ops_) {
     MaintenanceManagerStatusPB_MaintenanceOpPB* op_pb = out_pb->add_registered_operations();
     MaintenanceOp* op(val.first);
@@ -527,10 +567,6 @@ void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatu
       op_pb->set_logs_retained_bytes(0);
       op_pb->set_perf_improvement(0.0);
     }
-
-    if (best_op == op) {
-      out_pb->mutable_best_op()->CopyFrom(*op_pb);
-    }
   }
 
   {
@@ -540,8 +576,9 @@ void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatu
     }
   }
 
+  // The latest completed op will be dumped at first.
   for (int n = 1; n <= completed_ops_.size(); n++) {
-    int i = completed_ops_count_ - n;
+    int64_t i = completed_ops_count_ - n;
     if (i < 0) break;
     const auto& completed_op = completed_ops_[i % completed_ops_.size()];
 
@@ -551,8 +588,48 @@ void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatu
   }
 }
 
-std::string MaintenanceManager::LogPrefix() const {
+string MaintenanceManager::LogPrefix() const {
   return Substitute("P $0: ", server_uuid_);
 }
 
+bool MaintenanceManager::HasFreeThreads() {
+  return num_threads_ - running_ops_ > 0;
+}
+
+bool MaintenanceManager::CouldNotLaunchNewOp(bool prev_iter_found_no_work) {
+  return (!HasFreeThreads() || prev_iter_found_no_work || disabled_for_tests()) &&
!shutdown_;
+}
+
+void MaintenanceManager::UpdateTablePriorities() {
+  string table_priorities_str;
+  CHECK(google::GetCommandLineOption("maintenance_manager_table_priorities",
+                                     &table_priorities_str));
+
+  TablePriorities table_priorities;
+  int32_t value;
+  for (auto table_priority_str : Split(table_priorities_str, ";", strings::SkipEmpty()))
{
+    vector<string> table_priority = Split(table_priority_str, ":", strings::SkipEmpty());
+    if (safe_strto32_base(table_priority[1].c_str(), &value, 10)) {
+      value = std::max(value, -FLAGS_max_priority_range);
+      value = std::min(value, FLAGS_max_priority_range);
+      table_priorities[table_priority[0]] = value;
+    } else {
+      LOG(WARNING) << "Some error occured when parse flag maintenance_manager_table_priorities:
"
+          << table_priorities_str;
+      return;
+    }
+  }
+  table_priorities_.swap(table_priorities);
+}
+
+void MaintenanceManager::IncreaseOpCount(MaintenanceOp *op) {
+  op->running_++;
+  running_ops_++;
+}
+
+void MaintenanceManager::DecreaseOpCount(MaintenanceOp *op) {
+  op->running_--;
+  running_ops_--;
+}
+
 } // namespace kudu
diff --git a/src/kudu/util/maintenance_manager.h b/src/kudu/util/maintenance_manager.h
index c5dd8ac..a4e6b96 100644
--- a/src/kudu/util/maintenance_manager.h
+++ b/src/kudu/util/maintenance_manager.h
@@ -229,6 +229,9 @@ class MaintenanceOp {
     cancel_.Store(true);
   }
 
+ protected:
+  virtual const std::string& table_id() const = 0;
+
  private:
   DISALLOW_COPY_AND_ASSIGN(MaintenanceOp);
 
@@ -302,8 +305,11 @@ class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManage
 
  private:
   FRIEND_TEST(MaintenanceManagerTest, TestLogRetentionPrioritization);
+  FRIEND_TEST(MaintenanceManagerTest, TestOpFactors);
+
   typedef std::map<MaintenanceOp*, MaintenanceOpStats,
           MaintenanceOpComparator> OpMapTy;
+  typedef std::unordered_map<std::string, int32_t> TablePriorities;
 
   // Return true if tests have currently disabled the maintenance
   // manager by way of changing the gflags at runtime.
@@ -311,17 +317,32 @@ class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManage
 
   void RunSchedulerThread();
 
+  bool FindAndLaunchOp(std::unique_lock<Mutex>* guard);
+
   // Find the best op, or null if there is nothing we want to run.
   //
   // Returns the op, as well as a string explanation of why that op was chosen,
   // suitable for logging.
   std::pair<MaintenanceOp*, std::string> FindBestOp();
 
+  double PerfImprovement(double perf_improvement,
+                         const std::string& table_id) const;
+
   void LaunchOp(MaintenanceOp* op);
 
   std::string LogPrefix() const;
 
+  bool HasFreeThreads();
+
+  bool CouldNotLaunchNewOp(bool prev_iter_found_no_work);
+
+  void UpdateTablePriorities();
+
+  void IncreaseOpCount(MaintenanceOp *op);
+  void DecreaseOpCount(MaintenanceOp *op);
+
   const std::string server_uuid_;
+  TablePriorities table_priorities_;
   const int32_t num_threads_;
   OpMapTy ops_; // Registered operations.
   Mutex lock_;
@@ -330,7 +351,7 @@ class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManage
   ConditionVariable cond_;
   bool shutdown_;
   int32_t polling_interval_ms_;
-  uint64_t running_ops_;
+  int32_t running_ops_;
   // Vector used as a circular buffer for recently completed ops. Elements need to be added
at
   // the completed_ops_count_ % the vector's size and then the count needs to be incremented.
   std::vector<OpInstance> completed_ops_;
diff --git a/src/kudu/util/maintenance_manager.proto b/src/kudu/util/maintenance_manager.proto
index b6b1203..77c8cf8 100644
--- a/src/kudu/util/maintenance_manager.proto
+++ b/src/kudu/util/maintenance_manager.proto
@@ -26,7 +26,7 @@ message MaintenanceManagerStatusPB {
     // Number of times this operation is currently running.
     required uint32 running = 2;
     required bool runnable = 3;
-    required uint64 ram_anchored_bytes = 4;
+    required int64 ram_anchored_bytes = 4;
     required int64 logs_retained_bytes = 5;
     required double perf_improvement = 6;
   }
@@ -40,15 +40,12 @@ message MaintenanceManagerStatusPB {
     required int32 millis_since_start = 4;
   }
 
-  // The next operation that would run.
-  optional MaintenanceOpPB best_op = 1;
-
   // List of all the operations.
-  repeated MaintenanceOpPB registered_operations = 2;
+  repeated MaintenanceOpPB registered_operations = 1;
 
   // This list isn't in order of anything. Can contain the same operation multiple times.
-  repeated OpInstancePB running_operations = 3;
+  repeated OpInstancePB running_operations = 2;
 
   // This list isn't in order of anything. Can contain the same operation multiple times.
-  repeated OpInstancePB completed_operations = 4;
+  repeated OpInstancePB completed_operations = 3;
 }


Mime
View raw message