rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-go] branch native updated: fix: using atomic status for producer/consumer (#320)
Date Thu, 05 Dec 2019 08:20:17 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new c6b08aa  fix: using atomic status for producer/consumer (#320)
c6b08aa is described below

commit c6b08aafd5f8fe0b2d54b3377c5cc70b7bdf8d43
Author: xujianhai666 <52450794+xujianhai666@users.noreply.github.com>
AuthorDate: Thu Dec 5 16:20:06 2019 +0800

    fix: using atomic status for producer/consumer (#320)
    
    - using atomic for status
    
    Closes #318
---
 consumer/consumer.go      | 10 ++++++----
 consumer/pull_consumer.go |  7 ++++---
 consumer/push_consumer.go |  8 ++++----
 internal/model.go         |  2 +-
 producer/producer.go      |  9 +++++----
 5 files changed, 20 insertions(+), 16 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 070cd19..a10d3c5 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -25,6 +25,7 @@ import (
 	"strconv"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/pkg/errors"
@@ -247,7 +248,7 @@ type defaultConsumer struct {
 	cType     ConsumeType
 	client    internal.RMQClient
 	mqChanged func(topic string, mqAll, mqDivided []*primitive.MessageQueue)
-	state     internal.ServiceState
+	state     int32
 	pause     bool
 	once      sync.Once
 	option    consumerOptions
@@ -287,13 +288,14 @@ func (dc *defaultConsumer) start() error {
 
 	dc.client.UpdateTopicRouteInfo()
 	dc.client.Start()
-	dc.state = internal.StateRunning
+	atomic.StoreInt32(&dc.state, int32(internal.StateRunning))
 	dc.consumerStartTimestamp = time.Now().UnixNano() / int64(time.Millisecond)
 	return nil
 }
 
 func (dc *defaultConsumer) shutdown() error {
-	dc.state = internal.StateShutdown
+	atomic.StoreInt32(&dc.state, int32(internal.StateShutdown))
+
 	mqs := make([]*primitive.MessageQueue, 0)
 	dc.processQueueTable.Range(func(key, value interface{}) bool {
 		k := key.(primitive.MessageQueue)
@@ -435,7 +437,7 @@ func (dc *defaultConsumer) SubscriptionDataList() []*internal.SubscriptionData
{
 }
 
 func (dc *defaultConsumer) makeSureStateOK() error {
-	if dc.state != internal.StateRunning {
+	if atomic.LoadInt32(&dc.state) != int32(internal.StateRunning) {
 		return fmt.Errorf("state not running, actually: %v", dc.state)
 	}
 	return nil
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 6a6fa64..f7c8ef4 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -82,9 +82,10 @@ func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
 	}
 
 	dc := &defaultConsumer{
+		client:        internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
 		consumerGroup: defaultOpts.GroupName,
 		cType:         _PullConsume,
-		state:         internal.StateCreateJust,
+		state:         int32(internal.StateCreateJust),
 		prCh:          make(chan PullRequest, 4),
 		model:         defaultOpts.ConsumerModel,
 		option:        defaultOpts,
@@ -99,7 +100,7 @@ func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
 }
 
 func (c *defaultPullConsumer) Start() error {
-	c.state = internal.StateRunning
+	atomic.StoreInt32(&c.state, int32(internal.StateRunning))
 
 	var err error
 	c.once.Do(func() {
@@ -208,7 +209,7 @@ func (c *defaultPullConsumer) pull(ctx context.Context, mq *primitive.MessageQue
 }
 
 func (c *defaultPullConsumer) makeSureStateOK() error {
-	if c.state != internal.StateRunning {
+	if atomic.LoadInt32(&c.state) != int32(internal.StateRunning) {
 		return fmt.Errorf("the consumer state is [%d], not running", c.state)
 	}
 	return nil
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 98b0033..c915a32 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -23,6 +23,7 @@ import (
 	"math"
 	"strconv"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/pkg/errors"
@@ -92,7 +93,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
 		client:         internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
 		consumerGroup:  defaultOpts.GroupName,
 		cType:          _PushConsume,
-		state:          internal.StateCreateJust,
+		state:          int32(internal.StateCreateJust),
 		prCh:           make(chan PullRequest, 4),
 		model:          defaultOpts.ConsumerModel,
 		consumeOrderly: defaultOpts.ConsumeOrderly,
@@ -130,12 +131,11 @@ func (pc *pushConsumer) Start() error {
 			"messageModel":           pc.model,
 			"unitMode":               pc.unitMode,
 		})
-		pc.state = internal.StateStartFailed
+		atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed))
 		pc.validate()
 
 		err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
 		if err != nil {
-			pc.state = internal.StateStartFailed
 			rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{
 				rlog.LogKeyConsumerGroup: pc.consumerGroup,
 			})
@@ -220,7 +220,7 @@ func (pc *pushConsumer) Shutdown() error {
 
 func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
 	f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)) error {
-	if pc.state != internal.StateCreateJust {
+	if atomic.LoadInt32(&pc.state) != int32(internal.StateCreateJust) {
 		return errors.New("subscribe topic only started before")
 	}
 	if pc.option.Namespace != "" {
diff --git a/internal/model.go b/internal/model.go
index ea1fdac..61ecbbc 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -39,7 +39,7 @@ type (
 	// groupName of consumer
 	consumeType string
 
-	ServiceState int
+	ServiceState int32
 )
 
 const (
diff --git a/producer/producer.go b/producer/producer.go
index 78cd8d9..287df58 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -22,6 +22,7 @@ import (
 	"fmt"
 	"strconv"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/pkg/errors"
@@ -42,7 +43,7 @@ var (
 type defaultProducer struct {
 	group       string
 	client      internal.RMQClient
-	state       internal.ServiceState
+	state       int32
 	options     producerOptions
 	publishInfo sync.Map
 	callbackCh  chan interface{}
@@ -77,20 +78,20 @@ func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
 }
 
 func (p *defaultProducer) Start() error {
-	p.state = internal.StateRunning
+	atomic.StoreInt32(&p.state, int32(internal.StateRunning))
 	p.client.RegisterProducer(p.group, p)
 	p.client.Start()
 	return nil
 }
 
 func (p *defaultProducer) Shutdown() error {
-	p.state = internal.StateShutdown
+	atomic.StoreInt32(&p.state, int32(internal.StateShutdown))
 	p.client.Shutdown()
 	return nil
 }
 
 func (p *defaultProducer) checkMsg(msg *primitive.Message) error {
-	if p.state != internal.StateRunning {
+	if atomic.LoadInt32(&p.state) != int32(internal.StateRunning) {
 		return ErrNotRunning
 	}
 


Mime
View raw message