rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenf...@apache.org
Subject [rocketmq-client-go] branch native updated: [ISSUE #354] feat: Support PanicHandler (#355)
Date Tue, 07 Jan 2020 12:25:50 GMT
This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 7308bc9  [ISSUE #354] feat: Support PanicHandler (#355)
7308bc9 is described below

commit 7308bc94369320195652243059f63c71bfafc74b
Author: xujianhai666 <52450794+xujianhai666@users.noreply.github.com>
AuthorDate: Tue Jan 7 20:25:44 2020 +0800

    [ISSUE #354] feat: Support PanicHandler (#355)
    
    * feat: Support PanicHandler
    
    Closes #354
---
 consumer/push_consumer.go        | 23 +++++++++++------------
 consumer/statistics.go           | 25 +++++++++++++------------
 errors.go                        |  1 -
 internal/client.go               | 17 ++++++++---------
 internal/remote/remote_client.go | 16 ++++++++++------
 internal/trace.go                | 16 ++++++++++++----
 internal/utils/fun.go            | 18 ------------------
 primitive/base.go                | 15 +++++++++++++++
 producer/producer.go             |  4 +++-
 9 files changed, 72 insertions(+), 63 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 0c7f224..731eb9d 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -66,7 +66,6 @@ type pushConsumer struct {
 	subscribedTopic              map[string]string
 	interceptor                  primitive.Interceptor
 	queueLock                    *QueueLock
-	lockTicker                   *time.Ticker
 	done                         chan struct{}
 	closeOnce                    sync.Once
 }
@@ -107,7 +106,6 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
 		defaultConsumer: dc,
 		subscribedTopic: make(map[string]string, 0),
 		queueLock:       newQueueLock(),
-		lockTicker:      time.NewTicker(dc.option.RebalanceLockInterval),
 		done:            make(chan struct{}, 1),
 		consumeFunc:     utils.NewSet(),
 	}
@@ -168,14 +166,16 @@ func (pc *pushConsumer) Start() error {
 		pc.Rebalance()
 		time.Sleep(1 * time.Second)
 
-		go func() {
+		go primitive.WithRecover(func() {
 			// initial lock.
 			time.Sleep(1000 * time.Millisecond)
 			pc.lockAll()
 
+			lockTicker := time.NewTicker(pc.option.RebalanceLockInterval)
+			defer lockTicker.Stop()
 			for {
 				select {
-				case <-pc.lockTicker.C:
+				case <-lockTicker.C:
 					pc.lockAll()
 				case <-pc.done:
 					rlog.Info("push consumer close tick.", map[string]interface{}{
@@ -184,7 +184,7 @@ func (pc *pushConsumer) Start() error {
 					return
 				}
 			}
-		}()
+		})
 	})
 
 	if err != nil {
@@ -209,7 +209,6 @@ func (pc *pushConsumer) Start() error {
 func (pc *pushConsumer) Shutdown() error {
 	var err error
 	pc.closeOnce.Do(func() {
-		pc.lockTicker.Stop()
 		close(pc.done)
 
 		pc.client.UnregisterConsumer(pc.consumerGroup)
@@ -438,7 +437,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 	})
 	var sleepTime time.Duration
 	pq := request.pq
-	go func() {
+	go primitive.WithRecover(func() {
 		for {
 			select {
 			case <-pc.done:
@@ -450,7 +449,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 				pc.submitToConsume(request.pq, request.mq)
 			}
 		}
-	}()
+	})
 
 	for {
 	NEXT:
@@ -683,13 +682,13 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 			})
 			request.nextOffset = result.NextBeginOffset
 			pq.WithDropped(true)
-			go func() {
+			go primitive.WithRecover(func() {
 				time.Sleep(10 * time.Second)
 				pc.storage.update(request.mq, request.nextOffset, false)
 				pc.storage.persist([]*primitive.MessageQueue{request.mq})
 				pc.storage.remove(request.mq)
 				rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
-			}()
+			})
 		default:
 			rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil)
 			sleepTime = _PullDelayTimeWhenError
@@ -866,7 +865,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
 			subMsgs = msgs[count:next]
 			count = next - 1
 		}
-		go func() {
+		go primitive.WithRecover(func() {
 		RETRY:
 			if pq.IsDroppd() {
 				rlog.Info("the message queue not be able to consume, because it was dropped", map[string]interface{}{
@@ -948,7 +947,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
 					"message":               msgs,
 				})
 			}
-		}()
+		})
 	}
 }
 
diff --git a/consumer/statistics.go b/consumer/statistics.go
index a58ff9e..fdc6379 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -24,6 +24,7 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
 )
 
@@ -148,7 +149,7 @@ func newStatsItemSet(statsName string) *statsItemSet {
 }
 
 func (sis *statsItemSet) init() {
-	go func() {
+	go primitive.WithRecover(func() {
 		ticker := time.NewTicker(10 * time.Second)
 		defer ticker.Stop()
 		for {
@@ -160,9 +161,9 @@ func (sis *statsItemSet) init() {
 
 			}
 		}
-	}()
+	})
 
-	go func() {
+	go primitive.WithRecover(func() {
 		ticker := time.NewTicker(10 * time.Minute)
 		defer ticker.Stop()
 		for {
@@ -173,9 +174,9 @@ func (sis *statsItemSet) init() {
 				sis.samplingInMinutes()
 			}
 		}
-	}()
+	})
 
-	go func() {
+	go primitive.WithRecover(func() {
 		ticker := time.NewTicker(time.Hour)
 		defer ticker.Stop()
 		for {
@@ -186,9 +187,9 @@ func (sis *statsItemSet) init() {
 				sis.samplingInHour()
 			}
 		}
-	}()
+	})
 
-	go func() {
+	go primitive.WithRecover(func() {
 		time.Sleep(nextMinutesTime().Sub(time.Now()))
 		ticker := time.NewTicker(time.Minute)
 		defer ticker.Stop()
@@ -200,9 +201,9 @@ func (sis *statsItemSet) init() {
 				sis.printAtMinutes()
 			}
 		}
-	}()
+	})
 
-	go func() {
+	go primitive.WithRecover(func() {
 		time.Sleep(nextHourTime().Sub(time.Now()))
 		ticker := time.NewTicker(time.Hour)
 		defer ticker.Stop()
@@ -214,9 +215,9 @@ func (sis *statsItemSet) init() {
 				sis.printAtHour()
 			}
 		}
-	}()
+	})
 
