rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenf...@apache.org
Subject [rocketmq-client-go] branch native updated: change transaction producer about struct, interface, chan type from struct to struct pointer (#373)
Date Tue, 07 Jan 2020 03:55:25 GMT
This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 088ab71  change transaction producer about struct, interface, chan type from struct
to struct pointer (#373)
088ab71 is described below

commit 088ab716498a0efab35942be8918b02c21cc600a
Author: zZ <320314319@qq.com>
AuthorDate: Tue Jan 7 11:55:15 2020 +0800

    change transaction producer about struct, interface, chan type from struct to struct pointer
(#373)
---
 examples/producer/transaction/main.go |  8 +++-----
 internal/callback.go                  |  2 +-
 internal/client.go                    |  4 ++--
 primitive/message.go                  |  4 ++--
 producer/producer.go                  | 16 +++++++++-------
 5 files changed, 17 insertions(+), 17 deletions(-)

diff --git a/examples/producer/transaction/main.go b/examples/producer/transaction/main.go
index 8b017db..1b5181e 100644
--- a/examples/producer/transaction/main.go
+++ b/examples/producer/transaction/main.go
@@ -42,7 +42,7 @@ func NewDemoListener() *DemoListener {
 	}
 }
 
-func (dl *DemoListener) ExecuteLocalTransaction(msg primitive.Message) primitive.LocalTransactionState
{
+func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState
{
 	nextIndex := atomic.AddInt32(&dl.transactionIndex, 1)
 	fmt.Printf("nextIndex: %v for transactionID: %v\n", nextIndex, msg.TransactionId)
 	status := nextIndex % 3
@@ -52,8 +52,8 @@ func (dl *DemoListener) ExecuteLocalTransaction(msg primitive.Message) primitive
 	return primitive.UnknowState
 }
 
-func (dl *DemoListener) CheckLocalTransaction(msg primitive.MessageExt) primitive.LocalTransactionState
{
-	fmt.Printf("msg transactionID : %v\n", msg.TransactionId)
+func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState
{
+	fmt.Printf("%v msg transactionID : %v\n", time.Now(), msg.TransactionId)
 	v, existed := dl.localTrans.Load(msg.TransactionId)
 	if !existed {
 		fmt.Printf("unknow msg: %v, return Commit", msg)
@@ -74,8 +74,6 @@ func (dl *DemoListener) CheckLocalTransaction(msg primitive.MessageExt)
primitiv
 		fmt.Printf("checkLocalTransaction default COMMIT_MESSAGE: %v\n", msg)
 		return primitive.CommitMessageState
 	}
-
-	return primitive.UnknowState
 }
 
 func main() {
diff --git a/internal/callback.go b/internal/callback.go
index 2ff182c..761fe36 100644
--- a/internal/callback.go
+++ b/internal/callback.go
@@ -26,6 +26,6 @@ import (
 // remotingClient callback TransactionProducer
 type CheckTransactionStateCallback struct {
 	Addr   net.Addr
-	Msg    primitive.MessageExt
+	Msg    *primitive.MessageExt
 	Header CheckTransactionStateRequestHeader
 }
diff --git a/internal/client.go b/internal/client.go
index 50fba22..acc25ea 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -212,9 +212,9 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{})
R
 				rlog.Warning("producer group is not equal", nil)
 				return nil
 			}
-			callback := CheckTransactionStateCallback{
+			callback := &CheckTransactionStateCallback{
 				Addr:   addr,
-				Msg:    *msgExt,
+				Msg:    msgExt,
 				Header: *header,
 			}
 			callbackCh <- callback
diff --git a/primitive/message.go b/primitive/message.go
index 8633dae..4693aa5 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -423,11 +423,11 @@ const (
 
 type TransactionListener interface {
 	//  When send transactional prepare(half) message succeed, this method will be invoked to
execute local transaction.
-	ExecuteLocalTransaction(Message) LocalTransactionState
+	ExecuteLocalTransaction(*Message) LocalTransactionState
 
 	// When no response to prepare(half) message. broker will send check message to check the
transaction status, and this
 	// method will be invoked to get local transaction status.
-	CheckLocalTransaction(MessageExt) LocalTransactionState
+	CheckLocalTransaction(*MessageExt) LocalTransactionState
 }
 
 type MessageID struct {
diff --git a/producer/producer.go b/producer/producer.go
index f82104f..8e0661f 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -421,7 +421,7 @@ func (tp *transactionProducer) Shutdown() error {
 func (tp *transactionProducer) checkTransactionState() {
 	for ch := range tp.producer.callbackCh {
 		switch callback := ch.(type) {
-		case internal.CheckTransactionStateCallback:
+		case *internal.CheckTransactionStateCallback:
 			localTransactionState := tp.listener.CheckLocalTransaction(callback.Msg)
 			uniqueKey := callback.Msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
 			if uniqueKey == "" {
@@ -442,11 +442,13 @@ func (tp *transactionProducer) checkTransactionState() {
 
 			err := tp.producer.client.InvokeOneWay(context.Background(), callback.Addr.String(), req,
 				tp.producer.options.SendMsgTimeout)
-			rlog.Error("send ReqENDTransaction to broker error", map[string]interface{}{
-				"callback":               callback.Addr.String(),
-				"request":                req.String(),
-				rlog.LogKeyUnderlayError: err,
-			})
+			if err != nil {
+				rlog.Error("send ReqENDTransaction to broker error", map[string]interface{}{
+					"callback":               callback.Addr.String(),
+					"request":                req.String(),
+					rlog.LogKeyUnderlayError: err,
+				})
+			}
 		default:
 			rlog.Error(fmt.Sprintf("unknown type %v", ch), nil)
 		}
@@ -471,7 +473,7 @@ func (tp *transactionProducer) SendMessageInTransaction(ctx context.Context,
msg
 		if len(transactionId) > 0 {
 			msg.TransactionId = transactionId
 		}
-		localTransactionState = tp.listener.ExecuteLocalTransaction(*msg)
+		localTransactionState = tp.listener.ExecuteLocalTransaction(msg)
 		if localTransactionState != primitive.CommitMessageState {
 			rlog.Error("executeLocalTransaction but state unexpected", map[string]interface{}{
 				"localState": localTransactionState,


Mime
View raw message