rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-go] branch native updated: fix tcp scanner bug (#41)
Date Tue, 12 Mar 2019 12:51:03 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new ca0b2a8  fix tcp scanner bug (#41)
ca0b2a8 is described below

commit ca0b2a802015681f9ec3c00d6b6e6f6add9d87ed
Author: 高峰 <gaufung@foxmail.com>
AuthorDate: Tue Mar 12 20:50:58 2019 +0800

    fix tcp scanner bug (#41)
    
    * complete remote/client.go interface  and unit test
    
    * [bug-fix]
    - fix remote/client.go `createScanner` method
    - add `InvokeSync`. `InvokeAsync` and `InvokeOneWay` unit test cases
---
 remote/client.go      |   4 +-
 remote/client_test.go | 165 ++++++++++++++++++++++++++++++++++++++++++++++++++
 remote/codec_test.go  |  18 ------
 3 files changed, 167 insertions(+), 20 deletions(-)

diff --git a/remote/client.go b/remote/client.go
index 94f5ed2..095c304 100644
--- a/remote/client.go
+++ b/remote/client.go
@@ -263,8 +263,8 @@ func createScanner(r io.Reader) *bufio.Scanner {
 			if len(data) >= 4 {
 				var length int32
 				binary.Read(bytes.NewReader(data[0:4]), binary.BigEndian, &length)
-				if int(length)+4 <= len(data) {
-					return int(length) + 4, data[:int(length)+4], nil
+				if int(length) <= len(data) {
+					return int(length), data[:length], nil
 				}
 			}
 		}
diff --git a/remote/client_test.go b/remote/client_test.go
index 55a26fc..40f6e50 100644
--- a/remote/client_test.go
+++ b/remote/client_test.go
@@ -19,6 +19,7 @@ package remote
 import (
 	"bytes"
 	"errors"
+	"net"
 	"reflect"
 	"sync"
 	"testing"
@@ -176,4 +177,168 @@ func TestCreateScanner(t *testing.T) {
 			t.Fatal("decoded RemotingCommand not equal to the original one")
 		}
 	}
+}
+
+func TestDefaultRemotingClient_InvokeSync(t *testing.T) {
+	clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
+	serverSendRemotingCommand := NewRemotingCommand(20, nil, []byte("Welcome native"))
+	serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque
+	serverSendRemotingCommand.Flag = ResponseType
+
+	client := NewDefaultRemotingClient()
+	client.Start()
+	defer client.Shutdown()
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		receiveCommand, err := client.InvokeSync(":3000",
+			clientSendRemtingCommand, time.Duration(1000))
+		if err != nil {
+			t.Fatalf("failed to invoke synchronous. %s", err)
+		} else {
+			if !reflect.DeepEqual(&receiveCommand, &serverSendRemotingCommand) {
+				t.Errorf("remotingCommand prased in client is different from server. ")
+			}
+		}
+		wg.Done()
+	}()
+
+	l, err := net.Listen("tcp", ":3000")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer l.Close()
+	for {
+		conn, err := l.Accept()
+		if err != nil {
+			return
+		}
+		defer conn.Close()
+		scanner := createScanner(conn)
+		for scanner.Scan() {
+			receivedRemotingCommand, err := decode(scanner.Bytes())
+			if err != nil {
+				t.Errorf("failed to decode RemotingCommnad. %s", err)
+			}
+			if clientSendRemtingCommand.Code != receivedRemotingCommand.Code {
+				t.Errorf("wrong code. want=%d, got=%d",receivedRemotingCommand.Code,
+					clientSendRemtingCommand.Code)
+			}
+			body, err := encode(serverSendRemotingCommand)
+			if err != nil {
+				t.Fatalf("failed to encode RemotingCommand")
+			}
+			_, err = conn.Write(body)
+			if err != nil {
+				t.Fatalf("failed to write body to conneciton.")
+			}
+			return
+		}
+	}
+	wg.Done()
+}
+
+func TestDefaultRemotingClient_InvokeAsync(t *testing.T) {
+	clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
+	serverSendRemotingCommand := NewRemotingCommand(20, nil, []byte("Welcome native"))
+	serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque
+	serverSendRemotingCommand.Flag = ResponseType
+
+	client := NewDefaultRemotingClient()
+	client.Start()
+	defer client.Shutdown()
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		err := client.InvokeAsync(":3000", clientSendRemtingCommand,
+			time.Duration(1000), func(r *ResponseFuture) {
+			if string(r.ResponseCommand.Body) != "Welcome native" {
+				t.Errorf("wrong responseCommand.Body. want=%s, got=%s",
+					"Welcome native",  string(r.ResponseCommand.Body))
+			}
+			wg.Done()
+		})
+		if err != nil {
+			t.Errorf("failed to invokeSync. %s", err)
+		}
+
+	}()
+
+	l, err := net.Listen("tcp", ":3000")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer l.Close()
+	for {
+		conn, err := l.Accept()
+		if err != nil {
+			return
+		}
+		defer conn.Close()
+		scanner := createScanner(conn)
+		for scanner.Scan() {
+			receivedRemotingCommand, err := decode(scanner.Bytes())
+			if err != nil {
+				t.Errorf("failed to decode RemotingCommnad. %s", err)
+			}
+			if clientSendRemtingCommand.Code != receivedRemotingCommand.Code {
+				t.Errorf("wrong code. want=%d, got=%d",receivedRemotingCommand.Code,
+					clientSendRemtingCommand.Code)
+			}
+			body, err := encode(serverSendRemotingCommand)
+			if err != nil {
+				t.Fatalf("failed to encode RemotingCommand")
+			}
+			_, err = conn.Write(body)
+			if err != nil {
+				t.Fatalf("failed to write body to conneciton.")
+			}
+			return
+		}
+	}
+	wg.Done()
+}
+
+
+func TestDefaultRemotingClient_InvokeOneWay(t *testing.T) {
+	clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
+
+	client := NewDefaultRemotingClient()
+	client.Start()
+	defer client.Shutdown()
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		err := client.InvokeOneWay(":3000", clientSendRemtingCommand)
+		if err != nil {
+			t.Fatalf("failed to invoke synchronous. %s", err)
+		}
+		wg.Done()
+	}()
+
+	l, err := net.Listen("tcp", ":3000")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer l.Close()
+	for {
+		conn, err := l.Accept()
+		if err != nil {
+			return
+		}
+		defer conn.Close()
+		scanner := createScanner(conn)
+		for scanner.Scan() {
+			receivedRemotingCommand, err := decode(scanner.Bytes())
+			if err != nil {
+				t.Errorf("failed to decode RemotingCommnad. %s", err)
+			}
+			if clientSendRemtingCommand.Code != receivedRemotingCommand.Code {
+				t.Errorf("wrong code. want=%d, got=%d",receivedRemotingCommand.Code,
+					clientSendRemtingCommand.Code)
+			}
+			return
+		}
+	}
+	wg.Done()
 }
