rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vongosl...@apache.org
Subject [rocketmq] branch develop updated: [ISSUE #1857] Refactor the duplicated code
Date Mon, 16 Mar 2020 12:05:42 GMT
This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1558e05  [ISSUE #1857] Refactor the duplicated code
1558e05 is described below

commit 1558e052018ae12d273e175dd3c6ba004e9e0995
Author: xu <774590465@qq.com>
AuthorDate: Mon Mar 16 20:05:32 2020 +0800

    [ISSUE #1857] Refactor the duplicated code
    
    close #1857
---
 .../impl/producer/DefaultMQProducerImpl.java       | 46 ++++++++--------------
 1 file changed, 16 insertions(+), 30 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index fca50cc..6ca4b72 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -1367,16 +1367,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 }
             }, timeout - cost);
 
-            Message responseMessage = requestResponseFuture.waitResponseMessage(timeout -
cost);
-            if (responseMessage == null) {
-                if (requestResponseFuture.isSendRequestOk()) {
-                    throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
-                        "send request message to <" + msg.getTopic() + "> OK, but wait
reply message timeout, " + timeout + " ms.");
-                } else {
-                    throw new MQClientException("send request message to <" + msg.getTopic()
+ "> fail", requestResponseFuture.getCause());
-                }
-            }
-            return responseMessage;
+            return waitResponse(msg, timeout, requestResponseFuture, cost);
         } finally {
             RequestFutureTable.getRequestFutureTable().remove(correlationId);
         }
@@ -1432,16 +1423,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 }
             }, timeout - cost);
 
-            Message responseMessage = requestResponseFuture.waitResponseMessage(timeout -
cost);
-            if (responseMessage == null) {
-                if (requestResponseFuture.isSendRequestOk()) {
-                    throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
-                        "send request message to <" + msg.getTopic() + "> OK, but wait
reply message timeout, " + timeout + " ms.");
-                } else {
-                    throw new MQClientException("send request message to <" + msg.getTopic()
+ "> fail", requestResponseFuture.getCause());
-                }
-            }
-            return responseMessage;
+            return waitResponse(msg, timeout, requestResponseFuture, cost);
         } finally {
             RequestFutureTable.getRequestFutureTable().remove(correlationId);
         }
@@ -1498,21 +1480,25 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 }
             }, null, timeout - cost);
 
-            Message responseMessage = requestResponseFuture.waitResponseMessage(timeout -
cost);
-            if (responseMessage == null) {
-                if (requestResponseFuture.isSendRequestOk()) {
-                    throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
-                        "send request message to <" + msg.getTopic() + "> OK, but wait
reply message timeout, " + timeout + " ms.");
-                } else {
-                    throw new MQClientException("send request message to <" + msg.getTopic()
+ "> fail", requestResponseFuture.getCause());
-                }
-            }
-            return responseMessage;
+            return waitResponse(msg, timeout, requestResponseFuture, cost);
         } finally {
             RequestFutureTable.getRequestFutureTable().remove(correlationId);
         }
     }
 
+    private Message waitResponse(Message msg, long timeout, RequestResponseFuture requestResponseFuture,
long cost) throws InterruptedException, RequestTimeoutException, MQClientException {
+        Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
+        if (responseMessage == null) {
+            if (requestResponseFuture.isSendRequestOk()) {
+                throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
+                    "send request message to <" + msg.getTopic() + "> OK, but wait
reply message timeout, " + timeout + " ms.");
+            } else {
+                throw new MQClientException("send request message to <" + msg.getTopic()
+ "> fail", requestResponseFuture.getCause());
+            }
+        }
+        return responseMessage;
+    }
+
     public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback,
long timeout)
         throws RemotingException, InterruptedException, MQClientException, MQBrokerException
{
         long beginTimestamp = System.currentTimeMillis();


Mime
View raw message