rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-go] branch native updated: fix(retry): can not consume the messages in the retry topic (#437)
Date Tue, 03 Mar 2020 06:49:31 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 2c78dae  fix(retry): can not consume the messages in the retry topic (#437)
2c78dae is described below

commit 2c78dae5b5e00d20b376dc39b49364c8d90bf49d
Author: 秦宁 <q383175101@163.com>
AuthorDate: Tue Mar 3 14:49:22 2020 +0800

    fix(retry): can not consume the messages in the retry topic (#437)
    
    fix(retry): can not consume the messages in the retry topic (#437)
---
 consumer/push_consumer.go | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index cb14688..321c59f 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -22,6 +22,7 @@ import (
 	"fmt"
 	"math"
 	"strconv"
+	"strings"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -803,6 +804,12 @@ func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.M
 	}
 
 	f, exist := pc.consumeFunc.Contains(subMsgs[0].Topic)
+
+	// fix lost retry message
+	if !exist && strings.HasPrefix(subMsgs[0].Topic, internal.RetryGroupTopicPrefix)
{
+		f, exist = pc.consumeFunc.Contains(subMsgs[0].GetProperty(primitive.PropertyRetryTopic))
+	}
+
 	if !exist {
 		return ConsumeRetryLater, fmt.Errorf("the consume callback missing for topic: %s", subMsgs[0].Topic)
 	}


Mime
View raw message