This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new d3b6435 make all remote unit tests pass (#76)
d3b6435 is described below
commit d3b6435e80126dc9cfd0b2406ea32716085a1aaa
Author: 高峰 <gaufung@foxmail.com>
AuthorDate: Mon Jul 1 14:01:04 2019 +0800
make all remote unit tests pass (#76)
---
remote/codec.go | 9 ++---
remote/codec_test.go | 80 +++++++++++++++++++++++++++++++-------------
remote/remote_client.go | 24 ++++++++-----
remote/remote_client_test.go | 48 +++++++++++++++++++-------
4 files changed, 113 insertions(+), 48 deletions(-)
diff --git a/remote/codec.go b/remote/codec.go
index 8e499fe..aebf75b 100644
--- a/remote/codec.go
+++ b/remote/codec.go
@@ -58,10 +58,11 @@ type CustomHeader interface {
func NewRemotingCommand(code int16, header CustomHeader, body []byte) *RemotingCommand {
cmd := &RemotingCommand{
- Code: code,
- Version: _Version,
- Opaque: atomic.AddInt32(&opaque, 1),
- Body: body,
+ Code: code,
+ Version: _Version,
+ Opaque: atomic.AddInt32(&opaque, 1),
+ Body: body,
+ Language: _LanguageCode,
}
if header != nil {
diff --git a/remote/codec_test.go b/remote/codec_test.go
index e48b0bb..2e836c7 100644
--- a/remote/codec_test.go
+++ b/remote/codec_test.go
@@ -83,14 +83,29 @@ func Test_decode(t *testing.T) {
if err != nil {
t.Fatalf("encode RemotingCommand to bytes fail: %v", err)
}
-
+ bs = bs[4:]
decodedRc, err := decode(bs)
if err != nil {
t.Fatalf("decode bytes to RemotingCommand fail: %v", err)
}
- if !reflect.DeepEqual(*rc, *decodedRc) {
- t.Fatal("decoded RemotingCommand not equal to the original one")
+ if rc.Code != decodedRc.Code {
+ t.Fatalf("wrong Code. want=%d, got=%d", rc.Code, decodedRc.Code)
+ }
+ if rc.Version != decodedRc.Version {
+ t.Fatalf("wrong Version. want=%d, got=%d", rc.Version, decodedRc.Version)
+ }
+ if rc.Opaque != decodedRc.Opaque {
+ t.Fatalf("wrong Opaque. want=%d, got=%d", rc.Opaque, decodedRc.Opaque)
+ }
+ if rc.Remark != decodedRc.Remark {
+ t.Fatalf("wrong remark. want=%s, got=%s", rc.Remark, decodedRc.Remark)
+ }
+ if rc.Flag != decodedRc.Flag {
+ t.Fatalf("wrong flag. want=%d, got=%d", rc.Flag, decodedRc.Flag)
+ }
+ if !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
+ t.Fatalf("wrong extFields, want=%v, got=%v", rc.ExtFields, decodedRc.ExtFields)
}
}
}
@@ -102,7 +117,7 @@ func Benchmark_decode(b *testing.B) {
b.Fatalf("encode RemotingCommand to bytes fail: %v", err)
}
b.ResetTimer()
-
+ bs = bs[4:]
for i := 0; i < b.N; i++ {
if _, err := decode(bs); err != nil {
b.Fatalf("decode bytes to RemotingCommand fail: %v", err)
@@ -145,14 +160,23 @@ func Test_jsonCodec_decodeHeader(t *testing.T) {
t.Fatalf("decode header with jsonCodec fail: %v", err)
}
- if rc.Code != decodedRc.Code ||
- rc.Language != decodedRc.Language ||
- rc.Version != decodedRc.Version ||
- rc.Opaque != rc.Opaque ||
- rc.Flag != rc.Flag ||
- rc.Remark != rc.Remark ||
- !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
- t.Fatal("decoded RemotingCommand not equal to the original one")
+ if rc.Code != decodedRc.Code {
+ t.Fatalf("wrong Code. want=%d, got=%d", rc.Code, decodedRc.Code)
+ }
+ if rc.Version != decodedRc.Version {
+ t.Fatalf("wrong Version. want=%d, got=%d", rc.Version, decodedRc.Version)
+ }
+ if rc.Opaque != decodedRc.Opaque {
+ t.Fatalf("wrong Opaque. want=%d, got=%d", rc.Opaque, decodedRc.Opaque)
+ }
+ if rc.Remark != decodedRc.Remark {
+ t.Fatalf("wrong remark. want=%s, got=%s", rc.Remark, decodedRc.Remark)
+ }
+ if rc.Flag != decodedRc.Flag {
+ t.Fatalf("wrong flag. want=%d, got=%d", rc.Flag, decodedRc.Flag)
+ }
+ if !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
+ t.Fatalf("wrong extFields, want=%v, got=%v", rc.ExtFields, decodedRc.ExtFields)
}
}
}
@@ -206,16 +230,25 @@ func Test_rmqCodec_decodeHeader(t *testing.T) {
if err != nil {
t.Fatalf("decode header with rmqCodec fail: %v", err)
}
-
- if rc.Code != decodedRc.Code ||
- rc.Language != decodedRc.Language ||
- rc.Version != decodedRc.Version ||
- rc.Opaque != rc.Opaque ||
- rc.Flag != rc.Flag ||
- rc.Remark != rc.Remark ||
- !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
- t.Fatal("decoded RemotingCommand not equal to the original one")
+ if rc.Code != decodedRc.Code {
+ t.Fatalf("wrong Code. want=%d, got=%d", rc.Code, decodedRc.Code)
+ }
+ if rc.Version != decodedRc.Version {
+ t.Fatalf("wrong Version. want=%d, got=%d", rc.Version, decodedRc.Version)
+ }
+ if rc.Opaque != decodedRc.Opaque {
+ t.Fatalf("wrong Opaque. want=%d, got=%d", rc.Opaque, decodedRc.Opaque)
+ }
+ if rc.Remark != decodedRc.Remark {
+ t.Fatalf("wrong remark. want=%s, got=%s", rc.Remark, decodedRc.Remark)
+ }
+ if rc.Flag != decodedRc.Flag {
+ t.Fatalf("wrong flag. want=%d, got=%d", rc.Flag, decodedRc.Flag)
}
+ if !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
+ t.Fatalf("wrong extFields, want=%v, got=%v", rc.ExtFields, decodedRc.ExtFields)
+ }
+
}
}
@@ -246,6 +279,7 @@ func TestCommandJsonEncodeDecode(t *testing.T) {
t.Errorf("failed to encode remotingCommand, result is empty.")
}
}
+ cmdData = cmdData[4:]
newCmd, err := decode(cmdData)
if err != nil {
t.Errorf("failed to decode remoting in JSON. %s", err)
@@ -253,9 +287,6 @@ func TestCommandJsonEncodeDecode(t *testing.T) {
if newCmd.Code != cmd.Code {
t.Errorf("wrong command code. want=%d, got=%d", cmd.Code, newCmd.Code)
}
- if newCmd.Language != cmd.Language {
- t.Errorf("wrong command language. want=%d, got=%d", cmd.Language, newCmd.Language)
- }
if newCmd.Version != cmd.Version {
t.Errorf("wrong command version. want=%d, got=%d", cmd.Version, newCmd.Version)
}
@@ -282,6 +313,7 @@ func TestCommandRocketMQEncodeDecode(t *testing.T) {
t.Errorf("failed to encode remotingCommand, result is empty.")
}
}
+ cmdData = cmdData[4:]
newCmd, err := decode(cmdData)
if err != nil {
t.Errorf("failed to decode remoting in JSON. %s", err)
diff --git a/remote/remote_client.go b/remote/remote_client.go
index 93f81ce..40c10f0 100644
--- a/remote/remote_client.go
+++ b/remote/remote_client.go
@@ -72,20 +72,27 @@ func (r *ResponseFuture) isTimeout() bool {
}
func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
+ var (
+ cmd *RemotingCommand
+ err error
+ )
+ timer := time.NewTimer(r.TimeoutMillis * time.Millisecond)
for {
select {
case <-r.Done:
- if r.Err != nil {
- return nil, r.Err
- }
- return r.ResponseCommand, nil
- case <-time.After(r.TimeoutMillis * time.Millisecond):
- return nil, ErrRequestTimeout
+ cmd, err = r.ResponseCommand, r.Err
+ goto done
+ case <-timer.C:
+ err = ErrRequestTimeout
+ goto done
}
}
+done:
+ timer.Stop()
+ return cmd, err
}
-type ClientRequestFunc func(remotingCommand *RemotingCommand) (responseCommand *RemotingCommand)
+type ClientRequestFunc func(*RemotingCommand) *RemotingCommand
type TcpOption struct {
// TODO
@@ -116,6 +123,7 @@ func (c *RemotingClient) InvokeSync(addr string, request *RemotingCommand,
timeo
resp := NewResponseFuture(request.Opaque, timeoutMillis, nil)
c.responseTable.Store(resp.Opaque, resp)
err = c.sendRequest(conn, request)
+ defer c.responseTable.Delete(request.Opaque)
if err != nil {
return nil, err
}
@@ -209,7 +217,7 @@ func (c *RemotingClient) receiveResponse(r net.Conn) {
go func() { // 单个goroutine会造成死锁
res := f(cmd)
if res != nil {
- err := c.sendRequest(r, cmd)
+ err := c.sendRequest(r, res)
if err != nil {
rlog.Warnf("send response to broker error: %s, type is: %d", err, res.Code)
}
diff --git a/remote/remote_client_test.go b/remote/remote_client_test.go
index acd0d26..c999c0e 100644
--- a/remote/remote_client_test.go
+++ b/remote/remote_client_test.go
@@ -42,7 +42,7 @@ func TestNewResponseFuture(t *testing.T) {
future.TimeoutMillis, time.Duration(1000))
}
if future.callback != nil {
- t.Errorf("wrong ResponseFuture's callback. want=<nil>, got=%v", future.callback)
+ t.Errorf("wrong ResponseFuture's callback. want=<nil>, got!=<nil>")
}
if future.Done == nil {
t.Errorf("wrong ResponseFuture's Done. want=<channel>, got=<nil>")
@@ -129,15 +129,28 @@ func TestCreateScanner(t *testing.T) {
if err != nil {
t.Fatalf("failed to encode RemotingCommand. %s", err)
}
+ client := NewRemotingClient()
reader := bytes.NewReader(content)
- scanner := createScanner(reader)
+ scanner := client.createScanner(reader)
for scanner.Scan() {
rcr, err := decode(scanner.Bytes())
if err != nil {
t.Fatalf("failedd to decode RemotingCommand from scanner")
}
- if !reflect.DeepEqual(*r, *rcr) {
- t.Fatal("decoded RemotingCommand not equal to the original one")
+ if r.Code != rcr.Code {
+ t.Fatalf("wrong Code. want=%d, got=%d", r.Code, rcr.Code)
+ }
+ if r.Version != rcr.Version {
+ t.Fatalf("wrong Version. want=%d, got=%d", r.Version, rcr.Version)
+ }
+ if r.Opaque != rcr.Opaque {
+ t.Fatalf("wrong Opaque. want=%d, got=%d", r.Opaque, rcr.Opaque)
+ }
+ if r.Flag != rcr.Flag {
+ t.Fatalf("wrong flag. want=%d, got=%d", r.Opaque, rcr.Opaque)
+ }
+ if !reflect.DeepEqual(r.ExtFields, rcr.ExtFields) {
+ t.Fatalf("wrong extFields. want=%v, got=%v", r.ExtFields, rcr.ExtFields)
}
}
}
@@ -149,8 +162,9 @@ func TestInvokeSync(t *testing.T) {
serverSendRemotingCommand.Flag = ResponseType
var wg sync.WaitGroup
wg.Add(1)
+ client := NewRemotingClient()
go func() {
- receiveCommand, err := InvokeSync(":3000",
+ receiveCommand, err := client.InvokeSync(":3000",
clientSendRemtingCommand, time.Duration(1000))
if err != nil {
t.Fatalf("failed to invoke synchronous. %s", err)
@@ -173,7 +187,7 @@ func TestInvokeSync(t *testing.T) {
return
}
defer conn.Close()
- scanner := createScanner(conn)
+ scanner := client.createScanner(conn)
for scanner.Scan() {
receivedRemotingCommand, err := decode(scanner.Bytes())
if err != nil {
@@ -205,9 +219,13 @@ func TestInvokeAsync(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
+ client := NewRemotingClient()
go func() {
- err := InvokeAsync(":3000", clientSendRemtingCommand,
+ time.Sleep(1 * time.Second)
+ t.Logf("invoke async method")
+ err := client.InvokeAsync(":3000", clientSendRemtingCommand,
time.Duration(1000), func(r *ResponseFuture) {
+ t.Logf("invoke async callback")
if string(r.ResponseCommand.Body) != "Welcome native" {
t.Errorf("wrong responseCommand.Body. want=%s, got=%s",
"Welcome native", string(r.ResponseCommand.Body))
@@ -231,8 +249,9 @@ func TestInvokeAsync(t *testing.T) {
return
}
defer conn.Close()
- scanner := createScanner(conn)
+ scanner := client.createScanner(conn)
for scanner.Scan() {
+ t.Logf("receive request.")
receivedRemotingCommand, err := decode(scanner.Bytes())
if err != nil {
t.Errorf("failed to decode RemotingCommnad. %s", err)
@@ -241,18 +260,22 @@ func TestInvokeAsync(t *testing.T) {
t.Errorf("wrong code. want=%d, got=%d", receivedRemotingCommand.Code,
clientSendRemtingCommand.Code)
}
+ t.Logf("encoding response")
body, err := encode(serverSendRemotingCommand)
if err != nil {
t.Fatalf("failed to encode RemotingCommand")
}
_, err = conn.Write(body)
+ t.Logf("sent response")
if err != nil {
t.Fatalf("failed to write body to conneciton.")
}
- return
+ goto done
}
}
- wg.Done()
+done:
+
+ wg.Wait()
}
func TestInvokeOneWay(t *testing.T) {
@@ -260,8 +283,9 @@ func TestInvokeOneWay(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
+ client := NewRemotingClient()
go func() {
- err := InvokeOneWay(":3000", clientSendRemtingCommand, 3*time.Second)
+ err := client.InvokeOneWay(":3000", clientSendRemtingCommand, 3*time.Second)
if err != nil {
t.Fatalf("failed to invoke synchronous. %s", err)
}
@@ -279,7 +303,7 @@ func TestInvokeOneWay(t *testing.T) {
return
}
defer conn.Close()
- scanner := createScanner(conn)
+ scanner := client.createScanner(conn)
for scanner.Scan() {
receivedRemotingCommand, err := decode(scanner.Bytes())
if err != nil {
|