rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-go] branch native updated: feat: support nameserver resolver (#457)
Date Sun, 26 Apr 2020 06:56:32 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 17a373f  feat: support nameserver resolver (#457)
17a373f is described below

commit 17a373f0392d5d8e106d3d5ecf0a6a3ec8e8bd3b
Author: xujianhai666 <52450794+xujianhai666@users.noreply.github.com>
AuthorDate: Sun Apr 26 14:56:27 2020 +0800

    feat: support nameserver resolver (#457)
    
    use nameserver resolver instead of nameserver address, we can pass
    passThrough for direct address, httpResolver for a domain, an env resolver
    for env param.
---
 .travis.yml                                |   6 +-
 api.go                                     |   1 +
 consumer/consumer.go                       |   6 +-
 consumer/mock_offset_store.go              |   3 +-
 consumer/offset_store.go                   |   8 +-
 consumer/option.go                         |  39 +++--
 consumer/pull_consumer.go                  |   2 +-
 consumer/push_consumer.go                  |   2 +-
 consumer/push_consumer_test.go             |   5 +-
 consumer/strategy_test.go                  |   3 +-
 examples/consumer/acl/main.go              |   2 +-
 examples/consumer/broadcast/main.go        |   2 +-
 examples/consumer/delay/main.go            |   2 +-
 examples/consumer/interceptor/main.go      |   2 +-
 examples/consumer/namespace/main.go        |   2 +-
 examples/consumer/orderly/main.go          |   2 +-
 examples/consumer/pull/main.go             |   2 +-
 examples/consumer/retry/concurrent/main.go |   2 +-
 examples/consumer/retry/order/main.go      |   2 +-
 examples/consumer/simple/main.go           |   2 +-
 examples/consumer/strategy/main.go         |   2 +-
 examples/consumer/tag/main.go              |   2 +-
 examples/consumer/trace/main.go            |   2 +-
 examples/producer/acl/main.go              |   2 +-
 examples/producer/async/main.go            |   2 +-
 examples/producer/batch/main.go            |   2 +-
 examples/producer/delay/main.go            |   2 +-
 examples/producer/interceptor/main.go      |   2 +-
 examples/producer/namespace/main.go        |   2 +-
 examples/producer/simple/main.go           |   2 +-
 examples/producer/tag/main.go              |   2 +-
 examples/producer/trace/main.go            |   2 +-
 examples/producer/transaction/main.go      |   2 +-
 internal/client.go                         |  46 +++---
 internal/mock_namesrv.go                   |   8 +-
 internal/model.go                          |   2 +-
 internal/namesrv.go                        | 124 ++++------------
 internal/namesrv_test.go                   | 129 ++---------------
 internal/remote/codec.go                   |   2 +-
 internal/remote/codec_test.go              |   3 +-
 internal/remote/mock_remote_client.go      |   3 +-
 internal/route_test.go                     |   2 +-
 internal/trace.go                          |   2 +-
 primitive/base.go                          |  28 ++++
 primitive/base_test.go                     |  18 +++
 primitive/nsresolver.go                    | 219 +++++++++++++++++++++++++++++
 primitive/nsresolver_test.go               | 133 ++++++++++++++++++
 producer/option.go                         |  37 +++--
 producer/producer.go                       |   5 +-
 producer/producer_test.go                  |   8 +-
 50 files changed, 562 insertions(+), 328 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index e65ed79..d4ed1ac 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -3,9 +3,8 @@ language: go
 go:
   - "1.11.x"
   - "1.12.x"
-
+  - "1.13.x"
 go_import_path: github.com/apache/rocketmq-client-go/v2
-
 env:
   global:
     - NAME_SERVER_ADDRESS=127.0.0.1:9876
@@ -27,7 +26,10 @@ before_script:
   - nohup sh bin/mqbroker -n localhost:9876 &
 
 script:
+  - cd ${TRAVIS_HOME}
+  - ls -al
   - cd ${GOPATH}/src/github.com/apache/rocketmq-client-go/v2
+  - ls -al
   - go fmt ./... && [[ -z `git status -s` ]]
   - go mod vendor && go test ./... -coverprofile=coverage.txt -covermode=atomic
 
diff --git a/api.go b/api.go
index 0f9a0bd..0e149e9 100644
--- a/api.go
+++ b/api.go
@@ -19,6 +19,7 @@ package rocketmq
 
 import (
 	"context"
+
 	"github.com/pkg/errors"
 
 	"github.com/apache/rocketmq-client-go/v2/consumer"
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 2a1f9af..3129b8d 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -27,7 +27,8 @@ import (
 	"sync/atomic"
 	"time"
 
-	"github.com/json-iterator/go"
+	jsoniter "github.com/json-iterator/go"
+
 	"github.com/pkg/errors"
 	"github.com/tidwall/gjson"
 
@@ -273,9 +274,6 @@ type defaultConsumer struct {
 }
 
 func (dc *defaultConsumer) start() error {
-	if len(dc.option.NameServerAddrs) == 0 {
-		dc.namesrv.UpdateNameServerAddress(dc.option.NameServerDomain, dc.option.InstanceName)
-	}
 	if dc.model == Clustering {
 		// set retry topic
 		retryTopic := internal.GetRetryTopic(dc.consumerGroup)
diff --git a/consumer/mock_offset_store.go b/consumer/mock_offset_store.go
index a78a698..bac1cdb 100644
--- a/consumer/mock_offset_store.go
+++ b/consumer/mock_offset_store.go
@@ -22,9 +22,10 @@ limitations under the License.
 package consumer
 
 import (
+	reflect "reflect"
+
 	primitive "github.com/apache/rocketmq-client-go/v2/primitive"
 	gomock "github.com/golang/mock/gomock"
-	reflect "reflect"
 )
 
 // MockOffsetStore is a mock of OffsetStore interface
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index e5f3bab..44a0597 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -26,7 +26,7 @@ import (
 	"sync"
 	"time"
 
-	"github.com/json-iterator/go"
+	jsoniter "github.com/json-iterator/go"
 
 	"github.com/apache/rocketmq-client-go/v2/internal"
 	"github.com/apache/rocketmq-client-go/v2/internal/remote"
@@ -255,12 +255,6 @@ func (r *remoteBrokerOffsetStore) persist(mqs []*primitive.MessageQueue) {
 				rlog.LogKeyUnderlayError: err.Error(),
 				"offset":                 off,
 			})
-		} else {
-			rlog.Info("update offset to broker success", map[string]interface{}{
-				rlog.LogKeyConsumerGroup: r.group,
-				rlog.LogKeyMessageQueue:  mq.String(),
-				"offset":                 off,
-			})
 		}
 	}
 }
diff --git a/consumer/option.go b/consumer/option.go
index 2537554..07b4246 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -104,6 +104,8 @@ type consumerOptions struct {
 	//
 	AutoCommit            bool
 	RebalanceLockInterval time.Duration
+
+	Resolver primitive.NsResolver
 }
 
 func defaultPushConsumerOptions() consumerOptions {
@@ -115,6 +117,7 @@ func defaultPushConsumerOptions() consumerOptions {
 		MaxReconsumeTimes:          -1,
 		ConsumerModel:              Clustering,
 		AutoCommit:                 true,
+		Resolver:                   primitive.NewHttpResolver("DEFAULT"),
 	}
 	opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
 	return opts
@@ -125,6 +128,7 @@ type Option func(*consumerOptions)
 func defaultPullConsumerOptions() consumerOptions {
 	opts := consumerOptions{
 		ClientOptions: internal.DefaultClientOptions(),
+		Resolver:      primitive.NewHttpResolver("DEFAULT"),
 	}
 	opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
 	return opts
@@ -179,20 +183,6 @@ func WithInstance(name string) Option {
 	}
 }
 
-// WithNameServer set NameServer address, only support one NameServer cluster in alpha2
-func WithNameServer(nameServers primitive.NamesrvAddr) Option {
-	return func(opts *consumerOptions) {
-		opts.NameServerAddrs = nameServers
-	}
-}
-
-// WithNameServerDomain set NameServer domain
-func WithNameServerDomain(nameServerUrl string) Option {
-	return func(opts *consumerOptions) {
-		opts.NameServerDomain = nameServerUrl
-	}
-}
-
 // WithNamespace set the namespace of consumer
 func WithNamespace(namespace string) Option {
 	return func(opts *consumerOptions) {
@@ -263,3 +253,24 @@ func WithPullInterval(interval time.Duration) Option {
 		options.PullInterval = interval
 	}
 }
+
+// WithNsResovler set nameserver resolver to fetch nameserver addr
+func WithNsResovler(resolver primitive.NsResolver) Option {
+	return func(options *consumerOptions) {
+		options.Resolver = resolver
+	}
+}
+
+// WithNameServer set NameServer address, only support one NameServer cluster in alpha2
+func WithNameServer(nameServers primitive.NamesrvAddr) Option {
+	return func(options *consumerOptions) {
+		options.Resolver = primitive.NewPassthroughResolver(nameServers)
+	}
+}
+
+// WithNameServerDomain set NameServer domain
+func WithNameServerDomain(nameServerUrl string) Option {
+	return func(opts *consumerOptions) {
+		opts.Resolver = primitive.NewHttpResolver("DEFAULT", nameServerUrl)
+	}
+}
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 83369d1..0523f08 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -76,7 +76,7 @@ func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
 		apply(&defaultOpts)
 	}
 
-	srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
+	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
 	if err != nil {
 		return nil, errors.Wrap(err, "new Namesrv failed.")
 	}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index d14f0a5..d2aee5b 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -76,7 +76,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
 	for _, apply := range opts {
 		apply(&defaultOpts)
 	}
-	srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
+	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
 	if err != nil {
 		return nil, errors.Wrap(err, "new Namesrv failed.")
 	}
diff --git a/consumer/push_consumer_test.go b/consumer/push_consumer_test.go
index 3115807..7cb5fca 100644
--- a/consumer/push_consumer_test.go
+++ b/consumer/push_consumer_test.go
@@ -20,11 +20,12 @@ package consumer
 import (
 	"context"
 	"fmt"
+	"testing"
+
 	"github.com/apache/rocketmq-client-go/v2/internal"
 	"github.com/apache/rocketmq-client-go/v2/primitive"
 	"github.com/golang/mock/gomock"
 	. "github.com/smartystreets/goconvey/convey"
-	"testing"
 )
 
 func mockB4Start(c *pushConsumer) {
@@ -35,7 +36,7 @@ func TestStart(t *testing.T) {
 	Convey("test Start method", t, func() {
 		c, _ := NewPushConsumer(
 			WithGroupName("testGroup"),
-			WithNameServer([]string{"127.0.0.1:9876"}),
+			WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 			WithConsumerModel(BroadCasting),
 		)
 
diff --git a/consumer/strategy_test.go b/consumer/strategy_test.go
index 3cd79cc..e66b15c 100644
--- a/consumer/strategy_test.go
+++ b/consumer/strategy_test.go
@@ -19,9 +19,10 @@ package consumer
 
 import (
 	"fmt"
+	"testing"
+
 	"github.com/apache/rocketmq-client-go/v2/primitive"
 	. "github.com/smartystreets/goconvey/convey"
-	"testing"
 )
 
 func TestAllocateByAveragely(t *testing.T) {
diff --git a/examples/consumer/acl/main.go b/examples/consumer/acl/main.go
index 583f8f0..a6535fd 100644
--- a/examples/consumer/acl/main.go
+++ b/examples/consumer/acl/main.go
@@ -31,7 +31,7 @@ import (
 func main() {
 	c, err := rocketmq.NewPushConsumer(
 		consumer.WithGroupName("testGroup"),
-		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+		consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		consumer.WithCredentials(primitive.Credentials{
 			AccessKey: "RocketMQ",
 			SecretKey: "12345678",
diff --git a/examples/consumer/broadcast/main.go b/examples/consumer/broadcast/main.go
index 114aa87..29b0b12 100644
--- a/examples/consumer/broadcast/main.go
+++ b/examples/consumer/broadcast/main.go
@@ -31,7 +31,7 @@ import (
 func main() {
 	c, _ := rocketmq.NewPushConsumer(
 		consumer.WithGroupName("testGroup"),
-		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+		consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
 		consumer.WithConsumerModel(consumer.BroadCasting),
 	)
diff --git a/examples/consumer/delay/main.go b/examples/consumer/delay/main.go
index d14c620..8cc5c04 100644
--- a/examples/consumer/delay/main.go
+++ b/examples/consumer/delay/main.go
@@ -31,7 +31,7 @@ import (
 func main() {
 	c, _ := rocketmq.NewPushConsumer(
 		consumer.WithGroupName("testGroup"),
-		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+		consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 	)
 	err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
 		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
diff --git a/examples/consumer/interceptor/main.go b/examples/consumer/interceptor/main.go
index 37a5979..1036c6a 100644
--- a/examples/consumer/interceptor/main.go
+++ b/examples/consumer/interceptor/main.go
@@ -31,7 +31,7 @@ import (
 func main() {
 	c, _ := rocketmq.NewPushConsumer(
 		consumer.WithGroupName("testGroup"),
-		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+		consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		consumer.WithConsumerModel(consumer.Clustering),
 		consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
 		consumer.WithInterceptor(UserFistInterceptor(), UserSecondInterceptor()))
diff --git a/examples/consumer/namespace/main.go b/examples/consumer/namespace/main.go
index 815e152..e1b9dea 100644
--- a/examples/consumer/namespace/main.go
+++ b/examples/consumer/namespace/main.go
@@ -31,7 +31,7 @@ import (
 func main() {
 	c, err := rocketmq.NewPushConsumer(
 		consumer.WithGroupName("testGroup"),
-		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+		consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		consumer.WithCredentials(primitive.Credentials{
 			AccessKey: "RocketMQ",
 			SecretKey: "12345678",
diff --git a/examples/consumer/orderly/main.go b/examples/consumer/orderly/main.go
index 3e51896..9e7a810 100644
--- a/examples/consumer/orderly/main.go
+++ b/examples/consumer/orderly/main.go
@@ -31,7 +31,7 @@ import (
 func main() {
 	c, _ := rocketmq.NewPushConsumer(
 		consumer.WithGroupName("testGroup"),
-		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+		consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		consumer.WithConsumerModel(consumer.Clustering),
 		consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
 		consumer.WithConsumerOrder(true),
diff --git a/examples/consumer/pull/main.go b/examples/consumer/pull/main.go
index 3c75272..d740b15 100644
--- a/examples/consumer/pull/main.go
+++ b/examples/consumer/pull/main.go
@@ -31,7 +31,7 @@ import (
 func main() {
 	c, err := rocketmq.NewPullConsumer(
 		consumer.WithGroupName("testGroup"),
-		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+		consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 	)
 	if err != nil {
 		rlog.Fatal(fmt.Sprintf("fail to new pullConsumer: %s", err), nil)
diff --git a/examples/consumer/retry/concurrent/main.go b/examples/consumer/retry/concurrent/main.go
index acfe749..5fcf489 100644
--- a/examples/consumer/retry/concurrent/main.go
+++ b/examples/consumer/retry/concurrent/main.go
@@ -37,7 +37,7 @@ import (
 func main() {
 	c, _ := rocketmq.NewPushConsumer(
 		consumer.WithGroupName("testGroup"),
-		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+		consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		consumer.WithConsumerModel(consumer.Clustering),
 	)
 
diff --git a/examples/consumer/retry/order/main.go b/examples/consumer/retry/order/main.go
index ee726eb..4ec05e7 100644
--- a/examples/consumer/retry/order/main.go
+++ b/examples/consumer/retry/order/main.go
@@ -36,7 +36,7 @@ import (
 func main() {
 	c, _ := rocketmq.NewPushConsumer(
 		consumer.WithGroupName("testGroup"),
-		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+		consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		consumer.WithConsumerModel(consumer.Clustering),
 		consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
 		consumer.WithConsumerOrder(true),
diff --git a/examples/consumer/simple/main.go b/examples/consumer/simple/main.go
index 41091c4..7d1a0b7 100644
--- a/examples/consumer/simple/main.go
+++ b/examples/consumer/simple/main.go
@@ -31,7 +31,7 @@ import (
 func main() {
 	c, _ := rocketmq.NewPushConsumer(
 		consumer.WithGroupName("testGroup"),
-		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+		consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 	)
 	err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
 		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
diff --git a/examples/consumer/strategy/main.go b/examples/consumer/strategy/main.go
index 5ff639e..502524c 100644
--- a/examples/consumer/strategy/main.go
+++ b/examples/consumer/strategy/main.go
@@ -31,7 +31,7 @@ import (
 func main() {
 	c, _ := rocketmq.NewPushConsumer(
 		consumer.WithGroupName("testGroup"),
-		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+		consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		consumer.WithStrategy(consumer.AllocateByAveragely),
 	)
 	err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
diff --git a/examples/consumer/tag/main.go b/examples/consumer/tag/main.go
index a2e811f..0532971 100644
--- a/examples/consumer/tag/main.go
+++ b/examples/consumer/tag/main.go
@@ -31,7 +31,7 @@ import (
 func main() {
 	c, _ := rocketmq.NewPushConsumer(
 		consumer.WithGroupName("testGroup"),
-		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+		consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 	)
 	selector := consumer.MessageSelector{
 		Type:       consumer.TAG,
diff --git a/examples/consumer/trace/main.go b/examples/consumer/trace/main.go
index d25a635..35c4926 100644
--- a/examples/consumer/trace/main.go
+++ b/examples/consumer/trace/main.go
@@ -37,7 +37,7 @@ func main() {
 
 	c, _ := rocketmq.NewPushConsumer(
 		consumer.WithGroupName("testGroup"),
-		consumer.WithNameServer(namesrvs),
+		consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		consumer.WithTrace(traceCfg),
 	)
 	err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
diff --git a/examples/producer/acl/main.go b/examples/producer/acl/main.go
index 73d4fcb..38d61dc 100644
--- a/examples/producer/acl/main.go
+++ b/examples/producer/acl/main.go
@@ -30,7 +30,7 @@ import (
 
 func main() {
 	p, err := rocketmq.NewProducer(
-		producer.WithNameServer([]string{"127.0.0.1:9876"}),
+		producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		producer.WithRetry(2),
 		producer.WithCredentials(primitive.Credentials{
 			AccessKey: "RocketMQ",
diff --git a/examples/producer/async/main.go b/examples/producer/async/main.go
index 699a70e..aa73881 100644
--- a/examples/producer/async/main.go
+++ b/examples/producer/async/main.go
@@ -31,7 +31,7 @@ import (
 // Package main implements a async producer to send message.
 func main() {
 	p, _ := rocketmq.NewProducer(
-		producer.WithNameServer([]string{"127.0.0.1:9876"}),
+		producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		producer.WithRetry(2),
 		producer.WithQueueSelector(producer.NewManualQueueSelector()))
 
diff --git a/examples/producer/batch/main.go b/examples/producer/batch/main.go
index 6260536..d807daf 100644
--- a/examples/producer/batch/main.go
+++ b/examples/producer/batch/main.go
@@ -30,7 +30,7 @@ import (
 
 func main() {
 	p, _ := rocketmq.NewProducer(
-		producer.WithNameServer([]string{"127.0.0.1:9876"}),
+		producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		producer.WithRetry(2),
 	)
 	err := p.Start()
diff --git a/examples/producer/delay/main.go b/examples/producer/delay/main.go
index 6d5cb5d..465e97c 100644
--- a/examples/producer/delay/main.go
+++ b/examples/producer/delay/main.go
@@ -29,7 +29,7 @@ import (
 
 func main() {
 	p, _ := rocketmq.NewProducer(
-		producer.WithNameServer([]string{"127.0.0.1:9876"}),
+		producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		producer.WithRetry(2),
 	)
 	err := p.Start()
diff --git a/examples/producer/interceptor/main.go b/examples/producer/interceptor/main.go
index 8dafe3d..47ea1fb 100644
--- a/examples/producer/interceptor/main.go
+++ b/examples/producer/interceptor/main.go
@@ -30,7 +30,7 @@ import (
 
 func main() {
 	p, _ := rocketmq.NewProducer(
-		producer.WithNameServer([]string{"127.0.0.1:9876"}),
+		producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		producer.WithRetry(2),
 		producer.WithInterceptor(UserFirstInterceptor(), UserSecondInterceptor()),
 	)
diff --git a/examples/producer/namespace/main.go b/examples/producer/namespace/main.go
index 5a82b72..524bd32 100644
--- a/examples/producer/namespace/main.go
+++ b/examples/producer/namespace/main.go
@@ -30,7 +30,7 @@ import (
 
 func main() {
 	p, err := rocketmq.NewProducer(
-		producer.WithNameServer([]string{"127.0.0.1:9876"}),
+		producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		producer.WithRetry(2),
 		producer.WithCredentials(primitive.Credentials{
 			AccessKey: "RocketMQ",
diff --git a/examples/producer/simple/main.go b/examples/producer/simple/main.go
index 08b7682..8ac4421 100644
--- a/examples/producer/simple/main.go
+++ b/examples/producer/simple/main.go
@@ -31,7 +31,7 @@ import (
 // Package main implements a simple producer to send message.
 func main() {
 	p, _ := rocketmq.NewProducer(
-		producer.WithNameServer([]string{"127.0.0.1:9876"}),
+		producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		producer.WithRetry(2),
 	)
 	err := p.Start()
diff --git a/examples/producer/tag/main.go b/examples/producer/tag/main.go
index 02c402e..2bcd51b 100644
--- a/examples/producer/tag/main.go
+++ b/examples/producer/tag/main.go
@@ -29,7 +29,7 @@ import (
 
 func main() {
 	p, _ := rocketmq.NewProducer(
-		producer.WithNameServer([]string{"127.0.0.1:9876"}),
+		producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		producer.WithRetry(2),
 	)
 	err := p.Start()
diff --git a/examples/producer/trace/main.go b/examples/producer/trace/main.go
index 33d86d5..5f2c9b3 100644
--- a/examples/producer/trace/main.go
+++ b/examples/producer/trace/main.go
@@ -36,7 +36,7 @@ func main() {
 	}
 
 	p, _ := rocketmq.NewProducer(
-		producer.WithNameServer(namesrvs),
+		producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		producer.WithRetry(2),
 		producer.WithTrace(traceCfg))
 	err := p.Start()
diff --git a/examples/producer/transaction/main.go b/examples/producer/transaction/main.go
index 8536501..dde39a9 100644
--- a/examples/producer/transaction/main.go
+++ b/examples/producer/transaction/main.go
@@ -79,7 +79,7 @@ func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primiti
 func main() {
 	p, _ := rocketmq.NewTransactionProducer(
 		NewDemoListener(),
-		producer.WithNameServer([]string{"127.0.0.1:9876"}),
+		producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		producer.WithRetry(1),
 	)
 	err := p.Start()
diff --git a/internal/client.go b/internal/client.go
index 954cc57..37d7ea8 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -101,7 +101,6 @@ func DefaultClientOptions() ClientOptions {
 type ClientOptions struct {
 	GroupName         string
 	NameServerAddrs   primitive.NamesrvAddr
-	NameServerDomain  string
 	Namesrv           *namesrvs
 	ClientIP          string
 	InstanceName      string
@@ -112,6 +111,7 @@ type ClientOptions struct {
 	Interceptors      []primitive.Interceptor
 	Credentials       primitive.Credentials
 	Namespace         string
+	Resolver          primitive.NsResolver
 }
 
 func (opt *ClientOptions) ChangeInstanceNameToPID() {
@@ -263,31 +263,27 @@ func (c *rmqClient) Start() {
 		if !c.option.Credentials.IsEmpty() {
 			c.remoteClient.RegisterInterceptor(remote.ACLInterceptor(c.option.Credentials))
 		}
-		// fetchNameServerAddr
-		if len(c.option.NameServerAddrs) == 0 {
-			c.namesrvs.UpdateNameServerAddress(c.option.NameServerDomain, c.option.InstanceName)
-			go primitive.WithRecover(func() {
-				op := func() {
-					c.namesrvs.UpdateNameServerAddress(c.option.NameServerDomain, c.option.InstanceName)
-				}
-				time.Sleep(10 * time.Second)
-				op()
-
-				ticker := time.NewTicker(2 * time.Minute)
-				defer ticker.Stop()
-				for {
-					select {
-					case <-ticker.C:
-						op()
-					case <-c.done:
-						rlog.Info("The RMQClient stopping update name server domain info.", map[string]interface{}{
-							"clientID": c.ClientID(),
-						})
-						return
-					}
+		go primitive.WithRecover(func() {
+			op := func() {
+				c.namesrvs.UpdateNameServerAddress()
+			}
+			time.Sleep(10 * time.Second)
+			op()
+
+			ticker := time.NewTicker(2 * time.Minute)
+			defer ticker.Stop()
+			for {
+				select {
+				case <-ticker.C:
+					op()
+				case <-c.done:
+					rlog.Info("The RMQClient stopping update name server domain info.", map[string]interface{}{
+						"clientID": c.ClientID(),
+					})
+					return
 				}
-			})
-		}
+			}
+		})
 
 		// schedule update route info
 		go primitive.WithRecover(func() {
diff --git a/internal/mock_namesrv.go b/internal/mock_namesrv.go
index 7f9bfd6..f87d174 100644
--- a/internal/mock_namesrv.go
+++ b/internal/mock_namesrv.go
@@ -51,15 +51,15 @@ func (m *MockNamesrvs) EXPECT() *MockNamesrvsMockRecorder {
 }
 
 // UpdateNameServerAddress mocks base method
-func (m *MockNamesrvs) UpdateNameServerAddress(nameServerDomain, instanceName string) {
+func (m *MockNamesrvs) UpdateNameServerAddress() {
 	m.ctrl.T.Helper()
-	m.ctrl.Call(m, "UpdateNameServerAddress", nameServerDomain, instanceName)
+	m.ctrl.Call(m, "UpdateNameServerAddress")
 }
 
 // UpdateNameServerAddress indicates an expected call of UpdateNameServerAddress
-func (mr *MockNamesrvsMockRecorder) UpdateNameServerAddress(nameServerDomain, instanceName interface{}) *gomock.Call {
+func (mr *MockNamesrvsMockRecorder) UpdateNameServerAddress() *gomock.Call {
 	mr.mock.ctrl.T.Helper()
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateNameServerAddress", reflect.TypeOf((*MockNamesrvs)(nil).UpdateNameServerAddress), nameServerDomain, instanceName)
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateNameServerAddress", reflect.TypeOf((*MockNamesrvs)(nil).UpdateNameServerAddress))
 }
 
 // AddBroker mocks base method
diff --git a/internal/model.go b/internal/model.go
index 0c8259f..0ee9ccc 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -21,13 +21,13 @@ import (
 	"bytes"
 	"encoding/json"
 	"fmt"
-	"github.com/json-iterator/go"
 	"sort"
 	"strings"
 
 	"github.com/apache/rocketmq-client-go/v2/internal/utils"
 	"github.com/apache/rocketmq-client-go/v2/primitive"
 	"github.com/apache/rocketmq-client-go/v2/rlog"
+	jsoniter "github.com/json-iterator/go"
 )
 
 type FindBrokerResult struct {
diff --git a/internal/namesrv.go b/internal/namesrv.go
index a06a09e..6d8ff2b 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -19,19 +19,12 @@ package internal
 
 import (
 	"errors"
-	"fmt"
-	"github.com/apache/rocketmq-client-go/v2/internal/remote"
-	"github.com/apache/rocketmq-client-go/v2/primitive"
-	"github.com/apache/rocketmq-client-go/v2/rlog"
-	"io/ioutil"
-	"net/http"
-	"os"
-	"os/user"
-	"path"
 	"regexp"
 	"strings"
 	"sync"
-	"time"
+
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
 )
 
 const (
@@ -48,7 +41,7 @@ var (
 
 //go:generate mockgen -source namesrv.go -destination mock_namesrv.go -self_package github.com/apache/rocketmq-client-go/v2/internal  --package internal Namesrvs
 type Namesrvs interface {
-	UpdateNameServerAddress(nameServerDomain, instanceName string)
+	UpdateNameServerAddress()
 
 	AddBroker(routeData *TopicRouteData)
 
@@ -94,13 +87,21 @@ type namesrvs struct {
 	lockNamesrv sync.Mutex
 
 	nameSrvClient remote.RemotingClient
+
+	resolver primitive.NsResolver
 }
 
-var _ Namesrvs = &namesrvs{}
+var _ Namesrvs = (*namesrvs)(nil)
 
 // NewNamesrv init Namesrv from namesrv addr string.
-func NewNamesrv(addr primitive.NamesrvAddr) (*namesrvs, error) {
-	if err := addr.Check(); err != nil {
+// addr primitive.NamesrvAddr
+func NewNamesrv(resolver primitive.NsResolver) (*namesrvs, error) {
+	addr := resolver.Resolve()
+	if len(addr) == 0 {
+		return nil, errors.New("no name server addr found with resolver: " + resolver.Description())
+	}
+
+	if err := primitive.NamesrvAddr(addr).Check(); err != nil {
 		return nil, err
 	}
 	nameSrvClient := remote.NewRemotingClient()
@@ -110,6 +111,7 @@ func NewNamesrv(addr primitive.NamesrvAddr) (*namesrvs, error) {
 		nameSrvClient:    nameSrvClient,
 		brokerVersionMap: make(map[string]map[string]int32, 0),
 		brokerLock:       new(sync.RWMutex),
+		resolver:         resolver,
 	}, nil
 }
 
@@ -143,99 +145,21 @@ func (s *namesrvs) AddrList() []string {
 	return s.srvs
 }
 
-func getSnapshotFilePath(instanceName string) string {
-	homeDir := ""
-	if usr, err := user.Current(); err == nil {
-		homeDir = usr.HomeDir
-	} else {
-		rlog.Error("name server domain, can't get user home directory", map[string]interface{}{
-			"err": err,
-		})
-	}
-	storePath := path.Join(homeDir, "/logs/rocketmq-go/snapshot")
-	if _, err := os.Stat(storePath); os.IsNotExist(err) {
-		if err = os.MkdirAll(storePath, 0755); err != nil {
-			rlog.Fatal("can't create name server snapshot directory", map[string]interface{}{
-				"path": storePath,
-				"err":  err,
-			})
-		}
-	}
-	filePath := path.Join(storePath, fmt.Sprintf("nameserver_addr-%s", instanceName))
-	return filePath
-}
-
 // UpdateNameServerAddress will update srvs.
 // docs: https://rocketmq.apache.org/docs/best-practice-namesvr/
-func (s *namesrvs) UpdateNameServerAddress(nameServerDomain, instanceName string) {
+func (s *namesrvs) UpdateNameServerAddress() {
 	s.lock.Lock()
 	defer s.lock.Unlock()
 
-	if nameServerDomain == "" {
-		// try to get from environment variable
-		if v := os.Getenv("NAMESRV_ADDR"); v != "" {
-			s.srvs = strings.Split(v, ";")
-			return
-		}
-		// use default domain
-		nameServerDomain = DEFAULT_NAMESRV_ADDR
+	srvs := s.resolver.Resolve()
+	if len(srvs) == 0 {
+		return
 	}
 
-	client := http.Client{Timeout: 10 * time.Second}
-	resp, err := client.Get(nameServerDomain)
-	if err == nil {
-		defer resp.Body.Close()
-		body, err := ioutil.ReadAll(resp.Body)
-		if err == nil {
-			oldBodyStr := strings.Join(s.srvs, ";")
-			bodyStr := string(body)
-			if bodyStr != "" && oldBodyStr != bodyStr {
-				s.srvs = strings.Split(string(body), ";")
-
-				rlog.Info("name server address changed", map[string]interface{}{
-					"old": oldBodyStr,
-					"new": bodyStr,
-				})
-				// save to local snapshot
-				filePath := getSnapshotFilePath(instanceName)
-				if err := ioutil.WriteFile(filePath, body, 0644); err == nil {
-					rlog.Info("name server snapshot save successfully", map[string]interface{}{
-						"filePath": filePath,
-					})
-				} else {
-					rlog.Error("name server snapshot save failed", map[string]interface{}{
-						"filePath": filePath,
-						"err":      err,
-					})
-				}
-			}
-			rlog.Info("name server http fetch successfully", map[string]interface{}{
-				"addrs": bodyStr,
-			})
-			return
-		} else {
-			rlog.Error("name server http fetch failed", map[string]interface{}{
-				"NameServerDomain": nameServerDomain,
-				"err":              err,
-			})
-		}
+	updated := primitive.Diff(s.srvs, srvs)
+	if !updated {
+		return
 	}
 
-	// load local snapshot if need when name server domain request failed
-	if len(s.srvs) == 0 {
-		filePath := getSnapshotFilePath(instanceName)
-		if _, err := os.Stat(filePath); !os.IsNotExist(err) {
-			if bs, err := ioutil.ReadFile(filePath); err == nil {
-				rlog.Info("load the name server snapshot local file", map[string]interface{}{
-					"filePath": filePath,
-				})
-				s.srvs = strings.Split(string(bs), ";")
-				return
-			}
-		} else {
-			rlog.Warning("name server snapshot local file not exists", map[string]interface{}{
-				"filePath": filePath,
-			})
-		}
-	}
+	s.srvs = srvs
 }
diff --git a/internal/namesrv_test.go b/internal/namesrv_test.go
index ede14fc..e58dc29 100644
--- a/internal/namesrv_test.go
+++ b/internal/namesrv_test.go
@@ -19,7 +19,6 @@ package internal
 
 import (
 	"fmt"
-	"io/ioutil"
 	"net"
 	"net/http"
 	"os"
@@ -27,6 +26,8 @@ import (
 	"sync"
 	"testing"
 
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+
 	. "github.com/smartystreets/goconvey/convey"
 	"github.com/stretchr/testify/assert"
 )
@@ -34,7 +35,7 @@ import (
 // TestSelector test roundrobin selector in namesrv
 func TestSelector(t *testing.T) {
 	srvs := []string{"127.0.0.1:9876", "127.0.0.1:9879", "12.24.123.243:10911", "12.24.123.243:10915"}
-	namesrv, err := NewNamesrv(srvs)
+	namesrv, err := NewNamesrv(primitive.NewPassthroughResolver(srvs))
 	assert.Nil(t, err)
 
 	assert.Equal(t, srvs[0], namesrv.getNameServerAddress())
@@ -92,11 +93,14 @@ func TestUpdateNameServerAddress(t *testing.T) {
 		nameServerDommain := fmt.Sprintf("http://127.0.0.1:%d/nameserver/addrs", port)
 		fmt.Println("temporary name server domain: ", nameServerDommain)
 
+		resolver := primitive.NewHttpResolver("DEFAULT", nameServerDommain)
 		ns := &namesrvs{
-			srvs: []string{},
-			lock: new(sync.Mutex),
+			srvs:     []string{},
+			lock:     new(sync.Mutex),
+			resolver: resolver,
 		}
-		ns.UpdateNameServerAddress(nameServerDommain, "DEFAULT")
+
+		ns.UpdateNameServerAddress()
 
 		index1 := ns.index
 		IP1 := ns.getNameServerAddress()
@@ -110,39 +114,6 @@ func TestUpdateNameServerAddress(t *testing.T) {
 	})
 }
 
-func TestUpdateNameServerAddressSaveLocalSnapshot(t *testing.T) {
-	Convey("Test UpdateNameServerAddress Save Local Snapshot", t, func() {
-		srvs := []string{
-			"192.168.100.1",
-			"192.168.100.2",
-			"192.168.100.3",
-			"192.168.100.4",
-			"192.168.100.5",
-		}
-		http.HandleFunc("/nameserver/addrs2", func(w http.ResponseWriter, r *http.Request) {
-			fmt.Fprintf(w, strings.Join(srvs, ";"))
-		})
-		server := &http.Server{Addr: ":0", Handler: nil}
-		listener, _ := net.Listen("tcp", ":0")
-		go server.Serve(listener)
-
-		port := listener.Addr().(*net.TCPAddr).Port
-		nameServerDommain := fmt.Sprintf("http://127.0.0.1:%d/nameserver/addrs2", port)
-		fmt.Println("temporary name server domain: ", nameServerDommain)
-
-		ns := &namesrvs{
-			srvs: []string{},
-			lock: new(sync.Mutex),
-		}
-		ns.UpdateNameServerAddress(nameServerDommain, "DEFAULT")
-		// check snapshot saved
-		filePath := getSnapshotFilePath("DEFAULT")
-		body := strings.Join(srvs, ";")
-		bs, _ := ioutil.ReadFile(filePath)
-		So(string(bs), ShouldEqual, body)
-	})
-}
-
 func TestUpdateNameServerAddressUseEnv(t *testing.T) {
 	Convey("Test UpdateNameServerAddress Use Env", t, func() {
 		srvs := []string{
@@ -153,86 +124,14 @@ func TestUpdateNameServerAddressUseEnv(t *testing.T) {
 			"192.168.100.5",
 		}
 
+		resolver := primitive.NewEnvResolver()
 		ns := &namesrvs{
-			srvs: []string{},
-			lock: new(sync.Mutex),
+			srvs:     []string{},
+			lock:     new(sync.Mutex),
+			resolver: resolver,
 		}
 		os.Setenv("NAMESRV_ADDR", strings.Join(srvs, ";"))
-		ns.UpdateNameServerAddress("", "DEFAULT")
-
-		index1 := ns.index
-		IP1 := ns.getNameServerAddress()
-
-		index2 := ns.index
-		IP2 := ns.getNameServerAddress()
-
-		So(index1+1, ShouldEqual, index2)
-		So(IP1, ShouldEqual, srvs[index1])
-		So(IP2, ShouldEqual, srvs[index2])
-	})
-}
-
-func TestUpdateNameServerAddressUseSnapshotFile(t *testing.T) {
-	Convey("Test UpdateNameServerAddress Use Local Snapshot", t, func() {
-		srvs := []string{
-			"192.168.100.1",
-			"192.168.100.2",
-			"192.168.100.3",
-			"192.168.100.4",
-			"192.168.100.5",
-		}
-
-		ns := &namesrvs{
-			srvs: []string{},
-			lock: new(sync.Mutex),
-		}
-
-		os.Setenv("NAMESRV_ADDR", "") // clear env
-		// setup local snapshot file
-		filePath := getSnapshotFilePath("DEFAULT")
-		body := strings.Join(srvs, ";")
-		_ = ioutil.WriteFile(filePath, []byte(body), 0644)
-
-		ns.UpdateNameServerAddress("http://127.0.0.1:80/error/nsaddrs", "DEFAULT")
-
-		index1 := ns.index
-		IP1 := ns.getNameServerAddress()
-
-		index2 := ns.index
-		IP2 := ns.getNameServerAddress()
-
-		So(index1+1, ShouldEqual, index2)
-		So(IP1, ShouldEqual, srvs[index1])
-		So(IP2, ShouldEqual, srvs[index2])
-	})
-}
-
-func TestUpdateNameServerAddressLoadSnapshotFileOnce(t *testing.T) {
-	Convey("Test UpdateNameServerAddress Load Local Snapshot Once", t, func() {
-		srvs := []string{
-			"192.168.100.1",
-			"192.168.100.2",
-			"192.168.100.3",
-			"192.168.100.4",
-			"192.168.100.5",
-		}
-
-		ns := &namesrvs{
-			srvs: []string{},
-			lock: new(sync.Mutex),
-		}
-
-		os.Setenv("NAMESRV_ADDR", "") // clear env
-		// setup local snapshot file
-		filePath := getSnapshotFilePath("DEFAULT")
-		body := strings.Join(srvs, ";")
-		_ = ioutil.WriteFile(filePath, []byte(body), 0644)
-		// load local snapshot file first time
-		ns.UpdateNameServerAddress("http://127.0.0.1:80/error/nsaddrs", "DEFAULT")
-
-		// change the local snapshot file to check load once
-		_ = ioutil.WriteFile(filePath, []byte("127.0.0.1;127.0.0.2"), 0644)
-		ns.UpdateNameServerAddress("http://127.0.0.1:80/error/nsaddrs", "DEFAULT")
+		ns.UpdateNameServerAddress()
 
 		index1 := ns.index
 		IP1 := ns.getNameServerAddress()
diff --git a/internal/remote/codec.go b/internal/remote/codec.go
index 3ac13e9..f756c11 100644
--- a/internal/remote/codec.go
+++ b/internal/remote/codec.go
@@ -22,7 +22,7 @@ import (
 	"fmt"
 	"sync/atomic"
 
-	"github.com/json-iterator/go"
+	jsoniter "github.com/json-iterator/go"
 )
 
 var opaque int32
diff --git a/internal/remote/codec_test.go b/internal/remote/codec_test.go
index ed854bd..8fb8a60 100644
--- a/internal/remote/codec_test.go
+++ b/internal/remote/codec_test.go
@@ -24,7 +24,8 @@ import (
 	"testing"
 	"unsafe"
 
-	"github.com/json-iterator/go"
+	jsoniter "github.com/json-iterator/go"
+
 	"github.com/stretchr/testify/assert"
 )
 
diff --git a/internal/remote/mock_remote_client.go b/internal/remote/mock_remote_client.go
index bc1a0da..7d7b41c 100644
--- a/internal/remote/mock_remote_client.go
+++ b/internal/remote/mock_remote_client.go
@@ -22,9 +22,10 @@ package remote
 
 import (
 	context "context"
+	reflect "reflect"
+
 	primitive "github.com/apache/rocketmq-client-go/v2/primitive"
 	gomock "github.com/golang/mock/gomock"
-	reflect "reflect"
 )
 
 // MockRemotingClient is a mock of RemotingClient interface
diff --git a/internal/route_test.go b/internal/route_test.go
index 068f285..c9b65f0 100644
--- a/internal/route_test.go
+++ b/internal/route_test.go
@@ -42,7 +42,7 @@ func TestQueryTopicRouteInfoFromServer(t *testing.T) {
 		addr, err := primitive.NewNamesrvAddr("1.1.1.1:8880", "1.1.1.2:8880", "1.1.1.3:8880")
 		assert.Nil(t, err)
 
-		namesrv, err := NewNamesrv(addr)
+		namesrv, err := NewNamesrv(primitive.NewPassthroughResolver(addr))
 		assert.Nil(t, err)
 		namesrv.nameSrvClient = remotingCli
 
diff --git a/internal/trace.go b/internal/trace.go
index 48b257f..f0c643c 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -241,7 +241,7 @@ func NewTraceDispatcher(traceCfg *primitive.TraceConfig) *traceDispatcher {
 		t = TraceTopicPrefix + traceCfg.TraceTopic
 	}
 
-	srvs, err := NewNamesrv(traceCfg.NamesrvAddrs)
+	srvs, err := NewNamesrv(primitive.NewPassthroughResolver(traceCfg.NamesrvAddrs))
 	if err != nil {
 		panic(errors.Wrap(err, "new Namesrv failed."))
 	}
diff --git a/primitive/base.go b/primitive/base.go
index efc48ef..35b6268 100644
--- a/primitive/base.go
+++ b/primitive/base.go
@@ -95,3 +95,31 @@ func WithRecover(fn func()) {
 
 	fn()
 }
+
+func Diff(origin, latest []string) bool {
+	if len(origin) != len(latest) {
+		return true
+	}
+
+	// check added
+	originFilter := make(map[string]struct{}, len(origin))
+	for _, srv := range origin {
+		originFilter[srv] = struct{}{}
+	}
+
+	latestFilter := make(map[string]struct{}, len(latest))
+	for _, srv := range latest {
+		if _, ok := originFilter[srv]; !ok {
+			return true // added
+		}
+		latestFilter[srv] = struct{}{}
+	}
+
+	// check delete
+	for _, srv := range origin {
+		if _, ok := latestFilter[srv]; !ok {
+			return true // deleted
+		}
+	}
+	return false
+}
diff --git a/primitive/base_test.go b/primitive/base_test.go
index 09d55c3..03a978d 100644
--- a/primitive/base_test.go
+++ b/primitive/base_test.go
@@ -44,3 +44,21 @@ func TestVerifyIP(t *testing.T) {
 	err = verifyIP(IPs)
 	assert.Equal(t, "multiple IP addr does not support", err.Error())
 }
+
+func TestBase(t *testing.T) {
+	a := []string{}
+	b := []string{}
+	assert.False(t, Diff(a, b))
+
+	a = []string{"a"}
+	b = []string{"a", "b"}
+	assert.True(t, Diff(a, b))
+
+	a = []string{"a", "b", "c"}
+	b = []string{"c", "a", "b"}
+	assert.False(t, Diff(a, b))
+
+	a = []string{"b", "a"}
+	b = []string{"a", "c"}
+	assert.True(t, Diff(a, b))
+}
diff --git a/primitive/nsresolver.go b/primitive/nsresolver.go
new file mode 100644
index 0000000..d844373
--- /dev/null
+++ b/primitive/nsresolver.go
@@ -0,0 +1,219 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package primitive
+
+import (
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"os"
+	"os/user"
+	"path"
+	"strings"
+	"time"
+
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+)
+
+// resolver for nameserver, monitor change of nameserver and notify client
+// consul or domain is common
+type NsResolver interface {
+	Resolve() []string
+	Description() string
+}
+
+type StaticResolver struct {
+}
+
+var _ NsResolver = (*EnvResolver)(nil)
+
+func NewEnvResolver() *EnvResolver {
+	return &EnvResolver{}
+}
+
+type EnvResolver struct {
+}
+
+func (e *EnvResolver) Resolve() []string {
+	if v := os.Getenv("NAMESRV_ADDR"); v != "" {
+		return strings.Split(v, ";")
+	}
+	return nil
+}
+
+func (e *EnvResolver) Description() string {
+	return "env resolver of var NAMESRV_ADDR"
+}
+
+type passthroughResolver struct {
+	addr     []string
+	failback NsResolver
+}
+
+func NewPassthroughResolver(addr []string) *passthroughResolver {
+	return &passthroughResolver{
+		addr:     addr,
+		failback: NewEnvResolver(),
+	}
+}
+
+func (p *passthroughResolver) Resolve() []string {
+	if p.addr != nil {
+		return p.addr
+	}
+	return p.failback.Resolve()
+}
+
+func (p *passthroughResolver) Description() string {
+	return fmt.Sprintf("passthrough resolver of %v", p.addr)
+}
+
+const (
+	DEFAULT_NAMESRV_ADDR = "http://jmenv.tbsite.net:8080/rocketmq/nsaddr"
+)
+
+var _ NsResolver = (*HttpResolver)(nil)
+
+type HttpResolver struct {
+	domain   string
+	instance string
+	cli      http.Client
+	failback NsResolver
+}
+
+func NewHttpResolver(instance string, domain ...string) *HttpResolver {
+	d := DEFAULT_NAMESRV_ADDR
+	if len(domain) > 0 {
+		d = domain[0]
+	}
+	client := http.Client{Timeout: 10 * time.Second}
+
+	h := &HttpResolver{
+		domain:   d,
+		instance: instance,
+		cli:      client,
+		failback: NewEnvResolver(),
+	}
+	return h
+}
+
+func (h *HttpResolver) Resolve() []string {
+	addrs := h.get()
+	if len(addrs) > 0 {
+		return addrs
+	}
+
+	addrs = h.loadSnapshot()
+	if len(addrs) > 0 {
+		return addrs
+	}
+	return h.failback.Resolve()
+}
+
+func (h *HttpResolver) Description() string {
+	return fmt.Sprintf("passthrough resolver of domain:%v instance:%v", h.domain, h.instance)
+}
+
+func (h *HttpResolver) get() []string {
+	resp, err := h.cli.Get(h.domain)
+	if err != nil {
+		rlog.Error("name server http fetch failed", map[string]interface{}{
+			"NameServerDomain": h.domain,
+			"err":              err,
+		})
+		return nil
+	}
+
+	defer resp.Body.Close()
+	body, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		rlog.Error("name server read http response failed", map[string]interface{}{
+			"NameServerDomain": h.domain,
+			"err":              err,
+		})
+		return nil
+	}
+
+	bodyStr := string(body)
+	if bodyStr == "" {
+		return nil
+	}
+
+	h.saveSnapshot(body)
+
+	return strings.Split(string(body), ";")
+}
+
+func (h *HttpResolver) saveSnapshot(body []byte) error {
+	filePath := h.getSnapshotFilePath(h.instance)
+	err := ioutil.WriteFile(filePath, body, 0644)
+	if err != nil {
+		rlog.Error("name server snapshot save failed", map[string]interface{}{
+			"filePath": filePath,
+			"err":      err,
+		})
+		return err
+	}
+
+	rlog.Info("name server snapshot save successfully", map[string]interface{}{
+		"filePath": filePath,
+	})
+	return nil
+}
+
+func (h *HttpResolver) loadSnapshot() []string {
+	filePath := h.getSnapshotFilePath(h.instance)
+	_, err := os.Stat(filePath)
+	if os.IsNotExist(err) {
+		rlog.Warning("name server snapshot local file not exists", map[string]interface{}{
+			"filePath": filePath,
+		})
+		return nil
+	}
+
+	bs, err := ioutil.ReadFile(filePath)
+	if err != nil {
+		return nil
+	}
+
+	rlog.Info("load the name server snapshot local file", map[string]interface{}{
+		"filePath": filePath,
+	})
+	return strings.Split(string(bs), ";")
+}
+
+func (h *HttpResolver) getSnapshotFilePath(instanceName string) string {
+	homeDir := ""
+	if usr, err := user.Current(); err == nil {
+		homeDir = usr.HomeDir
+	} else {
+		rlog.Error("name server domain, can't get user home directory", map[string]interface{}{
+			"err": err,
+		})
+	}
+	storePath := path.Join(homeDir, "/logs/rocketmq-go/snapshot")
+	if _, err := os.Stat(storePath); os.IsNotExist(err) {
+		if err = os.MkdirAll(storePath, 0755); err != nil {
+			rlog.Fatal("can't create name server snapshot directory", map[string]interface{}{
+				"path": storePath,
+				"err":  err,
+			})
+		}
+	}
+	filePath := path.Join(storePath, fmt.Sprintf("nameserver_addr-%s", instanceName))
+	return filePath
+}
diff --git a/primitive/nsresolver_test.go b/primitive/nsresolver_test.go
new file mode 100644
index 0000000..98d839a
--- /dev/null
+++ b/primitive/nsresolver_test.go
@@ -0,0 +1,133 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package primitive
+
+import (
+	"fmt"
+	"io/ioutil"
+	"net"
+	"net/http"
+	"os"
+	"strings"
+	"testing"
+
+	. "github.com/smartystreets/goconvey/convey"
+)
+
+func TestEnvResolver(t *testing.T) {
+	Convey("Test UpdateNameServerAddress Use Env", t, func() {
+		srvs := []string{
+			"192.168.100.1",
+			"192.168.100.2",
+			"192.168.100.3",
+			"192.168.100.4",
+			"192.168.100.5",
+		}
+
+		resolver := NewEnvResolver()
+		os.Setenv("NAMESRV_ADDR", strings.Join(srvs, ";"))
+
+		addrs := resolver.Resolve()
+
+		So(Diff(srvs, addrs), ShouldBeFalse)
+	})
+}
+
+func TestHttpResolverWithGet(t *testing.T) {
+	Convey("Test UpdateNameServerAddress Save Local Snapshot", t, func() {
+		srvs := []string{
+			"192.168.100.1",
+			"192.168.100.2",
+			"192.168.100.3",
+			"192.168.100.4",
+			"192.168.100.5",
+		}
+		http.HandleFunc("/nameserver/addrs2", func(w http.ResponseWriter, r *http.Request) {
+			fmt.Fprintf(w, strings.Join(srvs, ";"))
+		})
+		server := &http.Server{Addr: ":0", Handler: nil}
+		listener, _ := net.Listen("tcp", ":0")
+		go server.Serve(listener)
+
+		port := listener.Addr().(*net.TCPAddr).Port
+		nameServerDommain := fmt.Sprintf("http://127.0.0.1:%d/nameserver/addrs2", port)
+		fmt.Println("temporary name server domain: ", nameServerDommain)
+
+		resolver := NewHttpResolver("DEFAULT", nameServerDommain)
+		resolver.Resolve()
+
+		// check snapshot saved
+		filePath := resolver.getSnapshotFilePath("DEFAULT")
+		body := strings.Join(srvs, ";")
+		bs, _ := ioutil.ReadFile(filePath)
+		So(string(bs), ShouldEqual, body)
+	})
+}
+
+func TestHttpResolverWithSnapshotFile(t *testing.T) {
+	Convey("Test UpdateNameServerAddress Use Local Snapshot", t, func() {
+		srvs := []string{
+			"192.168.100.1",
+			"192.168.100.2",
+			"192.168.100.3",
+			"192.168.100.4",
+			"192.168.100.5",
+		}
+
+		resolver := NewHttpResolver("DEFAULT", "http://127.0.0.1:80/error/nsaddrs")
+
+		os.Setenv("NAMESRV_ADDR", "") // clear env
+		// setup local snapshot file
+		filePath := resolver.getSnapshotFilePath("DEFAULT")
+		body := strings.Join(srvs, ";")
+		_ = ioutil.WriteFile(filePath, []byte(body), 0644)
+
+		addrs := resolver.Resolve()
+
+		So(Diff(addrs, srvs), ShouldBeFalse)
+	})
+}
+
+func TesHttpReslverWithSnapshotFileOnce(t *testing.T) {
+	Convey("Test UpdateNameServerAddress Load Local Snapshot Once", t, func() {
+		srvs := []string{
+			"192.168.100.1",
+			"192.168.100.2",
+			"192.168.100.3",
+			"192.168.100.4",
+			"192.168.100.5",
+		}
+
+		resolver := NewHttpResolver("DEFAULT", "http://127.0.0.1:80/error/nsaddrs")
+
+		os.Setenv("NAMESRV_ADDR", "") // clear env
+		// setup local snapshot file
+		filePath := resolver.getSnapshotFilePath("DEFAULT")
+		body := strings.Join(srvs, ";")
+		_ = ioutil.WriteFile(filePath, []byte(body), 0644)
+		// load local snapshot file first time
+		addrs1 := resolver.Resolve()
+
+		// change the local snapshot file to check load once
+		_ = ioutil.WriteFile(filePath, []byte("127.0.0.1;127.0.0.2"), 0644)
+
+		addrs2 := resolver.Resolve()
+
+		So(Diff(addrs1, addrs2), ShouldBeFalse)
+		So(Diff(addrs1, srvs), ShouldBeFalse)
+	})
+}
diff --git a/producer/option.go b/producer/option.go
index 97a480e..76e9a31 100644
--- a/producer/option.go
+++ b/producer/option.go
@@ -31,6 +31,7 @@ func defaultProducerOptions() producerOptions {
 		SendMsgTimeout:        3 * time.Second,
 		DefaultTopicQueueNums: 4,
 		CreateTopicKey:        "TBW102",
+		Resolver:              primitive.NewHttpResolver("DEFAULT"),
 	}
 	opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
 	return opts
@@ -43,6 +44,7 @@ type producerOptions struct {
 	DefaultTopicQueueNums int
 	CreateTopicKey        string // "TBW102" Will be created at broker when isAutoCreateTopicEnable. when topic is not created,
 	// and broker open isAutoCreateTopicEnable, topic will use "TBW102" config to create topic
+	Resolver primitive.NsResolver
 }
 
 type Option func(*producerOptions)
@@ -63,20 +65,6 @@ func WithInstanceName(name string) Option {
 	}
 }
 
-// WithNameServer set NameServer address, only support one NameServer cluster in alpha2
-func WithNameServer(nameServers primitive.NamesrvAddr) Option {
-	return func(opts *producerOptions) {
-		opts.NameServerAddrs = nameServers
-	}
-}
-
-// WithNameServerDomain set NameServer domain
-func WithNameServerDomain(nameServerUrl string) Option {
-	return func(opts *producerOptions) {
-		opts.NameServerDomain = nameServerUrl
-	}
-}
-
 // WithNamespace set the namespace of producer
 func WithNamespace(namespace string) Option {
 	return func(opts *producerOptions) {
@@ -133,3 +121,24 @@ func WithCreateTopicKey(topic string) Option {
 		options.CreateTopicKey = topic
 	}
 }
+
+// WithNsResovler set nameserver resolver to fetch nameserver addr
+func WithNsResovler(resolver primitive.NsResolver) Option {
+	return func(options *producerOptions) {
+		options.Resolver = resolver
+	}
+}
+
+// WithNameServer set NameServer address, only support one NameServer cluster in alpha2
+func WithNameServer(nameServers primitive.NamesrvAddr) Option {
+	return func(options *producerOptions) {
+		options.Resolver = primitive.NewPassthroughResolver(nameServers)
+	}
+}
+
+// WithNameServerDomain set NameServer domain
+func WithNameServerDomain(nameServerUrl string) Option {
+	return func(opts *producerOptions) {
+		opts.Resolver = primitive.NewHttpResolver("DEFAULT", nameServerUrl)
+	}
+}
diff --git a/producer/producer.go b/producer/producer.go
index e22cee9..7c5e0eb 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -57,7 +57,7 @@ func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
 	for _, apply := range opts {
 		apply(&defaultOpts)
 	}
-	srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
+	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
 	if err != nil {
 		return nil, errors.Wrap(err, "new Namesrv failed.")
 	}
@@ -80,9 +80,6 @@ func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
 
 func (p *defaultProducer) Start() error {
 	atomic.StoreInt32(&p.state, int32(internal.StateRunning))
-	if len(p.options.NameServerAddrs) == 0 {
-		p.options.Namesrv.UpdateNameServerAddress(p.options.NameServerDomain, p.options.InstanceName)
-	}
 
 	p.client.RegisterProducer(p.group, p)
 	p.client.Start()
diff --git a/producer/producer_test.go b/producer/producer_test.go
index 41bf80a..508acf8 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -35,7 +35,7 @@ const (
 
 func TestShutdown(t *testing.T) {
 	p, _ := NewDefaultProducer(
-		WithNameServer([]string{"127.0.0.1:9876"}),
+		WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		WithRetry(2),
 		WithQueueSelector(NewManualQueueSelector()),
 	)
@@ -98,7 +98,7 @@ func mockB4Send(p *defaultProducer) {
 
 func TestSync(t *testing.T) {
 	p, _ := NewDefaultProducer(
-		WithNameServer([]string{"127.0.0.1:9876"}),
+		WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		WithRetry(2),
 		WithQueueSelector(NewManualQueueSelector()),
 	)
@@ -149,7 +149,7 @@ func TestSync(t *testing.T) {
 
 func TestASync(t *testing.T) {
 	p, _ := NewDefaultProducer(
-		WithNameServer([]string{"127.0.0.1:9876"}),
+		WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		WithRetry(2),
 		WithQueueSelector(NewManualQueueSelector()),
 	)
@@ -211,7 +211,7 @@ func TestASync(t *testing.T) {
 
 func TestOneway(t *testing.T) {
 	p, _ := NewDefaultProducer(
-		WithNameServer([]string{"127.0.0.1:9876"}),
+		WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 		WithRetry(2),
 		WithQueueSelector(NewManualQueueSelector()),
 	)


Mime
View raw message