This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new ca0b2a8 fix tcp scanner bug (#41)
ca0b2a8 is described below
commit ca0b2a802015681f9ec3c00d6b6e6f6add9d87ed
Author: 高峰 <gaufung@foxmail.com>
AuthorDate: Tue Mar 12 20:50:58 2019 +0800
fix tcp scanner bug (#41)
* complete remote/client.go interface and unit test
* [bug-fix]
- fix remote/client.go `createScanner` method
- add `InvokeSync`. `InvokeAsync` and `InvokeOneWay` unit test cases
---
remote/client.go | 4 +-
remote/client_test.go | 165 ++++++++++++++++++++++++++++++++++++++++++++++++++
remote/codec_test.go | 18 ------
3 files changed, 167 insertions(+), 20 deletions(-)
diff --git a/remote/client.go b/remote/client.go
index 94f5ed2..095c304 100644
--- a/remote/client.go
+++ b/remote/client.go
@@ -263,8 +263,8 @@ func createScanner(r io.Reader) *bufio.Scanner {
if len(data) >= 4 {
var length int32
binary.Read(bytes.NewReader(data[0:4]), binary.BigEndian, &length)
- if int(length)+4 <= len(data) {
- return int(length) + 4, data[:int(length)+4], nil
+ if int(length) <= len(data) {
+ return int(length), data[:length], nil
}
}
}
diff --git a/remote/client_test.go b/remote/client_test.go
index 55a26fc..40f6e50 100644
--- a/remote/client_test.go
+++ b/remote/client_test.go
@@ -19,6 +19,7 @@ package remote
import (
"bytes"
"errors"
+ "net"
"reflect"
"sync"
"testing"
@@ -176,4 +177,168 @@ func TestCreateScanner(t *testing.T) {
t.Fatal("decoded RemotingCommand not equal to the original one")
}
}
+}
+
+func TestDefaultRemotingClient_InvokeSync(t *testing.T) {
+ clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
+ serverSendRemotingCommand := NewRemotingCommand(20, nil, []byte("Welcome native"))
+ serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque
+ serverSendRemotingCommand.Flag = ResponseType
+
+ client := NewDefaultRemotingClient()
+ client.Start()
+ defer client.Shutdown()
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ receiveCommand, err := client.InvokeSync(":3000",
+ clientSendRemtingCommand, time.Duration(1000))
+ if err != nil {
+ t.Fatalf("failed to invoke synchronous. %s", err)
+ } else {
+ if !reflect.DeepEqual(&receiveCommand, &serverSendRemotingCommand) {
+ t.Errorf("remotingCommand prased in client is different from server. ")
+ }
+ }
+ wg.Done()
+ }()
+
+ l, err := net.Listen("tcp", ":3000")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer l.Close()
+ for {
+ conn, err := l.Accept()
+ if err != nil {
+ return
+ }
+ defer conn.Close()
+ scanner := createScanner(conn)
+ for scanner.Scan() {
+ receivedRemotingCommand, err := decode(scanner.Bytes())
+ if err != nil {
+ t.Errorf("failed to decode RemotingCommnad. %s", err)
+ }
+ if clientSendRemtingCommand.Code != receivedRemotingCommand.Code {
+ t.Errorf("wrong code. want=%d, got=%d",receivedRemotingCommand.Code,
+ clientSendRemtingCommand.Code)
+ }
+ body, err := encode(serverSendRemotingCommand)
+ if err != nil {
+ t.Fatalf("failed to encode RemotingCommand")
+ }
+ _, err = conn.Write(body)
+ if err != nil {
+ t.Fatalf("failed to write body to conneciton.")
+ }
+ return
+ }
+ }
+ wg.Done()
+}
+
+func TestDefaultRemotingClient_InvokeAsync(t *testing.T) {
+ clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
+ serverSendRemotingCommand := NewRemotingCommand(20, nil, []byte("Welcome native"))
+ serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque
+ serverSendRemotingCommand.Flag = ResponseType
+
+ client := NewDefaultRemotingClient()
+ client.Start()
+ defer client.Shutdown()
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ err := client.InvokeAsync(":3000", clientSendRemtingCommand,
+ time.Duration(1000), func(r *ResponseFuture) {
+ if string(r.ResponseCommand.Body) != "Welcome native" {
+ t.Errorf("wrong responseCommand.Body. want=%s, got=%s",
+ "Welcome native", string(r.ResponseCommand.Body))
+ }
+ wg.Done()
+ })
+ if err != nil {
+ t.Errorf("failed to invokeSync. %s", err)
+ }
+
+ }()
+
+ l, err := net.Listen("tcp", ":3000")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer l.Close()
+ for {
+ conn, err := l.Accept()
+ if err != nil {
+ return
+ }
+ defer conn.Close()
+ scanner := createScanner(conn)
+ for scanner.Scan() {
+ receivedRemotingCommand, err := decode(scanner.Bytes())
+ if err != nil {
+ t.Errorf("failed to decode RemotingCommnad. %s", err)
+ }
+ if clientSendRemtingCommand.Code != receivedRemotingCommand.Code {
+ t.Errorf("wrong code. want=%d, got=%d",receivedRemotingCommand.Code,
+ clientSendRemtingCommand.Code)
+ }
+ body, err := encode(serverSendRemotingCommand)
+ if err != nil {
+ t.Fatalf("failed to encode RemotingCommand")
+ }
+ _, err = conn.Write(body)
+ if err != nil {
+ t.Fatalf("failed to write body to conneciton.")
+ }
+ return
+ }
+ }
+ wg.Done()
+}
+
+
+func TestDefaultRemotingClient_InvokeOneWay(t *testing.T) {
+ clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
+
+ client := NewDefaultRemotingClient()
+ client.Start()
+ defer client.Shutdown()
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ err := client.InvokeOneWay(":3000", clientSendRemtingCommand)
+ if err != nil {
+ t.Fatalf("failed to invoke synchronous. %s", err)
+ }
+ wg.Done()
+ }()
+
+ l, err := net.Listen("tcp", ":3000")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer l.Close()
+ for {
+ conn, err := l.Accept()
+ if err != nil {
+ return
+ }
+ defer conn.Close()
+ scanner := createScanner(conn)
+ for scanner.Scan() {
+ receivedRemotingCommand, err := decode(scanner.Bytes())
+ if err != nil {
+ t.Errorf("failed to decode RemotingCommnad. %s", err)
+ }
+ if clientSendRemtingCommand.Code != receivedRemotingCommand.Code {
+ t.Errorf("wrong code. want=%d, got=%d",receivedRemotingCommand.Code,
+ clientSendRemtingCommand.Code)
+ }
+ return
+ }
+ }
+ wg.Done()
}
\ No newline at end of file
diff --git a/remote/codec_test.go b/remote/codec_test.go
index 5e63e4a..ab1a61e 100644
--- a/remote/codec_test.go
+++ b/remote/codec_test.go
@@ -269,15 +269,6 @@ func TestCommandJsonEncodeDecode(t *testing.T) {
if newCmd.Remark != cmd.Remark {
t.Errorf("wrong command remakr. want=%s, got=%s", cmd.Remark, newCmd.Remark)
}
- //for k, v := range cmd.ExtFields {
- // if vv, ok := newCmd.ExtFields[k]; !ok {
- // t.Errorf("key: %s not exists in newCommand.", k)
- // } else {
- // if v != vv {
- // t.Errorf("wrong value. want=%s, got=%s", v, vv)
- // }
- // }
- //}
}
func TestCommandRocketMQEncodeDecode(t *testing.T) {
@@ -314,13 +305,4 @@ func TestCommandRocketMQEncodeDecode(t *testing.T) {
if newCmd.Remark != cmd.Remark {
t.Errorf("wrong command remakr. want=%s, got=%s", cmd.Remark, newCmd.Remark)
}
- //for k, v := range cmd.ExtFields {
- // if vv, ok := newCmd.ExtFields[k]; !ok {
- // t.Errorf("key: %s not exists in newCommand.", k)
- // } else {
- // if v != vv {
- // t.Errorf("wrong value. want=%s, got=%s", v, vv)
- // }
- // }
- //}
}
|