This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch native in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git The following commit(s) were added to refs/heads/native by this push: new 3a34ed8 complete remote/client.go interface and unit test (#38) 3a34ed8 is described below commit 3a34ed85cad756b2c538f37e8654435235f29202 Author: 高峰 AuthorDate: Mon Mar 11 21:20:58 2019 +0800 complete remote/client.go interface and unit test (#38) --- common/init.go | 26 +++ common/manager.go | 4 +- common/route.go | 2 +- remote/client.go | 391 ++++++++++++++++++++++------------------ remote/client_test.go | 162 +++++++++++++++++ remote/codec_test.go | 61 ++++--- remote/{request.go => codes.go} | 36 ++++ remote/response.go | 66 ------- 8 files changed, 474 insertions(+), 274 deletions(-) diff --git a/common/init.go b/common/init.go new file mode 100644 index 0000000..1285295 --- /dev/null +++ b/common/init.go @@ -0,0 +1,26 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package common + +import "github.com/apache/rocketmq-client-go/remote" + +var client remote.RemotingClient + + +func init(){ + client = remote.NewDefaultRemotingClient() +} \ No newline at end of file diff --git a/common/manager.go b/common/manager.go index 63c1896..83e2934 100644 --- a/common/manager.go +++ b/common/manager.go @@ -51,7 +51,7 @@ type InnerConsumer interface { func SendMessageSync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest, msgs []*Message) (*SendResult, error) { cmd := remote.NewRemotingCommand(SendBatchMessage, request, encodeMessages(msgs)) - response, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second) + response, err := client.InvokeSync(brokerAddrs, cmd, 3*time.Second) if err != nil { return nil, err } @@ -68,7 +68,7 @@ func SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, reque func SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest, msgs []*Message) (*SendResult, error) { cmd := remote.NewRemotingCommand(SendBatchMessage, request, encodeMessages(msgs)) - err := remote.InvokeOneWay(brokerAddrs, cmd) + err := client.InvokeOneWay(brokerAddrs, cmd) return nil, err } diff --git a/common/route.go b/common/route.go index 33f25e1..4ad3218 100644 --- a/common/route.go +++ b/common/route.go @@ -157,7 +157,7 @@ func queryTopicRouteInfoFromServer(topic string, timeout time.Duration) (*topicR } rc := remote.NewRemotingCommand(GetRouteInfoByTopic, request, nil) - response, err := remote.InvokeSync(getNameServerAddress(), rc, timeout) + response, err := client.InvokeSync(getNameServerAddress(), rc, timeout) if err != nil { return nil, err diff --git a/remote/client.go b/remote/client.go index 2c6805c..7b1c528 100644 --- a/remote/client.go +++ b/remote/client.go @@ -17,244 +17,275 @@ package remote import ( + "bufio" + "bytes" + "context" "encoding/binary" "errors" + "io" "net" "sync" "time" - - "github.com/apache/rocketmq-client-go/utils" - log "github.com/sirupsen/logrus" ) var ( + //ErrRequestTimeout for request timeout error ErrRequestTimeout = errors.New("request timeout") ) -func InvokeSync(addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) { - return nil, nil +//ResponseFuture for +type ResponseFuture struct { + ResponseCommand *RemotingCommand + SendRequestOK bool + Err error + Opaque int32 + TimeoutMillis time.Duration + callback func(*ResponseFuture) + BeginTimestamp int64 + Done chan bool + callbackOnce sync.Once } -func InvokeAsync(addr string, request *RemotingCommand, timeout time.Duration, f func(*RemotingCommand)) error { - return nil +//NewResponseFuture create ResponseFuture with opaque, timeout and callback +func NewResponseFuture(opaque int32, timeoutMillis time.Duration, callback func(*ResponseFuture)) *ResponseFuture { + return &ResponseFuture{ + Opaque: opaque, + Done: make(chan bool), + TimeoutMillis: timeoutMillis, + callback: callback, + BeginTimestamp: time.Now().Unix() * 1000, + } } -func InvokeOneWay(addr string, request *RemotingCommand) error { - return nil +func (r *ResponseFuture) executeInvokeCallback() { + r.callbackOnce.Do(func() { + if r.callback != nil { + r.callback(r) + } + }) } -// ClientConfig common config -type ClientConfig struct { - // NameServer or Broker address - RemotingAddress string - - ClientIP string - InstanceName string - - // Heartbeat interval in microseconds with message broker, default is 30 - HeartbeatBrokerInterval time.Duration - - // request timeout time - RequestTimeout time.Duration - CType byte +func (r *ResponseFuture) isTimeout() bool { + diff := time.Now().Unix()*1000 - r.BeginTimestamp + return diff > int64(r.TimeoutMillis) +} - UnitMode bool - UnitName string - VipChannelEnabled bool +func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) { + for { + select { + case <-r.Done: + if r.Err != nil { + return nil, r.Err + } + return r.ResponseCommand, nil + case <-time.After(r.TimeoutMillis * time.Millisecond): + return nil, ErrRequestTimeout + } + } } -type defaultClient struct { - //clientId string - config ClientConfig - conn net.Conn - // requestId - opaque int32 +//RemotingClient includes basic operations for remote +type RemotingClient interface { + Start() + Shutdown() + InvokeSync(string, *RemotingCommand, time.Duration) (*RemotingCommand, error) + InvokeAsync(string, *RemotingCommand, time.Duration, func(*ResponseFuture)) error + InvokeOneWay(string, *RemotingCommand) error +} - // int32 -> ResponseFuture - responseTable sync.Map - codec serializer - exitCh chan interface{} +//defaultRemotingClient for default RemotingClient implementation +type defaultRemotingClient struct { + responseTable map[int32]*ResponseFuture + responseLock sync.RWMutex + connectionsTable map[string]net.Conn + connectionLock sync.RWMutex + ctx context.Context + cancel context.CancelFunc } -//func newRemotingClient(config ClientConfig) error { -// client := &defaultClient{ -// config: config, -// } -// -// switch config.CType { -// case Json: -// client.codec = &jsonCodec{} -// case RocketMQ: -// client.codec = &rmqCodec{} -// default: -// return errors.New("unknow codec") -// } -// -// conn, err := net.Dial("tcp", config.RemotingAddress) -// if err != nil { -// log.Error(err) -// return nil, err -// } -// client.conn = conn -// go client.listen() -// go client.clearExpiredRequest() -// return client, nil -//} +//NewDefaultRemotingClient for +func NewDefaultRemotingClient() RemotingClient { + client := &defaultRemotingClient{ + responseTable: make(map[int32]*ResponseFuture, 0), + connectionsTable: make(map[string]net.Conn, 0), + } + ctx, cancel := context.WithCancel(context.Background()) + client.ctx = ctx + client.cancel = cancel + return client +} -func (client *defaultClient) invokeSync(request *RemotingCommand) (*RemotingCommand, error) { +//Start begin sca +func (client *defaultRemotingClient) Start() { + ticker := time.NewTicker(1 * time.Second) + go func() { + for { + select { + case <-ticker.C: + client.scanResponseTable() + case <-client.ctx.Done(): + ticker.Stop() + return + } + } + }() +} - response := &ResponseFuture{ - SendRequestOK: false, - Opaque: request.Opaque, - TimeoutMillis: client.config.RequestTimeout, - BeginTimestamp: time.Now().Unix(), - Done: make(chan bool), +// Shutdown for call client.cancel +func (client *defaultRemotingClient) Shutdown() { + client.cancel() + client.connectionLock.Lock() + for addr, conn := range client.connectionsTable { + conn.Close() + delete(client.connectionsTable, addr) } - header, err := encode(request) - body := request.Body - client.responseTable.Store(request.Opaque, response) - err = client.doRequest(header, body) + client.connectionLock.Unlock() +} +// InvokeSync sends request synchronously +func (client *defaultRemotingClient) InvokeSync(addr string, request *RemotingCommand, timeoutMillis time.Duration) (*RemotingCommand, error) { + conn, err := client.connect(addr) if err != nil { - log.Error(err) return nil, err } - select { - case <-response.Done: - rmd := response.ResponseCommand - return rmd, nil - case <-time.After(client.config.RequestTimeout): - return nil, ErrRequestTimeout + resp := NewResponseFuture(request.Opaque, timeoutMillis, nil) + client.responseLock.Lock() + client.responseTable[resp.Opaque] = resp + client.responseLock.Unlock() + err = client.sendRequest(conn, request) + if err != nil { + return nil, err } + resp.SendRequestOK = true + return resp.waitResponse() } -func (client *defaultClient) invokeAsync(request *RemotingCommand, f func(*RemotingCommand)) error { - - response := &ResponseFuture{ - SendRequestOK: false, - Opaque: request.Opaque, - TimeoutMillis: client.config.RequestTimeout, - BeginTimestamp: time.Now().Unix(), - callback: f, +//InvokeAsync send request asynchronously +func (client *defaultRemotingClient) InvokeAsync(addr string, request *RemotingCommand, timeoutMillis time.Duration, callback func(*ResponseFuture)) error { + conn, err := client.connect(addr) + if err != nil { + return err } - client.responseTable.Store(request.Opaque, response) - header, err := encode(request) + resp := NewResponseFuture(request.Opaque, timeoutMillis, callback) + client.responseLock.Lock() + client.responseTable[resp.Opaque] = resp + client.responseLock.Unlock() + err = client.sendRequest(conn, request) if err != nil { return err } - - body := request.Body - return client.doRequest(header, body) + resp.SendRequestOK = true + return nil } -func (client *defaultClient) invokeOneWay(request *RemotingCommand) error { - header, err := encode(request) +//InvokeOneWay send one-way request +func (client *defaultRemotingClient) InvokeOneWay(addr string, request *RemotingCommand) error { + conn, err := client.connect(addr) if err != nil { return err } - - body := request.Body - return client.doRequest(header, body) + return client.sendRequest(conn, request) } -func (client *defaultClient) doRequest(header, body []byte) error { - var requestBytes = make([]byte, len(header)+len(body)) - copy(requestBytes, header) - if len(body) > 0 { - copy(requestBytes[len(header):], body) +func (client *defaultRemotingClient) scanResponseTable() { + rfs := make([]*ResponseFuture, 0) + client.responseLock.Lock() + for opaque, resp := range client.responseTable { + if (resp.BeginTimestamp + int64(resp.TimeoutMillis) + 1000) <= time.Now().Unix()*1000 { + delete(client.responseTable, opaque) + rfs = append(rfs, resp) + } + } + client.responseLock.Unlock() + for _, rf := range rfs { + rf.Err = ErrRequestTimeout + rf.executeInvokeCallback() } - - _, err := client.conn.Write(requestBytes) - return err } -func (client *defaultClient) close() { - // TODO process response - client.conn.Close() +func (client *defaultRemotingClient) connect(addr string) (net.Conn, error) { + client.connectionLock.Lock() + defer client.connectionLock.Unlock() + conn, ok := client.connectionsTable[addr] + if ok { + return conn.(net.Conn), nil + } + tcpConn, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + client.connectionsTable[addr] = tcpConn + go client.receiveResponse(tcpConn) + return tcpConn, nil } -func (client *defaultClient) listen() { - rb := utils.NewRingBuffer(4096) - - var frameSize int32 - go func() { - for { - err := binary.Read(rb, binary.BigEndian, &frameSize) - if err != nil { - // TODO - } - data := make([]byte, frameSize) - - _, err = rb.Read(data) - - if err != nil { - // TODO - } - - cmd, err := decode(data) - if cmd.isResponseType() { - client.handleResponse(cmd) - } else { - client.handleRequestFromServer(cmd) - } - } - }() - - buf := make([]byte, 4096) - for { - n, err := client.conn.Read(buf) +func (client *defaultRemotingClient) receiveResponse(conn net.Conn) { + scanner := createScanner(conn) + for scanner.Scan() { + receivedRemotingCommand, err := decode(scanner.Bytes()) if err != nil { - log.Errorf("read data from connection errors: %v", err) - return + client.closeConnection(conn) + break } - err = rb.Write(buf[:n]) - if err != nil { - // just log - log.Errorf("write data to buffer errors: %v", err) + if receivedRemotingCommand.isResponseType() { + client.responseLock.Lock() + if resp, ok := client.responseTable[receivedRemotingCommand.Opaque]; ok { + delete(client.responseTable, receivedRemotingCommand.Opaque) + resp.ResponseCommand = receivedRemotingCommand + resp.executeInvokeCallback() + if resp.Done != nil { + resp.Done <- true + } + } + client.responseLock.Unlock() + } else { + // todo handler request from peer } - } } -func (client *defaultClient) handleRequestFromServer(cmd *RemotingCommand) { - //responseCommand := client.clientRequestProcessor(cmd) - //if responseCommand == nil { - // return - //} - //responseCommand.Opaque = cmd.Opaque - //responseCommand.markResponseType() - //header, err := encode(responseCommand) - //body := responseCommand.Body - //err = client.doRequest(header, body) - //if err != nil { - // log.Error(err) - //} +func createScanner(r io.Reader) *bufio.Scanner { + scanner := bufio.NewScanner(r) + scanner.Split(func(data []byte, atEOF bool) (int, []byte, error) { + if !atEOF { + if len(data) >= 4 { + var length int32 + binary.Read(bytes.NewReader(data[0:4]), binary.BigEndian, &length) + if int(length)+4 <= len(data) { + return int(length) + 4, data[:int(length)+4], nil + } + } + } + return 0, nil, nil + }) + return scanner } -func (client *defaultClient) handleResponse(cmd *RemotingCommand) error { - //response, err := client.getResponse(cmd.Opaque) - ////client.removeResponse(cmd.Opaque) - //if err != nil { - // return err - //} - // - //response.ResponseCommand = cmd - //response.callback(cmd) - // - //if response.Done != nil { - // response.Done <- true - //} +func (client *defaultRemotingClient) sendRequest(conn net.Conn, request *RemotingCommand) error { + content, err := encode(request) + if err != nil { + return err + } + _, err = conn.Write(content) + if err != nil { + client.closeConnection(conn) + return err + } return nil } -func (client *defaultClient) clearExpiredRequest() { - //for seq, responseObj := range client.responseTable.Items() { - // response := responseObj.(*ResponseFuture) - // if (response.BeginTimestamp + 30) <= time.Now().Unix() { - // //30 minutes expired - // client.responseTable.Remove(seq) - // response.callback(nil) - // log.Warningf("remove time out request %v", response) - // } - //} +func (client *defaultRemotingClient) closeConnection(toCloseConn net.Conn) { + client.connectionLock.Lock() + var toCloseAddr string + for addr, con := range client.connectionsTable { + if con == toCloseConn { + toCloseAddr = addr + break + } + } + if conn, ok := client.connectionsTable[toCloseAddr]; ok { + delete(client.connectionsTable, toCloseAddr) + conn.Close() + } + client.connectionLock.Unlock() } diff --git a/remote/client_test.go b/remote/client_test.go index 7909612..55a26fc 100644 --- a/remote/client_test.go +++ b/remote/client_test.go @@ -15,3 +15,165 @@ * limitations under the License. */ package remote + +import ( + "bytes" + "errors" + "reflect" + "sync" + "testing" + "time" +) + +func TestNewResponseFuture(t *testing.T) { + future := NewResponseFuture(10, time.Duration(1000), nil) + if future.Opaque != 10 { + t.Errorf("wrong ResponseFuture's Opaque. want=%d, got=%d", 10, future.Opaque) + } + if future.SendRequestOK != false { + t.Errorf("wrong ResposneFutrue's SendRequestOK. want=%t, got=%t", false, future.SendRequestOK) + } + if future.Err != nil { + t.Errorf("wrong RespnseFuture's Err. want=, got=%v", future.Err) + } + if future.TimeoutMillis != time.Duration(1000) { + t.Errorf("wrong ResponseFuture's TimeoutMills. want=%d, got=%d", + future.TimeoutMillis, time.Duration(1000)) + } + if future.callback != nil { + t.Errorf("wrong ResponseFuture's callback. want=, got=%v", future.callback) + } + if future.Done == nil { + t.Errorf("wrong ResponseFuture's Done. want=, got=") + } +} + +func TestResponseFutureTimeout(t *testing.T) { + callback := func(r *ResponseFuture) { + if r.ResponseCommand.Remark == "" { + r.ResponseCommand.Remark = "Hello RocketMQ." + } else { + r.ResponseCommand.Remark = r.ResponseCommand.Remark + "Go Client" + } + } + future := NewResponseFuture(10, time.Duration(1000), callback) + future.ResponseCommand = NewRemotingCommand(200, + nil, nil) + + var wg sync.WaitGroup + wg.Add(10) + for i := 0; i < 10; i++ { + go func() { + future.executeInvokeCallback() + wg.Done() + }() + } + wg.Wait() + if future.ResponseCommand.Remark != "Hello RocketMQ." { + t.Errorf("wrong ResponseFuture.ResponseCommand.Remark. want=%s, got=%s", + "Hello RocketMQ.", future.ResponseCommand.Remark) + } + +} + +func TestResponseFutureIsTimeout(t *testing.T) { + future := NewResponseFuture(10, time.Duration(500), nil) + if future.isTimeout() != false { + t.Errorf("wrong ResponseFuture's istimeout. want=%t, got=%t", false, future.isTimeout()) + } + time.Sleep(time.Duration(2000) * time.Millisecond) + if future.isTimeout() != true { + t.Errorf("wrong ResponseFuture's istimeout. want=%t, got=%t", true, future.isTimeout()) + } + +} + +func TestResponseFutureWaitResponse(t *testing.T) { + future := NewResponseFuture(10, time.Duration(500), nil) + if _, err := future.waitResponse(); err != ErrRequestTimeout { + t.Errorf("wrong ResponseFuture waitResponse. want=%v, got=%v", + ErrRequestTimeout, err) + } + future = NewResponseFuture(10, time.Duration(500), nil) + responseError := errors.New("response error") + go func() { + time.Sleep(100 * time.Millisecond) + future.Err = responseError + future.Done <- true + }() + if _, err := future.waitResponse(); err != responseError { + t.Errorf("wrong ResponseFuture waitResponse. want=%v. got=%v", + responseError, err) + } + future = NewResponseFuture(10, time.Duration(500), nil) + responseRemotingCommand := NewRemotingCommand(202, nil, nil) + go func() { + time.Sleep(100 * time.Millisecond) + future.ResponseCommand = responseRemotingCommand + future.Done <- true + }() + if r, err := future.waitResponse(); err != nil { + t.Errorf("wrong ResponseFuture waitResponse error: %v", err) + } else { + if r != responseRemotingCommand { + t.Errorf("wrong ResponseFuture waitResposne result. want=%v, got=%v", + responseRemotingCommand, r) + } + } +} + +func TestNewDefaultRemotingClient(t *testing.T) { + r := NewDefaultRemotingClient() + d, ok := r.(*defaultRemotingClient) + if !ok { + t.Errorf("defaultRemotingClient does not implement RemotingClient interface") + } + if len(d.responseTable) != 0 { + t.Errorf("wrong responseTable size. want=%d, got=%d", + 0, len(d.responseTable)) + } + if len(d.connectionsTable) != 0 { + t.Errorf("wrong connectionsTable size. want=%d, got=%d", + 0, len(d.connectionsTable)) + } + if d.ctx == nil { + t.Errorf("wrong ctx. want=%v, got=", d.ctx) + } + if d.cancel == nil { + t.Errorf("wrong cancel. want=%v, got=", d.cancel) + } +} + +func TestDefaultRemotingClient_Start_ShutDown(t *testing.T) { + r := NewDefaultRemotingClient() + d, ok := r.(*defaultRemotingClient) + if !ok { + t.Errorf("defaultRemotingClient does not implement RemotingClient interface") + } + d.Start() + time.Sleep(2 * time.Second) + d.Shutdown() + if len(d.connectionsTable) != 0 { + t.Errorf("wrong connectionTable. want=%d, got=%d", + 0, len(d.connectionsTable)) + } +} + +func TestCreateScanner(t *testing.T) { + r := randomNewRemotingCommand() + content, err := encode(r) + if err != nil { + t.Fatalf("failed to encode RemotingCommand. %s", err) + } + reader := bytes.NewReader(content) + scanner := createScanner(reader) + for scanner.Scan() { + rcr, err := decode(scanner.Bytes()) + if err != nil { + t.Fatalf("failedd to decode RemotingCommand from scanner") + } + if !reflect.DeepEqual(*r, *rcr) { + t.Fatal("decoded RemotingCommand not equal to the original one") + } + } +} \ No newline at end of file diff --git a/remote/codec_test.go b/remote/codec_test.go index 62dd528..5e63e4a 100644 --- a/remote/codec_test.go +++ b/remote/codec_test.go @@ -22,6 +22,18 @@ import ( "testing" ) +type testHeader struct { + +} + +func (t testHeader) Encode() map[string]string { + properties := make(map[string]string) + for i := 0; i < 10; i++ { + properties[randomString(rand.Intn(20))] = randomString(rand.Intn(20)) + } + return properties +} + func randomBytes(length int) []byte { bs := make([]byte, length) if _, err := rand.Read(bs); err != nil { @@ -39,12 +51,9 @@ func randomString(length int) string { } func randomNewRemotingCommand() *RemotingCommand { - properties := make(map[string]string) - for i := 0; i < 10; i++ { - properties[randomString(rand.Intn(20))] = randomString(rand.Intn(20)) - } + var h testHeader body := randomBytes(rand.Intn(100)) - return NewRemotingCommand(int16(rand.Intn(1000)), properties, body) + return NewRemotingCommand(int16(rand.Intn(1000)), h, body) } func Test_encode(t *testing.T) { @@ -227,7 +236,8 @@ func Benchmark_rmqCodec_decodeHeader(b *testing.B) { } func TestCommandJsonEncodeDecode(t *testing.T) { - cmd := NewRemotingCommand(192, map[string]string{"brokers": "127.0.0.1"}, []byte("Hello RocketMQCodecs")) + var h testHeader + cmd := NewRemotingCommand(192, h, []byte("Hello RocketMQCodecs")) codecType = JsonCodecs cmdData, err := encode(cmd) if err != nil { @@ -259,19 +269,20 @@ func TestCommandJsonEncodeDecode(t *testing.T) { if newCmd.Remark != cmd.Remark { t.Errorf("wrong command remakr. want=%s, got=%s", cmd.Remark, newCmd.Remark) } - for k, v := range cmd.ExtFields { - if vv, ok := newCmd.ExtFields[k]; !ok { - t.Errorf("key: %s not exists in newCommand.", k) - } else { - if v != vv { - t.Errorf("wrong value. want=%s, got=%s", v, vv) - } - } - } + //for k, v := range cmd.ExtFields { + // if vv, ok := newCmd.ExtFields[k]; !ok { + // t.Errorf("key: %s not exists in newCommand.", k) + // } else { + // if v != vv { + // t.Errorf("wrong value. want=%s, got=%s", v, vv) + // } + // } + //} } func TestCommandRocketMQEncodeDecode(t *testing.T) { - cmd := NewRemotingCommand(192, map[string]string{"brokers": "127.0.0.1"}, []byte("Hello RocketMQCodecs")) + var h testHeader + cmd := NewRemotingCommand(192, h, []byte("Hello RocketMQCodecs")) codecType = RocketMQCodecs cmdData, err := encode(cmd) if err != nil { @@ -303,13 +314,13 @@ func TestCommandRocketMQEncodeDecode(t *testing.T) { if newCmd.Remark != cmd.Remark { t.Errorf("wrong command remakr. want=%s, got=%s", cmd.Remark, newCmd.Remark) } - for k, v := range cmd.ExtFields { - if vv, ok := newCmd.ExtFields[k]; !ok { - t.Errorf("key: %s not exists in newCommand.", k) - } else { - if v != vv { - t.Errorf("wrong value. want=%s, got=%s", v, vv) - } - } - } + //for k, v := range cmd.ExtFields { + // if vv, ok := newCmd.ExtFields[k]; !ok { + // t.Errorf("key: %s not exists in newCommand.", k) + // } else { + // if v != vv { + // t.Errorf("wrong value. want=%s, got=%s", v, vv) + // } + // } + //} } diff --git a/remote/request.go b/remote/codes.go similarity index 78% rename from remote/request.go rename to remote/codes.go index c37eac5..b94d2e2 100644 --- a/remote/request.go +++ b/remote/codes.go @@ -109,3 +109,39 @@ const ( VIEW_BROKER_STATS_DATA = 315 ) + +const ( + SUCCESS = 0 + SYSTEM_ERROR = 1 + SYSTEM_BUSY = 2 + REQUEST_CODE_NOT_SUPPORTED = 3 + TRANSACTION_FAILED = 4 + FLUSH_DISK_TIMEOUT = 10 + SLAVE_NOT_AVAILABLE = 11 + FLUSH_SLAVE_TIMEOUT = 12 + MESSAGE_ILLEGAL = 13 + SERVICE_NOT_AVAILABLE = 14 + VERSION_NOT_SUPPORTED = 15 + NO_PERMISSION = 16 + TOPIC_NOT_EXIST = 17 + TOPIC_EXIST_ALREADY = 18 + PULL_NOT_FOUND = 19 + PULL_RETRY_IMMEDIATELY = 20 + PULL_OFFSET_MOVED = 21 + QUERY_NOT_FOUND = 22 + SUBSCRIPTION_PARSE_FAILED = 23 + SUBSCRIPTION_NOT_EXIST = 24 + SUBSCRIPTION_NOT_LATEST = 25 + SUBSCRIPTION_GROUP_NOT_EXIST = 26 + TRANSACTION_SHOULD_COMMIT = 200 + TRANSACTION_SHOULD_ROLLBACK = 201 + TRANSACTION_STATE_UNKNOW = 202 + TRANSACTION_STATE_GROUP_WRONG = 203 + NO_BUYER_ID = 204 + + NOT_IN_CURRENT_UNIT = 205 + + CONSUMER_NOT_ONLINE = 206 + + CONSUME_MSG_TIMEOUT = 207 +) diff --git a/remote/response.go b/remote/response.go deleted file mode 100644 index d7954bc..0000000 --- a/remote/response.go +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package remote - -import "time" - -const ( - SUCCESS = 0 - SYSTEM_ERROR = 1 - SYSTEM_BUSY = 2 - REQUEST_CODE_NOT_SUPPORTED = 3 - TRANSACTION_FAILED = 4 - FLUSH_DISK_TIMEOUT = 10 - SLAVE_NOT_AVAILABLE = 11 - FLUSH_SLAVE_TIMEOUT = 12 - MESSAGE_ILLEGAL = 13 - SERVICE_NOT_AVAILABLE = 14 - VERSION_NOT_SUPPORTED = 15 - NO_PERMISSION = 16 - TOPIC_NOT_EXIST = 17 - TOPIC_EXIST_ALREADY = 18 - PULL_NOT_FOUND = 19 - PULL_RETRY_IMMEDIATELY = 20 - PULL_OFFSET_MOVED = 21 - QUERY_NOT_FOUND = 22 - SUBSCRIPTION_PARSE_FAILED = 23 - SUBSCRIPTION_NOT_EXIST = 24 - SUBSCRIPTION_NOT_LATEST = 25 - SUBSCRIPTION_GROUP_NOT_EXIST = 26 - TRANSACTION_SHOULD_COMMIT = 200 - TRANSACTION_SHOULD_ROLLBACK = 201 - TRANSACTION_STATE_UNKNOW = 202 - TRANSACTION_STATE_GROUP_WRONG = 203 - NO_BUYER_ID = 204 - - NOT_IN_CURRENT_UNIT = 205 - - CONSUMER_NOT_ONLINE = 206 - - CONSUME_MSG_TIMEOUT = 207 -) - -type ResponseFuture struct { - ResponseCommand *RemotingCommand - SendRequestOK bool - Rrr error - Opaque int32 - TimeoutMillis time.Duration - callback func(*RemotingCommand) - BeginTimestamp int64 - Done chan bool -}