rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq] branch mqtt updated: [RIP-11] Add persistence interface for mqtt session (#1297)
Date Mon, 08 Jul 2019 02:36:43 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch mqtt
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/mqtt by this push:
     new 5813a8b  [RIP-11] Add persistence interface for mqtt session (#1297)
5813a8b is described below

commit 5813a8ba0e7fbb674bc5a3e41249c3343cb65c7c
Author: Aaron-He <Aaron-He@users.noreply.github.com>
AuthorDate: Mon Jul 8 10:36:35 2019 +0800

    [RIP-11] Add persistence interface for mqtt session (#1297)
    
    * implements mqtt subscription persistence
    
    * Refactor code follow RocketMQ coding style
    
    * Add quering mqtt client interface & reformat code
---
 broker/pom.xml                                     |   5 +
 .../apache/rocketmq/broker/BrokerController.java   |  58 ++++
 .../rocketmq/broker/processor/MQTTProcessor.java   | 325 +++++++++++++++++++++
 .../org/apache/rocketmq/common/BrokerConfig.java   |  24 +-
 .../rocketmq/common/protocol/RequestCode.java      |  21 ++
 ...ddOrUpdateClient2SubscriptionRequestHeader.java |  50 ++++
 ...dOrUpdateClient2SubscriptionResponseHeader.java |  26 +-
 .../AddOrUpdateRootTopic2ClientsRequestHeader.java |  38 ++-
 ...AddOrUpdateRootTopic2ClientsResponseHeader.java |  26 +-
 .../mqtt/ClientUnsubscribeRequestHeader.java       |  39 ++-
 .../mqtt/ClientUnsubscribeResponseHeader.java      |  55 ++++
 .../header/mqtt/DeleteClientRequestHeader.java     |  28 +-
 .../header/mqtt/DeleteClientResponseHeader.java    |  47 +++
 .../mqtt/DeleteRootTopic2ClientRequestHeader.java  |  38 ++-
 .../mqtt/DeleteRootTopic2ClientResponseHeader.java |  26 +-
 .../mqtt/GetClientByClientIdRequestHeader.java     |  29 +-
 .../mqtt/GetClientByClientIdResponseHeader.java    |  28 +-
 .../mqtt/GetRootTopic2ClientsRequestHeader.java    |  28 +-
 .../mqtt/GetRootTopic2ClientsResponseHeader.java   |  36 ++-
 .../mqtt/GetSnodeAddress2ClientsRequestHeader.java |  39 ++-
 .../GetSnodeAddress2ClientsResponseHeader.java     |  31 +-
 .../GetSubscriptionByClientIdRequestHeader.java    |  28 +-
 .../GetSubscriptionByClientIdResponseHeader.java   |  27 +-
 ...sClient2SubscriptionPersistedRequestHeader.java |  37 ++-
 ...Client2SubscriptionPersistedResponseHeader.java |  26 +-
 .../rocketmq/common/service/EnodeService.java      |   6 +
 mqtt/pom.xml                                       |   4 -
 .../rocketmq/mqtt/constant/MqttConstant.java       |   4 +
 .../impl/MqttSubscribeMessageHandler.java          |   2 +
 .../mqtt/persistence/PersistServiceFactory.java    |  45 +++
 .../AllocatePersistentDataConsistentHash.java      |  84 ++++++
 .../rebalance/AllocatePersistentDataStrategy.java  |  21 +-
 .../persistence/service/DefaultPersistService.java | 319 ++++++++++++++++++++
 .../mqtt/persistence/service/PersistService.java   |  47 +++
 .../processor/DefaultMqttMessageProcessor.java     |   9 +
 .../org.apache.rocketmq.mqtt.PersistService        |   1 +
 .../mqtt/MqttPublishMessageHandlerTest.java        |   4 +-
 .../remoting/util/MqttEncodeDecodeUtil.java        |   1 +
 .../snode/service/impl/LocalEnodeServiceImpl.java  |  29 +-
 .../snode/service/impl/RemoteEnodeServiceImpl.java |  13 +
 store/pom.xml                                      |   5 +
 .../rocketmq/store/DefaultMQTTInfoStore.java       |  90 ++++++
 .../org/apache/rocketmq/store/MQTTInfoStore.java   |  25 +-
 43 files changed, 1546 insertions(+), 278 deletions(-)

diff --git a/broker/pom.xml b/broker/pom.xml
index 01390fd..ff7eb6b 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -34,6 +34,7 @@
             <groupId>${project.groupId}</groupId>
             <artifactId>rocketmq-remoting</artifactId>
         </dependency>
+
         <dependency>
             <groupId>${project.groupId}</groupId>
             <artifactId>rocketmq-client</artifactId>
@@ -51,6 +52,10 @@
             <artifactId>rocketmq-acl</artifactId>
         </dependency>
         <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-mqtt</artifactId>
+        </dependency>
+        <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
         </dependency>
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index d9a618b..53b1357 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -58,6 +58,7 @@ import org.apache.rocketmq.broker.processor.AdminBrokerProcessor;
 import org.apache.rocketmq.broker.processor.ClientManageProcessor;
 import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
 import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
+import org.apache.rocketmq.broker.processor.MQTTProcessor;
 import org.apache.rocketmq.broker.processor.PullMessageProcessor;
 import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
 import org.apache.rocketmq.broker.processor.SendMessageProcessor;
@@ -97,7 +98,9 @@ import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer;
 import org.apache.rocketmq.remoting.util.ServiceProvider;
 import org.apache.rocketmq.srvutil.FileWatchService;
+import org.apache.rocketmq.store.DefaultMQTTInfoStore;
 import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MQTTInfoStore;
 import org.apache.rocketmq.store.MessageArrivingListener;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.config.BrokerRole;
@@ -139,11 +142,14 @@ public class BrokerController {
     private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
     private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
     private final BlockingQueue<Runnable> endTransactionThreadPoolQueue;
+    private final BlockingQueue<Runnable> mqttThreadPoolQueue;
     private final FilterServerManager filterServerManager;
     private final BrokerStatsManager brokerStatsManager;
     private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
     private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
     private MessageStore messageStore;
+    private MQTTProcessor mqttProcessor;
+    private MQTTInfoStore mqttInfoStore;
     private RemotingServer remotingServer;
     private RemotingServer fastRemotingServer;
     private TopicConfigManager topicConfigManager;
@@ -155,6 +161,7 @@ public class BrokerController {
     private ExecutorService heartbeatExecutor;
     private ExecutorService consumerManageExecutor;
     private ExecutorService endTransactionExecutor;
+    private ExecutorService mqttMessageExecutor;
     private boolean updateMasterHAServerAddrPeriodically = false;
     private BrokerStats brokerStats;
     private InetSocketAddress storeHost;
@@ -204,6 +211,7 @@ public class BrokerController {
         this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
         this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
         this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
+        this.mqttThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getMqttThreadPoolQueueCapacity());
 
         this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
         this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
@@ -319,6 +327,15 @@ public class BrokerController {
                 this.endTransactionThreadPoolQueue,
                 new ThreadFactoryImpl("EndTransactionThread_"));
 
+            this.mqttMessageExecutor = new BrokerFixedThreadPoolExecutor(
+                this.brokerConfig.getMqttMessageThreadPoolNums(),
+                this.brokerConfig.getMqttMessageThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.mqttThreadPoolQueue,
+                new ThreadFactoryImpl("MQTTThread_")
+            );
+
             this.consumerManageExecutor =
                 Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
                     "ConsumerManageThread_"));
@@ -475,6 +492,14 @@ public class BrokerController {
             }
             initialTransaction();
         }
+        try {
+            this.mqttInfoStore = new DefaultMQTTInfoStore();
+            mqttInfoStore.load();
+            mqttInfoStore.start();
+        } catch (Exception e) {
+            log.error("Open MQTT Database failed, error: {}", e);
+        }
+
         return result;
     }
 
@@ -609,6 +634,30 @@ public class BrokerController {
         this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
 
         /**
+         *  MQTTProcessor
+         */
+        mqttProcessor = new MQTTProcessor(this);
+        this.remotingServer.registerProcessor(RequestCode.MQTT_ADD_OR_UPDATE_CLIENT2SUBSCRIPTION, mqttProcessor, this.mqttMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.MQTT_ADD_OR_UPDATE_CLIENT2SUBSCRIPTION, mqttProcessor, this.mqttMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.MQTT_IS_CLIENT2SUBSCRIPTION_PERSISTED, mqttProcessor, this.mqttMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.MQTT_IS_CLIENT2SUBSCRIPTION_PERSISTED, mqttProcessor, this.mqttMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.MQTT_DELETE_CLIENT2SUBSCRIPTION, mqttProcessor, this.mqttMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.MQTT_DELETE_CLIENT2SUBSCRIPTION, mqttProcessor, this.mqttMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.MQTT_GET_SNODEADDRESS2CLIENT, mqttProcessor, this.mqttMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.MQTT_GET_SNODEADDRESS2CLIENT, mqttProcessor, this.mqttMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.MQTT_CLIENT_UNSUBSRIBE, mqttProcessor, this.mqttMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.MQTT_CLIENT_UNSUBSRIBE, mqttProcessor, this.mqttMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.MQTT_ADD_OR_UPDATE_ROOTTOPIC2CLIENTS, mqttProcessor, this.mqttMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.MQTT_ADD_OR_UPDATE_ROOTTOPIC2CLIENTS, mqttProcessor, this.mqttMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.MQTT_DELETE_ROOTTOPIC2CLIENT, mqttProcessor, this.mqttMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.MQTT_DELETE_ROOTTOPIC2CLIENT, mqttProcessor, this.mqttMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.MQTT_GET_ROOTTOPIC2CLIENTS, mqttProcessor, this.mqttMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.MQTT_GET_ROOTTOPIC2CLIENTS, mqttProcessor, this.mqttMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.MQTT_GET_SUBSCRIPTION_BY_CLIENT_ID, mqttProcessor, this.mqttMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.MQTT_GET_SUBSCRIPTION_BY_CLIENT_ID, mqttProcessor, this.mqttMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.MQTT_GET_CLIENT_BY_CLIENTID_ID, mqttProcessor, this.mqttMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.MQTT_GET_CLIENT_BY_CLIENTID_ID, mqttProcessor, this.mqttMessageExecutor);
+        /**
          * Default
          */
         this.adminProcessor = new AdminBrokerProcessor(this);
@@ -1243,4 +1292,13 @@ public class BrokerController {
     public ConsumerManageProcessor getConsumerManageProcessor() {
         return consumerManageProcessor;
     }
+
+    public MQTTInfoStore getMqttInfoStore() {
+        return mqttInfoStore;
+    }
+
+    public MQTTProcessor getMqttProcessor() {
+        return mqttProcessor;
+    }
+
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/MQTTProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/MQTTProcessor.java
new file mode 100644
index 0000000..0ebc062
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/MQTTProcessor.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.processor;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import java.lang.reflect.Type;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.client.Client;
+import org.apache.rocketmq.common.client.Subscription;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateClient2SubscriptionRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateClient2SubscriptionResponseHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateRootTopic2ClientsRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateRootTopic2ClientsResponseHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.ClientUnsubscribeRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.ClientUnsubscribeResponseHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.DeleteClientRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.DeleteClientResponseHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.DeleteRootTopic2ClientRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.DeleteRootTopic2ClientResponseHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.GetClientByClientIdRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.GetClientByClientIdResponseHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.GetRootTopic2ClientsRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.GetRootTopic2ClientsResponseHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.GetSnodeAddress2ClientsRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.GetSnodeAddress2ClientsResponseHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.GetSubscriptionByClientIdRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.GetSubscriptionByClientIdResponseHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.IsClient2SubscriptionPersistedRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.IsClient2SubscriptionPersistedResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.mqtt.client.MQTTSession;
+import org.apache.rocketmq.mqtt.constant.MqttConstant;
+import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.MQTTInfoStore;
+
+public class MQTTProcessor implements RequestProcessor {
+    private final BrokerController brokerController;
+    private static final Gson GSON = new Gson();
+    private final MQTTInfoStore mqttInfoStore;
+
+    public MQTTProcessor(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+        this.mqttInfoStore = this.brokerController.getMqttInfoStore();
+    }
+
+    @Override public boolean rejectRequest() {
+        return false;
+    }
+
+    @Override
+    public RemotingCommand processRequest(RemotingChannel remotingChannel,
+        RemotingCommand request) throws RemotingCommandException {
+
+        switch (request.getCode()) {
+            case RequestCode.MQTT_IS_CLIENT2SUBSCRIPTION_PERSISTED:
+                return this.isClient2SubscriptionPersistedHandler(remotingChannel, request);
+            case RequestCode.MQTT_ADD_OR_UPDATE_CLIENT2SUBSCRIPTION:
+                return this.addOrUpdateClient2Subscription(remotingChannel, request);
+            case RequestCode.MQTT_DELETE_CLIENT2SUBSCRIPTION:
+                return this.deleteClient2Subscription(request);
+            case RequestCode.MQTT_GET_SNODEADDRESS2CLIENT:
+                return this.getSnodeAddress2Clients(request);
+            case RequestCode.MQTT_CLIENT_UNSUBSRIBE:
+                return this.clientUnsubscribe(request);
+            case RequestCode.MQTT_ADD_OR_UPDATE_ROOTTOPIC2CLIENTS:
+                return this.addorUpdateRootTopic2Clients(request);
+            case RequestCode.MQTT_GET_ROOTTOPIC2CLIENTS:
+                return this.getRootTopic2Clients(request);
+            case RequestCode.MQTT_DELETE_ROOTTOPIC2CLIENT:
+                return this.deleteRootTopic2Client(request);
+            case RequestCode.MQTT_GET_SUBSCRIPTION_BY_CLIENT_ID:
+                return this.getSubscriptionByClientId(request);
+            case RequestCode.MQTT_GET_CLIENT_BY_CLIENTID_ID:
+                return this.getClientByClientId(request);
+            default:
+                return null;
+        }
+    }
+
+    private RemotingCommand getClientByClientId(RemotingCommand request) throws RemotingCommandException {
+        GetClientByClientIdRequestHeader requestHeader = (GetClientByClientIdRequestHeader) request.decodeCommandCustomHeader(GetClientByClientIdRequestHeader.class);
+        String clientId = requestHeader.getClientId();
+        Client client = GSON.fromJson(this.mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_CLIENT_SUFFIX), MQTTSession.class);
+        RemotingCommand response = RemotingCommand.createResponseCommand(GetClientByClientIdResponseHeader.class);
+        GetClientByClientIdResponseHeader responseHeader = (GetClientByClientIdResponseHeader) response.readCustomHeader();
+        responseHeader.setClient(client);
+        return response;
+    }
+
+    private RemotingCommand getSubscriptionByClientId(RemotingCommand request) throws RemotingCommandException {
+        GetSubscriptionByClientIdRequestHeader requestHeader = (GetSubscriptionByClientIdRequestHeader) request.decodeCommandCustomHeader(GetSubscriptionByClientIdRequestHeader.class);
+        String clientId = requestHeader.getClientId();
+        Subscription subscription = GSON.fromJson(this.mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX), Subscription.class);
+        RemotingCommand response = RemotingCommand.createResponseCommand(GetSubscriptionByClientIdResponseHeader.class);
+        GetSubscriptionByClientIdResponseHeader responseHeader = (GetSubscriptionByClientIdResponseHeader) response.readCustomHeader();
+        responseHeader.setSubscription(subscription);
+        return response;
+    }
+
+    private RemotingCommand isClient2SubscriptionPersistedHandler(final RemotingChannel remotingChannel,
+        final RemotingCommand request) throws RemotingCommandException {
+        RemotingCommand response = RemotingCommand.createResponseCommand(IsClient2SubscriptionPersistedResponseHeader.class);
+        IsClient2SubscriptionPersistedResponseHeader responseHeader = (IsClient2SubscriptionPersistedResponseHeader) response.readCustomHeader();
+        IsClient2SubscriptionPersistedRequestHeader requestHeader = (IsClient2SubscriptionPersistedRequestHeader) request.decodeCommandCustomHeader(IsClient2SubscriptionPersistedRequestHeader.class);
+
+        String clientId = requestHeader.getClientId();
+        boolean cleanSession = requestHeader.isCleanSession();
+
+        String clientJson = mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_CLIENT_SUFFIX);
+        if (clientJson != null) {
+            MQTTSession client = GSON.fromJson(clientJson, MQTTSession.class);
+            if (client.isCleanSession() != cleanSession) {
+                client.setCleanSession(cleanSession);
+                mqttInfoStore.putData(clientId + MqttConstant.PERSIST_CLIENT_SUFFIX, GSON.toJson(client));
+            }
+            responseHeader.setPersisted(true);
+        } else {
+            responseHeader.setPersisted(false);
+        }
+
+        mqttInfoStore.putData(clientId + MqttConstant.PERSIST_SNODEADDRESS_SUFFIX, remotingChannel.remoteAddress().toString());
+
+        return response;
+    }
+
+    private RemotingCommand addOrUpdateClient2Subscription(final RemotingChannel remotingChannel,
+        final RemotingCommand request) throws RemotingCommandException {
+        AddOrUpdateClient2SubscriptionRequestHeader requestHeader = (AddOrUpdateClient2SubscriptionRequestHeader) request.decodeCommandCustomHeader(AddOrUpdateClient2SubscriptionRequestHeader.class);
+        Client client = requestHeader.getClient();
+        Subscription subscription = requestHeader.getSubscription();
+
+        boolean client2SubResult = this.mqttInfoStore.putData(client.getClientId() + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX, GSON.toJson(subscription));
+        boolean client2SnoResult = this.mqttInfoStore.putData(client.getClientId() + MqttConstant.PERSIST_SNODEADDRESS_SUFFIX, remotingChannel.remoteAddress().toString());
+        boolean client2EntityResult = this.mqttInfoStore.putData(client.getClientId() + MqttConstant.PERSIST_CLIENT_SUFFIX, GSON.toJson(client));
+
+        RemotingCommand response = RemotingCommand.createResponseCommand(AddOrUpdateClient2SubscriptionResponseHeader.class);
+        AddOrUpdateClient2SubscriptionResponseHeader responseHeader = (AddOrUpdateClient2SubscriptionResponseHeader) response.readCustomHeader();
+        responseHeader.setResult(client2SubResult && client2SnoResult && client2EntityResult);
+        return response;
+    }
+
+    private RemotingCommand deleteClient2Subscription(final RemotingCommand request) throws RemotingCommandException {
+        DeleteClientRequestHeader requestHeader = (DeleteClientRequestHeader) request.decodeCommandCustomHeader(DeleteClientRequestHeader.class);
+        String clientId = requestHeader.getClientId();
+        String subscriptionJson = this.mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX);
+        Subscription subscription = GSON.fromJson(subscriptionJson, Subscription.class);
+        boolean operationSuccess = this.mqttInfoStore.deleteData(clientId + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX) && this.mqttInfoStore.deleteData(clientId + MqttConstant.PERSIST_SNODEADDRESS_SUFFIX) && this.mqttInfoStore.deleteData(clientId + MqttConstant.PERSIST_CLIENT_SUFFIX);
+        RemotingCommand response = RemotingCommand.createResponseCommand(DeleteClientResponseHeader.class);
+        DeleteClientResponseHeader responseHeader = (DeleteClientResponseHeader) response.readCustomHeader();
+        responseHeader.setOperationSuccess(operationSuccess);
+        responseHeader.setSubscription(subscription);
+        return response;
+    }
+
+    private RemotingCommand getSnodeAddress2Clients(final RemotingCommand request) throws RemotingCommandException {
+        Map<String, Set<Client>> snodeAddress2Clients = new ConcurrentHashMap<>();
+        Set<Client> clients = new HashSet<>();
+        GetSnodeAddress2ClientsRequestHeader requestHeader = (GetSnodeAddress2ClientsRequestHeader) request.decodeCommandCustomHeader(GetSnodeAddress2ClientsRequestHeader.class);
+        String topic = requestHeader.getTopic();
+        Set<String> clientsId = requestHeader.getClientsId();
+        for (String clientId : clientsId) {
+            ConcurrentHashMap<String/*Topic*/, SubscriptionData> subscriptionTable = GSON.fromJson(this.mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX), Subscription.class).getSubscriptionTable();
+            for (String topicFilter : subscriptionTable.keySet()) {
+                if (isMatch(topicFilter, topic)) {
+                    clients.add(GSON.fromJson(this.mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_CLIENT_SUFFIX), MQTTSession.class));
+                }
+            }
+        }
+        for (Client client : clients) {
+            String snodeAddress = this.mqttInfoStore.getValue(client.getClientId() + MqttConstant.PERSIST_SNODEADDRESS_SUFFIX);
+            Set<Client> clientsTmp = snodeAddress2Clients.getOrDefault(snodeAddress, new HashSet<>());
+            clientsTmp.add(client);
+        }
+
+        RemotingCommand response = RemotingCommand.createResponseCommand(GetSnodeAddress2ClientsResponseHeader.class);
+        GetSnodeAddress2ClientsResponseHeader responseHeader = (GetSnodeAddress2ClientsResponseHeader) response.readCustomHeader();
+        responseHeader.setSnodeAddress2Clients(snodeAddress2Clients);
+
+        return response;
+    }
+
+    private RemotingCommand clientUnsubscribe(final RemotingCommand request) throws RemotingCommandException {
+        ClientUnsubscribeRequestHeader requestHeader = (ClientUnsubscribeRequestHeader) request.decodeCommandCustomHeader(ClientUnsubscribeRequestHeader.class);
+        String clientId = requestHeader.getClientId();
+        List<String> topics = requestHeader.getTopics();
+        Subscription subscription = GSON.fromJson(this.mqttInfoStore.getValue(clientId + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX), Subscription.class);
+        ConcurrentHashMap<String, SubscriptionData> subscriptionTable = subscription.getSubscriptionTable();
+        Set<String> rootTopicsBefore = subscriptionTable.keySet().stream().map(t -> t.split(MqttConstant.SUBSCRIPTION_SEPARATOR)[0]).collect(Collectors.toSet());
+        for (String topic : topics) {
+            subscriptionTable.remove(topic);
+        }
+        Set<String> rootTopicAfter = subscriptionTable.keySet().stream().map(t -> t.split(MqttConstant.SUBSCRIPTION_SEPARATOR)[0]).collect(Collectors.toSet());
+        Set<String> rootTopicsDiff = new HashSet<>();
+        rootTopicsDiff.addAll(rootTopicsBefore);
+        rootTopicsDiff.removeAll(rootTopicAfter);
+
+        subscription.setSubscriptionTable(subscriptionTable);
+        boolean result = this.mqttInfoStore.putData(clientId + MqttConstant.PERSIST_SUBSCRIPTION_SUFFIX, GSON.toJson(subscription));
+        RemotingCommand response = RemotingCommand.createResponseCommand(ClientUnsubscribeResponseHeader.class);
+        ClientUnsubscribeResponseHeader responseHeader = (ClientUnsubscribeResponseHeader) response.readCustomHeader();
+        responseHeader.setOperationSuccess(result);
+        if (rootTopicsDiff.size() != 0) {
+            responseHeader.setRootTopicDiffExists(true);
+            responseHeader.setRootTopicsDiff(rootTopicsDiff);
+        }
+        return response;
+    }
+
+    private RemotingCommand addorUpdateRootTopic2Clients(
+        final RemotingCommand request) throws RemotingCommandException {
+        AddOrUpdateRootTopic2ClientsRequestHeader requestHeader = (AddOrUpdateRootTopic2ClientsRequestHeader) request.decodeCommandCustomHeader(AddOrUpdateRootTopic2ClientsRequestHeader.class);
+
+        String rootTopic = requestHeader.getRootTopic();
+        String clientId = requestHeader.getClientId();
+        String value = this.mqttInfoStore.getValue(rootTopic);
+        Set<String> clientsId;
+
+        if (value != null) {
+            clientsId = GSON.fromJson(value, new TypeToken<Set<String>>() {
+            }.getType());
+        } else {
+            clientsId = new HashSet<>();
+        }
+        clientsId.add(clientId);
+
+        RemotingCommand response = RemotingCommand.createResponseCommand(AddOrUpdateRootTopic2ClientsResponseHeader.class);
+        AddOrUpdateRootTopic2ClientsResponseHeader responseHeader = (AddOrUpdateRootTopic2ClientsResponseHeader) response.readCustomHeader();
+        responseHeader.setOperationSuccess(this.mqttInfoStore.putData(rootTopic, GSON.toJson(clientsId)));
+
+        return response;
+    }
+
+    private RemotingCommand getRootTopic2Clients(final RemotingCommand request) throws RemotingCommandException {
+        GetRootTopic2ClientsRequestHeader requestHeader = (GetRootTopic2ClientsRequestHeader) request.decodeCommandCustomHeader(GetRootTopic2ClientsRequestHeader.class);
+        String rootTopic = requestHeader.getRootTopic();
+        String json = this.mqttInfoStore.getValue(rootTopic);
+        RemotingCommand response = RemotingCommand.createResponseCommand(GetRootTopic2ClientsResponseHeader.class);
+        GetRootTopic2ClientsResponseHeader responseHeader = (GetRootTopic2ClientsResponseHeader) response.readCustomHeader();
+        if (json != null) {
+            Set<String> clientsId = GSON.fromJson(json, new TypeToken<Set<String>>() {
+            }.getType());
+            responseHeader.setOperationSuccess(true);
+            responseHeader.setClientsId(clientsId);
+        } else {
+            responseHeader.setOperationSuccess(false);
+        }
+
+        return response;
+    }
+
+    private RemotingCommand deleteRootTopic2Client(final RemotingCommand request) throws RemotingCommandException {
+        DeleteRootTopic2ClientRequestHeader requestHeader = (DeleteRootTopic2ClientRequestHeader) request.decodeCommandCustomHeader(DeleteRootTopic2ClientRequestHeader.class);
+        String rootTopic = requestHeader.getRootTopic();
+        String clientId = requestHeader.getClientId();
+        Set<String> clientsId = GSON.fromJson(this.mqttInfoStore.getValue(rootTopic), new TypeToken<Set<String>>() {
+        }.getType());
+        Set<String> clientsIdAfterDelete = clientsId.stream().filter(c -> c != clientId).collect(Collectors.toSet());
+        boolean result;
+        if (clientsIdAfterDelete.size() == 0) {
+            result = this.mqttInfoStore.deleteData(rootTopic);
+        } else {
+            result = this.mqttInfoStore.putData(rootTopic, GSON.toJson(clientsIdAfterDelete));
+        }
+        RemotingCommand response = RemotingCommand.createResponseCommand(DeleteRootTopic2ClientResponseHeader.class);
+        DeleteRootTopic2ClientResponseHeader responseHeader = (DeleteRootTopic2ClientResponseHeader) response.readCustomHeader();
+        responseHeader.setOperationSuccess(result);
+        return response;
+    }
+
+    private Set<Client> clientsStringToClientsSet(String clientsString) {
+        Set<Client> clients = new HashSet<>();
+        Type type = new TypeToken<Set<Client>>() {
+        }.getType();
+        clients = GSON.fromJson(clientsString, type);
+        return clients;
+    }
+
+    private boolean isMatch(String topicFiter, String topic) {
+        if (!topicFiter.contains("+") && !topicFiter.contains("#")) {
+            return topicFiter.equals(topic);
+        }
+        String[] filterTopics = topicFiter.split("/");
+        String[] actualTopics = topic.split("/");
+
+        int i = 0;
+        for (; i < filterTopics.length && i < actualTopics.length; i++) {
+            if ("+".equals(filterTopics[i])) {
+                continue;
+            }
+            if ("#".equals(filterTopics[i])) {
+                return true;
+            }
+            if (!filterTopics[i].equals(actualTopics[i])) {
+                return false;
+            }
+        }
+        return i == actualTopics.length;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 32a008c..bb27df6 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -25,8 +25,6 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 
-import static org.apache.rocketmq.common.SnodeConfig.localHostName;
-
 public class BrokerConfig {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
 
@@ -71,6 +69,11 @@ public class BrokerConfig {
     private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors());
 
     /**
+     * thread numbers for mqtt message thread pool.
+     */
+    private int mqttMessageThreadPoolNums = 1;
+
+    /**
      * Thread numbers for EndTransactionProcessor
      */
     private int endTransactionThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors() * 2;
@@ -90,6 +93,7 @@ public class BrokerConfig {
     private int consumerManagerThreadPoolQueueCapacity = 1000000;
     private int heartbeatThreadPoolQueueCapacity = 50000;
     private int endTransactionPoolQueueCapacity = 100000;
+    private int mqttThreadPoolQueueCapacity = 10000;
 
     private int filterServerNums = 0;
 
@@ -777,4 +781,20 @@ public class BrokerConfig {
     public void setAclEnable(boolean aclEnable) {
         this.aclEnable = aclEnable;
     }
+
+    public int getMqttMessageThreadPoolNums() {
+        return mqttMessageThreadPoolNums;
+    }
+
+    public void setMqttMessageThreadPoolNums(int mqttMessageThreadPoolNums) {
+        this.mqttMessageThreadPoolNums = mqttMessageThreadPoolNums;
+    }
+
+    public int getMqttThreadPoolQueueCapacity() {
+        return mqttThreadPoolQueueCapacity;
+    }
+
+    public void setMqttThreadPoolQueueCapacity(int mqttThreadPoolQueueCapacity) {
+        this.mqttThreadPoolQueueCapacity = mqttThreadPoolQueueCapacity;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 66c4a40..67f0919 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -181,4 +181,25 @@ public class RequestCode {
     public static final int CREATE_RETRY_TOPIC = 355;
 
     public static final int MQTT_MESSAGE = 1000;
+
+    public static final int MQTT_IS_CLIENT2SUBSCRIPTION_PERSISTED = 1001;
+
+    public static final int MQTT_ADD_OR_UPDATE_CLIENT2SUBSCRIPTION = 1002;
+
+    public static final int MQTT_DELETE_CLIENT2SUBSCRIPTION = 1003;
+
+    public static final int MQTT_GET_SNODEADDRESS2CLIENT = 1004;
+
+    public static final int MQTT_CLIENT_UNSUBSRIBE = 1005;
+
+    public static final int MQTT_ADD_OR_UPDATE_ROOTTOPIC2CLIENTS = 1006;
+
+    public static final int MQTT_GET_ROOTTOPIC2CLIENTS = 1007;
+
+    public static final int MQTT_DELETE_ROOTTOPIC2CLIENT = 1008;
+
+    public static final int MQTT_GET_SUBSCRIPTION_BY_CLIENT_ID = 1009;
+
+    public static final int MQTT_GET_CLIENT_BY_CLIENTID_ID = 1010;
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateClient2SubscriptionRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateClient2SubscriptionRequestHeader.java
new file mode 100644
index 0000000..340fa7a
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateClient2SubscriptionRequestHeader.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header.mqtt;
+
+import org.apache.rocketmq.common.client.Client;
+import org.apache.rocketmq.common.client.Subscription;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class AddOrUpdateClient2SubscriptionRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private Client client;
+    @CFNotNull
+    private Subscription subscription;
+
+    public Client getClient() {
+        return client;
+    }
+
+    public void setClient(Client client) {
+        this.client = client;
+    }
+
+    public Subscription getSubscription() {
+        return subscription;
+    }
+
+    public void setSubscription(Subscription subscription) {
+        this.subscription = subscription;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateClient2SubscriptionResponseHeader.java
similarity index 57%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateClient2SubscriptionResponseHeader.java
index 0eb1730..f0c5962 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateClient2SubscriptionResponseHeader.java
@@ -15,24 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.util;
+package org.apache.rocketmq.common.protocol.header.mqtt;
 
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class MqttEncodeDecodeUtil {
-    private static final Gson GSON = new Gson();
+public class AddOrUpdateClient2SubscriptionResponseHeader implements CommandCustomHeader {
+    private boolean result;
 
-    public static byte[] encode(Object object) {
-        final String json = GSON.toJson(object);
-        if (json != null) {
-            return json.getBytes(Charset.forName("UTF-8"));
-        }
-        return null;
+    public boolean isResult() {
+        return result;
     }
 
-    public static <T> Object decode(byte[] body, Class<T> classOfT) {
-        final String json = new String(body, Charset.forName("UTF-8"));
-        return GSON.fromJson(json, classOfT);
+    public void setResult(boolean result) {
+        this.result = result;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
     }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateRootTopic2ClientsRequestHeader.java
similarity index 50%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateRootTopic2ClientsRequestHeader.java
index 0eb1730..f1425c7 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateRootTopic2ClientsRequestHeader.java
@@ -15,24 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.util;
+package org.apache.rocketmq.common.protocol.header.mqtt;
 
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class MqttEncodeDecodeUtil {
-    private static final Gson GSON = new Gson();
+public class AddOrUpdateRootTopic2ClientsRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String rootTopic;
+    @CFNotNull
+    private String clientId;
 
-    public static byte[] encode(Object object) {
-        final String json = GSON.toJson(object);
-        if (json != null) {
-            return json.getBytes(Charset.forName("UTF-8"));
-        }
-        return null;
+    public String getRootTopic() {
+        return rootTopic;
     }
 
-    public static <T> Object decode(byte[] body, Class<T> classOfT) {
-        final String json = new String(body, Charset.forName("UTF-8"));
-        return GSON.fromJson(json, classOfT);
+    public void setRootTopic(String rootTopic) {
+        this.rootTopic = rootTopic;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
     }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateRootTopic2ClientsResponseHeader.java
similarity index 57%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateRootTopic2ClientsResponseHeader.java
index 0eb1730..ab5fc67 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/AddOrUpdateRootTopic2ClientsResponseHeader.java
@@ -15,24 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.util;
+package org.apache.rocketmq.common.protocol.header.mqtt;
 
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class MqttEncodeDecodeUtil {
-    private static final Gson GSON = new Gson();
+public class AddOrUpdateRootTopic2ClientsResponseHeader implements CommandCustomHeader {
+    private boolean operationSuccess;
 
-    public static byte[] encode(Object object) {
-        final String json = GSON.toJson(object);
-        if (json != null) {
-            return json.getBytes(Charset.forName("UTF-8"));
-        }
-        return null;
+    public boolean isOperationSuccess() {
+        return operationSuccess;
     }
 
-    public static <T> Object decode(byte[] body, Class<T> classOfT) {
-        final String json = new String(body, Charset.forName("UTF-8"));
-        return GSON.fromJson(json, classOfT);
+    public void setOperationSuccess(boolean operationSuccess) {
+        this.operationSuccess = operationSuccess;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
     }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/ClientUnsubscribeRequestHeader.java
similarity index 50%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/ClientUnsubscribeRequestHeader.java
index 0eb1730..74d56fb 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/ClientUnsubscribeRequestHeader.java
@@ -15,24 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.util;
+package org.apache.rocketmq.common.protocol.header.mqtt;
 
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
+import java.util.List;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class MqttEncodeDecodeUtil {
-    private static final Gson GSON = new Gson();
+public class ClientUnsubscribeRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String clientId;
+    @CFNotNull
+    private List<String> topics;
 
-    public static byte[] encode(Object object) {
-        final String json = GSON.toJson(object);
-        if (json != null) {
-            return json.getBytes(Charset.forName("UTF-8"));
-        }
-        return null;
+    public String getClientId() {
+        return clientId;
     }
 
-    public static <T> Object decode(byte[] body, Class<T> classOfT) {
-        final String json = new String(body, Charset.forName("UTF-8"));
-        return GSON.fromJson(json, classOfT);
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public List<String> getTopics() {
+        return topics;
+    }
+
+    public void setTopics(List<String> topics) {
+        this.topics = topics;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/ClientUnsubscribeResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/ClientUnsubscribeResponseHeader.java
new file mode 100644
index 0000000..39b3572
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/ClientUnsubscribeResponseHeader.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header.mqtt;
+
+import java.util.Set;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class ClientUnsubscribeResponseHeader implements CommandCustomHeader {
+    private boolean operationSuccess;
+    private boolean rootTopicDiffExists;
+    private Set<String> rootTopicsDiff;
+
+    public boolean isOperationSuccess() {
+        return operationSuccess;
+    }
+
+    public void setOperationSuccess(boolean operationSuccess) {
+        this.operationSuccess = operationSuccess;
+    }
+
+    public boolean isRootTopicDiffExists() {
+        return rootTopicDiffExists;
+    }
+
+    public void setRootTopicDiffExists(boolean rootTopicDiffExists) {
+        this.rootTopicDiffExists = rootTopicDiffExists;
+    }
+
+    public Set<String> getRootTopicsDiff() {
+        return rootTopicsDiff;
+    }
+
+    public void setRootTopicsDiff(Set<String> rootTopicsDiff) {
+        this.rootTopicsDiff = rootTopicsDiff;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteClientRequestHeader.java
similarity index 57%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteClientRequestHeader.java
index 0eb1730..2136667 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteClientRequestHeader.java
@@ -15,24 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.util;
+package org.apache.rocketmq.common.protocol.header.mqtt;
 
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class MqttEncodeDecodeUtil {
-    private static final Gson GSON = new Gson();
+public class DeleteClientRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String clientId;
 
-    public static byte[] encode(Object object) {
-        final String json = GSON.toJson(object);
-        if (json != null) {
-            return json.getBytes(Charset.forName("UTF-8"));
-        }
-        return null;
+    public String getClientId() {
+        return clientId;
     }
 
-    public static <T> Object decode(byte[] body, Class<T> classOfT) {
-        final String json = new String(body, Charset.forName("UTF-8"));
-        return GSON.fromJson(json, classOfT);
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteClientResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteClientResponseHeader.java
new file mode 100644
index 0000000..ecaa95b
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteClientResponseHeader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header.mqtt;
+
+import org.apache.rocketmq.common.client.Subscription;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class DeleteClientResponseHeader implements CommandCustomHeader {
+    private Subscription subscription;
+    private boolean operationSuccess;
+
+    public Subscription getSubscription() {
+        return subscription;
+    }
+
+    public void setSubscription(Subscription subscription) {
+        this.subscription = subscription;
+    }
+
+    public boolean isOperationSuccess() {
+        return operationSuccess;
+    }
+
+    public void setOperationSuccess(boolean operationSuccess) {
+        this.operationSuccess = operationSuccess;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
+
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteRootTopic2ClientRequestHeader.java
similarity index 50%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteRootTopic2ClientRequestHeader.java
index 0eb1730..a94ef5c 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteRootTopic2ClientRequestHeader.java
@@ -15,24 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.util;
+package org.apache.rocketmq.common.protocol.header.mqtt;
 
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class MqttEncodeDecodeUtil {
-    private static final Gson GSON = new Gson();
+public class DeleteRootTopic2ClientRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String rootTopic;
+    @CFNotNull
+    private String clientId;
 
-    public static byte[] encode(Object object) {
-        final String json = GSON.toJson(object);
-        if (json != null) {
-            return json.getBytes(Charset.forName("UTF-8"));
-        }
-        return null;
+    public String getRootTopic() {
+        return rootTopic;
     }
 
-    public static <T> Object decode(byte[] body, Class<T> classOfT) {
-        final String json = new String(body, Charset.forName("UTF-8"));
-        return GSON.fromJson(json, classOfT);
+    public void setRootTopic(String rootTopic) {
+        this.rootTopic = rootTopic;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
     }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteRootTopic2ClientResponseHeader.java
similarity index 57%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteRootTopic2ClientResponseHeader.java
index 0eb1730..8ee4dc9 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/DeleteRootTopic2ClientResponseHeader.java
@@ -15,24 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.util;
+package org.apache.rocketmq.common.protocol.header.mqtt;
 
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class MqttEncodeDecodeUtil {
-    private static final Gson GSON = new Gson();
+public class DeleteRootTopic2ClientResponseHeader implements CommandCustomHeader {
+    private boolean operationSuccess;
 
-    public static byte[] encode(Object object) {
-        final String json = GSON.toJson(object);
-        if (json != null) {
-            return json.getBytes(Charset.forName("UTF-8"));
-        }
-        return null;
+    public boolean isOperationSuccess() {
+        return operationSuccess;
     }
 
-    public static <T> Object decode(byte[] body, Class<T> classOfT) {
-        final String json = new String(body, Charset.forName("UTF-8"));
-        return GSON.fromJson(json, classOfT);
+    public void setOperationSuccess(boolean operationSuccess) {
+        this.operationSuccess = operationSuccess;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
     }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetClientByClientIdRequestHeader.java
similarity index 57%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetClientByClientIdRequestHeader.java
index 0eb1730..cf97242 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetClientByClientIdRequestHeader.java
@@ -15,24 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.util;
+package org.apache.rocketmq.common.protocol.header.mqtt;
 
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class MqttEncodeDecodeUtil {
-    private static final Gson GSON = new Gson();
+public class GetClientByClientIdRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String clientId;
 
-    public static byte[] encode(Object object) {
-        final String json = GSON.toJson(object);
-        if (json != null) {
-            return json.getBytes(Charset.forName("UTF-8"));
-        }
-        return null;
+    public String getClientId() {
+        return clientId;
     }
 
-    public static <T> Object decode(byte[] body, Class<T> classOfT) {
-        final String json = new String(body, Charset.forName("UTF-8"));
-        return GSON.fromJson(json, classOfT);
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
+
     }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetClientByClientIdResponseHeader.java
similarity index 57%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetClientByClientIdResponseHeader.java
index 0eb1730..9326bf6 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetClientByClientIdResponseHeader.java
@@ -15,24 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.util;
+package org.apache.rocketmq.common.protocol.header.mqtt;
 
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
+import org.apache.rocketmq.common.client.Client;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class MqttEncodeDecodeUtil {
-    private static final Gson GSON = new Gson();
+public class GetClientByClientIdResponseHeader implements CommandCustomHeader {
+    private Client client;
 
-    public static byte[] encode(Object object) {
-        final String json = GSON.toJson(object);
-        if (json != null) {
-            return json.getBytes(Charset.forName("UTF-8"));
-        }
-        return null;
+    public Client getClient() {
+        return client;
     }
 
-    public static <T> Object decode(byte[] body, Class<T> classOfT) {
-        final String json = new String(body, Charset.forName("UTF-8"));
-        return GSON.fromJson(json, classOfT);
+    public void setClient(Client client) {
+        this.client = client;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
+
     }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetRootTopic2ClientsRequestHeader.java
similarity index 57%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetRootTopic2ClientsRequestHeader.java
index 0eb1730..ed708c6 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetRootTopic2ClientsRequestHeader.java
@@ -15,24 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.util;
+package org.apache.rocketmq.common.protocol.header.mqtt;
 
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class MqttEncodeDecodeUtil {
-    private static final Gson GSON = new Gson();
+public class GetRootTopic2ClientsRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String rootTopic;
 
-    public static byte[] encode(Object object) {
-        final String json = GSON.toJson(object);
-        if (json != null) {
-            return json.getBytes(Charset.forName("UTF-8"));
-        }
-        return null;
+    public String getRootTopic() {
+        return rootTopic;
     }
 
-    public static <T> Object decode(byte[] body, Class<T> classOfT) {
-        final String json = new String(body, Charset.forName("UTF-8"));
-        return GSON.fromJson(json, classOfT);
+    public void setRootTopic(String rootTopic) {
+        this.rootTopic = rootTopic;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
     }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetRootTopic2ClientsResponseHeader.java
similarity index 50%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetRootTopic2ClientsResponseHeader.java
index 0eb1730..96ca54d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetRootTopic2ClientsResponseHeader.java
@@ -15,24 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.util;
+package org.apache.rocketmq.common.protocol.header.mqtt;
 
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
+import java.util.Set;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class MqttEncodeDecodeUtil {
-    private static final Gson GSON = new Gson();
+public class GetRootTopic2ClientsResponseHeader implements CommandCustomHeader {
+    private boolean operationSuccess;
+    private Set<String> clientsId;
 
-    public static byte[] encode(Object object) {
-        final String json = GSON.toJson(object);
-        if (json != null) {
-            return json.getBytes(Charset.forName("UTF-8"));
-        }
-        return null;
+    public boolean isOperationSuccess() {
+        return operationSuccess;
     }
 
-    public static <T> Object decode(byte[] body, Class<T> classOfT) {
-        final String json = new String(body, Charset.forName("UTF-8"));
-        return GSON.fromJson(json, classOfT);
+    public Set<String> getClientsId() {
+        return clientsId;
+    }
+
+    public void setClientsId(Set<String> clientsId) {
+        this.clientsId = clientsId;
+    }
+
+    public void setOperationSuccess(boolean operationSuccess) {
+        this.operationSuccess = operationSuccess;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
     }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSnodeAddress2ClientsRequestHeader.java
similarity index 50%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSnodeAddress2ClientsRequestHeader.java
index 0eb1730..3f00c08 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSnodeAddress2ClientsRequestHeader.java
@@ -15,24 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.util;
+package org.apache.rocketmq.common.protocol.header.mqtt;
 
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
+import java.util.Set;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class MqttEncodeDecodeUtil {
-    private static final Gson GSON = new Gson();
+public class GetSnodeAddress2ClientsRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private Set<String> clientsId;
+    @CFNotNull
+    private String topic;
 
-    public static byte[] encode(Object object) {
-        final String json = GSON.toJson(object);
-        if (json != null) {
-            return json.getBytes(Charset.forName("UTF-8"));
-        }
-        return null;
+    public Set<String> getClientsId() {
+        return clientsId;
     }
 
-    public static <T> Object decode(byte[] body, Class<T> classOfT) {
-        final String json = new String(body, Charset.forName("UTF-8"));
-        return GSON.fromJson(json, classOfT);
+    public void setClientsId(Set<String> clientsId) {
+        this.clientsId = clientsId;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
     }
 }
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSnodeAddress2ClientsResponseHeader.java
similarity index 54%
copy from mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSnodeAddress2ClientsResponseHeader.java
index 3d53f5f..63ee2c2 100644
--- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSnodeAddress2ClientsResponseHeader.java
@@ -15,19 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.mqtt.constant;
+package org.apache.rocketmq.common.protocol.header.mqtt;
 
-import io.netty.util.AttributeKey;
+import java.util.Map;
+import java.util.Set;
 import org.apache.rocketmq.common.client.Client;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class MqttConstant {
-    public static final int MAX_SUPPORTED_QOS = 0;
-    public static final String SUBSCRIPTION_FLAG_PLUS = "+";
-    public static final String SUBSCRIPTION_FLAG_SHARP = "#";
-    public static final String SUBSCRIPTION_SEPARATOR = "/";
-    public static final String TOPIC_CLIENTID_SEPARATOR = "@";
-    public static final long DEFAULT_TIMEOUT_MILLS = 3000L;
-    public static final int FLIGHT_BEFORE_RESEND_MS = 5_000;
-    public static final String PROPERTY_MQTT_QOS = "PROPERTY_MQTT_QOS";
-    public static final AttributeKey<Client> MQTT_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("mqtt.client");
+public class GetSnodeAddress2ClientsResponseHeader implements CommandCustomHeader {
+    private Map<String, Set<Client>> snodeAddress2Clients;
+
+    public Map<String, Set<Client>> getSnodeAddress2Clients() {
+        return snodeAddress2Clients;
+    }
+
+    public void setSnodeAddress2Clients(
+        Map<String, Set<Client>> snodeAddress2Clients) {
+        this.snodeAddress2Clients = snodeAddress2Clients;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
+    }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSubscriptionByClientIdRequestHeader.java
similarity index 57%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSubscriptionByClientIdRequestHeader.java
index 0eb1730..dc0ecf6 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSubscriptionByClientIdRequestHeader.java
@@ -15,24 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.util;
+package org.apache.rocketmq.common.protocol.header.mqtt;
 
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class MqttEncodeDecodeUtil {
-    private static final Gson GSON = new Gson();
+public class GetSubscriptionByClientIdRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String clientId;
 
-    public static byte[] encode(Object object) {
-        final String json = GSON.toJson(object);
-        if (json != null) {
-            return json.getBytes(Charset.forName("UTF-8"));
-        }
-        return null;
+    public String getClientId() {
+        return clientId;
     }
 
-    public static <T> Object decode(byte[] body, Class<T> classOfT) {
-        final String json = new String(body, Charset.forName("UTF-8"));
-        return GSON.fromJson(json, classOfT);
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
     }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSubscriptionByClientIdResponseHeader.java
similarity index 56%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSubscriptionByClientIdResponseHeader.java
index 0eb1730..4e11472 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/GetSubscriptionByClientIdResponseHeader.java
@@ -15,24 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.util;
+package org.apache.rocketmq.common.protocol.header.mqtt;
 
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
+import org.apache.rocketmq.common.client.Subscription;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class MqttEncodeDecodeUtil {
-    private static final Gson GSON = new Gson();
+public class GetSubscriptionByClientIdResponseHeader implements CommandCustomHeader {
+    private Subscription subscription;
 
-    public static byte[] encode(Object object) {
-        final String json = GSON.toJson(object);
-        if (json != null) {
-            return json.getBytes(Charset.forName("UTF-8"));
-        }
-        return null;
+    public Subscription getSubscription() {
+        return subscription;
     }
 
-    public static <T> Object decode(byte[] body, Class<T> classOfT) {
-        final String json = new String(body, Charset.forName("UTF-8"));
-        return GSON.fromJson(json, classOfT);
+    public void setSubscription(Subscription subscription) {
+        this.subscription = subscription;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
     }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/IsClient2SubscriptionPersistedRequestHeader.java
similarity index 50%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/IsClient2SubscriptionPersistedRequestHeader.java
index 0eb1730..c835f4b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/IsClient2SubscriptionPersistedRequestHeader.java
@@ -15,24 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.util;
+package org.apache.rocketmq.common.protocol.header.mqtt;
 
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class MqttEncodeDecodeUtil {
-    private static final Gson GSON = new Gson();
+public class IsClient2SubscriptionPersistedRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String clientId;
+    private boolean cleanSession;
 
-    public static byte[] encode(Object object) {
-        final String json = GSON.toJson(object);
-        if (json != null) {
-            return json.getBytes(Charset.forName("UTF-8"));
-        }
-        return null;
+    public String getClientId() {
+        return clientId;
     }
 
-    public static <T> Object decode(byte[] body, Class<T> classOfT) {
-        final String json = new String(body, Charset.forName("UTF-8"));
-        return GSON.fromJson(json, classOfT);
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public boolean isCleanSession() {
+        return cleanSession;
+    }
+
+    public void setCleanSession(boolean cleanSession) {
+        this.cleanSession = cleanSession;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
     }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/IsClient2SubscriptionPersistedResponseHeader.java
similarity index 57%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/IsClient2SubscriptionPersistedResponseHeader.java
index 0eb1730..2586bd0 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/mqtt/IsClient2SubscriptionPersistedResponseHeader.java
@@ -15,24 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.util;
+package org.apache.rocketmq.common.protocol.header.mqtt;
 
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class MqttEncodeDecodeUtil {
-    private static final Gson GSON = new Gson();
+public class IsClient2SubscriptionPersistedResponseHeader implements CommandCustomHeader {
+    private boolean isPersisted;
 
-    public static byte[] encode(Object object) {
-        final String json = GSON.toJson(object);
-        if (json != null) {
-            return json.getBytes(Charset.forName("UTF-8"));
-        }
-        return null;
+    public boolean isPersisted() {
+        return isPersisted;
     }
 
-    public static <T> Object decode(byte[] body, Class<T> classOfT) {
-        final String json = new String(body, Charset.forName("UTF-8"));
-        return GSON.fromJson(json, classOfT);
+    public void setPersisted(boolean persisted) {
+        isPersisted = persisted;
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/service/EnodeService.java b/common/src/main/java/org/apache/rocketmq/common/service/EnodeService.java
index 259ca95..9575580 100644
--- a/common/src/main/java/org/apache/rocketmq/common/service/EnodeService.java
+++ b/common/src/main/java/org/apache/rocketmq/common/service/EnodeService.java
@@ -137,4 +137,10 @@ public interface EnodeService {
 
     RemotingCommand unlockBatchMQ(final RemotingChannel remotingChannel,
         final RemotingCommand remotingCommand) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
+
+    RemotingCommand requestMQTTInfoSync(final RemotingCommand request)
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
+
+    CompletableFuture<RemotingCommand> requestMQTTInfoAsync(final RemotingCommand request)
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
 }
diff --git a/mqtt/pom.xml b/mqtt/pom.xml
index eee314e..17a986c 100644
--- a/mqtt/pom.xml
+++ b/mqtt/pom.xml
@@ -96,10 +96,6 @@
             <artifactId>simpleclient_hotspot</artifactId>
         </dependency>
         <dependency>
-            <groupId>${project.groupId}</groupId>
-            <artifactId>rocketmq-broker</artifactId>
-        </dependency>
-        <dependency>
             <groupId>org.eclipse.paho</groupId>
             <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
         </dependency>
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java
index 3d53f5f..d564c8c 100644
--- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java
@@ -30,4 +30,8 @@ public class MqttConstant {
     public static final int FLIGHT_BEFORE_RESEND_MS = 5_000;
     public static final String PROPERTY_MQTT_QOS = "PROPERTY_MQTT_QOS";
     public static final AttributeKey<Client> MQTT_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("mqtt.client");
+    public static final String ENODE_NAME = "enodeName";
+    public static final String PERSIST_SUBSCRIPTION_SUFFIX = "-sub";
+    public static final String PERSIST_SNODEADDRESS_SUFFIX = "-sno";
+    public static final String PERSIST_CLIENT_SUFFIX = "-cli";
 }
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java
index d55400a..9a4252a 100644
--- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java
@@ -150,6 +150,8 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
             }
         }
         //TODO update persistent store of topic2Clients and clientId2Subscription
+        this.defaultMqttMessageProcessor.getPersistService().addOrUpdateClient2Susbscription(client, subscription);
+
         return grantQoss;
     }
 
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/PersistServiceFactory.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/PersistServiceFactory.java
new file mode 100644
index 0000000..907d295
--- /dev/null
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/PersistServiceFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.persistence;
+
+import java.util.Map;
+import org.apache.rocketmq.mqtt.persistence.service.PersistService;
+import org.apache.rocketmq.remoting.util.ServiceProvider;
+
+public class PersistServiceFactory {
+    private static PersistServiceFactory instance = new PersistServiceFactory();
+
+    public static PersistServiceFactory getInstance() {
+        return instance;
+    }
+
+    private PersistServiceFactory() {
+    }
+
+    private static Map<String, String> paths;
+
+    private static final String SERVICE_LOCATION = "META-INF/service/org.apache.rocketmq.mqtt.PersistService";
+
+    static {
+        paths = ServiceProvider.loadPath(SERVICE_LOCATION);
+    }
+
+    public PersistService createPersistService() {
+        return ServiceProvider.createInstance(paths.get("persistService"), PersistService.class);
+    }
+}
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/rebalance/AllocatePersistentDataConsistentHash.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/rebalance/AllocatePersistentDataConsistentHash.java
new file mode 100644
index 0000000..a432642
--- /dev/null
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/rebalance/AllocatePersistentDataConsistentHash.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.persistence.rebalance;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
+import org.apache.rocketmq.common.consistenthash.HashFunction;
+import org.apache.rocketmq.common.consistenthash.Node;
+
+public class AllocatePersistentDataConsistentHash implements AllocatePersistentDataStrategy {
+    private final int virtualNodeCnt;
+    private final HashFunction customHashFunction;
+
+    public AllocatePersistentDataConsistentHash() {
+        this(10);
+    }
+
+    public AllocatePersistentDataConsistentHash(int virtualNodeCnt) {
+        this(virtualNodeCnt, null);
+    }
+
+    public AllocatePersistentDataConsistentHash(int virtualNodeCnt, HashFunction customHashFunction) {
+        if (virtualNodeCnt < 0) {
+            throw new IllegalArgumentException("illegal virtualNodeCnt :" + virtualNodeCnt);
+        }
+        this.virtualNodeCnt = virtualNodeCnt;
+        this.customHashFunction = customHashFunction;
+    }
+
+    @Override
+    public String allocate(String dataKey, Set<String> enodeNames) {
+        Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
+        for (String enodeName : enodeNames) {
+            cidNodes.add(new ClientNode(enodeName));
+        }
+        final ConsistentHashRouter<ClientNode> router; //for building hash ring
+        if (customHashFunction != null) {
+            router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
+        } else {
+            router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
+        }
+
+        ClientNode clientNode = router.routeNode(dataKey);
+        if (clientNode != null) {
+            return clientNode.getKey();
+        }
+        return null;
+    }
+
+    @Override
+    public String getName() {
+        return "CONSISTENT_HASH";
+    }
+
+    private static class ClientNode implements Node {
+        private final String clientID;
+
+        public ClientNode(String clientID) {
+            this.clientID = clientID;
+        }
+
+        @Override
+        public String getKey() {
+            return clientID;
+        }
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/rebalance/AllocatePersistentDataStrategy.java
similarity index 57%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
copy to mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/rebalance/AllocatePersistentDataStrategy.java
index 0eb1730..9ebc922 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/rebalance/AllocatePersistentDataStrategy.java
@@ -15,24 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.util;
+package org.apache.rocketmq.mqtt.persistence.rebalance;
 
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
+import java.util.Set;
 
-public class MqttEncodeDecodeUtil {
-    private static final Gson GSON = new Gson();
+public interface AllocatePersistentDataStrategy {
+    String allocate(final String dataKey, final Set<String> enodeNames);
 
-    public static byte[] encode(Object object) {
-        final String json = GSON.toJson(object);
-        if (json != null) {
-            return json.getBytes(Charset.forName("UTF-8"));
-        }
-        return null;
-    }
+    String getName();
 
-    public static <T> Object decode(byte[] body, Class<T> classOfT) {
-        final String json = new String(body, Charset.forName("UTF-8"));
-        return GSON.fromJson(json, classOfT);
-    }
 }
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/service/DefaultPersistService.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/service/DefaultPersistService.java
new file mode 100644
index 0000000..bca9750
--- /dev/null
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/service/DefaultPersistService.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.persistence.service;
+
+import com.google.gson.Gson;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+import org.apache.rocketmq.common.client.Client;
+import org.apache.rocketmq.common.client.Subscription;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateClient2SubscriptionRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateClient2SubscriptionResponseHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateRootTopic2ClientsRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.AddOrUpdateRootTopic2ClientsResponseHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.ClientUnsubscribeRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.ClientUnsubscribeResponseHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.DeleteClientRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.DeleteClientResponseHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.DeleteRootTopic2ClientRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.DeleteRootTopic2ClientResponseHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.GetClientByClientIdRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.GetClientByClientIdResponseHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.GetRootTopic2ClientsRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.GetRootTopic2ClientsResponseHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.GetSnodeAddress2ClientsRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.GetSnodeAddress2ClientsResponseHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.GetSubscriptionByClientIdRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.GetSubscriptionByClientIdResponseHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.IsClient2SubscriptionPersistedRequestHeader;
+import org.apache.rocketmq.common.protocol.header.mqtt.IsClient2SubscriptionPersistedResponseHeader;
+import org.apache.rocketmq.common.service.EnodeService;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.client.MQTTSession;
+import org.apache.rocketmq.mqtt.constant.MqttConstant;
+import org.apache.rocketmq.mqtt.persistence.rebalance.AllocatePersistentDataConsistentHash;
+import org.apache.rocketmq.mqtt.persistence.rebalance.AllocatePersistentDataStrategy;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class DefaultPersistService implements PersistService {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
+    private DefaultMqttMessageProcessor defaultMqttMessageProcessor;
+    private EnodeService enodeService;
+    private AllocatePersistentDataStrategy allocatePersistentDataStrategy;
+
+    private final Gson GSON = new Gson();
+
+    public DefaultPersistService() {
+    }
+
+    @Override
+    public void init(DefaultMqttMessageProcessor defaultMqttMessageProcessor) {
+        this.defaultMqttMessageProcessor = defaultMqttMessageProcessor;
+        this.enodeService = defaultMqttMessageProcessor.getEnodeService();
+        this.allocatePersistentDataStrategy = new AllocatePersistentDataConsistentHash();
+    }
+
+    @Override public boolean isClient2SubsriptionPersisted(Client client) {
+
+        String clientId = client.getClientId();
+        String enodeName = this.getAllocateEnodeName(clientId);
+        boolean cleanSession = ((MQTTSession) client).isCleanSession();
+
+        IsClient2SubscriptionPersistedRequestHeader requestHeader = new IsClient2SubscriptionPersistedRequestHeader();
+        requestHeader.setClientId(clientId);
+        requestHeader.setCleanSession(cleanSession);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_IS_CLIENT2SUBSCRIPTION_PERSISTED, requestHeader);
+        request.addExtField(MqttConstant.ENODE_NAME, enodeName);
+
+        try {
+            RemotingCommand response = enodeService.requestMQTTInfoSync(request);
+            IsClient2SubscriptionPersistedResponseHeader responseHeader = (IsClient2SubscriptionPersistedResponseHeader) response.decodeCommandCustomHeader(IsClient2SubscriptionPersistedResponseHeader.class);
+            return responseHeader.isPersisted();
+        } catch (Exception e) {
+            log.error("Transfer MQTT info to Enode: {} failed, Err: {} ", enodeName, e);
+        }
+        return false;
+    }
+
+    @Override public boolean addOrUpdateClient2Susbscription(Client client, Subscription subscription) {
+        // client2Subscription request
+        boolean client2SubscriptionResult = false;
+        String enodeName = this.getAllocateEnodeName(client.getClientId());
+        AddOrUpdateClient2SubscriptionRequestHeader requestHeader = new AddOrUpdateClient2SubscriptionRequestHeader();
+        requestHeader.setClient(client);
+        requestHeader.setSubscription(subscription);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_ADD_OR_UPDATE_CLIENT2SUBSCRIPTION, requestHeader);
+        request.addExtField(MqttConstant.ENODE_NAME, enodeName);
+
+        try {
+            RemotingCommand response = enodeService.requestMQTTInfoSync(request);
+            AddOrUpdateClient2SubscriptionResponseHeader responseHeader = (AddOrUpdateClient2SubscriptionResponseHeader) response.decodeCommandCustomHeader(AddOrUpdateClient2SubscriptionResponseHeader.class);
+            client2SubscriptionResult = responseHeader.isResult();
+        } catch (Exception e) {
+            log.error("Transfer MQTT info to Enode: {} failed, Err: {} ", enodeName, e);
+        }
+
+        // rootTopic2Clients request
+        boolean rootTopic2ClientsResult = true;
+        for (String rootTopic : subscription.getSubscriptionTable().keySet().stream().map(t -> t.split(MqttConstant.SUBSCRIPTION_SEPARATOR)[0]).distinct().collect(Collectors.toList())) {
+            String enodeNameForRootTopic = this.getAllocateEnodeName(rootTopic);
+            AddOrUpdateRootTopic2ClientsRequestHeader addOrUpdateRootTopic2ClientsRequestHeader = new AddOrUpdateRootTopic2ClientsRequestHeader();
+            addOrUpdateRootTopic2ClientsRequestHeader.setRootTopic(rootTopic);
+            addOrUpdateRootTopic2ClientsRequestHeader.setClientId(client.getClientId());
+            RemotingCommand requestForRootTopic = RemotingCommand.createRequestCommand(RequestCode.MQTT_ADD_OR_UPDATE_ROOTTOPIC2CLIENTS, addOrUpdateRootTopic2ClientsRequestHeader);
+            requestForRootTopic.addExtField(MqttConstant.ENODE_NAME, enodeNameForRootTopic);
+            try {
+                AddOrUpdateRootTopic2ClientsResponseHeader responseHeader = (AddOrUpdateRootTopic2ClientsResponseHeader) enodeService.requestMQTTInfoSync(requestForRootTopic).decodeCommandCustomHeader(AddOrUpdateRootTopic2ClientsResponseHeader.class);
+                rootTopic2ClientsResult = rootTopic2ClientsResult && responseHeader.isOperationSuccess();
+            } catch (Exception ex) {
+                log.error("Transfer MQTT rootTopic2Clients info to Enode: {} failed, Err: {} ", enodeName, ex);
+            }
+        }
+
+        return rootTopic2ClientsResult && client2SubscriptionResult;
+    }
+
+    @Override public boolean deleteClient(Client client) {
+        // delete client2subscription and client2snodeAddress
+        DeleteClientRequestHeader deleteClientRequestHeader = new DeleteClientRequestHeader();
+        deleteClientRequestHeader.setClientId(client.getClientId());
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_DELETE_CLIENT2SUBSCRIPTION, deleteClientRequestHeader);
+        String enodeName = this.getAllocateEnodeName(client.getClientId());
+        request.addExtField(MqttConstant.ENODE_NAME, enodeName);
+        RemotingCommand response = null;
+        try {
+            response = this.enodeService.requestMQTTInfoSync(request);
+        } catch (Exception e) {
+            log.error("Transfer MQTT rootTopic2Clients info to Enode: {} failed, Err: {} ", enodeName, e);
+        }
+
+        // delete rootTopic2Clients
+        if (response != null) {
+            boolean client2SubResult;
+            try {
+                DeleteClientResponseHeader deleteClientResponseHeader = (DeleteClientResponseHeader) response.decodeCommandCustomHeader(DeleteClientResponseHeader.class);
+                client2SubResult = deleteClientResponseHeader.isOperationSuccess();
+                Subscription subscription = deleteClientResponseHeader.getSubscription();
+                boolean rootTopic2ClientsResult = true;
+                for (String rootTopic : subscription.getSubscriptionTable().keySet().stream().map(t -> t.split(MqttConstant.SUBSCRIPTION_SEPARATOR)[0]).distinct().collect(Collectors.toList())) {
+                    String enodeNameForRootTopic = this.getAllocateEnodeName(rootTopic);
+                    DeleteRootTopic2ClientRequestHeader deleteRootTopic2ClientRequestHeader = new DeleteRootTopic2ClientRequestHeader();
+                    deleteRootTopic2ClientRequestHeader.setClientId(client.getClientId());
+                    deleteRootTopic2ClientRequestHeader.setRootTopic(rootTopic);
+                    request = RemotingCommand.createRequestCommand(RequestCode.MQTT_DELETE_ROOTTOPIC2CLIENT, deleteRootTopic2ClientRequestHeader);
+                    request.addExtField(MqttConstant.ENODE_NAME, enodeNameForRootTopic);
+                    try {
+                        DeleteRootTopic2ClientResponseHeader deleteRootTopic2ClientResponseHeader = (DeleteRootTopic2ClientResponseHeader) enodeService.requestMQTTInfoSync(request).decodeCommandCustomHeader(DeleteClientResponseHeader.class);
+                        rootTopic2ClientsResult = rootTopic2ClientsResult && deleteRootTopic2ClientResponseHeader.isOperationSuccess();
+                    } catch (Exception ex) {
+                        log.error("Transfer MQTT rootTopic2Clients info to Enode: {} failed, Err: {} ", enodeName, ex);
+                    }
+                }
+                return client2SubResult && rootTopic2ClientsResult;
+            } catch (Exception e) {
+                log.error("Decode deleteClient response header failed, error:{}", e);
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public Map<String, Set<Client>> getSnodeAddress2Clients(String topic) {
+        final Map<String, Set<Client>> snodeAddress2Clients = new ConcurrentHashMap<>();
+        // step1: get rootTopic2Clients
+        String rootTopic = topic.split(MqttConstant.SUBSCRIPTION_SEPARATOR)[0];
+        GetRootTopic2ClientsRequestHeader requestHeader = new GetRootTopic2ClientsRequestHeader();
+        requestHeader.setRootTopic(rootTopic);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_GET_ROOTTOPIC2CLIENTS, requestHeader);
+        String enodeName = this.getAllocateEnodeName(rootTopic);
+        request.addExtField(MqttConstant.ENODE_NAME, enodeName);
+        try {
+            RemotingCommand response = this.enodeService.requestMQTTInfoSync(request);
+            GetRootTopic2ClientsResponseHeader responseHeader = (GetRootTopic2ClientsResponseHeader) response.decodeCommandCustomHeader(GetRootTopic2ClientsResponseHeader.class);
+            if (responseHeader.isOperationSuccess()) {
+
+                Set<String> clientsId = responseHeader.getClientsId();
+                HashMap<String, Set<String>> enodeName2ClientsIdSet = new HashMap<>();
+                for (String clientId : clientsId) {
+                    String enodeNameTmp = this.getAllocateEnodeName(clientId);
+                    if (enodeName2ClientsIdSet.get(enodeNameTmp) == null) {
+                        Set<String> clientsIdTmp = new HashSet<>();
+                        clientsIdTmp.add(clientId);
+                        enodeName2ClientsIdSet.put(enodeNameTmp, clientsIdTmp);
+                    } else {
+                        enodeName2ClientsIdSet.get(enodeNameTmp).add(clientId);
+                    }
+                }
+                // step2: get snodeAddress2ClientsId
+                final CountDownLatch countDownLatch = new CountDownLatch(enodeName2ClientsIdSet.size());
+                for (String enodeNameToSend : enodeName2ClientsIdSet.keySet()) {
+                    GetSnodeAddress2ClientsRequestHeader getSnodeAddress2ClientsRequestHeader = new GetSnodeAddress2ClientsRequestHeader();
+                    getSnodeAddress2ClientsRequestHeader.setClientsId(enodeName2ClientsIdSet.get(enodeNameToSend));
+                    getSnodeAddress2ClientsRequestHeader.setTopic(topic);
+                    RemotingCommand requestToSend = RemotingCommand.createRequestCommand(RequestCode.MQTT_GET_SNODEADDRESS2CLIENT, getSnodeAddress2ClientsRequestHeader);
+                    requestToSend.addExtField(MqttConstant.ENODE_NAME, enodeNameToSend);
+                    CompletableFuture<RemotingCommand> responseFuture = this.enodeService.requestMQTTInfoAsync(requestToSend);
+                    responseFuture.whenComplete((data, ex) -> {
+                        if (ex == null) {
+                            try {
+                                GetSnodeAddress2ClientsResponseHeader getSnodeAddress2ClientsResponseHeader = (GetSnodeAddress2ClientsResponseHeader) data.decodeCommandCustomHeader(GetSnodeAddress2ClientsResponseHeader.class);
+                                Map<String, Set<Client>> snodeAddress2ClientsTmp = getSnodeAddress2ClientsResponseHeader.getSnodeAddress2Clients();
+                                for (String snodeAddress : snodeAddress2ClientsTmp.keySet()) {
+                                    snodeAddress2Clients.getOrDefault(snodeAddress, new HashSet<>()).addAll(snodeAddress2ClientsTmp.get(snodeAddress));
+                                }
+                            } catch (Exception e) {
+                                log.error("Transfer MQTT snodeAddress2Clients info to Enode: {} failed, Err: {} ", enodeNameToSend, e);
+                            }
+
+                        }
+                        countDownLatch.countDown();
+                    });
+                }
+                countDownLatch.await();
+
+            }
+        } catch (Exception e) {
+            log.error("Transfer MQTT rootTopic2Clients info to Enode: {} failed, Err: {} ", enodeName, e);
+        }
+        return snodeAddress2Clients;
+    }
+
+    @Override public boolean clientUnsubscribe(Client client, List<String> topics) {
+        boolean result = false;
+        // step1: delete client2sub
+        ClientUnsubscribeRequestHeader requestHeader = new ClientUnsubscribeRequestHeader();
+        requestHeader.setClientId(client.getClientId());
+        requestHeader.setTopics(topics);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_CLIENT_UNSUBSRIBE, requestHeader);
+        request.addExtField(MqttConstant.ENODE_NAME, this.getAllocateEnodeName(client.getClientId()));
+        try {
+            RemotingCommand response = this.enodeService.requestMQTTInfoSync(request);
+            ClientUnsubscribeResponseHeader responseHeader = (ClientUnsubscribeResponseHeader) response.decodeCommandCustomHeader(ClientUnsubscribeResponseHeader.class);
+            result = responseHeader.isOperationSuccess();
+            // step2: delete rootTopic2Clients
+            if (responseHeader.isRootTopicDiffExists()) {
+                Set<String> rootTopicsDiff = responseHeader.getRootTopicsDiff();
+                for (String rootTopic : rootTopicsDiff) {
+                    DeleteRootTopic2ClientRequestHeader deleteRootTopic2ClientRequestHeader = new DeleteRootTopic2ClientRequestHeader();
+                    deleteRootTopic2ClientRequestHeader.setRootTopic(rootTopic);
+                    deleteRootTopic2ClientRequestHeader.setClientId(client.getClientId());
+                    RemotingCommand requestForDeleteRootTopic = RemotingCommand.createRequestCommand(RequestCode.MQTT_DELETE_ROOTTOPIC2CLIENT, deleteRootTopic2ClientRequestHeader);
+                    requestForDeleteRootTopic.addExtField(MqttConstant.ENODE_NAME, this.getAllocateEnodeName(rootTopic));
+                    try {
+                        this.enodeService.requestMQTTInfoSync(requestForDeleteRootTopic);
+                    } catch (Exception ex) {
+                        log.error("Transfer MQTT rootTopic2Clients info  failed, Err: {} ", ex);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("Transfer MQTT rootTopic2Clients info  failed, Err: {} ", e);
+        }
+
+        return result;
+    }
+
+    @Override public Subscription getSubscriptionByClientId(String clientId) {
+        GetSubscriptionByClientIdRequestHeader requestHeader = new GetSubscriptionByClientIdRequestHeader();
+        requestHeader.setClientId(clientId);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_GET_SUBSCRIPTION_BY_CLIENT_ID, requestHeader);
+        request.addExtField(MqttConstant.ENODE_NAME, this.getAllocateEnodeName(clientId));
+        try {
+            RemotingCommand response = this.enodeService.requestMQTTInfoSync(request);
+            GetSubscriptionByClientIdResponseHeader responseHeader = (GetSubscriptionByClientIdResponseHeader) response.decodeCommandCustomHeader(GetSubscriptionByClientIdResponseHeader.class);
+            return responseHeader.getSubscription();
+        } catch (Exception e) {
+            log.error("Get Subscription failed, error: {}", e);
+        }
+        return null;
+    }
+
+    @Override public Client getClientByClientId(String clientId) {
+        GetClientByClientIdRequestHeader requestHeader = new GetClientByClientIdRequestHeader();
+        requestHeader.setClientId(clientId);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.MQTT_GET_CLIENT_BY_CLIENTID_ID, requestHeader);
+        request.addExtField(MqttConstant.ENODE_NAME, this.getAllocateEnodeName(clientId));
+        try {
+            RemotingCommand response = this.enodeService.requestMQTTInfoSync(request);
+            GetClientByClientIdResponseHeader responseHeader = (GetClientByClientIdResponseHeader) response.decodeCommandCustomHeader(GetClientByClientIdResponseHeader.class);
+            return responseHeader.getClient();
+        } catch (Exception e) {
+            log.error("Get Client failed, error: {}", e);
+        }
+        return null;
+    }
+
+    private String getAllocateEnodeName(String key) {
+        String clusterName = defaultMqttMessageProcessor.getSnodeConfig().getClusterName();
+        Set<String> enodeNames = defaultMqttMessageProcessor.getNnodeService().getEnodeClusterInfo(clusterName);
+        String enodeName = allocatePersistentDataStrategy.allocate(key, enodeNames);
+        return enodeName;
+    }
+}
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/service/PersistService.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/service/PersistService.java
new file mode 100644
index 0000000..f25a9a2
--- /dev/null
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/persistence/service/PersistService.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.persistence.service;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.common.client.Client;
+import org.apache.rocketmq.common.client.Subscription;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
+
+public interface PersistService {
+    /**
+     * Init Persist Service
+     * @param processor MQTT messages processor
+     */
+    void init(DefaultMqttMessageProcessor processor);
+
+    boolean isClient2SubsriptionPersisted(Client client);
+
+    boolean addOrUpdateClient2Susbscription(Client client, Subscription subscription);
+
+    boolean deleteClient(Client client);
+
+    Map<String, Set<Client>> getSnodeAddress2Clients(String topic);
+
+    boolean clientUnsubscribe(Client client, List<String> topics);
+
+    Subscription getSubscriptionByClientId(String clientId);
+
+    Client getClientByClientId(String clientId);
+}
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java
index 0ccb354..17d968b 100644
--- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java
@@ -55,6 +55,8 @@ import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubrecMessageHandler;
 import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubrelMessageHandler;
 import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttSubscribeMessageHandler;
 import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttUnsubscribeMessagHandler;
+import org.apache.rocketmq.mqtt.persistence.PersistServiceFactory;
+import org.apache.rocketmq.mqtt.persistence.service.PersistService;
 import org.apache.rocketmq.mqtt.service.WillMessageService;
 import org.apache.rocketmq.mqtt.service.impl.MqttScheduledServiceImpl;
 import org.apache.rocketmq.mqtt.service.impl.WillMessageServiceImpl;
@@ -84,6 +86,7 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
     private EnodeService enodeService;
     private NnodeService nnodeService;
     private ScheduledService mqttScheduledService;
+    private PersistService persistService;
 
     private final OrderedExecutor orderedExecutor;
 
@@ -97,6 +100,8 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
         this.mqttRemotingServer = mqttRemotingServer;
         this.enodeService = enodeService;
         this.nnodeService = nnodeService;
+        this.persistService = PersistServiceFactory.getInstance().createPersistService();
+        this.persistService.init(this);
         this.mqttClientHousekeepingService = new MqttClientHousekeepingService(iotClientManager);
         this.mqttClientHousekeepingService.start(mqttConfig.getHouseKeepingInterval());
 
@@ -214,4 +219,8 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
     public OrderedExecutor getOrderedExecutor() {
         return orderedExecutor;
     }
+
+    public PersistService getPersistService() {
+        return persistService;
+    }
 }
diff --git a/mqtt/src/main/resources/META-INF/service/org.apache.rocketmq.mqtt.PersistService b/mqtt/src/main/resources/META-INF/service/org.apache.rocketmq.mqtt.PersistService
new file mode 100644
index 0000000..f4a4481
--- /dev/null
+++ b/mqtt/src/main/resources/META-INF/service/org.apache.rocketmq.mqtt.PersistService
@@ -0,0 +1 @@
+persistService=org.apache.rocketmq.mqtt.persistence.service.DefaultPersistService
\ No newline at end of file
diff --git a/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPublishMessageHandlerTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPublishMessageHandlerTest.java
index 2103d31..4255f84 100644
--- a/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPublishMessageHandlerTest.java
+++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPublishMessageHandlerTest.java
@@ -127,9 +127,7 @@ public class MqttPublishMessageHandlerTest {
         TopicRouteData topicRouteData = buildTopicRouteData();
         Mockito.when(nnodeService.getTopicRouteDataByTopic(anyString(), anyBoolean())).thenReturn(topicRouteData);
         CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
-//        RemotingCommand response = Mockito.mock(RemotingCommand.class);
-        Mockito.when(this.defaultMqttMessageProcessor.getEnodeService().sendMessage(any(RemotingChannel.class), anyString(), any(RemotingCommand.class))).thenReturn(future);
-//        doAnswer(mock -> future.complete(response)).when(this.defaultMqttMessageProcessor.getEnodeService().sendMessage(any(RemotingChannel.class), anyString(), any(RemotingCommand.class)));
+        Mockito.when(this.defaultMqttMessageProcessor.getEnodeService().sendMessage(any(), anyString(), any(RemotingCommand.class))).thenReturn(future);
         RemotingCommand remotingCommand = mqttPublishMessageHandler.handleMessage(mqttPublishMessage, remotingChannel);
         assert remotingCommand != null;
         MqttHeader mqttHeader = (MqttHeader) remotingCommand.readCustomHeader();
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
index 0eb1730..3c2add0 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
@@ -35,4 +35,5 @@ public class MqttEncodeDecodeUtil {
         final String json = new String(body, Charset.forName("UTF-8"));
         return GSON.fromJson(json, classOfT);
     }
+
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
index 726251d..47166d1 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
@@ -20,14 +20,17 @@ import io.netty.channel.ChannelHandlerContext;
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.service.EnodeService;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.netty.CodecHelper;
 import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.common.service.EnodeService;
 
 public class LocalEnodeServiceImpl implements EnodeService {
 
@@ -147,4 +150,28 @@ public class LocalEnodeServiceImpl implements EnodeService {
         log.info("un");
         return this.brokerController.getAdminProcessor().lockBatchMQ(ctx, request);
     }
+
+    @Override public RemotingCommand requestMQTTInfoSync(
+        RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        try {
+            return this.brokerController.getMqttProcessor().processRequest(null, request);
+        } catch (Exception e) {
+            log.error("[Local]RequestMQTTInfo failed, error: {}", e);
+        }
+        return null;
+    }
+
+    @Override public CompletableFuture<RemotingCommand> requestMQTTInfoAsync(
+        RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        CompletableFuture<RemotingCommand> completableFuture = new CompletableFuture<>();
+        try {
+            RemotingCommand remotingCommand = this.brokerController.getMqttProcessor().processRequest(null, request);
+            CodecHelper.encodeHeader(remotingCommand);
+            completableFuture.complete(remotingCommand);
+        } catch (Exception ex) {
+            log.error("[Local]RequestMQTTInfo failed, error: {}", ex);
+            completableFuture.completeExceptionally(ex);
+        }
+        return completableFuture;
+    }
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java
index 66ee7e2..2e9c9d1 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.common.service.EnodeService;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.constant.MqttConstant;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -295,4 +296,16 @@ public class RemoteEnodeServiceImpl implements EnodeService {
         return this.snodeController.getRemotingClient().invokeSync(address,
             request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
     }
+
+    @Override
+    public RemotingCommand requestMQTTInfoSync(
+        final RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        return this.transferToEnode(request);
+    }
+
+    @Override public CompletableFuture<RemotingCommand> requestMQTTInfoAsync(
+        RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        return this.sendMessage(null, request.getExtFields().get(MqttConstant.ENODE_NAME), request);
+    }
+
 }
diff --git a/store/pom.xml b/store/pom.xml
index d90e7fb..4af4e68 100644
--- a/store/pom.xml
+++ b/store/pom.xml
@@ -52,6 +52,11 @@
             <artifactId>jna</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.rocksdb</groupId>
+            <artifactId>rocksdbjni</artifactId>
+            <version>5.5.1</version>
+        </dependency>
+        <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
             <scope>test</scope>
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMQTTInfoStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMQTTInfoStore.java
new file mode 100644
index 0000000..6861f5f
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMQTTInfoStore.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import java.io.File;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+public class DefaultMQTTInfoStore implements MQTTInfoStore {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private RocksDB db;
+    private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
+    private String storePathRocksDB = storePathRootDir
+        + File.separator + "RocksDB";
+
+    @Override public void load() {
+        RocksDB.loadLibrary();
+    }
+
+    @Override public void start() throws Exception {
+
+        try (final Options options = new Options().setCreateIfMissing(true)) {
+            if (!Files.isSymbolicLink(Paths.get(storePathRocksDB))) {
+                Files.createDirectories(Paths.get(storePathRocksDB));
+            }
+            db = RocksDB.open(options, storePathRocksDB);
+        } catch (RocksDBException e) {
+            log.error("Open RocksDb failed. Error:{}", e);
+            throw e;
+        }
+
+    }
+
+    @Override public boolean putData(String key, String value) {
+        try {
+            db.put(key.getBytes(), value.getBytes());
+            return true;
+        } catch (Exception e) {
+            log.error("RocksDB put data failed. Error:{}", e);
+            return false;
+        }
+    }
+
+    @Override public String getValue(String key) {
+        try {
+            byte[] value = db.get(key.getBytes());
+            if (value != null) {
+                return new String(value, Charset.forName("UTF-8"));
+            } else {
+                return null;
+            }
+        } catch (Exception e) {
+            log.error("RocksDB get value failed. Error:{}", e);
+            return null;
+        }
+    }
+
+    @Override public boolean deleteData(String key) {
+        boolean result = false;
+        try {
+            db.delete(key.getBytes());
+            result = true;
+        } catch (Exception e) {
+            log.error("RocksDB delete data failed. Error:{}", e);
+        }
+        return result;
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java b/store/src/main/java/org/apache/rocketmq/store/MQTTInfoStore.java
similarity index 57%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
copy to store/src/main/java/org/apache/rocketmq/store/MQTTInfoStore.java
index 0eb1730..6603e61 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/MqttEncodeDecodeUtil.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MQTTInfoStore.java
@@ -15,24 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.remoting.util;
+package org.apache.rocketmq.store;
 
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
+public interface MQTTInfoStore {
 
-public class MqttEncodeDecodeUtil {
-    private static final Gson GSON = new Gson();
+    void load();
 
-    public static byte[] encode(Object object) {
-        final String json = GSON.toJson(object);
-        if (json != null) {
-            return json.getBytes(Charset.forName("UTF-8"));
-        }
-        return null;
-    }
+    void start() throws Exception;
 
-    public static <T> Object decode(byte[] body, Class<T> classOfT) {
-        final String json = new String(body, Charset.forName("UTF-8"));
-        return GSON.fromJson(json, classOfT);
-    }
+    boolean putData(String key, String value);
+
+    String getValue(String key);
+
+    boolean deleteData(String key);
 }


Mime
View raw message