rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From huzongt...@apache.org
Subject [rocketmq-client-go] branch native updated: [ISSUE #95] Support multiple NameServe (#96)
Date Mon, 08 Jul 2019 06:19:32 GMT
This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 884475f  [ISSUE #95] Support multiple NameServe (#96)
884475f is described below

commit 884475fd06b71942f8463074cdb2cdb8a94a5542
Author: xujianhai666 <52450794+xujianhai666@users.noreply.github.com>
AuthorDate: Mon Jul 8 14:19:28 2019 +0800

    [ISSUE #95] Support multiple NameServe (#96)
    
    * fix message properties. fix #65
    
    * simplify commit
    
    * simplify commit
    
    * Support multiple NameServer. fix #95
    
    * add retry on queryTopicRouteInfoFromServer
    
    * move pkg to kernel
---
 examples/consumer/interceptor/main.go |   2 +-
 examples/consumer/simple/main.go      |   2 +-
 examples/producer/interceptor/main.go |   2 +-
 examples/producer/simple/main.go      |   2 +-
 go.mod                                |   1 +
 go.sum                                |   3 +
 internal/consumer/push_consumer.go    |  19 ++----
 internal/kernel/namesrv.go            | 107 ++++++++++++++++++++++++++++++++++
 internal/kernel/namesrv_test.go       |  63 ++++++++++++++++++++
 internal/kernel/route.go              |  24 ++++++--
 internal/producer/producer.go         |  21 ++-----
 primitive/options.go                  |   6 +-
 utils/errors.go                       |   4 +-
 utils/string.go                       |  21 -------
 utils/string_test.go                  |  30 +---------
 15 files changed, 217 insertions(+), 90 deletions(-)

diff --git a/examples/consumer/interceptor/main.go b/examples/consumer/interceptor/main.go
index 01fedc3..9db45a8 100644
--- a/examples/consumer/interceptor/main.go
+++ b/examples/consumer/interceptor/main.go
@@ -28,7 +28,7 @@ import (
 )
 
 func main() {
-	c, _ := consumer.NewPushConsumer("testGroup", "127.0.0.1:9876",
+	c, _ := consumer.NewPushConsumer("testGroup", []string{"127.0.0.1:9876"},
 		primitive.WithConsumerModel(primitive.Clustering),
 		primitive.WithConsumeFromWhere(primitive.ConsumeFromFirstOffset),
 		primitive.WithChainConsumerInterceptor(UserFistInterceptor(), UserSecondInterceptor()))
diff --git a/examples/consumer/simple/main.go b/examples/consumer/simple/main.go
index 70bbbd4..3bcf702 100644
--- a/examples/consumer/simple/main.go
+++ b/examples/consumer/simple/main.go
@@ -27,7 +27,7 @@ import (
 )
 
 func main() {
-	c, _ := consumer.NewPushConsumer("testGroup", "127.0.0.1:9876")
+	c, _ := consumer.NewPushConsumer("testGroup", []string{"127.0.0.1:9876"})
 	err := c.Subscribe("TopicTest", primitive.MessageSelector{}, func(ctx *primitive.ConsumeMessageContext,
 		msgs []*primitive.MessageExt) (primitive.ConsumeResult, error) {
 		fmt.Println("subscribe callback: %v", msgs)
diff --git a/examples/producer/interceptor/main.go b/examples/producer/interceptor/main.go
index c70eab3..a81a181 100644
--- a/examples/producer/interceptor/main.go
+++ b/examples/producer/interceptor/main.go
@@ -29,7 +29,7 @@ import (
 )
 
 func main() {
-	nameServerAddr := "127.0.0.1:9876"
+	nameServerAddr := []string{"127.0.0.1:9876"}
 	p, _ := producer.NewProducer(nameServerAddr, primitive.WithRetry(2),
 		primitive.WithChainProducerInterceptor(UserFirstInterceptor(), UserSecondInterceptor()))
 	err := p.Start()
diff --git a/examples/producer/simple/main.go b/examples/producer/simple/main.go
index 39c885d..542ec58 100644
--- a/examples/producer/simple/main.go
+++ b/examples/producer/simple/main.go
@@ -28,7 +28,7 @@ import (
 
 // Package main implements a simple producer to send message.
 func main() {
-	nameServerAddr := "127.0.0.1:9876"
+	nameServerAddr := []string{"127.0.0.1:9876"}
 	p, _ := producer.NewProducer(nameServerAddr, primitive.WithRetry(2))
 	err := p.Start()
 	if err != nil {
diff --git a/go.mod b/go.mod
index f8a9967..6fd011e 100644
--- a/go.mod
+++ b/go.mod
@@ -4,6 +4,7 @@ go 1.12
 
 require (
 	github.com/emirpasic/gods v1.12.0
+	github.com/pkg/errors v0.8.1
 	github.com/sirupsen/logrus v1.4.1
 	github.com/stretchr/testify v1.3.0
 	github.com/tidwall/gjson v1.2.1
diff --git a/go.sum b/go.sum
index 6580f88..cf90957 100644
--- a/go.sum
+++ b/go.sum
@@ -3,7 +3,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
 github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
+github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
diff --git a/internal/consumer/push_consumer.go b/internal/consumer/push_consumer.go
index 8acbc4c..d78f428 100644
--- a/internal/consumer/push_consumer.go
+++ b/internal/consumer/push_consumer.go
@@ -19,17 +19,15 @@ package consumer
 
 import (
 	"context"
-	"errors"
 	"fmt"
 	"math"
-	"os"
 	"strconv"
 	"time"
 
 	"github.com/apache/rocketmq-client-go/internal/kernel"
 	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
-	"github.com/apache/rocketmq-client-go/utils"
+	"github.com/pkg/errors"
 )
 
 // In most scenarios, this is the mostly recommended usage to consume messages.
@@ -63,24 +61,19 @@ type pushConsumer struct {
 	interceptor primitive.CInterceptor
 }
 
-func NewPushConsumer(consumerGroup string, nameServerAddr string, opts ...*primitive.ConsumerOption)
(PushConsumer, error) {
-	if err := utils.VerifyIP(nameServerAddr); err != nil {
-		return nil, err
-	}
-	if nameServerAddr == "" {
-		rlog.Fatal("opts.NameServerAddr can't be empty")
-	}
-	err := os.Setenv(kernel.EnvNameServerAddr, nameServerAddr)
+func NewPushConsumer(consumerGroup string, nameServerAddrs []string, opts ...*primitive.ConsumerOption)
(PushConsumer, error) {
+	srvs, err := kernel.NewNamesrv(nameServerAddrs...)
 	if err != nil {
-		rlog.Fatal("set env=EnvNameServerAddr error: %s ", err.Error())
+		return nil, errors.Wrap(err, "new Namesrv failed.")
 	}
+	kernel.RegisterNamsrv(srvs)
 
 	pushOpts := primitive.DefaultPushConsumerOptions()
 	for _, op := range opts {
 		op.Apply(&pushOpts)
 	}
 
-	pushOpts.NameServerAddr = nameServerAddr
+	pushOpts.NameServerAddrs = nameServerAddrs
 
 	dc := &defaultConsumer{
 		consumerGroup:  consumerGroup,
diff --git a/internal/kernel/namesrv.go b/internal/kernel/namesrv.go
new file mode 100644
index 0000000..7f29a8b
--- /dev/null
+++ b/internal/kernel/namesrv.go
@@ -0,0 +1,107 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+import (
+	"errors"
+	"regexp"
+	"strings"
+	"sync"
+)
+
+var (
+	ipRegex, _ = regexp.Compile(`^((25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))\.){3}(25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))`)
+
+	ErrNoNameserver = errors.New("nameServerAddrs can't be empty.")
+	ErrMultiIP      = errors.New("multiple IP addr does not support")
+	ErrIllegalIP    = errors.New("IP addr error")
+)
+
+// Namesrvs rocketmq namesrv instance.
+type Namesrvs struct {
+	// namesrv addr list
+	srvs []string
+
+	// lock for getNamesrv in case of update index race condition
+	lock sync.Locker
+
+	// index indicate the next position for getNamesrv
+	index int
+}
+
+// NewNamesrv init Namesrv from namesrv addr string.
+func NewNamesrv(s ...string) (*Namesrvs, error) {
+	if len(s) == 0 {
+		return nil, ErrNoNameserver
+	}
+
+	ss := s
+	if len(ss) == 1 {
+		// compatible with multi server env string: "a;b;c"
+		ss = strings.Split(s[0], ";")
+	}
+
+	for _, srv := range ss {
+		if err := verifyIP(srv); err != nil {
+			return nil, err
+		}
+	}
+
+	return &Namesrvs{
+		srvs: ss,
+		lock: new(sync.Mutex),
+	}, nil
+}
+
+// GetNamesrv return namesrv using round-robin strategy.
+func (s *Namesrvs) GetNamesrv() string {
+	s.lock.Lock()
+	defer s.lock.Unlock()
+
+	addr := s.srvs[s.index]
+	index := s.index + 1
+	if index < 0 {
+		index = -index
+	}
+	index %= len(s.srvs)
+	s.index = index
+	return addr
+}
+
+func (s *Namesrvs) Size() int {
+	return len(s.srvs)
+}
+
+func (s *Namesrvs) String() string {
+	return strings.Join(s.srvs, ";")
+}
+
+func verifyIP(ip string) error {
+	if strings.Contains(ip, ";") {
+		return ErrMultiIP
+	}
+	ips := ipRegex.FindAllString(ip, -1)
+	if len(ips) == 0 {
+		return ErrIllegalIP
+	}
+
+	if len(ips) > 1 {
+		return ErrMultiIP
+	}
+	return nil
+}
diff --git a/internal/kernel/namesrv_test.go b/internal/kernel/namesrv_test.go
new file mode 100644
index 0000000..accae0a
--- /dev/null
+++ b/internal/kernel/namesrv_test.go
@@ -0,0 +1,63 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestVerifyIP(t *testing.T) {
+	IPs := "127.0.0.1:9876"
+	err := verifyIP(IPs)
+	assert.Nil(t, err)
+
+	IPs = "12.24.123.243:10911"
+	err = verifyIP(IPs)
+	assert.Nil(t, err)
+
+	IPs = "xa2.0.0.1:9876"
+	err = verifyIP(IPs)
+	assert.Equal(t, "IP addr error", err.Error())
+
+	IPs = "333.0.0.1:9876"
+	err = verifyIP(IPs)
+	assert.Equal(t, "IP addr error", err.Error())
+
+	IPs = "127.0.0.1:9876;12.24.123.243:10911"
+	err = verifyIP(IPs)
+	assert.Equal(t, "multiple IP addr does not support", err.Error())
+}
+
+// TestSelector test roundrobin selector in namesrv
+func TestSelector(t *testing.T) {
+	srvs := []string{"127.0.0.1:9876", "127.0.0.1:9879", "12.24.123.243:10911", "12.24.123.243:10915"}
+	namesrv, err := NewNamesrv(srvs...)
+	assert.Nil(t, err)
+
+	assert.Equal(t, srvs[0], namesrv.GetNamesrv())
+	assert.Equal(t, srvs[1], namesrv.GetNamesrv())
+	assert.Equal(t, srvs[2], namesrv.GetNamesrv())
+	assert.Equal(t, srvs[3], namesrv.GetNamesrv())
+	assert.Equal(t, srvs[0], namesrv.GetNamesrv())
+	assert.Equal(t, srvs[1], namesrv.GetNamesrv())
+	assert.Equal(t, srvs[2], namesrv.GetNamesrv())
+	assert.Equal(t, srvs[3], namesrv.GetNamesrv())
+	assert.Equal(t, srvs[0], namesrv.GetNamesrv())
+}
diff --git a/internal/kernel/route.go b/internal/kernel/route.go
index 416d7b8..6e012c5 100644
--- a/internal/kernel/route.go
+++ b/internal/kernel/route.go
@@ -21,7 +21,6 @@ import (
 	"encoding/json"
 	"errors"
 	"math/rand"
-	"os"
 	"sort"
 	"strconv"
 	"strings"
@@ -48,8 +47,14 @@ const (
 var (
 	ErrTopicNotExist = errors.New("topic not exist")
 	nameSrvClient    = remote.NewRemotingClient()
+
+	nameSrvs *Namesrvs
 )
 
+func RegisterNamsrv(s *Namesrvs) {
+	nameSrvs = s
+}
+
 var (
 	// brokerName -> *BrokerData
 	brokerAddressesMap sync.Map
@@ -261,10 +266,21 @@ func queryTopicRouteInfoFromServer(topic string) (*TopicRouteData, error)
{
 	request := &GetRouteInfoRequest{
 		Topic: topic,
 	}
-	rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
-	response, err := nameSrvClient.InvokeSync(getNameServerAddress(), rc, requestTimeout)
 
+	var (
+		response *remote.RemotingCommand
+		err error
+	)
+	for i := 0; i < nameSrvs.Size(); i++ {
+		rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
+		response, err = nameSrvClient.InvokeSync(getNameServerAddress(), rc, requestTimeout)
+
+		if err != nil {
+			continue
+		}
+	}
 	if err != nil {
+		rlog.Errorf("connect to namesrv: %v failed.", nameSrvs)
 		return nil, err
 	}
 
@@ -372,7 +388,7 @@ func routeData2PublishInfo(topic string, data *TopicRouteData) *TopicPublishInfo
 }
 
 func getNameServerAddress() string {
-	return os.Getenv(EnvNameServerAddr)
+	return nameSrvs.GetNamesrv()
 }
 
 // TopicRouteData TopicRouteData
diff --git a/internal/producer/producer.go b/internal/producer/producer.go
index a559a98..5be553e 100644
--- a/internal/producer/producer.go
+++ b/internal/producer/producer.go
@@ -19,9 +19,7 @@ package producer
 
 import (
 	"context"
-	"errors"
 	"fmt"
-	"os"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -30,7 +28,7 @@ import (
 	"github.com/apache/rocketmq-client-go/internal/remote"
 	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
-	"github.com/apache/rocketmq-client-go/utils"
+	"github.com/pkg/errors"
 )
 
 type Producer interface {
@@ -40,24 +38,18 @@ type Producer interface {
 	SendOneWay(context.Context, *primitive.Message) error
 }
 
-func NewProducer(nameServerAddr string, opts ...*primitive.ProducerOption) (Producer, error)
{
-	if err := utils.VerifyIP(nameServerAddr); err != nil {
-		return nil, err
-	}
-
-	if nameServerAddr == "" {
-		rlog.Fatal("nameServerAddr can't be empty")
-	}
-	err := os.Setenv(kernel.EnvNameServerAddr, nameServerAddr)
+func NewProducer(nameServerAddrs []string, opts ...*primitive.ProducerOption) (Producer,
error) {
+	srvs, err := kernel.NewNamesrv(nameServerAddrs...)
 	if err != nil {
-		rlog.Fatal("set env=EnvNameServerAddr error: %s ", err.Error())
+		return nil, errors.Wrap(err, "new Namesrv failed.")
 	}
+	kernel.RegisterNamsrv(srvs)
 
 	popts := primitive.DefaultProducerOptions()
 	for _, opt := range opts {
 		opt.Apply(&popts)
 	}
-	popts.NameServerAddr = nameServerAddr
+	popts.NameServerAddrs = nameServerAddrs
 
 	producer := &defaultProducer{
 		group:   "default",
@@ -289,4 +281,3 @@ func (p *defaultProducer) IsPublishTopicNeedUpdate(topic string) bool
{
 func (p *defaultProducer) IsUnitMode() bool {
 	return false
 }
-
diff --git a/primitive/options.go b/primitive/options.go
index 0e40415..2cbf63c 100644
--- a/primitive/options.go
+++ b/primitive/options.go
@@ -30,7 +30,7 @@ type ProducerOptions struct {
 	Interceptors []PInterceptor
 
 	ClientOption
-	NameServerAddr           string
+	NameServerAddrs          []string
 	GroupName                string
 	RetryTimesWhenSendFailed int
 	UnitMode                 bool
@@ -79,7 +79,7 @@ func WithRetry(retries int) *ProducerOption {
 
 type ConsumerOptions struct {
 	ClientOption
-	NameServerAddr string
+	NameServerAddrs []string
 
 	/**
 	 * Backtracking consumption time with second precision. Time format is
@@ -217,7 +217,7 @@ func (opt *ClientOption) String() string {
 }
 
 type ClientOption struct {
-	NameServerAddr    string
+	NameServerAddrs   string
 	ClientIP          string
 	InstanceName      string
 	UnitMode          bool
diff --git a/utils/errors.go b/utils/errors.go
index 0d96d97..a3c7ead 100644
--- a/utils/errors.go
+++ b/utils/errors.go
@@ -17,7 +17,9 @@ limitations under the License.
 
 package utils
 
-import "github.com/apache/rocketmq-client-go/rlog"
+import (
+	"github.com/apache/rocketmq-client-go/rlog"
+)
 
 func CheckError(action string, err error) {
 	if err != nil {
diff --git a/utils/string.go b/utils/string.go
index 427a8a5..a1f9950 100644
--- a/utils/string.go
+++ b/utils/string.go
@@ -18,14 +18,7 @@ limitations under the License.
 package utils
 
 import (
-	"errors"
 	"fmt"
-	"regexp"
-	"strings"
-)
-
-var (
-	ipRegex, _ = regexp.Compile(`^((25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))\.){3}(25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))`)
 )
 
 // HashString hashes a string to a unique hashcode.
@@ -48,17 +41,3 @@ func StrJoin(str, key string, value interface{}) string {
 	return str + key + ": " + fmt.Sprint(value) + ", "
 }
 
-func VerifyIP(ip string) error {
-	if strings.Contains(ip, ";") {
-		return errors.New("multiple IP addr does not support")
-	}
-	ips := ipRegex.FindAllString(ip, -1)
-	if len(ips) == 0 {
-		return errors.New("IP addr error")
-	}
-
-	if len(ips) > 1 {
-		return errors.New("multiple IP addr does not support")
-	}
-	return nil
-}
diff --git a/utils/string_test.go b/utils/string_test.go
index e0a9f70..5b55d23 100644
--- a/utils/string_test.go
+++ b/utils/string_test.go
@@ -1,29 +1 @@
-package utils
-
-import (
-	"testing"
-
-	"github.com/stretchr/testify/assert"
-)
-
-func TestVerifyIP(t *testing.T) {
-	IPs := "127.0.0.1:9876"
-	err := VerifyIP(IPs)
-	assert.Nil(t, err)
-
-	IPs = "12.24.123.243:10911"
-	err = VerifyIP(IPs)
-	assert.Nil(t, err)
-
-	IPs = "xa2.0.0.1:9876"
-	err = VerifyIP(IPs)
-	assert.Equal(t, "IP addr error", err.Error())
-
-	IPs = "333.0.0.1:9876"
-	err = VerifyIP(IPs)
-	assert.Equal(t, "IP addr error", err.Error())
-
-	IPs = "127.0.0.1:9876;12.24.123.243:10911"
-	err = VerifyIP(IPs)
-	assert.Equal(t, "multiple IP addr does not support", err.Error())
-}
+package utils
\ No newline at end of file


Mime
View raw message