rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vongosl...@apache.org
Subject [rocketmq] branch develop updated: [ISSUE #411] Fixed ClassCastException when get the instance of the store (#423)
Date Fri, 19 Oct 2018 06:19:02 GMT
This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4a22c0c  [ISSUE #411] Fixed ClassCastException when get the instance of the store
(#423)
4a22c0c is described below

commit 4a22c0c0c1cef3b6370205651cb098a3d36927f7
Author: jungle <353187194@qq.com>
AuthorDate: Fri Oct 19 14:18:56 2018 +0800

    [ISSUE #411] Fixed ClassCastException when get the instance of the store (#423)
    
    * Fixed issue #411
    
    * fix cast in getAllDelayOffset
    
    * Update AdminBrokerProcessor.java
---
 .../rocketmq/broker/plugin/AbstractPluginMessageStore.java   |  6 ++++++
 .../rocketmq/broker/processor/AdminBrokerProcessor.java      | 12 ++++++++++--
 .../java/org/apache/rocketmq/store/DefaultMessageStore.java  |  1 +
 .../main/java/org/apache/rocketmq/store/MessageStore.java    |  8 ++++++++
 4 files changed, 25 insertions(+), 2 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index f6f8a80..e66cead 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.QueryMessageResult;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
 public abstract class AbstractPluginMessageStore implements MessageStore {
     protected MessageStore next = null;
@@ -246,4 +247,9 @@ public abstract class AbstractPluginMessageStore implements MessageStore
{
     public ConsumeQueue getConsumeQueue(String topic, int queueId) {
         return next.getConsumeQueue(topic, queueId);
     }
+
+    @Override
+    public BrokerStatsManager getBrokerStatsManager() {
+        return next.getBrokerStatsManager();
+    };
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 356aafc..73fe439 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -114,6 +114,7 @@ import org.apache.rocketmq.store.ConsumeQueue;
 import org.apache.rocketmq.store.ConsumeQueueExt;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.MessageFilter;
+import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 
 public class AdminBrokerProcessor implements NettyRequestProcessor {
@@ -760,12 +761,19 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
     private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand
request) {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
 
+        if (!(this.brokerController.getMessageStore() instanceof DefaultMessageStore)) {
+            log.error("Delay offset not supported in this messagetore, client: {} ", ctx.channel().remoteAddress());
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("Delay offset not supported in this messagetore");
+            return response;
+        }
+
         String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode();
         if (content != null && content.length() > 0) {
             try {
                 response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
             } catch (UnsupportedEncodingException e) {
-                log.error("get all delay offset from master error.", e);
+                log.error("Get all delay offset from master error.", e);
 
                 response.setCode(ResponseCode.SYSTEM_ERROR);
                 response.setRemark("UnsupportedEncodingException " + e);
@@ -1051,7 +1059,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         final ViewBrokerStatsDataRequestHeader requestHeader =
             (ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class);
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
-        DefaultMessageStore messageStore = (DefaultMessageStore) this.brokerController.getMessageStore();
+        MessageStore messageStore = this.brokerController.getMessageStore();
 
         StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(),
requestHeader.getStatsKey());
         if (null == statsItem) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 1ade7c2..ff431ed 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1371,6 +1371,7 @@ public class DefaultMessageStore implements MessageStore {
         cq.putMessagePositionInfoWrapper(dispatchRequest);
     }
 
+    @Override
     public BrokerStatsManager getBrokerStatsManager() {
         return brokerStatsManager;
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 907dfe2..0f9b4f0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -21,6 +21,7 @@ import java.util.LinkedList;
 import java.util.Set;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
 /**
  * This class defines contracting interfaces to implement, allowing third-party vendor to
use customized message store.
@@ -358,4 +359,11 @@ public interface MessageStore {
      * @return Consume queue.
      */
     ConsumeQueue getConsumeQueue(String topic, int queueId);
+
+    /**
+     * Get BrokerStatsManager of the messageStore.
+     *
+     * @return BrokerStatsManager.
+     */
+    BrokerStatsManager getBrokerStatsManager();
 }


Mime
View raw message