-	go func() {
+	go primitive.WithRecover(func() {
 		time.Sleep(nextMonthTime().Sub(time.Now()))
 		ticker := time.NewTicker(24 * time.Hour)
 		defer ticker.Stop()
@@ -228,7 +229,7 @@ func (sis *statsItemSet) init() {
 				sis.printAtDay()
 			}
 		}
-	}()
+	})
 }
 
 func (sis *statsItemSet) samplingInSeconds() {
diff --git a/errors.go b/errors.go
index 6a774aa..fe9ba33 100644
--- a/errors.go
+++ b/errors.go
@@ -22,7 +22,6 @@ import (
 )
 
 var (
-	// ErrRequestTimeout for request timeout error
 	ErrRequestTimeout = errors.New("request timeout")
 	ErrMQEmpty        = errors.New("MessageQueue is nil")
 	ErrOffset         = errors.New("offset < 0")
diff --git a/internal/client.go b/internal/client.go
index ca8cc88..86a02cb 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -282,7 +282,7 @@ func (c *rmqClient) Start() {
 		}
 
 		// schedule update route info
-		go func() {
+		go primitive.WithRecover(func() {
 			// delay
 			ticker := time.NewTicker(_PullNameServerInterval)
 			defer ticker.Stop()
@@ -298,10 +298,9 @@ func (c *rmqClient) Start() {
 					return
 				}
 			}
-		}()
+		})
 
-		// TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
-		go func() {
+		go primitive.WithRecover(func() {
 			ticker := time.NewTicker(_HeartbeatBrokerInterval)
 			defer ticker.Stop()
 			for {
@@ -316,10 +315,10 @@ func (c *rmqClient) Start() {
 					return
 				}
 			}
-		}()
+		})
 
 		// schedule persist offset
-		go func() {
+		go primitive.WithRecover(func() {
 			ticker := time.NewTicker(_PersistOffsetInterval)
 			defer ticker.Stop()
 			for {
@@ -342,9 +341,9 @@ func (c *rmqClient) Start() {
 					return
 				}
 			}
-		}()
+		})
 
-		go func() {
+		go primitive.WithRecover(func() {
 			ticker := time.NewTicker(_RebalanceInterval)
 			defer ticker.Stop()
 			for {
@@ -358,7 +357,7 @@ func (c *rmqClient) Start() {
 					return
 				}
 			}
-		}()
+		})
 	})
 }
 
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 3d2bf7f..58abd3a 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -95,7 +95,9 @@ func (c *remotingClient) InvokeAsync(ctx context.Context, addr string, request
*
 	if err != nil {
 		return err
 	}
-	go c.receiveAsync(resp)
+	go primitive.WithRecover(func() {
+		c.receiveAsync(resp)
+	})
 	return nil
 }
 
