rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-go] branch native updated: make all remote unit tests pass (#76)
Date Mon, 01 Jul 2019 06:01:08 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new d3b6435  make all remote unit tests pass (#76)
d3b6435 is described below

commit d3b6435e80126dc9cfd0b2406ea32716085a1aaa
Author: 高峰 <gaufung@foxmail.com>
AuthorDate: Mon Jul 1 14:01:04 2019 +0800

    make all remote unit tests pass (#76)
---
 remote/codec.go              |  9 ++---
 remote/codec_test.go         | 80 +++++++++++++++++++++++++++++++-------------
 remote/remote_client.go      | 24 ++++++++-----
 remote/remote_client_test.go | 48 +++++++++++++++++++-------
 4 files changed, 113 insertions(+), 48 deletions(-)

diff --git a/remote/codec.go b/remote/codec.go
index 8e499fe..aebf75b 100644
--- a/remote/codec.go
+++ b/remote/codec.go
@@ -58,10 +58,11 @@ type CustomHeader interface {
 
 func NewRemotingCommand(code int16, header CustomHeader, body []byte) *RemotingCommand {
 	cmd := &RemotingCommand{
-		Code:    code,
-		Version: _Version,
-		Opaque:  atomic.AddInt32(&opaque, 1),
-		Body:    body,
+		Code:     code,
+		Version:  _Version,
+		Opaque:   atomic.AddInt32(&opaque, 1),
+		Body:     body,
+		Language: _LanguageCode,
 	}
 
 	if header != nil {
diff --git a/remote/codec_test.go b/remote/codec_test.go
index e48b0bb..2e836c7 100644
--- a/remote/codec_test.go
+++ b/remote/codec_test.go
@@ -83,14 +83,29 @@ func Test_decode(t *testing.T) {
 		if err != nil {
 			t.Fatalf("encode RemotingCommand to bytes fail: %v", err)
 		}
-
+		bs = bs[4:]
 		decodedRc, err := decode(bs)
 		if err != nil {
 			t.Fatalf("decode bytes to RemotingCommand fail: %v", err)
 		}
 
-		if !reflect.DeepEqual(*rc, *decodedRc) {
-			t.Fatal("decoded RemotingCommand not equal to the original one")
+		if rc.Code != decodedRc.Code {
+			t.Fatalf("wrong Code. want=%d, got=%d", rc.Code, decodedRc.Code)
+		}
+		if rc.Version != decodedRc.Version {
+			t.Fatalf("wrong Version. want=%d, got=%d", rc.Version, decodedRc.Version)
+		}
+		if rc.Opaque != decodedRc.Opaque {
+			t.Fatalf("wrong Opaque. want=%d, got=%d", rc.Opaque, decodedRc.Opaque)
+		}
+		if rc.Remark != decodedRc.Remark {
+			t.Fatalf("wrong remark. want=%s, got=%s", rc.Remark, decodedRc.Remark)
+		}
+		if rc.Flag != decodedRc.Flag {
+			t.Fatalf("wrong flag. want=%d, got=%d", rc.Flag, decodedRc.Flag)
+		}
+		if !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
+			t.Fatalf("wrong extFields, want=%v, got=%v", rc.ExtFields, decodedRc.ExtFields)
 		}
 	}
 }
@@ -102,7 +117,7 @@ func Benchmark_decode(b *testing.B) {
 		b.Fatalf("encode RemotingCommand to bytes fail: %v", err)
 	}
 	b.ResetTimer()
-
+	bs = bs[4:]
 	for i := 0; i < b.N; i++ {
 		if _, err := decode(bs); err != nil {
 			b.Fatalf("decode bytes to RemotingCommand fail: %v", err)
@@ -145,14 +160,23 @@ func Test_jsonCodec_decodeHeader(t *testing.T) {
 			t.Fatalf("decode header with jsonCodec fail: %v", err)
 		}
 
-		if rc.Code != decodedRc.Code ||
-			rc.Language != decodedRc.Language ||
-			rc.Version != decodedRc.Version ||
-			rc.Opaque != rc.Opaque ||
-			rc.Flag != rc.Flag ||
-			rc.Remark != rc.Remark ||
-			!reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
-			t.Fatal("decoded RemotingCommand not equal to the original one")
+		if rc.Code != decodedRc.Code {
+			t.Fatalf("wrong Code. want=%d, got=%d", rc.Code, decodedRc.Code)
+		}
+		if rc.Version != decodedRc.Version {
+			t.Fatalf("wrong Version. want=%d, got=%d", rc.Version, decodedRc.Version)
+		}
+		if rc.Opaque != decodedRc.Opaque {
+			t.Fatalf("wrong Opaque. want=%d, got=%d", rc.Opaque, decodedRc.Opaque)
+		}
+		if rc.Remark != decodedRc.Remark {
+			t.Fatalf("wrong remark. want=%s, got=%s", rc.Remark, decodedRc.Remark)
+		}
+		if rc.Flag != decodedRc.Flag {
+			t.Fatalf("wrong flag. want=%d, got=%d", rc.Flag, decodedRc.Flag)
+		}
+		if !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
+			t.Fatalf("wrong extFields, want=%v, got=%v", rc.ExtFields, decodedRc.ExtFields)
 		}
 	}
 }
@@ -206,16 +230,25 @@ func Test_rmqCodec_decodeHeader(t *testing.T) {
 		if err != nil {
 			t.Fatalf("decode header with rmqCodec fail: %v", err)
 		}
-
-		if rc.Code != decodedRc.Code ||
-			rc.Language != decodedRc.Language ||
-			rc.Version != decodedRc.Version ||
-			rc.Opaque != rc.Opaque ||
-			rc.Flag != rc.Flag ||
-			rc.Remark != rc.Remark ||
-			!reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
-			t.Fatal("decoded RemotingCommand not equal to the original one")
+		if rc.Code != decodedRc.Code {
+			t.Fatalf("wrong Code. want=%d, got=%d", rc.Code, decodedRc.Code)
+		}
+		if rc.Version != decodedRc.Version {
+			t.Fatalf("wrong Version. want=%d, got=%d", rc.Version, decodedRc.Version)
+		}
+		if rc.Opaque != decodedRc.Opaque {
+			t.Fatalf("wrong Opaque. want=%d, got=%d", rc.Opaque, decodedRc.Opaque)
+		}
+		if rc.Remark != decodedRc.Remark {
+			t.Fatalf("wrong remark. want=%s, got=%s", rc.Remark, decodedRc.Remark)
+		}
+		if rc.Flag != decodedRc.Flag {
+			t.Fatalf("wrong flag. want=%d, got=%d", rc.Flag, decodedRc.Flag)
 		}
+		if !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
+			t.Fatalf("wrong extFields, want=%v, got=%v", rc.ExtFields, decodedRc.ExtFields)
+		}
+
 	}
 }
 
@@ -246,6 +279,7 @@ func TestCommandJsonEncodeDecode(t *testing.T) {
 			t.Errorf("failed to encode remotingCommand, result is empty.")
 		}
 	}
+	cmdData = cmdData[4:]
 	newCmd, err := decode(cmdData)
 	if err != nil {
 		t.Errorf("failed to decode remoting in JSON. %s", err)
@@ -253,9 +287,6 @@ func TestCommandJsonEncodeDecode(t *testing.T) {
 	if newCmd.Code != cmd.Code {
 		t.Errorf("wrong command code. want=%d, got=%d", cmd.Code, newCmd.Code)
 	}
-	if newCmd.Language != cmd.Language {
-		t.Errorf("wrong command language. want=%d, got=%d", cmd.Language, newCmd.Language)
-	}
 	if newCmd.Version != cmd.Version {
 		t.Errorf("wrong command version. want=%d, got=%d", cmd.Version, newCmd.Version)
 	}
@@ -282,6 +313,7 @@ func TestCommandRocketMQEncodeDecode(t *testing.T) {
 			t.Errorf("failed to encode remotingCommand, result is empty.")
 		}
 	}
+	cmdData = cmdData[4:]
 	newCmd, err := decode(cmdData)
 	if err != nil {
 		t.Errorf("failed to decode remoting in JSON. %s", err)
diff --git a/remote/remote_client.go b/remote/remote_client.go
index 93f81ce..40c10f0 100644
--- a/remote/remote_client.go
+++ b/remote/remote_client.go
@@ -72,20 +72,27 @@ func (r *ResponseFuture) isTimeout() bool {
 }
 
 func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
+	var (
+		cmd *RemotingCommand
+		err error
+	)
+	timer := time.NewTimer(r.TimeoutMillis * time.Millisecond)
 	for {
 		select {
 		case <-r.Done:
-			if r.Err != nil {
-				return nil, r.Err
-			}
-			return r.ResponseCommand, nil
-		case <-time.After(r.TimeoutMillis * time.Millisecond):
-			return nil, ErrRequestTimeout
+			cmd, err = r.ResponseCommand, r.Err
+			goto done
+		case <-timer.C:
+			err = ErrRequestTimeout
+			goto done
 		}
 	}
+done:
+	timer.Stop()
+	return cmd, err
 }
 
-type ClientRequestFunc func(remotingCommand *RemotingCommand) (responseCommand *RemotingCommand)
+type ClientRequestFunc func(*RemotingCommand) *RemotingCommand
 
 type TcpOption struct {
 	// TODO
@@ -116,6 +123,7 @@ func (c *RemotingClient) InvokeSync(addr string, request *RemotingCommand,
timeo
 	resp := NewResponseFuture(request.Opaque, timeoutMillis, nil)
 	c.responseTable.Store(resp.Opaque, resp)
 	err = c.sendRequest(conn, request)
+	defer c.responseTable.Delete(request.Opaque)
 	if err != nil {
 		return nil, err
 	}
@@ -209,7 +217,7 @@ func (c *RemotingClient) receiveResponse(r net.Conn) {
 				go func() { // 单个goroutine会造成死锁
 					res := f(cmd)
 					if res != nil {
-						err := c.sendRequest(r, cmd)
+						err := c.sendRequest(r, res)
 						if err != nil {
 							rlog.Warnf("send response to broker error: %s, type is: %d", err, res.Code)
 						}
diff --git a/remote/remote_client_test.go b/remote/remote_client_test.go
index acd0d26..c999c0e 100644
--- a/remote/remote_client_test.go
+++ b/remote/remote_client_test.go
@@ -42,7 +42,7 @@ func TestNewResponseFuture(t *testing.T) {
 			future.TimeoutMillis, time.Duration(1000))
 	}
 	if future.callback != nil {
-		t.Errorf("wrong ResponseFuture's callback. want=<nil>, got=%v", future.callback)
+		t.Errorf("wrong ResponseFuture's callback. want=<nil>, got!=<nil>")
 	}
 	if future.Done == nil {
 		t.Errorf("wrong ResponseFuture's Done. want=<channel>, got=<nil>")
@@ -129,15 +129,28 @@ func TestCreateScanner(t *testing.T) {
 	if err != nil {
 		t.Fatalf("failed to encode RemotingCommand. %s", err)
 	}
+	client := NewRemotingClient()
 	reader := bytes.NewReader(content)
-	scanner := createScanner(reader)
+	scanner := client.createScanner(reader)
 	for scanner.Scan() {
 		rcr, err := decode(scanner.Bytes())
 		if err != nil {
 			t.Fatalf("failedd to decode RemotingCommand from scanner")
 		}
-		if !reflect.DeepEqual(*r, *rcr) {
-			t.Fatal("decoded RemotingCommand not equal to the original one")
+		if r.Code != rcr.Code {
+			t.Fatalf("wrong Code. want=%d, got=%d", r.Code, rcr.Code)
+		}
+		if r.Version != rcr.Version {
+			t.Fatalf("wrong Version. want=%d, got=%d", r.Version, rcr.Version)
+		}
+		if r.Opaque != rcr.Opaque {
+			t.Fatalf("wrong Opaque. want=%d, got=%d", r.Opaque, rcr.Opaque)
+		}
+		if r.Flag != rcr.Flag {
+			t.Fatalf("wrong flag. want=%d, got=%d", r.Opaque, rcr.Opaque)
+		}
+		if !reflect.DeepEqual(r.ExtFields, rcr.ExtFields) {
+			t.Fatalf("wrong extFields. want=%v, got=%v", r.ExtFields, rcr.ExtFields)
 		}
 	}
 }
@@ -149,8 +162,9 @@ func TestInvokeSync(t *testing.T) {
 	serverSendRemotingCommand.Flag = ResponseType
 	var wg sync.WaitGroup
 	wg.Add(1)
+	client := NewRemotingClient()
 	go func() {
-		receiveCommand, err := InvokeSync(":3000",
+		receiveCommand, err := client.InvokeSync(":3000",
 			clientSendRemtingCommand, time.Duration(1000))
 		if err != nil {
 			t.Fatalf("failed to invoke synchronous. %s", err)
@@ -173,7 +187,7 @@ func TestInvokeSync(t *testing.T) {
 			return
 		}
 		defer conn.Close()
-		scanner := createScanner(conn)
+		scanner := client.createScanner(conn)
 		for scanner.Scan() {
 			receivedRemotingCommand, err := decode(scanner.Bytes())
 			if err != nil {
@@ -205,9 +219,13 @@ func TestInvokeAsync(t *testing.T) {
 
 	var wg sync.WaitGroup
 	wg.Add(1)
+	client := NewRemotingClient()
 	go func() {
-		err := InvokeAsync(":3000", clientSendRemtingCommand,
+		time.Sleep(1 * time.Second)
+		t.Logf("invoke async method")
+		err := client.InvokeAsync(":3000", clientSendRemtingCommand,
 			time.Duration(1000), func(r *ResponseFuture) {
+				t.Logf("invoke async callback")
 				if string(r.ResponseCommand.Body) != "Welcome native" {
 					t.Errorf("wrong responseCommand.Body. want=%s, got=%s",
 						"Welcome native", string(r.ResponseCommand.Body))
@@ -231,8 +249,9 @@ func TestInvokeAsync(t *testing.T) {
 			return
 		}
 		defer conn.Close()
-		scanner := createScanner(conn)
+		scanner := client.createScanner(conn)
 		for scanner.Scan() {
+			t.Logf("receive request.")
 			receivedRemotingCommand, err := decode(scanner.Bytes())
 			if err != nil {
 				t.Errorf("failed to decode RemotingCommnad. %s", err)
@@ -241,18 +260,22 @@ func TestInvokeAsync(t *testing.T) {
 				t.Errorf("wrong code. want=%d, got=%d", receivedRemotingCommand.Code,
 					clientSendRemtingCommand.Code)
 			}
+			t.Logf("encoding response")
 			body, err := encode(serverSendRemotingCommand)
 			if err != nil {
 				t.Fatalf("failed to encode RemotingCommand")
 			}
 			_, err = conn.Write(body)
+			t.Logf("sent response")
 			if err != nil {
 				t.Fatalf("failed to write body to conneciton.")
 			}
-			return
+			goto done
 		}
 	}
-	wg.Done()
+done:
+
+	wg.Wait()
 }
 
 func TestInvokeOneWay(t *testing.T) {
@@ -260,8 +283,9 @@ func TestInvokeOneWay(t *testing.T) {
 
 	var wg sync.WaitGroup
 	wg.Add(1)
+	client := NewRemotingClient()
 	go func() {
-		err := InvokeOneWay(":3000", clientSendRemtingCommand, 3*time.Second)
+		err := client.InvokeOneWay(":3000", clientSendRemtingCommand, 3*time.Second)
 		if err != nil {
 			t.Fatalf("failed to invoke synchronous. %s", err)
 		}
@@ -279,7 +303,7 @@ func TestInvokeOneWay(t *testing.T) {
 			return
 		}
 		defer conn.Close()
-		scanner := createScanner(conn)
+		scanner := client.createScanner(conn)
 		for scanner.Scan() {
 			receivedRemotingCommand, err := decode(scanner.Bytes())
 			if err != nil {


Mime
View raw message