dubbo-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [dubbo-go] hxmhlt commented on a change in pull request #495: Imp: refactor the network transport layer
Date Mon, 01 Jun 2020 11:53:14 GMT

hxmhlt commented on a change in pull request #495:
URL: https://github.com/apache/dubbo-go/pull/495#discussion_r433185716



##########
File path: protocol/dubbo/dubbo_codec.go
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dubbo
+
+import (
+	"bufio"
+	"bytes"
+	"fmt"
+	"strconv"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+//SerialID serial ID
+type SerialID byte
+
+const (
+	// S_Dubbo dubbo serial id
+	S_Dubbo SerialID = 2
+)
+
+func init() {
+	codec := &DubboCodec{}
+	// this is for registry dubboCodec of dubbo protocol
+	remoting.RegistryCodec("dubbo", codec)
+}
+
+// DubboPackage.  this is for hessian encode/decode. If we refactor hessian, it will also
be refactored.
+type DubboPackage struct {
+	Header  hessian.DubboHeader
+	Service hessian.Service
+	Body    interface{}
+	Err     error
+}
+
+// String of DubboPackage
+func (p DubboPackage) String() string {
+	return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body)
+}
+
+// Marshal ...
+func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
+	codec := hessian.NewHessianCodec(nil)
+
+	pkg, err := codec.Write(p.Service, p.Header, p.Body)
+	if err != nil {
+		return nil, perrors.WithStack(err)
+	}
+
+	return bytes.NewBuffer(pkg), nil
+}
+
+// Unmarshal ...
+func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, resp *remoting.Response) error {
+	// fix issue https://github.com/apache/dubbo-go/issues/380
+	bufLen := buf.Len()
+	if bufLen < hessian.HEADER_LENGTH {
+		return perrors.WithStack(hessian.ErrHeaderNotEnough)
+	}
+
+	codec := hessian.NewHessianCodec(bufio.NewReaderSize(buf, bufLen))
+
+	// read header
+	err := codec.ReadHeader(&p.Header)
+	if err != nil {
+		return perrors.WithStack(err)
+	}
+
+	if resp != nil { // for client
+		if (p.Header.Type & hessian.PackageRequest) != 0x00 {
+			// size of this array must be '7'
+			// https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272
+			p.Body = make([]interface{}, 7)
+		} else {
+			pendingRsp := remoting.GetPendingResponse(remoting.SequenceType(p.Header.ID))
+			if pendingRsp == nil {
+				return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
+			}
+			p.Body = &hessian.Response{RspObj: pendingRsp.Reply}
+		}
+	}
+	// read body
+	err = codec.ReadBody(p.Body)
+	return perrors.WithStack(err)
+}
+
+// DubboCodec. It is implements remoting.Codec
+type DubboCodec struct {
+}
+
+// encode request for transport
+func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
+	if request.Event {
+		return c.encodeHeartbeartReqeust(request)
+	}
+
+	invoc, ok := request.Data.(*protocol.Invocation)
+	if !ok {
+		logger.Errorf("encode request failed for parameter type :%+v", request)
+		return nil, perrors.Errorf("encode request failed for parameter type :%+v", request)
+	}
+	invocation := *invoc
+
+	p := &DubboPackage{}
+	p.Service.Path = invocation.AttachmentsByKey(constant.PATH_KEY, "")
+	p.Service.Interface = invocation.AttachmentsByKey(constant.INTERFACE_KEY, "")
+	p.Service.Version = invocation.AttachmentsByKey(constant.VERSION_KEY, "")
+	p.Service.Group = invocation.AttachmentsByKey(constant.GROUP_KEY, "")
+	p.Service.Method = invocation.MethodName()
+
+	timeout, err := strconv.Atoi(invocation.AttachmentsByKey(constant.TIMEOUT_KEY, "3000"))
+	if err != nil {
+		// it will be wrapped in readwrite.Write .
+		return nil, err
+	}

Review comment:
       perrors.WithStack(err)

