rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From huzongt...@apache.org
Subject [rocketmq-client-go] branch native updated: [ISSUE #104] Support ACL (#117)
Date Wed, 17 Jul 2019 05:53:00 GMT
This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new a133666  [ISSUE #104] Support ACL (#117)
a133666 is described below

commit a133666071467b72ec6c6021372d8377486dd3f0
Author: wenfeng <sxian.wang@gmail.com>
AuthorDate: Wed Jul 17 13:52:56 2019 +0800

    [ISSUE #104] Support ACL (#117)
    
    * move utils to internal package
    
    * add ACL
    
    * fix options
    
    * fix PR
---
 consumer/consumer.go                               |  4 +-
 consumer/offset_store.go                           |  2 +-
 consumer/option.go                                 | 14 ++--
 consumer/pull_consumer.go                          |  2 +-
 consumer/statistics.go                             |  1 -
 consumer/strategy.go                               |  2 +-
 utils/errors.go => errors.go                       |  4 +-
 examples/consumer/acl/main.go                      | 55 ++++++++++++++
 examples/consumer/pull/main.go                     |  6 +-
 examples/producer/acl/main.go                      | 63 ++++++++++++++++
 internal/client.go                                 | 17 +++--
 internal/remote/codec.go                           | 11 +--
 internal/remote/future.go                          |  2 +-
 internal/remote/interceptor.go                     | 83 ++++++++++++++++++++++
 .../math.go => internal/remote/interceptor_test.go | 21 +++---
 internal/remote/remote_client.go                   | 43 +++++++++++
 internal/remote/remote_client_test.go              | 19 +++--
 internal/route.go                                  |  2 +-
 internal/route_test.go                             |  2 +-
 {utils => internal/utils}/errors.go                |  2 +-
 {utils => internal/utils}/files.go                 |  0
 {utils => internal/utils}/fun.go                   |  0
 {utils => internal/utils}/helper.go                |  0
 {utils => internal/utils}/helper_test.go           |  0
 {utils => internal/utils}/math.go                  |  0
 {utils => internal/utils}/messagesysflag.go        |  0
 {utils => internal/utils}/net.go                   |  0
 {utils => internal/utils}/net_test.go              |  0
 {utils => internal/utils}/ring_buffer.go           |  0
 {utils => internal/utils}/ring_buffer_test.go      |  0
 {utils => internal/utils}/string.go                |  0
 {utils => internal/utils}/string_test.go           |  0
 utils/math.go => primitive/auth.go                 | 18 ++---
 primitive/message.go                               |  2 +-
 primitive/result.go                                |  3 +-
 producer/option.go                                 | 14 ++--
 producer/producer.go                               |  7 +-
 37 files changed, 318 insertions(+), 81 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 393e094..7f80802 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -29,9 +29,9 @@ import (
 
 	"github.com/apache/rocketmq-client-go/internal"
 	"github.com/apache/rocketmq-client-go/internal/remote"
+	"github.com/apache/rocketmq-client-go/internal/utils"
 	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
-	"github.com/apache/rocketmq-client-go/utils"
 	"github.com/pkg/errors"
 	"github.com/tidwall/gjson"
 )
@@ -879,7 +879,7 @@ func (dc *defaultConsumer) queryMaxOffset(mq *primitive.MessageQueue)
(int64, er
 	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
 }
 
-func (dc *defaultConsumer) queryOffset(mq *primitive.MessageQueue) (int64) {
+func (dc *defaultConsumer) queryOffset(mq *primitive.MessageQueue) int64 {
 	return dc.storage.read(mq, _ReadMemoryThenStore)
 }
 
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 45d80f2..8bc5b5a 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -28,9 +28,9 @@ import (
 
 	"github.com/apache/rocketmq-client-go/internal"
 	"github.com/apache/rocketmq-client-go/internal/remote"
+	"github.com/apache/rocketmq-client-go/internal/utils"
 	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
-	"github.com/apache/rocketmq-client-go/utils"
 )
 
 type readType int
diff --git a/consumer/option.go b/consumer/option.go
index 0973797..f967e1e 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -175,20 +175,12 @@ func WithNameServer(nameServers []string) Option {
 	}
 }
 
-// WithACL on/off ACL
 func WithVIPChannel(enable bool) Option {
 	return func(opts *consumerOptions) {
 		opts.VIPChannelEnabled = enable
 	}
 }
 
-// WithACL on/off ACL
-func WithACL(enable bool) Option {
-	return func(opts *consumerOptions) {
-		opts.ACLEnabled = enable
-	}
-}
-
 // WithRetry return a Option that specifies the retry times when send failed.
 // TODO: use retry middleware instead
 func WithRetry(retries int) Option {
@@ -196,3 +188,9 @@ func WithRetry(retries int) Option {
 		opts.RetryTimes = retries
 	}
 }
+
+func WithCredentials(c primitive.Credentials) Option {
+	return func(options *consumerOptions) {
+		options.ClientOptions.Credentials = c
+	}
+}
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 19e2369..742cf16 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -24,9 +24,9 @@ import (
 	"sync/atomic"
 
 	"github.com/apache/rocketmq-client-go/internal"
+	"github.com/apache/rocketmq-client-go/internal/utils"
 	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
-	"github.com/apache/rocketmq-client-go/utils"
 	"github.com/pkg/errors"
 )
 
diff --git a/consumer/statistics.go b/consumer/statistics.go
index adc7b38..f01364a 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -188,7 +188,6 @@ func (sis *statsItemSet) init() {
 	}()
 }
 
-
 func (sis *statsItemSet) samplingInSeconds() {
 	sis.statsItemTable.Range(func(key, value interface{}) bool {
 		si := value.(*statsItem)
diff --git a/consumer/strategy.go b/consumer/strategy.go
index 2990d90..c5b6a25 100644
--- a/consumer/strategy.go
+++ b/consumer/strategy.go
@@ -18,9 +18,9 @@ limitations under the License.
 package consumer
 
 import (
+	"github.com/apache/rocketmq-client-go/internal/utils"
 	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
-	"github.com/apache/rocketmq-client-go/utils"
 )
 
 // Strategy Algorithm for message allocating between consumers
diff --git a/utils/errors.go b/errors.go
similarity index 98%
copy from utils/errors.go
copy to errors.go
index 507d7bb..6b98475 100644
--- a/utils/errors.go
+++ b/errors.go
@@ -15,14 +15,14 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package utils
+package rocketmq
 
 import (
 	"github.com/apache/rocketmq-client-go/rlog"
 	"github.com/pkg/errors"
 )
 
-var(
+var (
 	// ErrRequestTimeout for request timeout error
 	ErrRequestTimeout = errors.New("request timeout")
 
diff --git a/examples/consumer/acl/main.go b/examples/consumer/acl/main.go
new file mode 100644
index 0000000..4582cde
--- /dev/null
+++ b/examples/consumer/acl/main.go
@@ -0,0 +1,55 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/apache/rocketmq-client-go"
+	"github.com/apache/rocketmq-client-go/consumer"
+	"github.com/apache/rocketmq-client-go/primitive"
+)
+
+func main() {
+	c, _ := rocketmq.NewPushConsumer(
+		consumer.WithGroupName("testGroup"),
+		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+		consumer.WithCredentials(primitive.Credentials{
+			AccessKey: "RocketMQ",
+			SecretKey: "12345678",
+		}),
+	)
+	err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
+		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
+		fmt.Printf("subscribe callback: %v \n", msgs)
+		return consumer.ConsumeSuccess, nil
+	})
+	if err != nil {
+		fmt.Println(err.Error())
+	}
+	// Note: start after subscribe
+	err = c.Start()
+	if err != nil {
+		fmt.Println(err.Error())
+		os.Exit(-1)
+	}
+	time.Sleep(time.Hour)
+}
diff --git a/examples/consumer/pull/main.go b/examples/consumer/pull/main.go
index 4076450..c2915f9 100644
--- a/examples/consumer/pull/main.go
+++ b/examples/consumer/pull/main.go
@@ -23,14 +23,14 @@ import (
 	"time"
 
 	"github.com/apache/rocketmq-client-go/consumer"
+	"github.com/apache/rocketmq-client-go/internal/utils"
 	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
-	"github.com/apache/rocketmq-client-go/utils"
 )
 
 func main() {
 	c, err := consumer.NewPullConsumer(consumer.WithGroupName("testGroup"), consumer.WithNameServer([]string{"127.0.0.1:9876"}))
-	if err != nil{
+	if err != nil {
 		rlog.Fatal("fail to new pullConsumer: ", err)
 	}
 	c.Start()
@@ -48,7 +48,7 @@ func main() {
 		if err != nil {
 			if err == utils.ErrRequestTimeout {
 				fmt.Printf("timeout \n")
-				time.Sleep(1 *time.Second)
+				time.Sleep(1 * time.Second)
 				continue
 			}
 			fmt.Printf("unexpectable err: %v \n", err)
diff --git a/examples/producer/acl/main.go b/examples/producer/acl/main.go
new file mode 100644
index 0000000..cee23db
--- /dev/null
+++ b/examples/producer/acl/main.go
@@ -0,0 +1,63 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package main implements a producer with user custom interceptor.
+package main
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"strconv"
+
+	"github.com/apache/rocketmq-client-go"
+	"github.com/apache/rocketmq-client-go/primitive"
+	"github.com/apache/rocketmq-client-go/producer"
+)
+
+func main() {
+	p, _ := rocketmq.NewProducer(
+		producer.WithNameServer([]string{"127.0.0.1:9876"}),
+		producer.WithRetry(2),
+		producer.WithCredentials(primitive.Credentials{
+			AccessKey: "RocketMQ",
+			SecretKey: "12345678",
+		}),
+	)
+	err := p.Start()
+	if err != nil {
+		fmt.Printf("start producer error: %s", err.Error())
+		os.Exit(1)
+	}
+	for i := 0; i < 100000; i++ {
+		res, err := p.SendSync(context.Background(), &primitive.Message{
+			Topic:      "test",
+			Body:       []byte("Hello RocketMQ Go Client!"),
+			Properties: map[string]string{"order": strconv.Itoa(i)},
+		})
+
+		if err != nil {
+			fmt.Printf("send message error: %s\n", err)
+		} else {
+			fmt.Printf("send message success: result=%s\n", res.String())
+		}
+	}
+	err = p.Shutdown()
+	if err != nil {
+		fmt.Printf("shundown producer error: %s", err.Error())
+	}
+}
diff --git a/internal/client.go b/internal/client.go
index c826154..34870b3 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -29,9 +29,9 @@ import (
 	"time"
 
 	"github.com/apache/rocketmq-client-go/internal/remote"
+	"github.com/apache/rocketmq-client-go/internal/utils"
 	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
-	"github.com/apache/rocketmq-client-go/utils"
 )
 
 const (
@@ -104,6 +104,7 @@ type ClientOptions struct {
 	ACLEnabled        bool
 	RetryTimes        int
 	Interceptors      []primitive.Interceptor
+	Credentials       primitive.Credentials
 }
 
 func (opt *ClientOptions) ChangeInstanceNameToPID() {
@@ -140,7 +141,7 @@ type RMQClient interface {
 	SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
 		msgs []*primitive.Message) (*primitive.SendResult, error)
 
-	ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult,
msgs ...*primitive.Message)
+	ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult,
msgs ...*primitive.Message) error
 
 	RegisterConsumer(group string, consumer InnerConsumer) error
 	UnregisterConsumer(group string)
@@ -190,6 +191,9 @@ func (c *rmqClient) Start() {
 	c.close = false
 	c.once.Do(func() {
 		// TODO fetchNameServerAddr
+		if !c.option.Credentials.IsEmpty() {
+			c.remoteClient.RegisterInterceptor(remote.ACLInterceptor(c.option.Credentials))
+		}
 		go func() {}()
 
 		// schedule update route info
@@ -385,7 +389,7 @@ func (c *rmqClient) SendMessageOneWay(ctx context.Context, brokerAddrs
string, r
 	return nil, err
 }
 
-func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp
*primitive.SendResult, msgs ...*primitive.Message) {
+func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp
*primitive.SendResult, msgs ...*primitive.Message) error {
 	var status primitive.SendStatus
 	switch cmd.Code {
 	case ResFlushDiskTimeout:
@@ -397,7 +401,8 @@ func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
 	case ResSuccess:
 		status = primitive.SendOK
 	default:
-		// TODO process unknown code
+		status = primitive.SendUnknownError
+		return errors.New(cmd.Remark)
 	}
 
 	msgIDs := make([]string, 0)
@@ -427,7 +432,7 @@ func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
 	//TransactionID: sendResponse.TransactionId,
 	resp.RegionID = regionId
 	resp.TraceOn = trace != "" && trace != _TranceOff
-
+	return nil
 }
 
 // PullMessage with sync
@@ -600,7 +605,7 @@ func encodeMessages(message []*primitive.Message) []byte {
 	index := 0
 	for index < len(message) {
 		buffer.Write(message[index].Body)
-		index ++
+		index++
 	}
 	return buffer.Bytes()
 }
diff --git a/internal/remote/codec.go b/internal/remote/codec.go
index 434678e..3093514 100644
--- a/internal/remote/codec.go
+++ b/internal/remote/codec.go
@@ -58,11 +58,12 @@ type CustomHeader interface {
 
 func NewRemotingCommand(code int16, header CustomHeader, body []byte) *RemotingCommand {
 	cmd := &RemotingCommand{
-		Code:     code,
-		Version:  _Version,
-		Opaque:   atomic.AddInt32(&opaque, 1),
-		Body:     body,
-		Language: _LanguageCode,
+		Code:      code,
+		Version:   _Version,
+		Opaque:    atomic.AddInt32(&opaque, 1),
+		Body:      body,
+		Language:  _LanguageCode,
+		ExtFields: make(map[string]string),
 	}
 
 	if header != nil {
diff --git a/internal/remote/future.go b/internal/remote/future.go
index 5a1c724..8690644 100644
--- a/internal/remote/future.go
+++ b/internal/remote/future.go
@@ -21,7 +21,7 @@ import (
 	"sync"
 	"time"
 
-	"github.com/apache/rocketmq-client-go/utils"
+	"github.com/apache/rocketmq-client-go/internal/utils"
 )
 
 // ResponseFuture
diff --git a/internal/remote/interceptor.go b/internal/remote/interceptor.go
new file mode 100644
index 0000000..0197f27
--- /dev/null
+++ b/internal/remote/interceptor.go
@@ -0,0 +1,83 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package remote
+
+import (
+	"context"
+	"crypto/hmac"
+	"crypto/sha1"
+	"encoding/base64"
+	"hash"
+	"sort"
+	"strings"
+
+	"github.com/apache/rocketmq-client-go/primitive"
+)
+
+const (
+	signature     = "Signature"
+	accessKey     = "AccessKey"
+	securityToken = "SecurityToken"
+	keyFile       = "KEY_FILE"
+	// System.getProperty("rocketmq.client.keyFile", System.getProperty("user.home") + File.separator
+ "key");
+)
+
+func ACLInterceptor(credentials primitive.Credentials) primitive.Interceptor {
+	return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
+		cmd := req.(*RemotingCommand)
+		m := make(map[string]string)
+		order := make([]string, 1)
+		m[accessKey] = credentials.AccessKey
+		order[0] = accessKey
+		if credentials.SecurityToken != "" {
+			m[securityToken] = credentials.SecurityToken
+		}
+		for k, v := range cmd.ExtFields {
+			m[k] = v
+			order = append(order, k)
+		}
+		sort.Slice(order, func(i, j int) bool {
+			return strings.Compare(order[i], order[j]) < 0
+		})
+		content := ""
+		for idx := range order {
+			content += m[order[idx]]
+		}
+		buf := make([]byte, len(content)+len(cmd.Body))
+		copy(buf, []byte(content))
+		copy(buf[len(content):], cmd.Body)
+
+		cmd.ExtFields[signature] = calculateSignature(buf, []byte(credentials.SecretKey))
+		cmd.ExtFields[accessKey] = credentials.AccessKey
+
+		// The SecurityToken value is unnecessary, user can choose this one.
+		if credentials.SecurityToken != "" {
+			cmd.ExtFields[securityToken] = credentials.SecurityToken
+		}
+		err := next(ctx, req, reply)
+		return err
+	}
+}
+
+func calculateSignature(data, sk []byte) string {
+	mac := hmac.New(func() hash.Hash {
+		return sha1.New()
+	}, sk)
+	mac.Write(data)
+	return base64.StdEncoding.EncodeToString(mac.Sum(nil))
+}
diff --git a/utils/math.go b/internal/remote/interceptor_test.go
similarity index 74%
copy from utils/math.go
copy to internal/remote/interceptor_test.go
index 816631e..c2cc6ca 100644
--- a/utils/math.go
+++ b/internal/remote/interceptor_test.go
@@ -15,18 +15,15 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package utils
+package remote
 
-func AbsInt(i int) int {
-	if i >= 0 {
-		return i
-	}
-	return -i
-}
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
 
-func MinInt(a, b int) int {
-	if a < b {
-		return a
-	}
-	return b
+func Test_CalculateSignature(t *testing.T) {
+	assert.Equal(t, "tAb/54Rwwcq+pbH8Loi7FWX4QSQ=",
+		calculateSignature([]byte("Hello RocketMQ Client ACL Feature"), []byte("adiaushdiaushd")))
 }
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 8e32216..297aaa3 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -19,12 +19,15 @@ package remote
 import (
 	"bufio"
 	"bytes"
+	"context"
 	"encoding/binary"
 	"io"
 	"net"
 	"sync"
 	"time"
 
+	"github.com/apache/rocketmq-client-go/primitive"
+
 	"github.com/apache/rocketmq-client-go/rlog"
 )
 
@@ -40,6 +43,7 @@ type RemotingClient struct {
 	option           TcpOption
 	processors       map[int16]ClientRequestFunc
 	connectionLocker sync.Mutex
+	interceptor      primitive.Interceptor
 }
 
 func NewRemotingClient() *RemotingClient {
@@ -192,6 +196,18 @@ func (c *RemotingClient) createScanner(r io.Reader) *bufio.Scanner {
 }
 
 func (c *RemotingClient) sendRequest(conn net.Conn, request *RemotingCommand) error {
+	var err error
+	if c.interceptor != nil {
+		err = c.interceptor(context.Background(), request, nil, func(ctx context.Context, req,
reply interface{}) error {
+			return c.doRequest(conn, request)
+		})
+	} else {
+		err = c.doRequest(conn, request)
+	}
+	return err
+}
+
+func (c *RemotingClient) doRequest(conn net.Conn, request *RemotingCommand) error {
 	content, err := encode(request)
 	if err != nil {
 		return err
@@ -226,3 +242,30 @@ func (c *RemotingClient) ShutDown() {
 		return true
 	})
 }
+
+func (c *RemotingClient) RegisterInterceptor(interceptors ...primitive.Interceptor) {
+	if len(interceptors) == 0 {
+		return
+	}
+	idx := 0
+	if c.interceptor == nil {
+		c.interceptor = interceptors[0]
+		idx = 1
+	}
+	for ; idx < len(interceptors); idx++ {
+		c.interceptor = func(ctx context.Context, req, reply interface{}, invoker primitive.Invoker)
error {
+			return interceptors[0](ctx, req, reply, getChainedInterceptor(interceptors, idx, invoker))
+		}
+	}
+}
+
+// TODO
+// getChainedInterceptor recursively generate the chained invoker.
+func getChainedInterceptor(interceptors []primitive.Interceptor, cur int, finalInvoker primitive.Invoker)
primitive.Invoker {
+	if cur == len(interceptors)-1 {
+		return finalInvoker
+	}
+	return func(ctx context.Context, req, reply interface{}) error {
+		return interceptors[cur+1](ctx, req, reply, getChainedInterceptor(interceptors, cur+1,
finalInvoker))
+	}
+}
diff --git a/internal/remote/remote_client_test.go b/internal/remote/remote_client_test.go
index 4efb990..06dcedc 100644
--- a/internal/remote/remote_client_test.go
+++ b/internal/remote/remote_client_test.go
@@ -19,6 +19,7 @@ package remote
 import (
 	"bytes"
 	"errors"
+	"github.com/apache/rocketmq-client-go/internal/utils"
 	"math/rand"
 	"net"
 	"reflect"
@@ -81,7 +82,7 @@ func TestResponseFutureTimeout(t *testing.T) {
 }
 
 func TestResponseFutureIsTimeout(t *testing.T) {
-	future := NewResponseFuture(10, 500 * time.Millisecond, nil)
+	future := NewResponseFuture(10, 500*time.Millisecond, nil)
 	if future.isTimeout() != false {
 		t.Errorf("wrong ResponseFuture's istimeout. want=%t, got=%t", false, future.isTimeout())
 	}
@@ -93,12 +94,12 @@ func TestResponseFutureIsTimeout(t *testing.T) {
 }
 
 func TestResponseFutureWaitResponse(t *testing.T) {
-	future := NewResponseFuture(10, 500 * time.Millisecond, nil)
-	if _, err := future.waitResponse(); err != ErrRequestTimeout {
+	future := NewResponseFuture(10, 500*time.Millisecond, nil)
+	if _, err := future.waitResponse(); err != utils.ErrRequestTimeout {
 		t.Errorf("wrong ResponseFuture waitResponse. want=%v, got=%v",
-			ErrRequestTimeout, err)
+			utils.ErrRequestTimeout, err)
 	}
-	future = NewResponseFuture(10, 500 * time.Millisecond, nil)
+	future = NewResponseFuture(10, 500*time.Millisecond, nil)
 	responseError := errors.New("response error")
 	go func() {
 		time.Sleep(100 * time.Millisecond)
@@ -109,7 +110,7 @@ func TestResponseFutureWaitResponse(t *testing.T) {
 		t.Errorf("wrong ResponseFuture waitResponse. want=%v. got=%v",
 			responseError, err)
 	}
-	future = NewResponseFuture(10, 500 * time.Millisecond, nil)
+	future = NewResponseFuture(10, 500*time.Millisecond, nil)
 	responseRemotingCommand := NewRemotingCommand(202, nil, nil)
 	go func() {
 		time.Sleep(100 * time.Millisecond)
@@ -219,7 +220,7 @@ func TestInvokeAsync(t *testing.T) {
 	cnt := 50
 	wg.Add(cnt)
 	client := NewRemotingClient()
-	for i:=0; i < cnt; i++ {
+	for i := 0; i < cnt; i++ {
 		go func(index int) {
 			time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
 			t.Logf("[Send: %d] asychronous message", index)
@@ -291,7 +292,7 @@ func TestInvokeAsyncTimeout(t *testing.T) {
 		err := client.InvokeAsync(":3000", clientSendRemtingCommand,
 			time.Duration(1000), func(r *ResponseFuture) {
 				assert.NotNil(t, r.Err)
-				assert.Equal(t, ErrRequestTimeout, r.Err)
+				assert.Equal(t, utils.ErrRequestTimeout, r.Err)
 				wg.Done()
 			})
 		assert.Nil(t, err, "failed to invokeSync.")
@@ -361,5 +362,3 @@ func TestInvokeOneWay(t *testing.T) {
 	}
 	wg.Done()
 }
-
-
diff --git a/internal/route.go b/internal/route.go
index dbb39c1..94fd5c0 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -29,9 +29,9 @@ import (
 	"time"
 
 	"github.com/apache/rocketmq-client-go/internal/remote"
+	"github.com/apache/rocketmq-client-go/internal/utils"
 	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
-	"github.com/apache/rocketmq-client-go/utils"
 	"github.com/tidwall/gjson"
 )
 
diff --git a/internal/route_test.go b/internal/route_test.go
index ee95755..9c3241e 100644
--- a/internal/route_test.go
+++ b/internal/route_test.go
@@ -132,4 +132,4 @@ func TestFindBrokerAddressInSubscribe(t *testing.T) {
 			}
 		})
 	})
-}
\ No newline at end of file
+}
diff --git a/utils/errors.go b/internal/utils/errors.go
similarity index 99%
rename from utils/errors.go
rename to internal/utils/errors.go
index 507d7bb..1d83d58 100644
--- a/utils/errors.go
+++ b/internal/utils/errors.go
@@ -22,7 +22,7 @@ import (
 	"github.com/pkg/errors"
 )
 
-var(
+var (
 	// ErrRequestTimeout for request timeout error
 	ErrRequestTimeout = errors.New("request timeout")
 
diff --git a/utils/files.go b/internal/utils/files.go
similarity index 100%
rename from utils/files.go
rename to internal/utils/files.go
diff --git a/utils/fun.go b/internal/utils/fun.go
similarity index 100%
rename from utils/fun.go
rename to internal/utils/fun.go
diff --git a/utils/helper.go b/internal/utils/helper.go
similarity index 100%
rename from utils/helper.go
rename to internal/utils/helper.go
diff --git a/utils/helper_test.go b/internal/utils/helper_test.go
similarity index 100%
rename from utils/helper_test.go
rename to internal/utils/helper_test.go
diff --git a/utils/math.go b/internal/utils/math.go
similarity index 100%
copy from utils/math.go
copy to internal/utils/math.go
diff --git a/utils/messagesysflag.go b/internal/utils/messagesysflag.go
similarity index 100%
rename from utils/messagesysflag.go
rename to internal/utils/messagesysflag.go
diff --git a/utils/net.go b/internal/utils/net.go
similarity index 100%
rename from utils/net.go
rename to internal/utils/net.go
diff --git a/utils/net_test.go b/internal/utils/net_test.go
similarity index 100%
rename from utils/net_test.go
rename to internal/utils/net_test.go
diff --git a/utils/ring_buffer.go b/internal/utils/ring_buffer.go
similarity index 100%
rename from utils/ring_buffer.go
rename to internal/utils/ring_buffer.go
diff --git a/utils/ring_buffer_test.go b/internal/utils/ring_buffer_test.go
similarity index 100%
rename from utils/ring_buffer_test.go
rename to internal/utils/ring_buffer_test.go
diff --git a/utils/string.go b/internal/utils/string.go
similarity index 100%
rename from utils/string.go
rename to internal/utils/string.go
diff --git a/utils/string_test.go b/internal/utils/string_test.go
similarity index 100%
rename from utils/string_test.go
rename to internal/utils/string_test.go
diff --git a/utils/math.go b/primitive/auth.go
similarity index 79%
rename from utils/math.go
rename to primitive/auth.go
index 816631e..772bc4d 100644
--- a/utils/math.go
+++ b/primitive/auth.go
@@ -15,18 +15,14 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package utils
+package primitive
 
-func AbsInt(i int) int {
-	if i >= 0 {
-		return i
-	}
-	return -i
+type Credentials struct {
+	AccessKey     string
+	SecretKey     string
+	SecurityToken string
 }
 
-func MinInt(a, b int) int {
-	if a < b {
-		return a
-	}
-	return b
+func (c Credentials) IsEmpty() bool {
+	return c.AccessKey == "" || c.SecretKey == ""
 }
diff --git a/primitive/message.go b/primitive/message.go
index 52ab2bf..f191c92 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -20,7 +20,7 @@ package primitive
 import (
 	"fmt"
 
-	"github.com/apache/rocketmq-client-go/utils"
+	"github.com/apache/rocketmq-client-go/internal/utils"
 )
 
 const (
diff --git a/primitive/result.go b/primitive/result.go
index f734043..f21cc2e 100644
--- a/primitive/result.go
+++ b/primitive/result.go
@@ -22,7 +22,7 @@ import (
 	"encoding/binary"
 	"fmt"
 
-	"github.com/apache/rocketmq-client-go/utils"
+	"github.com/apache/rocketmq-client-go/internal/utils"
 )
 
 // SendStatus of message
@@ -33,6 +33,7 @@ const (
 	SendFlushDiskTimeout
 	SendFlushSlaveTimeout
 	SendSlaveNotAvailable
+	SendUnknownError
 
 	FlagCompressed = 0x1
 	MsgIdLength    = 8 + 8
diff --git a/producer/option.go b/producer/option.go
index 7b6c18b..ad6c98e 100644
--- a/producer/option.go
+++ b/producer/option.go
@@ -57,20 +57,12 @@ func WithNameServer(nameServers []string) Option {
 	}
 }
 
-// WithACL on/off ACL
 func WithVIPChannel(enable bool) Option {
 	return func(opts *producerOptions) {
 		opts.VIPChannelEnabled = enable
 	}
 }
 
-// WithACL on/off ACL
-func WithACL(enable bool) Option {
-	return func(opts *producerOptions) {
-		opts.ACLEnabled = enable
-	}
-}
-
 // WithRetry return a Option that specifies the retry times when send failed.
 // TODO: use retry middleware instead
 func WithRetry(retries int) Option {
@@ -90,3 +82,9 @@ func WithQueueSelector(s QueueSelector) Option {
 		options.Selector = s
 	}
 }
+
+func WithCredentials(c primitive.Credentials) Option {
+	return func(options *producerOptions) {
+		options.ClientOptions.Credentials = c
+	}
+}
diff --git a/producer/producer.go b/producer/producer.go
index ab501de..12a72b6 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -139,8 +139,8 @@ func (p *defaultProducer) SendSync(ctx context.Context, msg *primitive.Message)
 		return resp, err
 	}
 
-	p.sendSync(ctx, msg, resp)
-	return resp, nil
+	err := p.sendSync(ctx, msg, resp)
+	return resp, err
 }
 
 func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, resp *primitive.SendResult)
error {
@@ -168,8 +168,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
 			err = _err
 			continue
 		}
-		p.client.ProcessSendResponse(mq.BrokerName, res, resp, msg)
-		return nil
+		return p.client.ProcessSendResponse(mq.BrokerName, res, resp, msg)
 	}
 	return err
 }


Mime
View raw message