\ No newline at end of file
diff --git a/remote/codec_test.go b/remote/codec_test.go
index 5e63e4a..ab1a61e 100644
--- a/remote/codec_test.go
+++ b/remote/codec_test.go
@@ -269,15 +269,6 @@ func TestCommandJsonEncodeDecode(t *testing.T) {
 	if newCmd.Remark != cmd.Remark {
 		t.Errorf("wrong command remakr. want=%s, got=%s", cmd.Remark, newCmd.Remark)
 	}
-	//for k, v := range cmd.ExtFields {
-	//	if vv, ok := newCmd.ExtFields[k]; !ok {
-	//		t.Errorf("key: %s not exists in newCommand.", k)
-	//	} else {
-	//		if v != vv {
-	//			t.Errorf("wrong value. want=%s, got=%s", v, vv)
-	//		}
-	//	}
-	//}
 }
 
 func TestCommandRocketMQEncodeDecode(t *testing.T) {
@@ -314,13 +305,4 @@ func TestCommandRocketMQEncodeDecode(t *testing.T) {
 	if newCmd.Remark != cmd.Remark {
 		t.Errorf("wrong command remakr. want=%s, got=%s", cmd.Remark, newCmd.Remark)
 	}
-	//for k, v := range cmd.ExtFields {
-	//	if vv, ok := newCmd.ExtFields[k]; !ok {
-	//		t.Errorf("key: %s not exists in newCommand.", k)
-	//	} else {
-	//		if v != vv {
-	//			t.Errorf("wrong value. want=%s, got=%s", v, vv)
-	//		}
-	//	}
-	//}
 }


Mime
View raw message