rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq-client-go] branch master updated: feat(produce) : reduce memory copy for on-wire protocol
Date Mon, 01 Mar 2021 09:25:36 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 9736ba8  feat(produce) : reduce memory copy for on-wire protocol
     new 6cd3181  Merge pull request #607 from shenhui0509/master
9736ba8 is described below

commit 9736ba88b63320baebdac0396171bcc7b51c70b8
Author: shenhui.backend <shenhui.backend@bytedance.com>
AuthorDate: Thu Feb 11 11:21:21 2021 +0800

    feat(produce) : reduce memory copy for on-wire protocol
---
 internal/remote/codec.go         | 38 ++++++++++++++++++++++++++++++++++++++
 internal/remote/remote_client.go |  8 +++-----
 internal/remote/tcp_conn.go      |  2 ++
 3 files changed, 43 insertions(+), 5 deletions(-)

diff --git a/internal/remote/codec.go b/internal/remote/codec.go
index f756c11..1c6e0a5 100644
--- a/internal/remote/codec.go
+++ b/internal/remote/codec.go
@@ -20,6 +20,7 @@ import (
 	"bytes"
 	"encoding/binary"
 	"fmt"
+	"io"
 	"sync/atomic"
 
 	jsoniter "github.com/json-iterator/go"
@@ -132,6 +133,43 @@ var (
 // +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 // + len  |   4bytes   |     4bytes    | (21 + r_len + e_len) bytes | remain bytes +
 // +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+func (command *RemotingCommand) WriteTo(w io.Writer) error {
+	var (
+		header []byte
+		err    error
+	)
+
+	switch codecType {
+	case JsonCodecs:
+		header, err = jsonSerializer.encodeHeader(command)
+	case RocketMQCodecs:
+		header, err = rocketMqSerializer.encodeHeader(command)
+	}
+
+	if err != nil {
+		return err
+	}
+
+	frameSize := 4 + len(header) + len(command.Body)
+	err = binary.Write(w, binary.BigEndian, int32(frameSize))
+	if err != nil {
+		return err
+	}
+
+	err = binary.Write(w, binary.BigEndian, markProtocolType(int32(len(header))))
+	if err != nil {
+		return err
+	}
+
+	_, err = w.Write(header)
+	if err != nil {
+		return err
+	}
+
+	_, err = w.Write(command.Body)
+	return err
+}
+
 func encode(command *RemotingCommand) ([]byte, error) {
 	var (
 		header []byte
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index dae364e..5e2aa83 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -272,11 +272,9 @@ func (c *remotingClient) sendRequest(conn *tcpConnWrapper, request *RemotingComm
 }
 
 func (c *remotingClient) doRequest(conn *tcpConnWrapper, request *RemotingCommand) error
{
-	content, err := encode(request)
-	if err != nil {
-		return err
-	}
-	_, err = conn.Write(content)
+	conn.Lock()
+	defer conn.Unlock()
+	err := request.WriteTo(conn)
 	if err != nil {
 		c.closeConnection(conn)
 		return err
diff --git a/internal/remote/tcp_conn.go b/internal/remote/tcp_conn.go
index dda7dcf..ae340c6 100644
--- a/internal/remote/tcp_conn.go
+++ b/internal/remote/tcp_conn.go
@@ -19,6 +19,7 @@ package remote
 import (
 	"context"
 	"net"
+	"sync"
 
 	"go.uber.org/atomic"
 )
@@ -26,6 +27,7 @@ import (
 // TODO: Adding TCP Connections Pool, https://github.com/apache/rocketmq-client-go/v2/issues/298
 type tcpConnWrapper struct {
 	net.Conn
+	sync.Mutex
 	closed atomic.Bool
 }
 


Mime
View raw message