rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq] branch snode updated: add new module rocketmq-mqtt
Date Fri, 15 Mar 2019 14:05:42 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/snode by this push:
     new 3629a0a  add new module rocketmq-mqtt
     new a3c17c6  Merge pull request #1067 from xiangwangcheng/snode
3629a0a is described below

commit 3629a0a7171a2aad381f715e6250feec1ee3b779
Author: chengxiangwang <chengxiangwang@cmss.chinamobile.com>
AuthorDate: Thu Feb 28 15:54:19 2019 +0800

    add new module rocketmq-mqtt
---
 .../org/apache/rocketmq/common/MqttConfig.java     | 10 +++
 .../org/apache/rocketmq/common}/client/Client.java |  3 +-
 .../rocketmq/common}/client/ClientManager.java     |  2 +-
 .../rocketmq/common/client}/ClientManagerImpl.java | 10 +--
 .../apache/rocketmq/common/client}/ClientRole.java | 19 ++++-
 .../rocketmq/common/client}/Subscription.java      |  4 +-
 .../rocketmq/common/constant/LoggerName.java       |  1 +
 distribution/conf/snode.conf                       |  2 +-
 distribution/release.xml                           |  1 +
 .../rocketmq/example/mqtt/MqttSampleConsumer.java  |  6 +-
 {snode => mqtt}/pom.xml                            | 55 ++++++-------
 .../mqtt/client}/IOTClientManagerImpl.java         | 21 +++--
 .../mqtt/client/MqttClientHousekeepingService.java | 31 +++-----
 .../rocketmq/mqtt}/constant/MqttConstant.java      |  7 +-
 .../mqtt}/exception/MqttConnectException.java      |  2 +-
 .../mqtt}/exception/WrongMessageTypeException.java |  2 +-
 .../rocketmq/mqtt}/mqtthandler/MessageHandler.java |  2 +-
 .../impl}/MqttConnectMessageHandler.java           | 42 +++++-----
 .../impl}/MqttDisconnectMessageHandler.java        | 22 +++---
 .../mqtthandler/impl}/MqttMessageForwarder.java    | 21 +++--
 .../mqtt/mqtthandler/impl}/MqttMessageSender.java  | 17 ++--
 .../impl}/MqttPingreqMessageHandler.java           | 25 +++---
 .../impl}/MqttPubackMessageHandler.java            | 24 +++---
 .../impl}/MqttPubcompMessageHandler.java           | 18 +++--
 .../impl}/MqttPublishMessageHandler.java           | 21 +++--
 .../impl}/MqttPubrecMessageHandler.java            | 18 +++--
 .../impl}/MqttPubrelMessageHandler.java            | 17 ++--
 .../impl}/MqttSubscribeMessageHandler.java         | 29 +++----
 .../impl}/MqttUnsubscribeMessagHandler.java        | 28 +++----
 .../processor/DefaultMqttMessageProcessor.java     | 92 +++++++++++++++-------
 .../rocketmq/mqtt}/service/WillMessageService.java |  2 +-
 .../mqtt}/service/impl/MqttPushServiceImpl.java    | 32 ++++----
 .../mqtt}/service/impl/WillMessageServiceImpl.java |  9 +--
 .../org/apache/rocketmq/mqtt}/util/MqttUtil.java   |  4 +-
 .../mqtt}/DefaultMqttMessageProcessorTest.java     | 15 ++--
 .../mqtt}/MqttConnectMessageHandlerTest.java       | 13 +--
 .../mqtt}/MqttDisconnectMessageHandlerTest.java    | 22 +++---
 .../rocketmq/mqtt}/WillMessageServiceImplTest.java | 17 ++--
 pom.xml                                            |  6 ++
 snode/pom.xml                                      |  4 +
 .../org/apache/rocketmq/snode/SnodeController.java | 45 +++++------
 .../snode/client/ClientHousekeepingService.java    | 10 ++-
 .../rocketmq/snode/client/SubscriptionManager.java |  2 +-
 .../snode/client/impl/ConsumerManagerImpl.java     |  1 +
 .../snode/client/impl/ProducerManagerImpl.java     |  1 +
 .../snode/client/impl/SubscriptionManagerImpl.java |  1 +
 .../rocketmq/snode/constant/SnodeConstant.java     |  4 +-
 .../snode/processor/HeartbeatProcessor.java        |  4 +-
 .../snode/processor/PullMessageProcessor.java      |  2 +-
 .../snode/service/impl/PushServiceImpl.java        |  4 +-
 50 files changed, 410 insertions(+), 340 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java b/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java
