rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [16/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander
Date Mon, 19 Dec 2016 09:40:33 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/processor/DefaultRequestProcessor.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
new file mode 100644
index 0000000..105cfff
--- /dev/null
+++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.filtersrv.processor;
+
+import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
+import com.alibaba.rocketmq.client.consumer.PullCallback;
+import com.alibaba.rocketmq.client.consumer.PullResult;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.filter.FilterContext;
+import com.alibaba.rocketmq.common.message.MessageDecoder;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.RequestCode;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+import com.alibaba.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import com.alibaba.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader;
+import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
+import com.alibaba.rocketmq.filtersrv.FiltersrvController;
+import com.alibaba.rocketmq.filtersrv.filter.FilterClassInfo;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import com.alibaba.rocketmq.store.CommitLog;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultRequestProcessor implements NettyRequestProcessor {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
+
+    private final FiltersrvController filtersrvController;
+
+
+    public DefaultRequestProcessor(FiltersrvController filtersrvController) {
+        this.filtersrvController = filtersrvController;
+    }
+
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        if (log.isDebugEnabled()) {
+            log.debug("receive request, {} {} {}",
+                    request.getCode(),
+                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                    request);
+        }
+
+        switch (request.getCode()) {
+            case RequestCode.REGISTER_MESSAGE_FILTER_CLASS:
+                return registerMessageFilterClass(ctx, request);
+            case RequestCode.PULL_MESSAGE:
+                return pullMessageForward(ctx, request);
+        }
+
+        return null;
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+
+    private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final RegisterMessageFilterClassRequestHeader requestHeader =
+                (RegisterMessageFilterClassRequestHeader) request.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class);
+
+        try {
+            boolean ok = this.filtersrvController.getFilterClassManager().registerFilterClass(requestHeader.getConsumerGroup(),
+                    requestHeader.getTopic(),
+                    requestHeader.getClassName(),
+                    requestHeader.getClassCRC(),
+                    request.getBody());
+            if (!ok) {
+                throw new Exception("registerFilterClass error");
+            }
+        } catch (Exception e) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(RemotingHelper.exceptionSimpleDesc(e));
+            return response;
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
+        final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
+        final PullMessageRequestHeader requestHeader =
+                (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
+
+        final FilterContext filterContext = new FilterContext();
+        filterContext.setConsumerGroup(requestHeader.getConsumerGroup());
+
+
+        response.setOpaque(request.getOpaque());
+
+        DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer();
+        final FilterClassInfo findFilterClass =
+                this.filtersrvController.getFilterClassManager()
+                        .findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic());
+        if (null == findFilterClass) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("Find Filter class failed, not registered");
+            return response;
+        }
+
+        if (null == findFilterClass.getMessageFilter()) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("Find Filter class failed, registered but no class");
+            return response;
+        }
+
+        responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
+
+
+        MessageQueue mq = new MessageQueue();
+        mq.setTopic(requestHeader.getTopic());
+        mq.setQueueId(requestHeader.getQueueId());
+        mq.setBrokerName(this.filtersrvController.getBrokerName());
+        long offset = requestHeader.getQueueOffset();
+        int maxNums = requestHeader.getMaxMsgNums();
+
+        final PullCallback pullCallback = new PullCallback() {
+
+            @Override
+            public void onSuccess(PullResult pullResult) {
+                responseHeader.setMaxOffset(pullResult.getMaxOffset());
+                responseHeader.setMinOffset(pullResult.getMinOffset());
+                responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset());
+                response.setRemark(null);
+
+                switch (pullResult.getPullStatus()) {
+                    case FOUND:
+                        response.setCode(ResponseCode.SUCCESS);
+
+                        List<MessageExt> msgListOK = new ArrayList<MessageExt>();
+                        try {
+                            for (MessageExt msg : pullResult.getMsgFoundList()) {
+                                boolean match = findFilterClass.getMessageFilter().match(msg, filterContext);
+                                if (match) {
+                                    msgListOK.add(msg);
+                                }
+                            }
+
+
+                            if (!msgListOK.isEmpty()) {
+                                returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK);
+                                return;
+                            } else {
+                                response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+                            }
+                        } catch (Throwable e) {
+                            final String error =
+                                    String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ",
+                                            requestHeader.getConsumerGroup(), requestHeader.getTopic());
+                            log.error(error, e);
+
+                            response.setCode(ResponseCode.SYSTEM_ERROR);
+                            response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e));
+                            returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
+                            return;
+                        }
+
+                        break;
+                    case NO_MATCHED_MSG:
+                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+                        break;
+                    case NO_NEW_MSG:
+                        response.setCode(ResponseCode.PULL_NOT_FOUND);
+                        break;
+                    case OFFSET_ILLEGAL:
+                        response.setCode(ResponseCode.PULL_OFFSET_MOVED);
+                        break;
+                    default:
+                        break;
+                }
+
+                returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
+            }
+
+
+            @Override
+            public void onException(Throwable e) {
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e));
+                returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
+                return;
+            }
+        };
+
+        pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);
+
+        return null;
+    }
+
+    private void returnResponse(final String group, final String topic, ChannelHandlerContext ctx, final RemotingCommand response,
+                                final List<MessageExt> msgList) {
+        if (null != msgList) {
+            ByteBuffer[] msgBufferList = new ByteBuffer[msgList.size()];
+            int bodyTotalSize = 0;
+            for (int i = 0; i < msgList.size(); i++) {
+                try {
+                    msgBufferList[i] = messageToByteBuffer(msgList.get(i));
+                    bodyTotalSize += msgBufferList[i].capacity();
+                } catch (Exception e) {
+                    log.error("messageToByteBuffer UnsupportedEncodingException", e);
+                }
+            }
+
+            ByteBuffer body = ByteBuffer.allocate(bodyTotalSize);
+            for (ByteBuffer bb : msgBufferList) {
+                bb.flip();
+                body.put(bb);
+            }
+
+            response.setBody(body.array());
+
+
+            this.filtersrvController.getFilterServerStatsManager().incGroupGetNums(group, topic, msgList.size());
+
+            this.filtersrvController.getFilterServerStatsManager().incGroupGetSize(group, topic, bodyTotalSize);
+        }
+
+        try {
+            ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if (!future.isSuccess()) {
+                        log.error("FilterServer response to " + future.channel().remoteAddress() + " failed", future.cause());
+                        log.error(response.toString());
+                    }
+                }
+            });
+        } catch (Throwable e) {
+            log.error("FilterServer process request over, but response failed", e);
+            log.error(response.toString());
+        }
+    }
+
+    private ByteBuffer messageToByteBuffer(final MessageExt msg) throws IOException {
+        int sysFlag = MessageSysFlag.clearCompressedFlag(msg.getSysFlag());
+        if (msg.getBody() != null) {
+            if (msg.getBody().length >= this.filtersrvController.getFiltersrvConfig().getCompressMsgBodyOverHowmuch()) {
+                byte[] data = UtilAll.compress(msg.getBody(), this.filtersrvController.getFiltersrvConfig().getZipCompressLevel());
+                if (data != null) {
+                    msg.setBody(data);
+                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
+                }
+            }
+        }
+
+        final int bodyLength = msg.getBody() != null ? msg.getBody().length : 0;
+        byte[] topicData = msg.getTopic().getBytes(MixAll.DEFAULT_CHARSET);
+        final int topicLength = topicData.length;
+        String properties = MessageDecoder.messageProperties2String(msg.getProperties());
+        byte[] propertiesData = properties.getBytes(MixAll.DEFAULT_CHARSET);
+        final int propertiesLength = propertiesData.length;
+        final int msgLen = 4 // 1 TOTALSIZE
+                + 4 // 2 MAGICCODE
+                + 4 // 3 BODYCRC
+                + 4 // 4 QUEUEID
+                + 4 // 5 FLAG
+                + 8 // 6 QUEUEOFFSET
+                + 8 // 7 PHYSICALOFFSET
+                + 4 // 8 SYSFLAG
+                + 8 // 9 BORNTIMESTAMP
+                + 8 // 10 BORNHOST
+                + 8 // 11 STORETIMESTAMP
+                + 8 // 12 STOREHOSTADDRESS
+                + 4 // 13 RECONSUMETIMES
+                + 8 // 14 Prepared Transaction Offset
+                + 4 + bodyLength // 14 BODY
+                + 1 + topicLength // 15 TOPIC
+                + 2 + propertiesLength // 16 propertiesLength
+                + 0;
+
+        ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen);
+
+        final MessageExt msgInner = msg;
+
+        // 1 TOTALSIZE
+        msgStoreItemMemory.putInt(msgLen);
+        // 2 MAGICCODE
+        msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
+        // 3 BODYCRC
+        msgStoreItemMemory.putInt(UtilAll.crc32(msgInner.getBody()));
+        // 4 QUEUEID
+        msgStoreItemMemory.putInt(msgInner.getQueueId());
+        // 5 FLAG
+        msgStoreItemMemory.putInt(msgInner.getFlag());
+        // 6 QUEUEOFFSET
+        msgStoreItemMemory.putLong(msgInner.getQueueOffset());
+        // 7 PHYSICALOFFSET
+        msgStoreItemMemory.putLong(msgInner.getCommitLogOffset());
+        // 8 SYSFLAG
+        msgStoreItemMemory.putInt(sysFlag);
+        // 9 BORNTIMESTAMP
+        msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
+        // 10 BORNHOST
+        msgStoreItemMemory.put(msgInner.getBornHostBytes());
+        // 11 STORETIMESTAMP
+        msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
+        // 12 STOREHOSTADDRESS
+        msgStoreItemMemory.put(msgInner.getStoreHostBytes());
+        // 13 RECONSUMETIMES
+        msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
+        // 14 Prepared Transaction Offset
+        msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
+        // 15 BODY
+        msgStoreItemMemory.putInt(bodyLength);
+        if (bodyLength > 0)
+            msgStoreItemMemory.put(msgInner.getBody());
+        // 16 TOPIC
+        msgStoreItemMemory.put((byte) topicLength);
+        msgStoreItemMemory.put(topicData);
+        // 17 PROPERTIES
+        msgStoreItemMemory.putShort((short) propertiesLength);
+        if (propertiesLength > 0)
+            msgStoreItemMemory.put(propertiesData);
+
+        return msgStoreItemMemory;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java
new file mode 100644
index 0000000..3921c92
--- /dev/null
+++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.filtersrv.stats;
+
+import com.alibaba.rocketmq.common.ThreadFactoryImpl;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.stats.StatsItemSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+
+public class FilterServerStatsManager {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSStatsThread"));
+
+    // ConsumerGroup Get Nums
+    private final StatsItemSet groupGetNums = new StatsItemSet("GROUP_GET_NUMS",
+            this.scheduledExecutorService, log);
+
+    // ConsumerGroup Get Size
+    private final StatsItemSet groupGetSize = new StatsItemSet("GROUP_GET_SIZE",
+            this.scheduledExecutorService, log);
+
+
+    public FilterServerStatsManager() {
+    }
+
+
+    public void start() {
+    }
+
+
+    public void shutdown() {
+        this.scheduledExecutorService.shutdown();
+    }
+
+
+    public void incGroupGetNums(final String group, final String topic, final int incValue) {
+        this.groupGetNums.addValue(topic + "@" + group, incValue, 1);
+    }
+
+
+    public void incGroupGetSize(final String group, final String topic, final int incValue) {
+        this.groupGetSize.addValue(topic + "@" + group, incValue, 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-namesrv/pom.xml b/rocketmq-namesrv/pom.xml
new file mode 100644
index 0000000..3494f8f
--- /dev/null
+++ b/rocketmq-namesrv/pom.xml
@@ -0,0 +1,58 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain producerGroup copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>com.alibaba.rocketmq</groupId>
+        <artifactId>rocketmq-all</artifactId>
+        <version>4.0.0-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+    <artifactId>rocketmq-namesrv</artifactId>
+    <name>rocketmq-namesrv ${project.version}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba.rocketmq</groupId>
+            <artifactId>rocketmq-tools</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-srvutil</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java
----------------------------------------------------------------------
diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java
new file mode 100644
index 0000000..82f2622
--- /dev/null
+++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.namesrv;
+
+import com.alibaba.rocketmq.common.Configuration;
+import com.alibaba.rocketmq.common.ThreadFactoryImpl;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.namesrv.NamesrvConfig;
+import com.alibaba.rocketmq.namesrv.kvconfig.KVConfigManager;
+import com.alibaba.rocketmq.namesrv.processor.ClusterTestRequestProcessor;
+import com.alibaba.rocketmq.namesrv.processor.DefaultRequestProcessor;
+import com.alibaba.rocketmq.namesrv.routeinfo.BrokerHousekeepingService;
+import com.alibaba.rocketmq.namesrv.routeinfo.RouteInfoManager;
+import com.alibaba.rocketmq.remoting.RemotingServer;
+import com.alibaba.rocketmq.remoting.netty.NettyRemotingServer;
+import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class NamesrvController {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+
+    private final NamesrvConfig namesrvConfig;
+
+    private final NettyServerConfig nettyServerConfig;
+
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+            "NSScheduledThread"));
+    private final KVConfigManager kvConfigManager;
+    private final RouteInfoManager routeInfoManager;
+
+    private RemotingServer remotingServer;
+
+    private BrokerHousekeepingService brokerHousekeepingService;
+
+    private ExecutorService remotingExecutor;
+
+    private Configuration configuration;
+
+
+    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
+        this.namesrvConfig = namesrvConfig;
+        this.nettyServerConfig = nettyServerConfig;
+        this.kvConfigManager = new KVConfigManager(this);
+        this.routeInfoManager = new RouteInfoManager();
+        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
+        this.configuration = new Configuration(
+                log,
+                this.namesrvConfig, this.nettyServerConfig
+        );
+        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
+    }
+
+
+    public boolean initialize() {
+
+        this.kvConfigManager.load();
+
+        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
+
+
+        this.remotingExecutor =
+                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
+
+        this.registerProcessor();
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+            @Override
+            public void run() {
+                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
+            }
+        }, 5, 10, TimeUnit.SECONDS);
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+            @Override
+            public void run() {
+                NamesrvController.this.kvConfigManager.printAllPeriodically();
+            }
+        }, 1, 10, TimeUnit.MINUTES);
+
+        return true;
+    }
+
+
+    private void registerProcessor() {
+        if (namesrvConfig.isClusterTest()) {
+
+            this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
+                    this.remotingExecutor);
+        } else {
+
+            this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
+        }
+    }
+
+
+    public void start() throws Exception {
+        this.remotingServer.start();
+    }
+
+
+    public void shutdown() {
+        this.remotingServer.shutdown();
+        this.remotingExecutor.shutdown();
+        this.scheduledExecutorService.shutdown();
+    }
+
+
+    public NamesrvConfig getNamesrvConfig() {
+        return namesrvConfig;
+    }
+
+
+    public NettyServerConfig getNettyServerConfig() {
+        return nettyServerConfig;
+    }
+
+
+    public KVConfigManager getKvConfigManager() {
+        return kvConfigManager;
+    }
+
+
+    public RouteInfoManager getRouteInfoManager() {
+        return routeInfoManager;
+    }
+
+
+    public RemotingServer getRemotingServer() {
+        return remotingServer;
+    }
+
+
+    public void setRemotingServer(RemotingServer remotingServer) {
+        this.remotingServer = remotingServer;
+    }
+
+    public Configuration getConfiguration() {
+        return configuration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvStartup.java
----------------------------------------------------------------------
diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvStartup.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvStartup.java
new file mode 100644
index 0000000..286de3a
--- /dev/null
+++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvStartup.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.namesrv;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.namesrv.NamesrvConfig;
+import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
+import com.alibaba.rocketmq.remoting.netty.NettySystemConfig;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import com.alibaba.rocketmq.srvutil.ServerUtil;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class NamesrvStartup {
+    public static Properties properties = null;
+    public static CommandLine commandLine = null;
+
+    public static void main(String[] args) {
+        main0(args);
+    }
+
+    public static NamesrvController main0(String[] args) {
+        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
+
+
+        if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
+            NettySystemConfig.socketSndbufSize = 4096;
+        }
+
+
+        if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
+            NettySystemConfig.socketRcvbufSize = 4096;
+        }
+
+        try {
+            //PackageConflictDetect.detectFastjson();
+
+            Options options = ServerUtil.buildCommandlineOptions(new Options());
+            commandLine =
+                    ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options),
+                            new PosixParser());
+            if (null == commandLine) {
+                System.exit(-1);
+                return null;
+            }
+
+
+            final NamesrvConfig namesrvConfig = new NamesrvConfig();
+            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
+            nettyServerConfig.setListenPort(9876);
+            if (commandLine.hasOption('c')) {
+                String file = commandLine.getOptionValue('c');
+                if (file != null) {
+                    InputStream in = new BufferedInputStream(new FileInputStream(file));
+                    properties = new Properties();
+                    properties.load(in);
+                    MixAll.properties2Object(properties, namesrvConfig);
+                    MixAll.properties2Object(properties, nettyServerConfig);
+
+                    namesrvConfig.setConfigStorePath(file);
+
+                    System.out.printf("load config properties file OK, " + file + "%n");
+                    in.close();
+                }
+            }
+
+
+            if (commandLine.hasOption('p')) {
+                MixAll.printObjectProperties(null, namesrvConfig);
+                MixAll.printObjectProperties(null, nettyServerConfig);
+                System.exit(0);
+            }
+
+            MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
+
+            if (null == namesrvConfig.getRocketmqHome()) {
+                System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
+                        + " variable in your environment to match the location of the RocketMQ installation%n");
+                System.exit(-2);
+            }
+
+            LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+            JoranConfigurator configurator = new JoranConfigurator();
+            configurator.setContext(lc);
+            lc.reset();
+            configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
+            final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+
+
+            MixAll.printObjectProperties(log, namesrvConfig);
+            MixAll.printObjectProperties(log, nettyServerConfig);
+
+
+            final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
+
+            // remember all configs to prevent discard
+            controller.getConfiguration().registerConfig(properties);
+
+            boolean initResult = controller.initialize();
+            if (!initResult) {
+                controller.shutdown();
+                System.exit(-3);
+            }
+
+            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+                private volatile boolean hasShutdown = false;
+                private AtomicInteger shutdownTimes = new AtomicInteger(0);
+
+
+                @Override
+                public void run() {
+                    synchronized (this) {
+                        log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet());
+                        if (!this.hasShutdown) {
+                            this.hasShutdown = true;
+                            long begineTime = System.currentTimeMillis();
+                            controller.shutdown();
+                            long consumingTimeTotal = System.currentTimeMillis() - begineTime;
+                            log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal);
+                        }
+                    }
+                }
+            }, "ShutdownHook"));
+
+
+            controller.start();
+
+            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+            log.info(tip);
+            System.out.printf(tip + "%n");
+
+            return controller;
+        } catch (Throwable e) {
+            e.printStackTrace();
+            System.exit(-1);
+        }
+
+        return null;
+    }
+
+    public static Options buildCommandlineOptions(final Options options) {
+        Option opt = new Option("c", "configFile", true, "Name server config properties file");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("p", "printConfigItem", false, "Print all config item");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigManager.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigManager.java
new file mode 100644
index 0000000..a83586c
--- /dev/null
+++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigManager.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.namesrv.kvconfig;
+
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.protocol.body.KVTable;
+import com.alibaba.rocketmq.namesrv.NamesrvController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class KVConfigManager {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+
+    private final NamesrvController namesrvController;
+
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
+            new HashMap<String, HashMap<String, String>>();
+
+
+    public KVConfigManager(NamesrvController namesrvController) {
+        this.namesrvController = namesrvController;
+    }
+
+
+    public void load() {
+        String content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
+        if (content != null) {
+            KVConfigSerializeWrapper kvConfigSerializeWrapper =
+                    KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
+            if (null != kvConfigSerializeWrapper) {
+                this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
+                log.info("load KV config table OK");
+            }
+        }
+    }
+
+
+    public void putKVConfig(final String namespace, final String key, final String value) {
+        try {
+            this.lock.writeLock().lockInterruptibly();
+            try {
+                HashMap<String, String> kvTable = this.configTable.get(namespace);
+                if (null == kvTable) {
+                    kvTable = new HashMap<String, String>();
+                    this.configTable.put(namespace, kvTable);
+                    log.info("putKVConfig create new Namespace {}", namespace);
+                }
+
+                final String prev = kvTable.put(key, value);
+                if (null != prev) {
+                    log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}", //
+                            namespace, key, value);
+                } else {
+                    log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}", //
+                            namespace, key, value);
+                }
+            } finally {
+                this.lock.writeLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("putKVConfig InterruptedException", e);
+        }
+
+        this.persist();
+    }
+
+    public void persist() {
+        try {
+            this.lock.readLock().lockInterruptibly();
+            try {
+                KVConfigSerializeWrapper kvConfigSerializeWrapper = new KVConfigSerializeWrapper();
+                kvConfigSerializeWrapper.setConfigTable(this.configTable);
+
+                String content = kvConfigSerializeWrapper.toJson();
+
+                if (null != content) {
+                    MixAll.string2File(content, this.namesrvController.getNamesrvConfig().getKvConfigPath());
+                }
+            } catch (IOException e) {
+                log.error("persist kvconfig Exception, "
+                        + this.namesrvController.getNamesrvConfig().getKvConfigPath(), e);
+            } finally {
+                this.lock.readLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("persist InterruptedException", e);
+        }
+
+    }
+
+    public void deleteKVConfig(final String namespace, final String key) {
+        try {
+            this.lock.writeLock().lockInterruptibly();
+            try {
+                HashMap<String, String> kvTable = this.configTable.get(namespace);
+                if (null != kvTable) {
+                    String value = kvTable.remove(key);
+                    log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}", //
+                            namespace, key, value);
+                }
+            } finally {
+                this.lock.writeLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("deleteKVConfig InterruptedException", e);
+        }
+
+        this.persist();
+    }
+
+    public byte[] getKVListByNamespace(final String namespace) {
+        try {
+            this.lock.readLock().lockInterruptibly();
+            try {
+                HashMap<String, String> kvTable = this.configTable.get(namespace);
+                if (null != kvTable) {
+                    KVTable table = new KVTable();
+                    table.setTable(kvTable);
+                    return table.encode();
+                }
+            } finally {
+                this.lock.readLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("getKVListByNamespace InterruptedException", e);
+        }
+
+        return null;
+    }
+
+    public String getKVConfig(final String namespace, final String key) {
+        try {
+            this.lock.readLock().lockInterruptibly();
+            try {
+                HashMap<String, String> kvTable = this.configTable.get(namespace);
+                if (null != kvTable) {
+                    return kvTable.get(key);
+                }
+            } finally {
+                this.lock.readLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("getKVConfig InterruptedException", e);
+        }
+
+        return null;
+    }
+
+    public void printAllPeriodically() {
+        try {
+            this.lock.readLock().lockInterruptibly();
+            try {
+                log.info("--------------------------------------------------------");
+
+                {
+                    log.info("configTable SIZE: {}", this.configTable.size());
+                    Iterator<Entry<String, HashMap<String, String>>> it =
+                            this.configTable.entrySet().iterator();
+                    while (it.hasNext()) {
+                        Entry<String, HashMap<String, String>> next = it.next();
+                        Iterator<Entry<String, String>> itSub = next.getValue().entrySet().iterator();
+                        while (itSub.hasNext()) {
+                            Entry<String, String> nextSub = itSub.next();
+                            log.info("configTable NS: {} Key: {} Value: {}", next.getKey(), nextSub.getKey(),
+                                    nextSub.getValue());
+                        }
+                    }
+                }
+            } finally {
+                this.lock.readLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("printAllPeriodically InterruptedException", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java
new file mode 100644
index 0000000..3a91028
--- /dev/null
+++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.namesrv.kvconfig;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class KVConfigSerializeWrapper extends RemotingSerializable {
+    private HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable;
+
+
+    public HashMap<String, HashMap<String, String>> getConfigTable() {
+        return configTable;
+    }
+
+
+    public void setConfigTable(HashMap<String, HashMap<String, String>> configTable) {
+        this.configTable = configTable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
new file mode 100644
index 0000000..b0b158d
--- /dev/null
+++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.namesrv.processor;
+
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.help.FAQUrl;
+import com.alibaba.rocketmq.common.namesrv.NamesrvUtil;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+import com.alibaba.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+import com.alibaba.rocketmq.namesrv.NamesrvController;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class ClusterTestRequestProcessor extends DefaultRequestProcessor {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+    private final DefaultMQAdminExt adminExt;
+    private final String productEnvName;
+
+
+    public ClusterTestRequestProcessor(NamesrvController namesrvController, String productEnvName) {
+        super(namesrvController);
+        this.productEnvName = productEnvName;
+        adminExt = new DefaultMQAdminExt();
+        adminExt.setInstanceName("CLUSTER_TEST_NS_INS_" + productEnvName);
+        adminExt.setUnitName(productEnvName);
+        try {
+            adminExt.start();
+        } catch (MQClientException e) {
+            e.printStackTrace();
+        }
+    }
+
+
+    @Override
+    public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final GetRouteInfoRequestHeader requestHeader =
+                (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
+
+        TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
+        if (topicRouteData != null) {
+            String orderTopicConf =
+                    this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
+                            requestHeader.getTopic());
+            topicRouteData.setOrderTopicConf(orderTopicConf);
+        } else {
+            try {
+                topicRouteData = adminExt.examineTopicRouteInfo(requestHeader.getTopic());
+            } catch (Exception e) {
+                log.info("get route info by topic from product environment failed. envName={},", productEnvName);
+            }
+        }
+
+        if (topicRouteData != null) {
+            byte[] content = topicRouteData.encode();
+            response.setBody(content);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+            return response;
+        }
+
+        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
+        response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+                + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
+        return response;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/DefaultRequestProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/DefaultRequestProcessor.java
new file mode 100644
index 0000000..118198e
--- /dev/null
+++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -0,0 +1,491 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.namesrv.processor;
+
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.MQVersion.Version;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.help.FAQUrl;
+import com.alibaba.rocketmq.common.namesrv.NamesrvUtil;
+import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult;
+import com.alibaba.rocketmq.common.protocol.RequestCode;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+import com.alibaba.rocketmq.common.protocol.body.RegisterBrokerBody;
+import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import com.alibaba.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader;
+import com.alibaba.rocketmq.common.protocol.header.namesrv.*;
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+import com.alibaba.rocketmq.namesrv.NamesrvController;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultRequestProcessor implements NettyRequestProcessor {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+
+    protected final NamesrvController namesrvController;
+
+
+    public DefaultRequestProcessor(NamesrvController namesrvController) {
+        this.namesrvController = namesrvController;
+    }
+
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        if (log.isDebugEnabled()) {
+            log.debug("receive request, {} {} {}",
+                    request.getCode(),
+                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                    request);
+        }
+
+        switch (request.getCode()) {
+            case RequestCode.PUT_KV_CONFIG:
+                return this.putKVConfig(ctx, request);
+            case RequestCode.GET_KV_CONFIG:
+                return this.getKVConfig(ctx, request);
+            case RequestCode.DELETE_KV_CONFIG:
+                return this.deleteKVConfig(ctx, request);
+            case RequestCode.REGISTER_BROKER:
+                Version brokerVersion = MQVersion.value2Version(request.getVersion());
+                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
+                    return this.registerBrokerWithFilterServer(ctx, request);
+                }
+                else {
+                    return this.registerBroker(ctx, request);
+                }
+            case RequestCode.UNREGISTER_BROKER:
+                return this.unregisterBroker(ctx, request);
+            case RequestCode.GET_ROUTEINTO_BY_TOPIC:
+                return this.getRouteInfoByTopic(ctx, request);
+            case RequestCode.GET_BROKER_CLUSTER_INFO:
+                return this.getBrokerClusterInfo(ctx, request);
+            case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
+                return this.wipeWritePermOfBroker(ctx, request);
+            case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
+                return getAllTopicListFromNameserver(ctx, request);
+            case RequestCode.DELETE_TOPIC_IN_NAMESRV:
+                return deleteTopicInNamesrv(ctx, request);
+            case RequestCode.GET_KVLIST_BY_NAMESPACE:
+                return this.getKVListByNamespace(ctx, request);
+            case RequestCode.GET_TOPICS_BY_CLUSTER:
+                return this.getTopicsByCluster(ctx, request);
+            case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
+                return this.getSystemTopicListFromNs(ctx, request);
+            case RequestCode.GET_UNIT_TOPIC_LIST:
+                return this.getUnitTopicList(ctx, request);
+            case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
+                return this.getHasUnitSubTopicList(ctx, request);
+            case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
+                return this.getHasUnitSubUnUnitTopicList(ctx, request);
+            case RequestCode.UPDATE_NAMESRV_CONFIG:
+                return this.updateConfig(ctx, request);
+            case RequestCode.GET_NAMESRV_CONFIG:
+                return this.getConfig(ctx, request);
+            default:
+                break;
+        }
+        return null;
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+
+    public RemotingCommand putKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final PutKVConfigRequestHeader requestHeader =
+                (PutKVConfigRequestHeader) request.decodeCommandCustomHeader(PutKVConfigRequestHeader.class);
+
+        this.namesrvController.getKvConfigManager().putKVConfig(
+                requestHeader.getNamespace(),
+                requestHeader.getKey(),
+                requestHeader.getValue()
+        );
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    public RemotingCommand getKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(GetKVConfigResponseHeader.class);
+        final GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader) response.readCustomHeader();
+        final GetKVConfigRequestHeader requestHeader =
+                (GetKVConfigRequestHeader) request.decodeCommandCustomHeader(GetKVConfigRequestHeader.class);
+
+        String value = this.namesrvController.getKvConfigManager().getKVConfig(
+                requestHeader.getNamespace(),
+                requestHeader.getKey()
+        );
+
+        if (value != null) {
+            responseHeader.setValue(value);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+            return response;
+        }
+
+        response.setCode(ResponseCode.QUERY_NOT_FOUND);
+        response.setRemark("No config item, Namespace: " + requestHeader.getNamespace() + " Key: " + requestHeader.getKey());
+        return response;
+    }
+
+    public RemotingCommand deleteKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final DeleteKVConfigRequestHeader requestHeader =
+                (DeleteKVConfigRequestHeader) request.decodeCommandCustomHeader(DeleteKVConfigRequestHeader.class);
+
+        this.namesrvController.getKvConfigManager().deleteKVConfig(
+                requestHeader.getNamespace(),
+                requestHeader.getKey()
+        );
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
+            throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
+        final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
+        final RegisterBrokerRequestHeader requestHeader =
+                (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
+
+        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
+
+        if (request.getBody() != null) {
+            registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class);
+        } else {
+            registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
+            registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestatmp(0);
+        }
+
+        RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
+                requestHeader.getClusterName(),
+                requestHeader.getBrokerAddr(),
+                requestHeader.getBrokerName(),
+                requestHeader.getBrokerId(),
+                requestHeader.getHaServerAddr(),
+                registerBrokerBody.getTopicConfigSerializeWrapper(),
+                registerBrokerBody.getFilterServerList(),
+                ctx.channel());
+
+        responseHeader.setHaServerAddr(result.getHaServerAddr());
+        responseHeader.setMasterAddr(result.getMasterAddr());
+
+
+        byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
+        response.setBody(jsonValue);
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
+        final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
+        final RegisterBrokerRequestHeader requestHeader =
+                (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
+
+        TopicConfigSerializeWrapper topicConfigWrapper;
+        if (request.getBody() != null) {
+            topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
+        } else {
+            topicConfigWrapper = new TopicConfigSerializeWrapper();
+            topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
+            topicConfigWrapper.getDataVersion().setTimestatmp(0);
+        }
+
+        RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
+                requestHeader.getClusterName(),
+                requestHeader.getBrokerAddr(),
+                requestHeader.getBrokerName(),
+                requestHeader.getBrokerId(),
+                requestHeader.getHaServerAddr(),
+                topicConfigWrapper,
+                null,
+                ctx.channel()
+        );
+
+        responseHeader.setHaServerAddr(result.getHaServerAddr());
+        responseHeader.setMasterAddr(result.getMasterAddr());
+
+
+        byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
+        response.setBody(jsonValue);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    public RemotingCommand unregisterBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final UnRegisterBrokerRequestHeader requestHeader =
+                (UnRegisterBrokerRequestHeader) request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class);
+
+        this.namesrvController.getRouteInfoManager().unregisterBroker(
+                requestHeader.getClusterName(),
+                requestHeader.getBrokerAddr(),
+                requestHeader.getBrokerName(),
+                requestHeader.getBrokerId());
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final GetRouteInfoRequestHeader requestHeader =
+                (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
+
+        TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
+
+        if (topicRouteData != null) {
+            if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
+                String orderTopicConf =
+                        this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
+                                requestHeader.getTopic());
+                topicRouteData.setOrderTopicConf(orderTopicConf);
+            }
+
+            byte[] content = topicRouteData.encode();
+            response.setBody(content);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+            return response;
+        }
+
+        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
+        response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+                + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
+        return response;
+    }
+
+    private RemotingCommand getBrokerClusterInfo(ChannelHandlerContext ctx, RemotingCommand request) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+        byte[] content = this.namesrvController.getRouteInfoManager().getAllClusterInfo();
+        response.setBody(content);
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand wipeWritePermOfBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(WipeWritePermOfBrokerResponseHeader.class);
+        final WipeWritePermOfBrokerResponseHeader responseHeader = (WipeWritePermOfBrokerResponseHeader) response.readCustomHeader();
+        final WipeWritePermOfBrokerRequestHeader requestHeader =
+                (WipeWritePermOfBrokerRequestHeader) request.decodeCommandCustomHeader(WipeWritePermOfBrokerRequestHeader.class);
+
+        int wipeTopicCnt = this.namesrvController.getRouteInfoManager().wipeWritePermOfBrokerByLock(requestHeader.getBrokerName());
+
+        log.info("wipe write perm of broker[{}], client: {}, {}",
+                requestHeader.getBrokerName(),
+                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                wipeTopicCnt);
+
+        responseHeader.setWipeTopicCount(wipeTopicCnt);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand getAllTopicListFromNameserver(ChannelHandlerContext ctx, RemotingCommand request) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+        byte[] body = this.namesrvController.getRouteInfoManager().getAllTopicList();
+
+        response.setBody(body);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final DeleteTopicInNamesrvRequestHeader requestHeader =
+                (DeleteTopicInNamesrvRequestHeader) request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class);
+
+        this.namesrvController.getRouteInfoManager().deleteTopic(requestHeader.getTopic());
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand getKVListByNamespace(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final GetKVListByNamespaceRequestHeader requestHeader =
+                (GetKVListByNamespaceRequestHeader) request.decodeCommandCustomHeader(GetKVListByNamespaceRequestHeader.class);
+
+        byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(
+                requestHeader.getNamespace());
+        if (null != jsonValue) {
+            response.setBody(jsonValue);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+            return response;
+        }
+
+        response.setCode(ResponseCode.QUERY_NOT_FOUND);
+        response.setRemark("No config item, Namespace: " + requestHeader.getNamespace());
+        return response;
+    }
+
+    private RemotingCommand getTopicsByCluster(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final GetTopicsByClusterRequestHeader requestHeader =
+                (GetTopicsByClusterRequestHeader) request.decodeCommandCustomHeader(GetTopicsByClusterRequestHeader.class);
+
+        byte[] body = this.namesrvController.getRouteInfoManager().getTopicsByCluster(requestHeader.getCluster());
+
+        response.setBody(body);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+
+    private RemotingCommand getSystemTopicListFromNs(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+        byte[] body = this.namesrvController.getRouteInfoManager().getSystemTopicList();
+
+        response.setBody(body);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+
+    private RemotingCommand getUnitTopicList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+        byte[] body = this.namesrvController.getRouteInfoManager().getUnitTopics();
+
+        response.setBody(body);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+
+    private RemotingCommand getHasUnitSubTopicList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+        byte[] body = this.namesrvController.getRouteInfoManager().getHasUnitSubTopicList();
+
+        response.setBody(body);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+
+    private RemotingCommand getHasUnitSubUnUnitTopicList(ChannelHandlerContext ctx, RemotingCommand request)
+            throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+        byte[] body = this.namesrvController.getRouteInfoManager().getHasUnitSubUnUnitTopicList();
+
+        response.setBody(body);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand updateConfig(ChannelHandlerContext ctx, RemotingCommand request) {
+        log.info("updateConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+        byte[] body = request.getBody();
+        if (body != null) {
+            String bodyStr;
+            try {
+                bodyStr = new String(body, MixAll.DEFAULT_CHARSET);
+            } catch (UnsupportedEncodingException e) {
+                log.error("updateConfig byte array to string error: ", e);
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("UnsupportedEncodingException " + e);
+                return response;
+            }
+
+            if (bodyStr == null) {
+                log.error("updateConfig get null body!");
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("string2Properties error");
+                return response;
+            }
+
+            Properties properties = MixAll.string2Properties(bodyStr);
+            if (properties == null) {
+                log.error("updateConfig MixAll.string2Properties error {}", bodyStr);
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("string2Properties error");
+                return response;
+            }
+
+            this.namesrvController.getConfiguration().update(properties);
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand getConfig(ChannelHandlerContext ctx, RemotingCommand request) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+        String content = this.namesrvController.getConfiguration().getAllConfigsFormatString();
+        if (content != null && content.length() > 0) {
+            try {
+                response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
+            } catch (UnsupportedEncodingException e) {
+                log.error("getConfig error, ", e);
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("UnsupportedEncodingException " + e);
+                return response;
+            }
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
----------------------------------------------------------------------
diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
new file mode 100644
index 0000000..2f123fb
--- /dev/null
+++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.namesrv.routeinfo;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.namesrv.NamesrvController;
+import com.alibaba.rocketmq.remoting.ChannelEventListener;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class BrokerHousekeepingService implements ChannelEventListener {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+    private final NamesrvController namesrvController;
+
+
+    public BrokerHousekeepingService(NamesrvController namesrvController) {
+        this.namesrvController = namesrvController;
+    }
+
+
+    @Override
+    public void onChannelConnect(String remoteAddr, Channel channel) {
+    }
+
+
+    @Override
+    public void onChannelClose(String remoteAddr, Channel channel) {
+        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
+    }
+
+
+    @Override
+    public void onChannelException(String remoteAddr, Channel channel) {
+        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
+    }
+
+
+    @Override
+    public void onChannelIdle(String remoteAddr, Channel channel) {
+        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
+    }
+}


Mime
View raw message