@@ -127,7 +129,9 @@ func (c *remotingClient) connect(ctx context.Context, addr string) (*tcpConnWrap
 		return nil, err
 	}
 	c.connectionTable.Store(addr, tcpConn)
-	go c.receiveResponse(tcpConn)
+	go primitive.WithRecover(func() {
+		c.receiveResponse(tcpConn)
+	})
 	return tcpConn, nil
 }
 
@@ -196,20 +200,20 @@ func (c *remotingClient) processCMD(cmd *RemotingCommand, r *tcpConnWrapper)
{
 		if exist {
 			c.responseTable.Delete(cmd.Opaque)
 			responseFuture := resp.(*ResponseFuture)
-			go func() {
+			go primitive.WithRecover(func() {
 				responseFuture.ResponseCommand = cmd
 				responseFuture.executeInvokeCallback()
 				if responseFuture.Done != nil {
 					responseFuture.Done <- true
 				}
-			}()
+			})
 		}
 	} else {
 		f := c.processors[cmd.Code]
 		if f != nil {
 			// single goroutine will be deadlock
 			// TODO: optimize with goroutine pool, https://github.com/apache/rocketmq-client-go/issues/307
-			go func() {
+			go primitive.WithRecover(func() {
 				res := f(cmd, r.RemoteAddr())
 				if res != nil {
 					res.Opaque = cmd.Opaque
@@ -222,7 +226,7 @@ func (c *remotingClient) processCMD(cmd *RemotingCommand, r *tcpConnWrapper)
{
 						})
 					}
 				}
-			}()
+			})
 		} else {
 			rlog.Warning("receive broker's requests, but no func to handle", map[string]interface{}{
 				"responseCode": cmd.Code,
diff --git a/internal/trace.go b/internal/trace.go
index 212ee60..9549bba 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -257,7 +257,9 @@ func (td *traceDispatcher) GetTraceTopicName() string {
 func (td *traceDispatcher) Start() {
 	td.running = true
 	td.cli.Start()
-	go td.process()
+	go primitive.WithRecover(func() {
+		td.process()
+	})
 }
 
 func (td *traceDispatcher) Close() {
@@ -299,7 +301,9 @@ func (td *traceDispatcher) process() {
 			batch = append(batch, ctx)
 			if count == batchSize {
 				count = 0
-				go td.batchCommit(batch)
+				go primitive.WithRecover(func() {
+					td.batchCommit(batch)
+				})
 				batch = make([]TraceContext, 0)
 			}
 		case <-td.ticker.C:
@@ -308,12 +312,16 @@ func (td *traceDispatcher) process() {
 				count++
 				lastput = time.Now()
 				if len(batch) > 0 {
-					go td.batchCommit(batch)
+					go primitive.WithRecover(func() {
+						td.batchCommit(batch)
+					})
 					batch = make([]TraceContext, 0)
 				}
 			}
 		case <-td.ctx.Done():
-			go td.batchCommit(batch)
+			go primitive.WithRecover(func() {
+				td.batchCommit(batch)
+			})
 			batch = make([]TraceContext, 0)
 
 			now := time.Now().UnixNano() / int64(time.Millisecond)
diff --git a/internal/utils/fun.go b/internal/utils/fun.go
deleted file mode 100644
index 4e9e4ea..0000000
--- a/internal/utils/fun.go
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package utils
diff --git a/primitive/base.go b/primitive/base.go
index ae0f06f..efc48ef 100644
--- a/primitive/base.go
+++ b/primitive/base.go
@@ -80,3 +80,18 @@ func verifyIP(ip string) error {
 	}
 	return nil
 }
+
+var PanicHandler func(interface{})
+
+func WithRecover(fn func()) {
+	defer func() {
+		handler := PanicHandler
+		if handler != nil {
+			if err := recover(); err != nil {
+				handler(err)
+			}
+		}
+	}()
+
+	fn()
+}
diff --git a/producer/producer.go b/producer/producer.go
index a47d76f..0762762 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -411,7 +411,9 @@ func NewTransactionProducer(listener primitive.TransactionListener, opts
...Opti
 }
 
 func (tp *transactionProducer) Start() error {
-	go tp.checkTransactionState()
+	go primitive.WithRecover(func() {
+		tp.checkTransactionState()
+	})
 	return tp.producer.Start()
 }
 func (tp *transactionProducer) Shutdown() error {


Mime
View raw message