index fc9128d..2966a3b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java
@@ -50,6 +50,8 @@ public class MqttConfig {
     @ImportantField
     private boolean aclEnable = false;
 
+    private long houseKeepingInterval = 10 * 1000;
+
     public int getListenPort() {
         return listenPort;
     }
@@ -130,4 +132,12 @@ public class MqttConfig {
     public void setPushMqttMessageThreadPoolQueueCapacity(int pushMqttMessageThreadPoolQueueCapacity) {
         this.pushMqttMessageThreadPoolQueueCapacity = pushMqttMessageThreadPoolQueueCapacity;
     }
+
+    public long getHouseKeepingInterval() {
+        return houseKeepingInterval;
+    }
+
+    public void setHouseKeepingInterval(long houseKeepingInterval) {
+        this.houseKeepingInterval = houseKeepingInterval;
+    }
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java b/common/src/main/java/org/apache/rocketmq/common/client/Client.java
similarity index 98%
rename from snode/src/main/java/org/apache/rocketmq/snode/client/Client.java
rename to common/src/main/java/org/apache/rocketmq/common/client/Client.java
index abaf30d..1719b6b 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/Client.java
+++ b/common/src/main/java/org/apache/rocketmq/common/client/Client.java
@@ -14,13 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.snode.client;
+package org.apache.rocketmq.common.client;
 
 import java.util.Objects;
 import java.util.Set;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.serialize.LanguageCode;
-import org.apache.rocketmq.snode.client.impl.ClientRole;
 
 public class Client {
 
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientManager.java b/common/src/main/java/org/apache/rocketmq/common/client/ClientManager.java
similarity index 96%
rename from snode/src/main/java/org/apache/rocketmq/snode/client/ClientManager.java
rename to common/src/main/java/org/apache/rocketmq/common/client/ClientManager.java
index 600dd16..c42244b 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientManager.java
+++ b/common/src/main/java/org/apache/rocketmq/common/client/ClientManager.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.snode.client;
+package org.apache.rocketmq.common.client;
 
 import java.util.List;
 import java.util.Set;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java b/common/src/main/java/org/apache/rocketmq/common/client/ClientManagerImpl.java
similarity index 96%
rename from snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java
rename to common/src/main/java/org/apache/rocketmq/common/client/ClientManagerImpl.java
index d0cddce..e39ca70 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/client/ClientManagerImpl.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.snode.client.impl;
+package org.apache.rocketmq.common.client;
 
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -33,8 +33,6 @@ import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
 import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
-import org.apache.rocketmq.snode.client.Client;
-import org.apache.rocketmq.snode.client.ClientManager;
 
 public abstract class ClientManagerImpl implements ClientManager {
 
@@ -45,7 +43,7 @@ public abstract class ClientManagerImpl implements ClientManager {
         .newSingleThreadScheduledExecutor(
             new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
 
-    private final ConcurrentHashMap<String/*Producer or Consumer Group*/, ConcurrentHashMap<RemotingChannel, Client>> groupClientTable = new ConcurrentHashMap<>(
+    private final ConcurrentHashMap<String/*Producer or Consumer Group*/, ConcurrentHashMap<RemotingChannel, Client>> groupClientTable = new ConcurrentHashMap<String, ConcurrentHashMap<RemotingChannel, Client>>(
         1024);
 
     public abstract void onClosed(String group, RemotingChannel remotingChannel);
@@ -176,7 +174,7 @@ public abstract class ClientManagerImpl implements ClientManager {
 
     public List<RemotingChannel> getChannels(String groupId) {
         if (groupId != null) {
-            List<RemotingChannel> result = new ArrayList<>();
+            List<RemotingChannel> result = new ArrayList<RemotingChannel>();
             ConcurrentHashMap channelsMap = this.groupClientTable.get(groupId);
             if (channelsMap != null) {
                 result.addAll(this.groupClientTable.get(groupId).keySet());
@@ -189,7 +187,7 @@ public abstract class ClientManagerImpl implements ClientManager {
 
     @Override
     public List<String> getAllClientId(String groupId) {
-        List<String> result = new ArrayList<>();
+        List<String> result = new ArrayList<String>();
         Map<RemotingChannel, Client> channelClientMap = this.groupClientTable.get(groupId);
         if (channelClientMap != null) {
             Iterator<Map.Entry<RemotingChannel, Client>> it = channelClientMap.entrySet()
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientRole.java b/common/src/main/java/org/apache/rocketmq/common/client/ClientRole.java
similarity index 50%
rename from snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientRole.java
rename to common/src/main/java/org/apache/rocketmq/common/client/ClientRole.java
index 9967976..5abe69d 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientRole.java
+++ b/common/src/main/java/org/apache/rocketmq/common/client/ClientRole.java
@@ -1,4 +1,21 @@
-package org.apache.rocketmq.snode.client.impl;/*
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.client;/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/Subscription.java b/common/src/main/java/org/apache/rocketmq/common/client/Subscription.java
similarity index 96%
rename from snode/src/main/java/org/apache/rocketmq/snode/client/impl/Subscription.java
rename to common/src/main/java/org/apache/rocketmq/common/client/Subscription.java
index 895d4c9..3c524b3 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/Subscription.java
+++ b/common/src/main/java/org/apache/rocketmq/common/client/Subscription.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.snode.client.impl;
+package org.apache.rocketmq.common.client;
 
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -27,7 +27,7 @@ public class Subscription {
     private volatile MessageModel messageModel;
     private volatile ConsumeFromWhere consumeFromWhere;
     private volatile boolean cleanSession;
-    ConcurrentHashMap<String/*Topic*/, SubscriptionData> subscriptionTable = new ConcurrentHashMap<>();
+    ConcurrentHashMap<String/*Topic*/, SubscriptionData> subscriptionTable = new ConcurrentHashMap<String, SubscriptionData>();
     private volatile long lastUpdateTimestamp = System.currentTimeMillis();
 
     public SubscriptionData getSubscriptionData(String topic) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
index 48295a3..637e112 100644
--- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
@@ -38,4 +38,5 @@ public class LoggerName {
     public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark";
     public static final String FILTER_LOGGER_NAME = "RocketmqFilter";
     public static final String SNODE_LOGGER_NAME = "RocketmqSnode";
+    public static final String MQTT_LOGGER_NAME = "RocketmqMQTT";
 }
diff --git a/distribution/conf/snode.conf b/distribution/conf/snode.conf
index 649057d..c710b8c 100644
--- a/distribution/conf/snode.conf
+++ b/distribution/conf/snode.conf
@@ -15,4 +15,4 @@
 namesrvAddr=localhost:9876
 clusterName = DefaultCluster
 snodeName = snode-a
-
+embeddedModeEnable = false
diff --git a/distribution/release.xml b/distribution/release.xml
index 181a99e..60ca331 100644
--- a/distribution/release.xml
+++ b/distribution/release.xml
@@ -67,6 +67,7 @@
                 <include>org.apache.rocketmq:rocketmq-namesrv</include>
                 <include>org.apache.rocketmq:rocketmq-example</include>
                 <include>org.apache.rocketmq:rocketmq-openmessaging</include>
+                <include>org.apache.rocketmq:rocketmq-mqtt</include>
                 <include>org.apache.rocketmq:rocketmq-snode</include>
             </includes>
             <binaries>
diff --git a/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleConsumer.java b/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleConsumer.java
index 8c52a8f..b2c6bb3 100644
--- a/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/mqtt/MqttSampleConsumer.java
@@ -50,17 +50,17 @@ public class MqttSampleConsumer {
                 log.info("Connected");
                 sampleClient.setCallback(new MqttCallback() {
                     @Override public void connectionLost(Throwable throwable) {
-                        System.out.println("connection lost." + throwable.getLocalizedMessage());
+                        log.info("connection lost." + throwable.getLocalizedMessage());
                     }
 
                     @Override public void messageArrived(String s, MqttMessage message) throws Exception {
-                        System.out.println(message.toString());
+                        log.info(message.toString());
 //                        System.exit(0);
                     }
 
                     @Override public void deliveryComplete(IMqttDeliveryToken token) {
                         try {
-                            System.out.println("delivery complete." + token.getMessage().toString());
+                            log.info("delivery complete." + token.getMessage().toString());
                         } catch (MqttException e) {
                             e.printStackTrace();
                         }
diff --git a/snode/pom.xml b/mqtt/pom.xml
similarity index 69%
copy from snode/pom.xml
copy to mqtt/pom.xml
index da1e3f3..01b19b8 100644
--- a/snode/pom.xml
+++ b/mqtt/pom.xml
@@ -1,33 +1,35 @@
 <!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
   -->
-
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
         <version>4.4.1-SNAPSHOT</version>
     </parent>
+    <artifactId>rocketmq-mqtt</artifactId>
+    <name>rocketmq-mqtt ${project.version}</name>
 
-    <modelVersion>4.0.0</modelVersion>
-    <packaging>jar</packaging>
-    <artifactId>rocketmq-snode</artifactId>
-    <name>rocketmq-snode ${project.version}</name>
-
+    <url>http://maven.apache.org</url>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
     <dependencies>
         <dependency>
             <groupId>${project.groupId}</groupId>
@@ -98,17 +100,4 @@
             <artifactId>rocketmq-broker</artifactId>
         </dependency>
     </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <artifactId>maven-surefire-plugin</artifactId>
-                <version>2.19.1</version>
-                <configuration>
-                    <forkCount>1</forkCount>
-                    <reuseForks>false</reuseForks>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
 </project>
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/IOTClientManagerImpl.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java
similarity index 90%
rename from snode/src/main/java/org/apache/rocketmq/snode/client/impl/IOTClientManagerImpl.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java
index 66ec2f7..5f56d16 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/IOTClientManagerImpl.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java
@@ -14,34 +14,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.snode.client.impl;
+package org.apache.rocketmq.mqtt.client;
 
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.client.Client;
+import org.apache.rocketmq.common.client.ClientManagerImpl;
+import org.apache.rocketmq.common.client.Subscription;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.RemotingChannel;
-import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.client.Client;
 
 public class IOTClientManagerImpl extends ClientManagerImpl {
 
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
 
     public static final String IOT_GROUP = "IOT_GROUP";
-    private final SnodeController snodeController;
 
     private final ConcurrentHashMap<String/*root topic*/, ConcurrentHashMap<Client, Set<SubscriptionData>>> topic2SubscriptionTable = new ConcurrentHashMap<>(
         1024);
     private final ConcurrentHashMap<String/*clientId*/, Subscription> clientId2Subscription = new ConcurrentHashMap<>(1024);
 
-    public IOTClientManagerImpl(SnodeController snodeController) {
-        this.snodeController = snodeController;
+    public IOTClientManagerImpl() {
     }
 
     public void onUnsubscribe(Client client, List<String> topics) {
@@ -89,6 +88,9 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
                 }
                 iterator1.remove();
             }
+            if (next.getValue() == null || next.getValue().size() == 0) {
+                iterator.remove();
+            }
         }
         //remove offline messages
     }
@@ -97,10 +99,6 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
         return clientId2Subscription.get(clientId);
     }
 
-    public SnodeController getSnodeController() {
-        return snodeController;
-    }
-
     public ConcurrentHashMap<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> getTopic2SubscriptionTable() {
         return topic2SubscriptionTable;
     }
@@ -112,5 +110,4 @@ public class IOTClientManagerImpl extends ClientManagerImpl {
     public void initSubscription(String clientId, Subscription subscription) {
         clientId2Subscription.put(clientId, subscription);
     }
-
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MqttClientHousekeepingService.java
similarity index 74%
copy from snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
copy to mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MqttClientHousekeepingService.java
index 899a9ef..fd2fe0f 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MqttClientHousekeepingService.java
@@ -14,48 +14,41 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.snode.client;
+package org.apache.rocketmq.mqtt.client;
 
 import io.netty.channel.Channel;
 import io.netty.util.Attribute;
+import org.apache.rocketmq.common.client.Client;
+import org.apache.rocketmq.common.client.ClientManager;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.constant.MqttConstant;
 import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
-import org.apache.rocketmq.snode.constant.SnodeConstant;
 
-public class ClientHousekeepingService implements ChannelEventListener {
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
-    private final ClientManager producerManager;
-    private final ClientManager consumerManager;
+public class MqttClientHousekeepingService implements ChannelEventListener {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
     private final ClientManager iotClientManager;
 
-    public ClientHousekeepingService(final ClientManager producerManager,
-        final ClientManager consumerManager, final ClientManager iotClientManager) {
-        this.producerManager = producerManager;
-        this.consumerManager = consumerManager;
+    public MqttClientHousekeepingService(final ClientManager iotClientManager) {
         this.iotClientManager = iotClientManager;
     }
 
     public void start(long interval) {
-        this.producerManager.startScan(interval);
-        this.consumerManager.startScan(interval);
-//        this.iotClientManager.startScan(interval);
+        this.iotClientManager.startScan(interval);
     }
 
     public void shutdown() {
-        this.producerManager.shutdown();
-        this.consumerManager.shutdown();
         this.iotClientManager.shutdown();
     }
 
     private Client getClient(RemotingChannel remotingChannel) {
         if (remotingChannel instanceof NettyChannelImpl) {
             Channel channel = ((NettyChannelImpl) remotingChannel).getChannel();
-            Attribute<Client> clientAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ATTRIBUTE_KEY);
+            Attribute<Client> clientAttribute = channel.attr(MqttConstant.MQTT_CLIENT_ATTRIBUTE_KEY);
             if (clientAttribute != null) {
                 Client client = clientAttribute.get();
                 return client;
@@ -69,12 +62,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
         Client client = getClient(remotingChannel);
         if (client != null) {
             switch (client.getClientRole()) {
-                case Consumer:
-                    this.consumerManager.onClose(client.getGroups(), remotingChannel);
-                    return;
-                case Producer:
-                    this.producerManager.onClose(client.getGroups(), remotingChannel);
-                    return;
                 case IOTCLIENT:
                     this.iotClientManager.onClose(client.getGroups(), remotingChannel);
                     return;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/constant/MqttConstant.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java
similarity index 78%
rename from snode/src/main/java/org/apache/rocketmq/snode/constant/MqttConstant.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java
index 2d2ecaa..9e803a3 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/constant/MqttConstant.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java
@@ -15,11 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.constant;
+package org.apache.rocketmq.mqtt.constant;
+
+import io.netty.util.AttributeKey;
+import org.apache.rocketmq.common.client.Client;
 
 public class MqttConstant {
     public static final int MAX_SUPPORTED_QOS = 0;
     public static final String SUBSCRIPTION_FLAG_PLUS = "+";
     public static final String SUBSCRIPTION_FLAG_SHARP = "#";
     public static final String SUBSCRIPTION_SEPARATOR = "/";
+    public static final long DEFAULT_TIMEOUT_MILLS = 3000L;
+    public static final AttributeKey<Client> MQTT_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("mqtt.client");
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/exception/MqttConnectException.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/exception/MqttConnectException.java
similarity index 95%
rename from snode/src/main/java/org/apache/rocketmq/snode/exception/MqttConnectException.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/exception/MqttConnectException.java
index f636842..93eb733 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/exception/MqttConnectException.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/exception/MqttConnectException.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.exception;
+package org.apache.rocketmq.mqtt.exception;
 
 public class MqttConnectException extends RuntimeException {
 
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/exception/WrongMessageTypeException.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/exception/WrongMessageTypeException.java
similarity index 95%
rename from snode/src/main/java/org/apache/rocketmq/snode/exception/WrongMessageTypeException.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/exception/WrongMessageTypeException.java
index 355c7f7..d008d78 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/exception/WrongMessageTypeException.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/exception/WrongMessageTypeException.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.exception;
+package org.apache.rocketmq.mqtt.exception;
 
 public class WrongMessageTypeException extends RuntimeException {
 
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java
similarity index 95%
rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MessageHandler.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java
index 2415316..fa0be8b 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MessageHandler.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/MessageHandler.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.processor.mqtthandler;
+package org.apache.rocketmq.mqtt.mqtthandler;
 
 import io.netty.handler.codec.mqtt.MqttMessage;
 import org.apache.rocketmq.remoting.RemotingChannel;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttConnectMessageHandler.java
similarity index 86%
rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttConnectMessageHandler.java
index 96f2843..ecbb124 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttConnectMessageHandler.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.processor.mqtthandler;
+package org.apache.rocketmq.mqtt.mqtthandler.impl;
 
 import io.netty.handler.codec.mqtt.MqttConnectMessage;
 import io.netty.handler.codec.mqtt.MqttConnectPayload;
@@ -25,32 +25,32 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
 import io.netty.handler.codec.mqtt.MqttQoS;
 import java.util.HashSet;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.client.Client;
+import org.apache.rocketmq.common.client.ClientManager;
+import org.apache.rocketmq.common.client.ClientRole;
+import org.apache.rocketmq.common.client.Subscription;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.mqtt.WillMessage;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
+import org.apache.rocketmq.mqtt.exception.MqttConnectException;
+import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
+import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
-import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.client.Client;
-import org.apache.rocketmq.snode.client.ClientManager;
-import org.apache.rocketmq.snode.client.impl.ClientRole;
-import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
-import org.apache.rocketmq.snode.client.impl.Subscription;
-import org.apache.rocketmq.snode.exception.MqttConnectException;
-import org.apache.rocketmq.snode.exception.WrongMessageTypeException;
 
 public class MqttConnectMessageHandler implements MessageHandler {
-
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
-    private final SnodeController snodeController;
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
     private static final int MIN_AVAILABLE_VERSION = 3;
     private static final int MAX_AVAILABLE_VERSION = 4;
+    private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
 
-    public MqttConnectMessageHandler(SnodeController snodeController) {
-        this.snodeController = snodeController;
+    public MqttConnectMessageHandler(DefaultMqttMessageProcessor defaultMqttMessageProcessor) {
+        this.defaultMqttMessageProcessor = defaultMqttMessageProcessor;
     }
 
     @Override
@@ -97,7 +97,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
             remotingChannel.close();
             return null;
         }
-        IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager();
+        IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager();
         //set Session Present according to whether the server has already stored Session State for the clientId
         if (mqttConnectMessage.variableHeader().isCleanSession()) {
             mqttHeader.setSessionPresent(false);
@@ -120,7 +120,11 @@ public class MqttConnectMessageHandler implements MessageHandler {
         Client client = new Client();
         client.setClientId(payload.clientIdentifier());
         client.setClientRole(ClientRole.IOTCLIENT);
-        client.setGroups(new HashSet<String>(){{add("IOT_GROUP");}});
+        client.setGroups(new HashSet<String>() {
+            {
+                add("IOT_GROUP");
+            }
+        });
         client.setConnected(true);
         client.setRemotingChannel(remotingChannel);
         client.setLastUpdateTimestamp(System.currentTimeMillis());
@@ -138,7 +142,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
             willMessage.setWillTopic(payload.willTopic());
             willMessage.setRetain(mqttConnectMessage.variableHeader().isWillRetain());
             willMessage.setBody(payload.willMessageInBytes());
-            snodeController.getWillMessageService().saveWillMessage(client.getClientId(), willMessage);
+            defaultMqttMessageProcessor.getWillMessageService().saveWillMessage(client.getClientId(), willMessage);
         }
 
         mqttHeader.setConnectReturnCode(MqttConnectReturnCode.CONNECTION_ACCEPTED.name());
@@ -148,7 +152,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
     }
 
     private boolean alreadyStoredSession(String clientId) {
-        IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager();
+        IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager();
         Subscription subscription = iotClientManager.getSubscriptionByClientId(clientId);
         if (subscription == null) {
             return false;
@@ -164,7 +168,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
     }
 
     private boolean isConnected(RemotingChannel remotingChannel, String clientId) {
-        ClientManager iotClientManager = snodeController.getIotClientManager();
+        ClientManager iotClientManager = defaultMqttMessageProcessor.getIotClientManager();
         Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
         if (client != null && client.getClientId().equals(clientId) && client.isConnected()) {
             return true;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttDisconnectMessageHandler.java
similarity index 78%
rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttDisconnectMessageHandler.java
index 66affac..5f21a4b 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttDisconnectMessageHandler.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttDisconnectMessageHandler.java
@@ -15,27 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.processor.mqtthandler;
+package org.apache.rocketmq.mqtt.mqtthandler.impl;
 
 import io.netty.handler.codec.mqtt.MqttFixedHeader;
 import io.netty.handler.codec.mqtt.MqttMessage;
 import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.rocketmq.common.client.Client;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
+import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.client.Client;
-import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
 
 public class MqttDisconnectMessageHandler implements MessageHandler {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
+    private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
 
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
-    private final SnodeController snodeController;
-
-    public MqttDisconnectMessageHandler(SnodeController snodeController) {
-        this.snodeController = snodeController;
+    public MqttDisconnectMessageHandler(DefaultMqttMessageProcessor processor) {
+        this.defaultMqttMessageProcessor = processor;
     }
 
     /**
@@ -56,10 +56,10 @@ public class MqttDisconnectMessageHandler implements MessageHandler {
         }
 
         //discard will message associated with the current connection(client)
-        Client client = snodeController.getIotClientManager()
+        Client client = defaultMqttMessageProcessor.getIotClientManager()
             .getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
         if (client != null) {
-            snodeController.getWillMessageService().deleteWillMessage(client.getClientId());
+            defaultMqttMessageProcessor.getWillMessageService().deleteWillMessage(client.getClientId());
         }
         client.setConnected(false);
         if (remotingChannel.isActive()) {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttMessageForwarder.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwarder.java
similarity index 66%
rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttMessageForwarder.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwarder.java
index eaba1a4..7cc9d02 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttMessageForwarder.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageForwarder.java
@@ -15,24 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.processor.mqtthandler;
+package org.apache.rocketmq.mqtt.mqtthandler.impl;
 
 import io.netty.handler.codec.mqtt.MqttMessage;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.SnodeController;
 
 public class MqttMessageForwarder implements MessageHandler {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
+    private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
 
-    private final SnodeController snodeController;
-/*    private SubscriptionStore subscriptionStore;
-
-    public MqttMessageForwarder(SubscriptionStore subscriptionStore) {
-        this.subscriptionStore = subscriptionStore;
-    }*/
-
-    public MqttMessageForwarder(SnodeController snodeController) {
-        this.snodeController = snodeController;
+    public MqttMessageForwarder(DefaultMqttMessageProcessor processor) {
+        this.defaultMqttMessageProcessor = processor;
     }
 
     /**
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttMessageSender.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageSender.java
similarity index 66%
rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttMessageSender.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageSender.java
index 624a4bf..e0e2ddf 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttMessageSender.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttMessageSender.java
@@ -15,20 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.processor.mqtthandler;
+package org.apache.rocketmq.mqtt.mqtthandler.impl;
 
 import io.netty.handler.codec.mqtt.MqttMessage;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.SnodeController;
 
 public class MqttMessageSender implements MessageHandler {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
+    private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
 
-    private final SnodeController snodeController;
-
-    public MqttMessageSender(SnodeController snodeController) {
-        this.snodeController = snodeController;
+    public MqttMessageSender(DefaultMqttMessageProcessor processor) {
+        this.defaultMqttMessageProcessor = processor;
     }
+
     /**
      * send the PUBLISH message to client
      *
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java
similarity index 58%
rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java
index 1f75fcc..d867476 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPingreqMessageHandler.java
@@ -15,29 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.processor.mqtthandler;
+package org.apache.rocketmq.mqtt.mqtthandler.impl;
 
 import io.netty.handler.codec.mqtt.MqttMessage;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.SnodeController;
 
 public class MqttPingreqMessageHandler implements MessageHandler {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
+    private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
 
-    private final SnodeController snodeController;
-
-    public MqttPingreqMessageHandler(SnodeController snodeController) {
-        this.snodeController = snodeController;
+    public MqttPingreqMessageHandler(DefaultMqttMessageProcessor processor) {
+        this.defaultMqttMessageProcessor = processor;
     }
 
     /**
-     * handle the PINGREQ message from client
-     * <ol>
-     * <li>check client exists</li>
-     * <li>check client is connected</li>
-     * <li>generate the PINGRESP message</li>
-     * <li>send the PINGRESP message to the client</li>
-     * </ol>
+     * handle the PINGREQ message from client <ol> <li>check client exists</li> <li>check client is connected</li>
+     * <li>generate the PINGRESP message</li> <li>send the PINGRESP message to the client</li> </ol>
      *
      * @param message
      * @return
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubackMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubackMessageHandler.java
similarity index 59%
rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubackMessageHandler.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubackMessageHandler.java
index 69dee26..71d9458 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubackMessageHandler.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubackMessageHandler.java
@@ -15,26 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.processor.mqtthandler;
+package org.apache.rocketmq.mqtt.mqtthandler.impl;
 
 import io.netty.handler.codec.mqtt.MqttMessage;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.SnodeController;
 
 public class MqttPubackMessageHandler implements MessageHandler {
 
-    private final SnodeController snodeController;
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
+    private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
 
-    public MqttPubackMessageHandler(SnodeController snodeController) {
-        this.snodeController = snodeController;
+    public MqttPubackMessageHandler(DefaultMqttMessageProcessor processor) {
+        this.defaultMqttMessageProcessor = processor;
     }
+
     /**
-     * handle the PUBACK message from the client
-     * <ol>
-     *     <li>remove the message from the published in-flight messages</li>
-     *     <li>ack the message in the MessageStore</li>
-     * </ol>
+     * handle the PUBACK message from the client <ol> <li>remove the message from the published in-flight messages</li>
+     * <li>ack the message in the MessageStore</li> </ol>
+     *
      * @param
      * @return
      */
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubcompMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubcompMessageHandler.java
similarity index 65%
rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubcompMessageHandler.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubcompMessageHandler.java
index e191f5a..2b1109a 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubcompMessageHandler.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubcompMessageHandler.java
@@ -15,22 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.processor.mqtthandler;
+package org.apache.rocketmq.mqtt.mqtthandler.impl;
 
 import io.netty.handler.codec.mqtt.MqttMessage;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.SnodeController;
 
 public class MqttPubcompMessageHandler implements MessageHandler {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
+    private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
 
-    private final SnodeController snodeController;
-
-    public MqttPubcompMessageHandler(SnodeController snodeController) {
-        this.snodeController = snodeController;
+    public MqttPubcompMessageHandler(DefaultMqttMessageProcessor processor) {
+        this.defaultMqttMessageProcessor = processor;
     }
+
     /**
      * handle the PUBCOMP message from the client
+     *
      * @param message
      * @return
      */
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java
similarity index 84%
rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java
index 17776ea..15552b0 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPublishMessageHandler.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPublishMessageHandler.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.processor.mqtthandler;
+package org.apache.rocketmq.mqtt.mqtthandler.impl;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.handler.codec.mqtt.MqttFixedHeader;
@@ -27,21 +27,20 @@ import io.netty.handler.codec.mqtt.MqttQoS;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
-import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.exception.WrongMessageTypeException;
-import org.apache.rocketmq.snode.util.MqttUtil;
+import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
+import org.apache.rocketmq.mqtt.util.MqttUtil;
 
 public class MqttPublishMessageHandler implements MessageHandler {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
+    private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
 
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
-
-    private final SnodeController snodeController;
-
-    public MqttPublishMessageHandler(SnodeController snodeController) {
-        this.snodeController = snodeController;
+    public MqttPublishMessageHandler(DefaultMqttMessageProcessor processor) {
+        this.defaultMqttMessageProcessor = processor;
     }
 
     @Override
@@ -61,7 +60,7 @@ public class MqttPublishMessageHandler implements MessageHandler {
 
         ByteBuf payload = mqttPublishMessage.payload();
         if (fixedHeader.qosLevel().equals(MqttQoS.AT_MOST_ONCE)) {
-            snodeController.getMqttPushService().pushMessageQos0(variableHeader.topicName(), payload);
+            defaultMqttMessageProcessor.getMqttPushService().pushMessageQos0(variableHeader.topicName(), payload);
         } else if (fixedHeader.qosLevel().equals(MqttQoS.AT_LEAST_ONCE)) {
             //Push messages to subscribers and add it to IN-FLIGHT messages
         }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubrecMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubrecMessageHandler.java
similarity index 65%
rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubrecMessageHandler.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubrecMessageHandler.java
index edd0d55..e2f1029 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubrecMessageHandler.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubrecMessageHandler.java
@@ -15,22 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.processor.mqtthandler;
+package org.apache.rocketmq.mqtt.mqtthandler.impl;
 
 import io.netty.handler.codec.mqtt.MqttMessage;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.SnodeController;
 
 public class MqttPubrecMessageHandler implements MessageHandler {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
+    private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
 
-    private final SnodeController snodeController;
-
-    public MqttPubrecMessageHandler(SnodeController snodeController) {
-        this.snodeController = snodeController;
+    public MqttPubrecMessageHandler(DefaultMqttMessageProcessor processor) {
+        this.defaultMqttMessageProcessor = processor;
     }
+
     /**
      * handle the PUBREC message from the clinet
+     *
      * @param message
      * @return
      */
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubrelMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubrelMessageHandler.java
similarity index 65%
rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubrelMessageHandler.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubrelMessageHandler.java
index c1061e4..284a460 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPubrelMessageHandler.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttPubrelMessageHandler.java
@@ -15,23 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.processor.mqtthandler;
+package org.apache.rocketmq.mqtt.mqtthandler.impl;
 
 import io.netty.handler.codec.mqtt.MqttMessage;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.SnodeController;
 
 public class MqttPubrelMessageHandler implements MessageHandler {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
+    private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
 
-    private final SnodeController snodeController;
-
-    public MqttPubrelMessageHandler(SnodeController snodeController) {
-        this.snodeController = snodeController;
+    public MqttPubrelMessageHandler(DefaultMqttMessageProcessor processor) {
+        this.defaultMqttMessageProcessor = processor;
     }
 
     /**
      * handle the PUBREL message from the client
+     *
      * @param message
      * @return
      */
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java
similarity index 90%
rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java
index 34f91d3..31cfd0c 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttSubscribeMessageHandler.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.processor.mqtthandler;
+package org.apache.rocketmq.mqtt.mqtthandler.impl;
 
 import io.netty.handler.codec.mqtt.MqttMessage;
 import io.netty.handler.codec.mqtt.MqttMessageType;
@@ -29,31 +29,32 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.client.Client;
+import org.apache.rocketmq.common.client.Subscription;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.heartbeat.MqttSubscriptionData;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
+import org.apache.rocketmq.mqtt.constant.MqttConstant;
+import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
+import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
+import org.apache.rocketmq.mqtt.util.MqttUtil;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
 import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
-import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.client.Client;
-import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
-import org.apache.rocketmq.snode.client.impl.Subscription;
-import org.apache.rocketmq.snode.constant.MqttConstant;
-import org.apache.rocketmq.snode.exception.WrongMessageTypeException;
-import org.apache.rocketmq.snode.util.MqttUtil;
 
 public class MqttSubscribeMessageHandler implements MessageHandler {
 
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
-    private final SnodeController snodeController;
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
+    private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
 
-    public MqttSubscribeMessageHandler(SnodeController snodeController) {
-        this.snodeController = snodeController;
+    public MqttSubscribeMessageHandler(DefaultMqttMessageProcessor processor) {
+        this.defaultMqttMessageProcessor = processor;
     }
 
     /**
@@ -72,7 +73,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
         }
         MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) message;
         MqttSubscribePayload payload = mqttSubscribeMessage.payload();
-        IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager();
+        IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager();
         Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
         if (client == null) {
             log.error("Can't find associated client, the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
@@ -135,7 +136,7 @@ public class MqttSubscribeMessageHandler implements MessageHandler {
             //2.update topic2SubscriptionTable
             String rootTopic = MqttUtil.getRootTopic(mqttTopicSubscription.topicName());
             ConcurrentHashMap<Client, Set<SubscriptionData>> client2SubscriptionData = topic2SubscriptionTable.get(rootTopic);
-            if (client2SubscriptionData == null) {
+            if (client2SubscriptionData == null || client2SubscriptionData.size() == 0) {
                 client2SubscriptionData = new ConcurrentHashMap<>();
                 ConcurrentHashMap<Client, Set<SubscriptionData>> prev = topic2SubscriptionTable.putIfAbsent(rootTopic, client2SubscriptionData);
                 if (prev != null) {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttUnsubscribeMessagHandler.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java
similarity index 86%
rename from snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttUnsubscribeMessagHandler.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java
index 38f6519..34c7fda 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttUnsubscribeMessagHandler.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/mqtthandler/impl/MqttUnsubscribeMessagHandler.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.processor.mqtthandler;
+package org.apache.rocketmq.mqtt.mqtthandler.impl;
 
 import io.netty.handler.codec.mqtt.MqttFixedHeader;
 import io.netty.handler.codec.mqtt.MqttMessage;
@@ -27,31 +27,31 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.client.Client;
+import org.apache.rocketmq.common.client.Subscription;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
+import org.apache.rocketmq.mqtt.exception.WrongMessageTypeException;
+import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
+import org.apache.rocketmq.mqtt.util.MqttUtil;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
-import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.client.Client;
-import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
-import org.apache.rocketmq.snode.client.impl.Subscription;
-import org.apache.rocketmq.snode.exception.WrongMessageTypeException;
-import org.apache.rocketmq.snode.util.MqttUtil;
 
 /**
  * handle the UNSUBSCRIBE message from the client
  */
 public class MqttUnsubscribeMessagHandler implements MessageHandler {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
+    private final DefaultMqttMessageProcessor defaultMqttMessageProcessor;
 
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
-    private final SnodeController snodeController;
-
-    public MqttUnsubscribeMessagHandler(SnodeController snodeController) {
-        this.snodeController = snodeController;
+    public MqttUnsubscribeMessagHandler(DefaultMqttMessageProcessor processor) {
+        this.defaultMqttMessageProcessor = processor;
     }
 
     @Override
@@ -73,7 +73,7 @@ public class MqttUnsubscribeMessagHandler implements MessageHandler {
             remotingChannel.close();
             return null;
         }
-        IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager();
+        IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager();
         Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
         if (client == null) {
             log.error("Can't find associated client, the connection will be closed. remotingChannel={}, MqttMessage={}", remotingChannel.toString(), message.toString());
@@ -98,7 +98,7 @@ public class MqttUnsubscribeMessagHandler implements MessageHandler {
     }
 
     private void doUnsubscribe(Client client, List<String> topics, IOTClientManagerImpl iotClientManager) {
-            ConcurrentHashMap<String, Subscription> clientId2Subscription = iotClientManager.getClientId2Subscription();
+        ConcurrentHashMap<String, Subscription> clientId2Subscription = iotClientManager.getClientId2Subscription();
         ConcurrentHashMap<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable();
 
         for (String topicFilter : topics) {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java
similarity index 64%
rename from snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java
index 9eb62c2..563fdac 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessor.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/processor/DefaultMqttMessageProcessor.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.processor;
+package org.apache.rocketmq.mqtt.processor;
 
 import io.netty.buffer.Unpooled;
 import io.netty.handler.codec.mqtt.MqttConnectMessage;
@@ -33,55 +33,73 @@ import io.netty.handler.codec.mqtt.MqttSubscribePayload;
 import java.io.UnsupportedEncodingException;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.rocketmq.common.MqttConfig;
+import org.apache.rocketmq.common.client.ClientManager;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
+import org.apache.rocketmq.mqtt.client.MqttClientHousekeepingService;
+import org.apache.rocketmq.mqtt.mqtthandler.MessageHandler;
+import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttConnectMessageHandler;
+import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttDisconnectMessageHandler;
+import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPingreqMessageHandler;
+import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubackMessageHandler;
+import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubcompMessageHandler;
+import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPublishMessageHandler;
+import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubrecMessageHandler;
+import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttPubrelMessageHandler;
+import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttSubscribeMessageHandler;
+import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttUnsubscribeMessagHandler;
+import org.apache.rocketmq.mqtt.service.WillMessageService;
+import org.apache.rocketmq.mqtt.service.impl.MqttPushServiceImpl;
+import org.apache.rocketmq.mqtt.service.impl.WillMessageServiceImpl;
 import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.RemotingServer;
 import org.apache.rocketmq.remoting.RequestProcessor;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
 import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
-import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler;
-import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
-import org.apache.rocketmq.snode.processor.mqtthandler.MqttDisconnectMessageHandler;
-import org.apache.rocketmq.snode.processor.mqtthandler.MqttPingreqMessageHandler;
-import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubackMessageHandler;
-import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubcompMessageHandler;
-import org.apache.rocketmq.snode.processor.mqtthandler.MqttPublishMessageHandler;
-import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubrecMessageHandler;
-import org.apache.rocketmq.snode.processor.mqtthandler.MqttPubrelMessageHandler;
-import org.apache.rocketmq.snode.processor.mqtthandler.MqttSubscribeMessageHandler;
-import org.apache.rocketmq.snode.processor.mqtthandler.MqttUnsubscribeMessagHandler;
 
 public class DefaultMqttMessageProcessor implements RequestProcessor {
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
 
     private Map<MqttMessageType, MessageHandler> type2handler = new HashMap<>();
-    private final SnodeController snodeController;
     private static final int MIN_AVAILABLE_VERSION = 3;
     private static final int MAX_AVAILABLE_VERSION = 4;
+    private WillMessageService willMessageService;
+    private MqttPushServiceImpl mqttPushService;
+    private ClientManager iotClientManager;
+    private RemotingServer mqttRemotingServer;
+    private MqttClientHousekeepingService mqttClientHousekeepingService;
+    private MqttConfig mqttConfig;
+
+    public DefaultMqttMessageProcessor(MqttConfig mqttConfig, RemotingServer mqttRemotingServer) {
+        this.willMessageService = new WillMessageServiceImpl();
+        this.mqttPushService = new MqttPushServiceImpl(this, mqttConfig);
+        this.iotClientManager = new IOTClientManagerImpl();
+        this.mqttRemotingServer = mqttRemotingServer;
+        this.mqttClientHousekeepingService = new MqttClientHousekeepingService(iotClientManager);
+        this.mqttClientHousekeepingService.start(mqttConfig.getHouseKeepingInterval());
 
-    public DefaultMqttMessageProcessor(SnodeController snodeController) {
-        this.snodeController = snodeController;
         registerMessageHandler(MqttMessageType.CONNECT,
-            new MqttConnectMessageHandler(this.snodeController));
+            new MqttConnectMessageHandler(this));
         registerMessageHandler(MqttMessageType.DISCONNECT,
-            new MqttDisconnectMessageHandler(this.snodeController));
+            new MqttDisconnectMessageHandler(this));
         registerMessageHandler(MqttMessageType.PINGREQ,
-            new MqttPingreqMessageHandler(this.snodeController));
+            new MqttPingreqMessageHandler(this));
         registerMessageHandler(MqttMessageType.PUBLISH,
-            new MqttPublishMessageHandler(this.snodeController));
-        registerMessageHandler(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this.snodeController));
+            new MqttPublishMessageHandler(this));
+        registerMessageHandler(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this));
         registerMessageHandler(MqttMessageType.PUBCOMP,
-            new MqttPubcompMessageHandler(this.snodeController));
-        registerMessageHandler(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this.snodeController));
-        registerMessageHandler(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this.snodeController));
+            new MqttPubcompMessageHandler(this));
+        registerMessageHandler(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this));
+        registerMessageHandler(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this));
         registerMessageHandler(MqttMessageType.SUBSCRIBE,
-            new MqttSubscribeMessageHandler(this.snodeController));
+            new MqttSubscribeMessageHandler(this));
         registerMessageHandler(MqttMessageType.UNSUBSCRIBE,
-            new MqttUnsubscribeMessagHandler(this.snodeController));
+            new MqttUnsubscribeMessagHandler(this));
     }
 
     @Override
@@ -127,4 +145,24 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
     private void registerMessageHandler(MqttMessageType type, MessageHandler handler) {
         type2handler.put(type, handler);
     }
+
+    public WillMessageService getWillMessageService() {
+        return willMessageService;
+    }
+
+    public MqttPushServiceImpl getMqttPushService() {
+        return mqttPushService;
+    }
+
+    public ClientManager getIotClientManager() {
+        return iotClientManager;
+    }
+
+    public MqttConfig getMqttConfig() {
+        return mqttConfig;
+    }
+
+    public RemotingServer getMqttRemotingServer() {
+        return mqttRemotingServer;
+    }
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/WillMessageService.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/WillMessageService.java
similarity index 96%
rename from snode/src/main/java/org/apache/rocketmq/snode/service/WillMessageService.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/service/WillMessageService.java
index 516bde5..f88d453 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/WillMessageService.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/WillMessageService.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.service;
+package org.apache.rocketmq.mqtt.service;
 
 import org.apache.rocketmq.common.message.mqtt.WillMessage;
 
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java
similarity index 86%
rename from snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java
index e853025..9b9b17e 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MqttPushServiceImpl.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttPushServiceImpl.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.snode.service.impl;
+package org.apache.rocketmq.mqtt.service.impl;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.handler.codec.mqtt.MqttMessageType;
@@ -27,37 +27,38 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.rocketmq.common.MqttConfig;
+import org.apache.rocketmq.common.client.Client;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
+import org.apache.rocketmq.mqtt.constant.MqttConstant;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
+import org.apache.rocketmq.mqtt.util.MqttUtil;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
 import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
-import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.client.Client;
-import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
-import org.apache.rocketmq.snode.constant.SnodeConstant;
-import org.apache.rocketmq.snode.util.MqttUtil;
 
 public class MqttPushServiceImpl {
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
 
-    private SnodeController snodeController;
     private ExecutorService pushMqttMessageExecutorService;
+    private DefaultMqttMessageProcessor defaultMqttMessageProcessor;
 
-    public MqttPushServiceImpl(final SnodeController snodeController) {
-        this.snodeController = snodeController;
+    public MqttPushServiceImpl(DefaultMqttMessageProcessor defaultMqttMessageProcessor, MqttConfig mqttConfig) {
+        this.defaultMqttMessageProcessor = defaultMqttMessageProcessor;
         pushMqttMessageExecutorService = ThreadUtils.newThreadPoolExecutor(
-            this.snodeController.getMqttConfig().getPushMqttMessageMinPoolSize(),
-            this.snodeController.getMqttConfig().getPushMqttMessageMaxPoolSize(),
+            mqttConfig.getPushMqttMessageMinPoolSize(),
+            mqttConfig.getPushMqttMessageMaxPoolSize(),
             3000,
             TimeUnit.MILLISECONDS,
-            new ArrayBlockingQueue<>(this.snodeController.getMqttConfig().getPushMqttMessageThreadPoolQueueCapacity()),
+            new ArrayBlockingQueue<>(mqttConfig.getPushMqttMessageThreadPoolQueueCapacity()),
             "pushMqttMessageThread",
             false);
     }
@@ -86,7 +87,7 @@ public class MqttPushServiceImpl {
                     RemotingCommand requestCommand = buildRequestCommand(topic, qos, retain, packetId);
 
                     //find those clients publishing the message to
-                    IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) snodeController.getIotClientManager();
+                    IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager();
                     ConcurrentHashMap<String, ConcurrentHashMap<Client, Set<SubscriptionData>>> topic2SubscriptionTable = iotClientManager.getTopic2SubscriptionTable();
                     Set<Client> clients = new HashSet<>();
                     if (topic2SubscriptionTable.containsKey(MqttUtil.getRootTopic(topic))) {
@@ -109,12 +110,11 @@ public class MqttPushServiceImpl {
                         byte[] body = new byte[message.readableBytes()];
                         message.readBytes(body);
                         requestCommand.setBody(body);
-                        snodeController.getMqttRemotingServer().push(remotingChannel, requestCommand, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
+                        defaultMqttMessageProcessor.getMqttRemotingServer().push(remotingChannel, requestCommand, MqttConstant.DEFAULT_TIMEOUT_MILLS);
                     }
                 } catch (Exception ex) {
                     log.warn("Exception was thrown when pushing MQTT message to topic: {}, exception={}", topic, ex.getMessage());
                 } finally {
-                    System.out.println("Release Bytebuf");
                     ReferenceCountUtil.release(message);
                 }
             } else {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/WillMessageServiceImpl.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/WillMessageServiceImpl.java
similarity index 81%
rename from snode/src/main/java/org/apache/rocketmq/snode/service/impl/WillMessageServiceImpl.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/WillMessageServiceImpl.java
index 78b4c1e..a769d90 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/WillMessageServiceImpl.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/WillMessageServiceImpl.java
@@ -15,20 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.service.impl;
+package org.apache.rocketmq.mqtt.service.impl;
 
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.common.message.mqtt.WillMessage;
-import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.service.WillMessageService;
+import org.apache.rocketmq.mqtt.service.WillMessageService;
 
 public class WillMessageServiceImpl implements WillMessageService {
 
     private static ConcurrentHashMap<String/*clientId*/, WillMessage> willMessageTable = new ConcurrentHashMap<>();
-    private final SnodeController snodeController;
 
-    public WillMessageServiceImpl(SnodeController snodeController) {
-        this.snodeController = snodeController;
+    public WillMessageServiceImpl() {
     }
 
     @Override
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/MqttUtil.java
similarity index 96%
rename from snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java
rename to mqtt/src/main/java/org/apache/rocketmq/mqtt/util/MqttUtil.java
index ef44a7a..3eefc48 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/util/MqttUtil.java
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/MqttUtil.java
@@ -15,11 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.snode.util;
+package org.apache.rocketmq.mqtt.util;
 
 import io.netty.handler.codec.mqtt.MqttQoS;
 import java.util.UUID;
-import org.apache.rocketmq.snode.constant.MqttConstant;
+import org.apache.rocketmq.mqtt.constant.MqttConstant;
 
 public class MqttUtil {
 
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/DefaultMqttMessageProcessorTest.java
similarity index 92%
rename from snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java
rename to mqtt/src/test/java/org/apache/rocketmq/mqtt/DefaultMqttMessageProcessorTest.java
index 076a005..570a864 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/processor/DefaultMqttMessageProcessorTest.java
+++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/DefaultMqttMessageProcessorTest.java
@@ -14,38 +14,37 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.snode.processor;
+package org.apache.rocketmq.mqtt;
 
 import io.netty.handler.codec.mqtt.MqttConnectPayload;
 import io.netty.handler.codec.mqtt.MqttMessageType;
 import io.netty.handler.codec.mqtt.MqttQoS;
 import java.io.UnsupportedEncodingException;
 import org.apache.rocketmq.common.MqttConfig;
-import org.apache.rocketmq.common.SnodeConfig;
 import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
+import org.apache.rocketmq.remoting.transport.mqtt.MqttRemotingServer;
 import org.apache.rocketmq.remoting.util.MqttEncodeDecodeUtil;
-import org.apache.rocketmq.snode.SnodeController;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
-import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
 public class DefaultMqttMessageProcessorTest {
     private DefaultMqttMessageProcessor defaultMqttMessageProcessor;
 
-    @Spy
-    private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
-
     @Mock
     private RemotingChannel remotingChannel;
 
+    @Mock
+    private MqttRemotingServer mqttRemotingServer;
+
     private String topic = "SnodeTopic";
 
     private String group = "SnodeGroup";
@@ -54,7 +53,7 @@ public class DefaultMqttMessageProcessorTest {
 
     @Before
     public void init() {
-        defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(snodeController);
+        defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(new MqttConfig(), mqttRemotingServer);
     }
 
     @Test
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttConnectMessageHandlerTest.java
similarity index 85%
rename from snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java
rename to mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttConnectMessageHandlerTest.java
index b0301e7..0bc2909 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttConnectMessageHandlerTest.java
+++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttConnectMessageHandlerTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.snode.processor;
+package org.apache.rocketmq.mqtt;
 
 import io.netty.handler.codec.mqtt.MqttConnectMessage;
 import io.netty.handler.codec.mqtt.MqttConnectPayload;
@@ -22,11 +22,9 @@ import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
 import io.netty.handler.codec.mqtt.MqttFixedHeader;
 import io.netty.handler.codec.mqtt.MqttMessageType;
 import io.netty.handler.codec.mqtt.MqttQoS;
-import org.apache.rocketmq.common.MqttConfig;
-import org.apache.rocketmq.common.SnodeConfig;
+import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttConnectMessageHandler;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
 import org.apache.rocketmq.remoting.RemotingChannel;
-import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
@@ -38,10 +36,13 @@ public class MqttConnectMessageHandlerTest {
     @Mock
     private RemotingChannel remotingChannel;
 
+    @Mock
+    private DefaultMqttMessageProcessor defaultMqttMessageProcessor;
+
     @Test
     public void testHandlerMessage() throws Exception {
 
-        MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(new SnodeController(new SnodeConfig(), new MqttConfig()));
+        MqttConnectMessageHandler mqttConnectMessageHandler = new MqttConnectMessageHandler(defaultMqttMessageProcessor);
         MqttConnectPayload payload = new MqttConnectPayload("1234567", "testTopic", "willMessage".getBytes(), null, "1234567".getBytes());
         MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(new MqttFixedHeader(
             MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200), new MqttConnectVariableHeader(null, 4, false, false, false, 0, false, false, 50), new MqttConnectPayload("abcd", "ttest", "message".getBytes(), "user", "password".getBytes()));
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttDisconnectMessageHandlerTest.java
similarity index 73%
rename from snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java
rename to mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttDisconnectMessageHandlerTest.java
index 0f474b1..088b623 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/processor/MqttDisconnectMessageHandlerTest.java
+++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttDisconnectMessageHandlerTest.java
@@ -14,20 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.snode.processor;
+package org.apache.rocketmq.mqtt;
 
 import io.netty.handler.codec.mqtt.MqttFixedHeader;
 import io.netty.handler.codec.mqtt.MqttMessage;
 import io.netty.handler.codec.mqtt.MqttMessageType;
 import io.netty.handler.codec.mqtt.MqttQoS;
-import org.apache.rocketmq.common.MqttConfig;
-import org.apache.rocketmq.common.SnodeConfig;
+import org.apache.rocketmq.common.client.Client;
 import org.apache.rocketmq.common.message.mqtt.WillMessage;
+import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl;
+import org.apache.rocketmq.mqtt.mqtthandler.impl.MqttDisconnectMessageHandler;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
 import org.apache.rocketmq.remoting.RemotingChannel;
-import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.client.Client;
-import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
-import org.apache.rocketmq.snode.processor.mqtthandler.MqttDisconnectMessageHandler;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
@@ -39,17 +37,19 @@ public class MqttDisconnectMessageHandlerTest {
     @Mock
     private RemotingChannel remotingChannel;
 
+    @Mock
+    private DefaultMqttMessageProcessor defaultMqttMessageProcessor;
+
     @Test
     public void testHandlerMessage() throws Exception {
 
-        SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
         MqttDisconnectMessageHandler mqttDisconnectMessageHandler = new MqttDisconnectMessageHandler(
-            snodeController);
+            defaultMqttMessageProcessor);
         Client client = new Client();
         client.setRemotingChannel(remotingChannel);
         client.setClientId("123456");
-        snodeController.getIotClientManager().register(IOTClientManagerImpl.IOT_GROUP, client);
-        snodeController.getWillMessageService().saveWillMessage("123456", new WillMessage());
+        defaultMqttMessageProcessor.getIotClientManager().register(IOTClientManagerImpl.IOT_GROUP, client);
+        defaultMqttMessageProcessor.getWillMessageService().saveWillMessage("123456", new WillMessage());
         MqttMessage mqttDisconnectMessage = new MqttMessage(new MqttFixedHeader(
             MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200));
 
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/WillMessageServiceImplTest.java
similarity index 71%
rename from snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java
rename to mqtt/src/test/java/org/apache/rocketmq/mqtt/WillMessageServiceImplTest.java
index 57f7c7a..dfce114 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/service/WillMessageServiceImplTest.java
+++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/WillMessageServiceImplTest.java
@@ -14,31 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.snode.service;
+package org.apache.rocketmq.mqtt;
 
-import org.apache.rocketmq.common.MqttConfig;
-import org.apache.rocketmq.common.SnodeConfig;
 import org.apache.rocketmq.common.message.mqtt.WillMessage;
-import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.SnodeTestBase;
-import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl;
+import org.apache.rocketmq.mqtt.service.WillMessageService;
+import org.apache.rocketmq.mqtt.service.impl.WillMessageServiceImpl;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
-public class WillMessageServiceImplTest extends SnodeTestBase {
-
-    @Spy
-    private SnodeController snodeController = new SnodeController(new SnodeConfig(), new MqttConfig());
+public class WillMessageServiceImplTest {
 
     private WillMessageService willMessageService;
 
     @Before
     public void init() {
-        willMessageService = new WillMessageServiceImpl(snodeController);
+        willMessageService = new WillMessageServiceImpl();
     }
 
     @Test
diff --git a/pom.xml b/pom.xml
index 3899247..ed34c04 100644
--- a/pom.xml
+++ b/pom.xml
@@ -126,6 +126,7 @@
         <module>distribution</module>
         <module>openmessaging</module>
         <module>logging</module>
+        <module>mqtt</module>
         <module>snode</module>
         <module>acl</module>
     </modules>
@@ -534,6 +535,11 @@
                 <version>${project.version}</version>
             </dependency>
             <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>rocketmq-mqtt</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>org.slf4j</groupId>
                 <artifactId>slf4j-api</artifactId>
                 <version>1.7.7</version>
diff --git a/snode/pom.xml b/snode/pom.xml
index da1e3f3..17158c0 100644
--- a/snode/pom.xml
+++ b/snode/pom.xml
@@ -97,6 +97,10 @@
             <groupId>${project.groupId}</groupId>
             <artifactId>rocketmq-broker</artifactId>
         </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-mqtt</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index 6f54ae0..d3a48b7 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -27,11 +27,13 @@ import org.apache.rocketmq.broker.BrokerStartup;
 import org.apache.rocketmq.common.MqttConfig;
 import org.apache.rocketmq.common.SnodeConfig;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.client.ClientManager;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor;
 import org.apache.rocketmq.remoting.ClientConfig;
 import org.apache.rocketmq.remoting.RemotingClient;
 import org.apache.rocketmq.remoting.RemotingClientFactory;
@@ -48,18 +50,15 @@ import org.apache.rocketmq.remoting.interceptor.ResponseContext;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.util.ServiceProvider;
 import org.apache.rocketmq.snode.client.ClientHousekeepingService;
-import org.apache.rocketmq.snode.client.ClientManager;
 import org.apache.rocketmq.snode.client.SlowConsumerService;
 import org.apache.rocketmq.snode.client.SubscriptionGroupManager;
 import org.apache.rocketmq.snode.client.SubscriptionManager;
 import org.apache.rocketmq.snode.client.impl.ConsumerManagerImpl;
-import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
 import org.apache.rocketmq.snode.client.impl.ProducerManagerImpl;
 import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl;
 import org.apache.rocketmq.snode.client.impl.SubscriptionManagerImpl;
 import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.snode.processor.ConsumerManageProcessor;
-import org.apache.rocketmq.snode.processor.DefaultMqttMessageProcessor;
 import org.apache.rocketmq.snode.processor.HeartbeatProcessor;
 import org.apache.rocketmq.snode.processor.PullMessageProcessor;
 import org.apache.rocketmq.snode.processor.SendMessageProcessor;
@@ -69,16 +68,13 @@ import org.apache.rocketmq.snode.service.MetricsService;
 import org.apache.rocketmq.snode.service.NnodeService;
 import org.apache.rocketmq.snode.service.PushService;
 import org.apache.rocketmq.snode.service.ScheduledService;
-import org.apache.rocketmq.snode.service.WillMessageService;
 import org.apache.rocketmq.snode.service.impl.ClientServiceImpl;
 import org.apache.rocketmq.snode.service.impl.LocalEnodeServiceImpl;
 import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl;
-import org.apache.rocketmq.snode.service.impl.MqttPushServiceImpl;
 import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl;
 import org.apache.rocketmq.snode.service.impl.PushServiceImpl;
 import org.apache.rocketmq.snode.service.impl.RemoteEnodeServiceImpl;
 import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl;
-import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl;
 
 public class SnodeController {
 
@@ -105,7 +101,7 @@ public class SnodeController {
     private ScheduledService scheduledService;
     private ClientManager producerManager;
     private ClientManager consumerManager;
-    private ClientManager iotClientManager;
+//    private ClientManager iotClientManager;
     private SubscriptionManager subscriptionManager;
     private ClientHousekeepingService clientHousekeepingService;
     private SubscriptionGroupManager subscriptionGroupManager;
@@ -122,8 +118,8 @@ public class SnodeController {
     private ClientService clientService;
     private SlowConsumerService slowConsumerService;
     private MetricsService metricsService;
-    private WillMessageService willMessageService;
-    private MqttPushServiceImpl mqttPushService;
+//    private WillMessageService willMessageService;
+//    private MqttPushServiceImpl mqttPushService;
 
     private final ScheduledExecutorService scheduledExecutorService = Executors
         .newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
@@ -153,7 +149,12 @@ public class SnodeController {
         if (this.mqttRemotingClient != null) {
             this.mqttRemotingClient.init(this.mqttClientConfig, null);
         }
-
+        this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(
+            RemotingUtil.MQTT_PROTOCOL);
+        if (this.mqttRemotingServer != null) {
+            this.mqttRemotingServer.init(this.mqttServerConfig, this.clientHousekeepingService);
+            this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
+        }
         this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
             snodeConfig.getSnodeSendMessageMinPoolSize(),
             snodeConfig.getSnodeSendMessageMaxPoolSize(),
@@ -211,19 +212,19 @@ public class SnodeController {
         this.sendMessageProcessor = new SendMessageProcessor(this);
         this.heartbeatProcessor = new HeartbeatProcessor(this);
         this.pullMessageProcessor = new PullMessageProcessor(this);
-        this.defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(this);
+        this.defaultMqttMessageProcessor = new DefaultMqttMessageProcessor(this.mqttConfig, mqttRemotingServer);
         this.pushService = new PushServiceImpl(this);
         this.clientService = new ClientServiceImpl(this);
         this.subscriptionManager = new SubscriptionManagerImpl();
         this.producerManager = new ProducerManagerImpl();
         this.consumerManager = new ConsumerManagerImpl(this);
-        this.iotClientManager = new IOTClientManagerImpl(this);
+//        this.iotClientManager = new IOTClientManagerImpl(this);
         this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager,
-            this.consumerManager, this.iotClientManager);
+            this.consumerManager, null);
         this.slowConsumerService = new SlowConsumerServiceImpl(this);
         this.metricsService = new MetricsServiceImpl();
-        this.willMessageService = new WillMessageServiceImpl(this);
-        this.mqttPushService = new MqttPushServiceImpl(this);
+//        this.willMessageService = new WillMessageServiceImpl(this);
+//        this.mqttPushService = new MqttPushServiceImpl(this);
     }
 
     public SnodeConfig getSnodeConfig() {
@@ -258,12 +259,6 @@ public class SnodeController {
             this.snodeServer.init(this.nettyServerConfig, this.clientHousekeepingService);
             this.snodeServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
         }
-        this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(
-            RemotingUtil.MQTT_PROTOCOL);
-        if (this.mqttRemotingServer != null) {
-            this.mqttRemotingServer.init(this.mqttServerConfig, this.clientHousekeepingService);
-            this.mqttRemotingServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
-        }
         registerProcessor();
         return true;
     }
@@ -507,13 +502,13 @@ public class SnodeController {
         this.consumerManager = consumerManager;
     }
 
-    public ClientManager getIotClientManager() {
+/*    public ClientManager getIotClientManager() {
         return iotClientManager;
     }
 
     public void setIotClientManager(ClientManager iotClientManager) {
         this.iotClientManager = iotClientManager;
-    }
+    }*/
 
     public SubscriptionManager getSubscriptionManager() {
         return subscriptionManager;
@@ -551,7 +546,7 @@ public class SnodeController {
         this.metricsService = metricsService;
     }
 
-    public WillMessageService getWillMessageService() {
+/*    public WillMessageService getWillMessageService() {
         return willMessageService;
     }
 
@@ -566,5 +561,5 @@ public class SnodeController {
 
     public void setMqttPushService(MqttPushServiceImpl mqttPushService) {
         this.mqttPushService = mqttPushService;
-    }
+    }*/
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
index 899a9ef..590f479 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
@@ -18,6 +18,8 @@ package org.apache.rocketmq.snode.client;
 
 import io.netty.channel.Channel;
 import io.netty.util.Attribute;
+import org.apache.rocketmq.common.client.Client;
+import org.apache.rocketmq.common.client.ClientManager;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -49,7 +51,7 @@ public class ClientHousekeepingService implements ChannelEventListener {
     public void shutdown() {
         this.producerManager.shutdown();
         this.consumerManager.shutdown();
-        this.iotClientManager.shutdown();
+//        this.iotClientManager.shutdown();
     }
 
     private Client getClient(RemotingChannel remotingChannel) {
@@ -75,9 +77,9 @@ public class ClientHousekeepingService implements ChannelEventListener {
                 case Producer:
                     this.producerManager.onClose(client.getGroups(), remotingChannel);
                     return;
-                case IOTCLIENT:
-                    this.iotClientManager.onClose(client.getGroups(), remotingChannel);
-                    return;
+//                case IOTCLIENT:
+//                    this.iotClientManager.onClose(client.getGroups(), remotingChannel);
+//                    return;
                 default:
             }
         }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java
index a648ba8..4247a4d 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java
@@ -17,13 +17,13 @@
 package org.apache.rocketmq.snode.client;
 
 import java.util.Set;
+import org.apache.rocketmq.common.client.Subscription;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.remoting.RemotingChannel;
-import org.apache.rocketmq.snode.client.impl.Subscription;
 
 public interface SubscriptionManager {
     boolean subscribe(String groupId, Set<SubscriptionData> subscriptionDataSet, ConsumeType consumeType,
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java
index fb6693c..6e6d8da 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ConsumerManagerImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.snode.client.impl;
 
+import org.apache.rocketmq.common.client.ClientManagerImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ProducerManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ProducerManagerImpl.java
index 150bc42..03ee085 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ProducerManagerImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ProducerManagerImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.snode.client.impl;
 
+import org.apache.rocketmq.common.client.ClientManagerImpl;
 import org.apache.rocketmq.remoting.RemotingChannel;
 
 public class ProducerManagerImpl extends ClientManagerImpl {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
index 4a4a35e..dffd210 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.client.Subscription;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageQueue;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
index 7798c0d..789599f 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
@@ -17,8 +17,8 @@
 package org.apache.rocketmq.snode.constant;
 
 import io.netty.util.AttributeKey;
-import org.apache.rocketmq.snode.client.Client;
-import org.apache.rocketmq.snode.client.impl.ClientRole;
+import org.apache.rocketmq.common.client.Client;
+import org.apache.rocketmq.common.client.ClientRole;
 
 public class SnodeConstant {
     public static final long HEARTBEAT_TIME_OUT = 3000;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
index 3a7d822..4d9c3fe 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
@@ -20,6 +20,8 @@ import io.netty.channel.Channel;
 import io.netty.util.Attribute;
 import java.util.HashSet;
 import java.util.Set;
+import org.apache.rocketmq.common.client.Client;
+import org.apache.rocketmq.common.client.ClientRole;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.protocol.RequestCode;
@@ -41,8 +43,6 @@ import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
 import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.client.Client;
-import org.apache.rocketmq.snode.client.impl.ClientRole;
 import org.apache.rocketmq.snode.constant.SnodeConstant;
 
 public class HeartbeatProcessor implements RequestProcessor {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
index 95982bc..9b0f272 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.snode.processor;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.client.Subscription;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -36,7 +37,6 @@ import org.apache.rocketmq.remoting.interceptor.RequestContext;
 import org.apache.rocketmq.remoting.interceptor.ResponseContext;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.client.impl.Subscription;
 
 public class PullMessageProcessor implements RequestProcessor {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
index 24a5a9c..4a5326c 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
@@ -24,6 +24,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.client.Client;
+import org.apache.rocketmq.common.client.Subscription;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -40,8 +42,6 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.client.Client;
-import org.apache.rocketmq.snode.client.impl.Subscription;
 import org.apache.rocketmq.snode.constant.SnodeConstant;
 import org.apache.rocketmq.snode.service.PushService;
 


Mime
View raw message