rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenf...@apache.org
Subject [rocketmq-client-go] branch native updated: [ISSUE #356] feat(consumer): redesign stat (#357)
Date Mon, 06 Jan 2020 02:09:49 GMT
This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 2ace48a  [ISSUE #356] feat(consumer): redesign stat (#357)
2ace48a is described below

commit 2ace48afd7c054fa6231e28407f7e40f76c84ad6
Author: xujianhai666 <52450794+xujianhai666@users.noreply.github.com>
AuthorDate: Mon Jan 6 10:09:43 2020 +0800

    [ISSUE #356] feat(consumer): redesign stat (#357)
    
    * feat(consumer): redesign stat
---
 consumer/statistics.go | 89 ++++++++++++++++++++++++++++++++++++--------------
 1 file changed, 65 insertions(+), 24 deletions(-)

diff --git a/consumer/statistics.go b/consumer/statistics.go
index 2448c74..a58ff9e 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -29,6 +29,7 @@ import (
 
 var (
 	csListLock sync.Mutex
+	closeOnce  sync.Once
 
 	topicAndGroupConsumeOKTPS     *statsItemSet
 	topicAndGroupConsumeRT        *statsItemSet
@@ -98,11 +99,13 @@ func GetConsumeStatus(group, topic string) ConsumeStatus {
 }
 
 func ShutDownStatis() {
-	topicAndGroupConsumeOKTPS.closed = true
-	topicAndGroupConsumeRT.closed = true
-	topicAndGroupConsumeFailedTPS.closed = true
-	topicAndGroupPullTPS.closed = true
-	topicAndGroupPullRT.closed = true
+	closeOnce.Do(func() {
+		close(topicAndGroupConsumeOKTPS.closed)
+		close(topicAndGroupConsumeRT.closed)
+		close(topicAndGroupConsumeFailedTPS.closed)
+		close(topicAndGroupPullTPS.closed)
+		close(topicAndGroupPullRT.closed)
+	})
 }
 
 func getPullRT(group, topic string) statsSnapshot {
@@ -132,12 +135,13 @@ func getConsumeFailedTPS(group, topic string) statsSnapshot {
 type statsItemSet struct {
 	statsName      string
 	statsItemTable sync.Map
-	closed         bool
+	closed         chan struct{}
 }
 
 func newStatsItemSet(statsName string) *statsItemSet {
 	sis := &statsItemSet{
 		statsName: statsName,
+		closed:    make(chan struct{}),
 	}
 	sis.init()
 	return sis
@@ -145,47 +149,84 @@ func newStatsItemSet(statsName string) *statsItemSet {
 
 func (sis *statsItemSet) init() {
 	go func() {
-		for !sis.closed {
-			sis.samplingInSeconds()
-			time.Sleep(10 * time.Second)
+		ticker := time.NewTicker(10 * time.Second)
+		defer ticker.Stop()
+		for {
+			select {
+			case <-sis.closed:
+				return
+			case <-ticker.C:
+				sis.samplingInSeconds()
+
+			}
 		}
 	}()
 
 	go func() {
-		for !sis.closed {
-			sis.samplingInMinutes()
-			time.Sleep(10 * time.Minute)
+		ticker := time.NewTicker(10 * time.Minute)
+		defer ticker.Stop()
+		for {
+			select {
+			case <-sis.closed:
+				return
+			case <-ticker.C:
+				sis.samplingInMinutes()
+			}
 		}
 	}()
 
 	go func() {
-		for !sis.closed {
-			sis.samplingInHour()
-			time.Sleep(time.Hour)
+		ticker := time.NewTicker(time.Hour)
+		defer ticker.Stop()
+		for {
+			select {
+			case <-sis.closed:
+				return
+			case <-ticker.C:
+				sis.samplingInHour()
+			}
 		}
 	}()
 
 	go func() {
 		time.Sleep(nextMinutesTime().Sub(time.Now()))
-		for !sis.closed {
-			sis.printAtMinutes()
-			time.Sleep(time.Minute)
+		ticker := time.NewTicker(time.Minute)
+		defer ticker.Stop()
+		for {
+			select {
+			case <-sis.closed:
+				return
+			case <-ticker.C:
+				sis.printAtMinutes()
+			}
 		}
 	}()
 
 	go func() {
 		time.Sleep(nextHourTime().Sub(time.Now()))
-		for !sis.closed {
-			sis.printAtHour()
-			time.Sleep(time.Hour)
+		ticker := time.NewTicker(time.Hour)
+		defer ticker.Stop()
+		for {
+			select {
+			case <-sis.closed:
+				return
+			case <-ticker.C:
+				sis.printAtHour()
+			}
 		}
 	}()
 
 	go func() {
 		time.Sleep(nextMonthTime().Sub(time.Now()))
-		for !sis.closed {
-			sis.printAtDay()
-			time.Sleep(24 * time.Hour)
+		ticker := time.NewTicker(24 * time.Hour)
+		defer ticker.Stop()
+		for {
+			select {
+			case <-sis.closed:
+				return
+			case <-ticker.C:
+				sis.printAtDay()
+			}
 		}
 	}()
 }


Mime
View raw message