##########
File path: protocol/dubbo/dubbo_codec.go
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dubbo
+
+import (
+	"bufio"
+	"bytes"
+	"fmt"
+	"strconv"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+//SerialID serial ID
+type SerialID byte
+
+const (
+	// S_Dubbo dubbo serial id
+	S_Dubbo SerialID = 2
+)
+
+func init() {
+	codec := &DubboCodec{}
+	// this is for registry dubboCodec of dubbo protocol
+	remoting.RegistryCodec("dubbo", codec)
+}
+
+// DubboPackage.  this is for hessian encode/decode. If we refactor hessian, it will also
be refactored.
+type DubboPackage struct {
+	Header  hessian.DubboHeader
+	Service hessian.Service
+	Body    interface{}
+	Err     error
+}
+
+// String of DubboPackage
+func (p DubboPackage) String() string {
+	return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body)
+}
+
+// Marshal ...
+func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
+	codec := hessian.NewHessianCodec(nil)
+
+	pkg, err := codec.Write(p.Service, p.Header, p.Body)
+	if err != nil {
+		return nil, perrors.WithStack(err)
+	}
+
+	return bytes.NewBuffer(pkg), nil
+}
+
+// Unmarshal ...
+func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, resp *remoting.Response) error {
+	// fix issue https://github.com/apache/dubbo-go/issues/380
+	bufLen := buf.Len()
+	if bufLen < hessian.HEADER_LENGTH {
+		return perrors.WithStack(hessian.ErrHeaderNotEnough)
+	}
+
+	codec := hessian.NewHessianCodec(bufio.NewReaderSize(buf, bufLen))
+
+	// read header
+	err := codec.ReadHeader(&p.Header)
+	if err != nil {
+		return perrors.WithStack(err)
+	}
+
+	if resp != nil { // for client
+		if (p.Header.Type & hessian.PackageRequest) != 0x00 {
+			// size of this array must be '7'
+			// https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272
+			p.Body = make([]interface{}, 7)
+		} else {
+			pendingRsp := remoting.GetPendingResponse(remoting.SequenceType(p.Header.ID))
+			if pendingRsp == nil {
+				return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
+			}
+			p.Body = &hessian.Response{RspObj: pendingRsp.Reply}
+		}
+	}
+	// read body
+	err = codec.ReadBody(p.Body)
+	return perrors.WithStack(err)
+}
+
+// DubboCodec. It is implements remoting.Codec
+type DubboCodec struct {
+}
+
+// encode request for transport
+func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
+	if request.Event {
+		return c.encodeHeartbeartReqeust(request)
+	}
+
+	invoc, ok := request.Data.(*protocol.Invocation)
+	if !ok {
+		logger.Errorf("encode request failed for parameter type :%+v", request)
+		return nil, perrors.Errorf("encode request failed for parameter type :%+v", request)
+	}
+	invocation := *invoc
+
+	p := &DubboPackage{}
+	p.Service.Path = invocation.AttachmentsByKey(constant.PATH_KEY, "")
+	p.Service.Interface = invocation.AttachmentsByKey(constant.INTERFACE_KEY, "")
+	p.Service.Version = invocation.AttachmentsByKey(constant.VERSION_KEY, "")
+	p.Service.Group = invocation.AttachmentsByKey(constant.GROUP_KEY, "")
+	p.Service.Method = invocation.MethodName()
+
+	timeout, err := strconv.Atoi(invocation.AttachmentsByKey(constant.TIMEOUT_KEY, "3000"))
+	if err != nil {

Review comment:
       You would better define a constant like default timeout for 3000.

##########
File path: protocol/dubbo/dubbo_codec.go
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dubbo
+
+import (
+	"bufio"
+	"bytes"
+	"fmt"
+	"strconv"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+//SerialID serial ID
+type SerialID byte
+
+const (
+	// S_Dubbo dubbo serial id
+	S_Dubbo SerialID = 2
+)
+
+func init() {
+	codec := &DubboCodec{}
+	// this is for registry dubboCodec of dubbo protocol
+	remoting.RegistryCodec("dubbo", codec)
+}
+
+// DubboPackage.  this is for hessian encode/decode. If we refactor hessian, it will also
be refactored.
+type DubboPackage struct {
+	Header  hessian.DubboHeader
+	Service hessian.Service
+	Body    interface{}
+	Err     error
+}
+
+// String of DubboPackage
+func (p DubboPackage) String() string {
+	return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body)
+}
+
+// Marshal ...
+func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
+	codec := hessian.NewHessianCodec(nil)
+
+	pkg, err := codec.Write(p.Service, p.Header, p.Body)
+	if err != nil {
+		return nil, perrors.WithStack(err)
+	}
+
+	return bytes.NewBuffer(pkg), nil
+}
+
+// Unmarshal ...
+func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, resp *remoting.Response) error {
+	// fix issue https://github.com/apache/dubbo-go/issues/380
+	bufLen := buf.Len()
+	if bufLen < hessian.HEADER_LENGTH {
+		return perrors.WithStack(hessian.ErrHeaderNotEnough)
+	}
+
+	codec := hessian.NewHessianCodec(bufio.NewReaderSize(buf, bufLen))
+
+	// read header
+	err := codec.ReadHeader(&p.Header)
+	if err != nil {
+		return perrors.WithStack(err)
+	}
+
+	if resp != nil { // for client
+		if (p.Header.Type & hessian.PackageRequest) != 0x00 {
+			// size of this array must be '7'
+			// https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272
+			p.Body = make([]interface{}, 7)
+		} else {
+			pendingRsp := remoting.GetPendingResponse(remoting.SequenceType(p.Header.ID))
+			if pendingRsp == nil {
+				return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
+			}
+			p.Body = &hessian.Response{RspObj: pendingRsp.Reply}
+		}
+	}
+	// read body
+	err = codec.ReadBody(p.Body)
+	return perrors.WithStack(err)
+}
+
+// DubboCodec. It is implements remoting.Codec
+type DubboCodec struct {
+}
+
+// encode request for transport
+func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
+	if request.Event {
+		return c.encodeHeartbeartReqeust(request)
+	}
+
+	invoc, ok := request.Data.(*protocol.Invocation)
+	if !ok {
+		logger.Errorf("encode request failed for parameter type :%+v", request)
+		return nil, perrors.Errorf("encode request failed for parameter type :%+v", request)
+	}
+	invocation := *invoc
+
+	p := &DubboPackage{}
+	p.Service.Path = invocation.AttachmentsByKey(constant.PATH_KEY, "")
+	p.Service.Interface = invocation.AttachmentsByKey(constant.INTERFACE_KEY, "")
+	p.Service.Version = invocation.AttachmentsByKey(constant.VERSION_KEY, "")
+	p.Service.Group = invocation.AttachmentsByKey(constant.GROUP_KEY, "")
+	p.Service.Method = invocation.MethodName()
+
+	timeout, err := strconv.Atoi(invocation.AttachmentsByKey(constant.TIMEOUT_KEY, "3000"))
+	if err != nil {
+		// it will be wrapped in readwrite.Write .
+		return nil, err
+	}
+	p.Service.Timeout = time.Duration(timeout)
+
+	p.Header.SerialID = byte(S_Dubbo)
+	p.Header.ID = request.ID
+	if request.TwoWay {
+		p.Header.Type = hessian.PackageRequest_TwoWay
+	} else {
+		p.Header.Type = hessian.PackageRequest
+	}
+
+	p.Body = hessian.NewRequest(invocation.Arguments(), invocation.Attachments())
+
+	codec := hessian.NewHessianCodec(nil)
+
+	pkg, err := codec.Write(p.Service, p.Header, p.Body)
+	if err != nil {
+		return nil, perrors.WithStack(err)
+	}
+
+	return bytes.NewBuffer(pkg), nil
+}
+
+// encode heartbeart request
+func (c *DubboCodec) encodeHeartbeartReqeust(request *remoting.Request) (*bytes.Buffer, error)
{
+	pkg := &DubboPackage{}
+	pkg.Body = []interface{}{}
+	pkg.Header.ID = request.ID
+	pkg.Header.Type = hessian.PackageHeartbeat
+	pkg.Header.SerialID = byte(S_Dubbo)
+
+	codec := hessian.NewHessianCodec(nil)
+
+	byt, err := codec.Write(pkg.Service, pkg.Header, pkg.Body)
+	if err != nil {
+		return nil, perrors.WithStack(err)
+	}
+
+	return bytes.NewBuffer(byt), nil
+}
+
+// encode response
+func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, error) {
+	var ptype = hessian.PackageResponse
+	if response.IsHeartbeat() {
+		ptype = hessian.PackageHeartbeat
+	}
+	resp := &DubboPackage{
+		Header: hessian.DubboHeader{
+			SerialID:       response.SerialID,
+			Type:           ptype,
+			ID:             response.ID,
+			ResponseStatus: response.Status,
+		},
+	}
+	if !response.IsHeartbeat() {
+		resp.Body = &hessian.Response{
+			RspObj:      response.Result.(protocol.RPCResult).Rest,
+			Exception:   response.Result.(protocol.RPCResult).Err,
+			Attachments: response.Result.(protocol.RPCResult).Attrs,
+		}
+	}
+
+	codec := hessian.NewHessianCodec(nil)
+
+	pkg, err := codec.Write(resp.Service, resp.Header, resp.Body)
+	if err != nil {
+		return nil, perrors.WithStack(err)
+	}
+
+	return bytes.NewBuffer(pkg), nil
+}
+
+// Decode data, including request and response.
+func (c *DubboCodec) Decode(data []byte) (remoting.DecodeResult, int, error) {
+	if c.isRequest(data) {
+		req, len, err := c.decodeRequest(data)
+		if err != nil {
+			return remoting.DecodeResult{}, len, perrors.WithStack(err)
+		}
+		return remoting.DecodeResult{IsRequest: true, Result: req}, len, perrors.WithStack(err)
+	} else {
+		resp, len, err := c.decodeResponse(data)
+		if err != nil {
+			return remoting.DecodeResult{}, len, perrors.WithStack(err)
+		}
+		return remoting.DecodeResult{IsRequest: false, Result: resp}, len, perrors.WithStack(err)
+	}
+}
+func (c *DubboCodec) isRequest(data []byte) bool {
+	if data[2]&byte(0x80) == 0x00 {
+		return false
+	}
+	return true
+}
+
+// decode request
+func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) {
+	pkg := &DubboPackage{
+		Body: make([]interface{}, 7),
+	}
+	var request *remoting.Request = nil
+	buf := bytes.NewBuffer(data)
+	err := pkg.Unmarshal(buf, nil)
+	if err != nil {
+		originErr := perrors.Cause(err)
+		if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
+			//FIXME
+			return nil, 0, originErr
+		}
+		logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err)
+
+		return request, 0, perrors.WithStack(err)
+	}
+	request = &remoting.Request{
+		ID:       pkg.Header.ID,
+		SerialID: pkg.Header.SerialID,
+		TwoWay:   pkg.Header.Type&hessian.PackageRequest_TwoWay != 0x00,
+		Event:    pkg.Header.Type&hessian.PackageHeartbeat != 0x00,
+	}
+	if pkg.Header.Type&hessian.PackageHeartbeat == 0x00 {
+		// convert params of request

Review comment:
       No go fmt? '&' 前后应该有空格才对。建议ide调成自动format

##########
File path: remoting/exchange_client.go
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remoting
+
+import (
+	"errors"
+	"sync"
+	"time"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+)
+
+var (
+	// store requestID and response
+	pendingResponses *sync.Map = new(sync.Map)
+)
+
+type SequenceType int64
+
+// It is interface of client for network communication.
+// If you use getty as network communication, you should define GettyClient that implements
this interface.
+type Client interface {
+	SetExchangeClient(client *ExchangeClient)
+	// responseHandler is used to deal with msg
+	SetResponseHandler(responseHandler ResponseHandler)
+	// connect url
+	Connect(url common.URL) error
+	// close
+	Close()
+	// send request to server.
+	Request(request *Request, timeout time.Duration, response *PendingResponse) error
+}
+
+// This is abstraction level. it is like facade.
+type ExchangeClient struct {
+	ConnectTimeout time.Duration
+	address        string
+	client         Client
+	init           bool
+}
+
+// handle the message from server
+type ResponseHandler interface {
+	Handler(response *Response)
+}
+
+// create ExchangeClient
+func NewExchangeClient(url common.URL, client Client, connectTimeout time.Duration, lazyInit
bool) *ExchangeClient {
+	exchangeClient := &ExchangeClient{
+		ConnectTimeout: connectTimeout,
+		address:        url.Location,
+		client:         client,
+	}
+	client.SetExchangeClient(exchangeClient)
+	if !lazyInit {
+		if err := exchangeClient.doInit(url); err != nil {
+			return nil
+		}
+	}
+
+	client.SetResponseHandler(exchangeClient)
+	return exchangeClient
+}
+
+func (cl *ExchangeClient) doInit(url common.URL) error {
+	if cl.init {
+		return nil
+	}
+	if cl.client.Connect(url) != nil {
+		//retry for a while
+		time.Sleep(100 * time.Millisecond)
+		if cl.client.Connect(url) != nil {
+			logger.Errorf("Failed to connect server %+v " + url.Location)
+			return errors.New("Failed to connect server " + url.Location)
+		}
+	}
+	//FIXME atomic operation
+	cl.init = true
+	return nil
+}
+
+// two way request
+func (client *ExchangeClient) Request(invocation *protocol.Invocation, url common.URL, timeout
time.Duration,
+	result *protocol.RPCResult) error {
+	if er := client.doInit(url); er != nil {
+		return er
+	}
+	request := NewRequest("2.0.2")
+	request.Data = invocation

Review comment:
       2.0.2 means what?

##########
File path: protocol/dubbo/dubbo_invoker_test.go
##########
@@ -92,3 +92,142 @@ func TestDubboInvoker_Invoke(t *testing.T) {
 	proto.Destroy()
 	lock.Unlock()
 }
+
+func InitTest(t *testing.T) (protocol.Protocol, common.URL) {
+
+	hessian.RegisterPOJO(&User{})
+
+	methods, err := common.ServiceMap.Register("dubbo", &UserProvider{})
+	assert.NoError(t, err)
+	assert.Equal(t, "GetBigPkg,GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4,GetUser5,GetUser6",
methods)
+
+	// config
+	getty.SetClientConf(getty.ClientConfig{
+		ConnectionNum:   2,
+		HeartbeatPeriod: "5s",
+		SessionTimeout:  "20s",
+		PoolTTL:         600,
+		PoolSize:        64,
+		GettySessionParam: getty.GettySessionParam{
+			CompressEncoding: false,
+			TcpNoDelay:       true,
+			TcpKeepAlive:     true,
+			KeepAlivePeriod:  "120s",
+			TcpRBufSize:      262144,
+			TcpWBufSize:      65536,
+			PkgWQSize:        512,
+			TcpReadTimeout:   "4s",
+			TcpWriteTimeout:  "5s",
+			WaitTimeout:      "1s",
+			MaxMsgLen:        10240000000,
+			SessionName:      "client",
+		},
+	})
+	getty.SetServerConfig(getty.ServerConfig{
+		SessionNumber:  700,
+		SessionTimeout: "20s",
+		GettySessionParam: getty.GettySessionParam{
+			CompressEncoding: false,
+			TcpNoDelay:       true,
+			TcpKeepAlive:     true,
+			KeepAlivePeriod:  "120s",
+			TcpRBufSize:      262144,
+			TcpWBufSize:      65536,
+			PkgWQSize:        512,
+			TcpReadTimeout:   "1s",
+			TcpWriteTimeout:  "5s",
+			WaitTimeout:      "1s",
+			MaxMsgLen:        10240000000,
+			SessionName:      "server",
+		}})
+
+	// Export
+	proto := GetProtocol()
+	url, err := common.NewURL("dubbo://127.0.0.1:20702/UserProvider?anyhost=true&" +
+		"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"
+
+		"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"
+
+		"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"
+
+		"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
+	assert.NoError(t, err)
+	proto.Export(&proxy_factory.ProxyInvoker{
+		BaseInvoker: *protocol.NewBaseInvoker(url),
+	})
+
+	time.Sleep(time.Second * 2)
+
+	return proto, url
+}
+
+//////////////////////////////////
+// provider
+//////////////////////////////////
+
+type (
+	User struct {
+		Id   string `json:"id"`
+		Name string `json:"name"`
+	}
+
+	UserProvider struct {
+		user map[string]User
+	}
+)
+
+// size:4801228
+func (u *UserProvider) GetBigPkg(ctx context.Context, req []interface{}, rsp *User) error
{
+	argBuf := new(bytes.Buffer)
+	for i := 0; i < 400; i++ {
+		argBuf.WriteString("击鼓其镗,踊跃用兵。土国城漕,我独南行。从孙子仲,平陈与宋。不我以归,忧心有忡。爰居爰处?爰丧其马?于以求之?于林之下。死生契阔,与子成说。执子之手,与子偕老。于嗟阔兮,不我活兮。于嗟洵兮,不我信兮。")
+		argBuf.WriteString("击鼓其镗,踊跃用兵。土国城漕,我独南行。从孙子仲,平陈与宋。不我以归,忧心有忡。爰居爰处?爰丧其马?于以求之?于林之下。死生契阔,与子成说。执子之手,与子偕老。于嗟阔兮,不我活兮。于嗟洵兮,不我信兮。")

Review comment:
       I do not think Chinese is ok. Pls mod to English...

##########
File path: remoting/exchange_client.go
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remoting
+
+import (
+	"errors"
+	"sync"
+	"time"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+)
+
+var (
+	// store requestID and response
+	pendingResponses *sync.Map = new(sync.Map)
+)
+
+type SequenceType int64
+
+// It is interface of client for network communication.
+// If you use getty as network communication, you should define GettyClient that implements
this interface.
+type Client interface {
+	SetExchangeClient(client *ExchangeClient)
+	// responseHandler is used to deal with msg
+	SetResponseHandler(responseHandler ResponseHandler)
+	// connect url
+	Connect(url common.URL) error
+	// close
+	Close()
+	// send request to server.
+	Request(request *Request, timeout time.Duration, response *PendingResponse) error
+}
+
+// This is abstraction level. it is like facade.
+type ExchangeClient struct {
+	ConnectTimeout time.Duration
+	address        string
+	client         Client
+	init           bool
+}
+
+// handle the message from server
+type ResponseHandler interface {
+	Handler(response *Response)
+}
+
+// create ExchangeClient
+func NewExchangeClient(url common.URL, client Client, connectTimeout time.Duration, lazyInit
bool) *ExchangeClient {
+	exchangeClient := &ExchangeClient{
+		ConnectTimeout: connectTimeout,
+		address:        url.Location,
+		client:         client,
+	}
+	client.SetExchangeClient(exchangeClient)
+	if !lazyInit {
+		if err := exchangeClient.doInit(url); err != nil {
+			return nil
+		}
+	}
+
+	client.SetResponseHandler(exchangeClient)
+	return exchangeClient
+}
+
+func (cl *ExchangeClient) doInit(url common.URL) error {
+	if cl.init {
+		return nil
+	}
+	if cl.client.Connect(url) != nil {
+		//retry for a while
+		time.Sleep(100 * time.Millisecond)
+		if cl.client.Connect(url) != nil {
+			logger.Errorf("Failed to connect server %+v " + url.Location)
+			return errors.New("Failed to connect server " + url.Location)
+		}
+	}
+	//FIXME atomic operation
+	cl.init = true

Review comment:
       need atomic.Bool

##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +141,74 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}

Review comment:
       Is dubboProtocol.ExporterMap()  may be nil?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


Mime
View raw message