rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vongosl...@apache.org
Subject [rocketmq-client-python] branch master updated: Add a function which shows how to use rocketmq in multi-threaded scenarios properly to handle exception such as Namer Server Cluster and Broker Cluster restart
Date Mon, 19 Oct 2020 01:27:38 GMT
This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-python.git


The following commit(s) were added to refs/heads/master by this push:
     new 89ad6bd  Add a function which shows how to use rocketmq in multi-threaded scenarios
properly to handle exception such as Namer Server Cluster and Broker Cluster restart
     new 48f7cf9  Merge pull request #100 from tom0392/multi-threaded
89ad6bd is described below

commit 89ad6bde4351ecafa582770a3826ac9c660134b9
Author: tangzhongrui <tangzhongrui_yewu@cmss.chinamobile.com>
AuthorDate: Sat Oct 17 10:02:49 2020 +0800

    Add a function which shows how to use rocketmq in multi-threaded scenarios properly to
handle exception such as Namer Server Cluster and Broker Cluster restart
---
 samples/producer.py | 35 ++++++++++++++++++++++++++++++++++-
 1 file changed, 34 insertions(+), 1 deletion(-)

diff --git a/samples/producer.py b/samples/producer.py
index fb90b7b..f69534c 100644
--- a/samples/producer.py
+++ b/samples/producer.py
@@ -19,11 +19,12 @@
 from rocketmq.client import Producer, Message, TransactionMQProducer, TransactionStatus
 
 import time
+import threading
 
 topic = 'TopicTest'
 gid = 'test'
 name_srv = '127.0.0.1:9876'
-
+MUTEX = threading.Lock()
 
 def create_message():
     msg = Message(topic)
@@ -46,6 +47,38 @@ def send_message_sync(count):
     producer.shutdown()
 
 
+def send_message_multi_threaded(retry_time):
+    producer = Producer(gid)
+    producer.set_name_server_address(name_srv)
+    msg = create_message()
+
+    global MUTEX
+    MUTEX.acquire()
+    try:
+        producer.start()
+    except Exception as e:
+        print('ProducerStartFailed:', e)
+        MUTEX.release()
+        return
+
+    try:
+        for i in range(retry_time):
+            ret = producer.send_sync(msg)
+            if ret.status == 0:
+                print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
+                break
+            else:
+                print('send message to MQ failed.')
+            if i == (retry_time - 1):
+                print('send message to MQ failed after retries.')
+    except Exception as e:
+        print('ProducerSendSyncFailed:', e)
+    finally:
+        producer.shutdown()
+        MUTEX.release()
+        return
+
+
 def send_orderly_with_sharding_key(count):
     producer = Producer(gid, True)
     producer.set_name_server_address(name_srv)


Mime
View raw message