rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [02/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander
Date Mon, 19 Dec 2016 09:40:19 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java
new file mode 100644
index 0000000..f6ee1f7
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.command.namesrv;
+
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * @author xigu.lx
+ */
+public class UpdateNamesrvConfigCommand implements SubCommand {
+    @Override
+    public String commandName() {
+        return "updateNamesrvConfig";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Update configs of name server.";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(final Options options) {
+        Option opt = new Option("k", "key", true, "config key");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("v", "value", true, "config value");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(final CommandLine commandLine, final Options options, final RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        try {
+            // key name
+            String key = commandLine.getOptionValue('k').trim();
+            // key name
+            String value = commandLine.getOptionValue('v').trim();
+            Properties properties = new Properties();
+            properties.put(key, value);
+
+            // servers
+            String servers = commandLine.getOptionValue('n');
+            List<String> serverList = null;
+            if (servers != null && servers.length() > 0) {
+                String[] serverArray = servers.trim().split(";");
+
+                if (serverArray != null && serverArray.length > 0) {
+                    serverList = Arrays.asList(serverArray);
+                }
+            }
+
+            defaultMQAdminExt.start();
+
+            defaultMQAdminExt.updateNameServerConfig(properties, serverList);
+
+            System.out.printf("update name server config success!%s\n%s : %s\n",
+                    serverList == null ? "" : serverList, key, value);
+            return;
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java
new file mode 100644
index 0000000..053ac7e
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.namesrv;
+
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.List;
+
+public class WipeWritePermSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "wipeWritePerm";
+    }
+
+
+    @Override
+    public String commandDesc() {
+        return "Wipe write perm of broker in all name server";
+    }
+
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("b", "brokerName", true, "broker name");
+        opt.setRequired(true);
+        options.addOption(opt);
+        return options;
+    }
+
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            defaultMQAdminExt.start();
+            String brokerName = commandLine.getOptionValue('b').trim();
+            List<String> namesrvList = defaultMQAdminExt.getNameServerAddressList();
+            if (namesrvList != null) {
+                for (String namesrvAddr : namesrvList) {
+                    try {
+                        int wipeTopicCount = defaultMQAdminExt.wipeWritePermOfBroker(namesrvAddr, brokerName);
+                        System.out.printf("wipe write perm of broker[%s] in name server[%s] OK, %d%n",
+                                brokerName,
+                                namesrvAddr,
+                                wipeTopicCount
+                        );
+                    } catch (Exception e) {
+                        System.out.printf("wipe write perm of broker[%s] in name server[%s] Failed%n",
+                                brokerName,
+                                namesrvAddr
+                        );
+
+                        e.printStackTrace();
+                    }
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java
new file mode 100644
index 0000000..b72aeae
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.command.offset;
+
+import com.alibaba.rocketmq.common.admin.ConsumeStats;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.route.BrokerData;
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Set;
+
+public class CloneGroupOffsetCommand implements SubCommand {
+    @Override
+    public String commandName() {
+        return "cloneGroupOffset";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "clone offset from other group.";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("s", "srcGroup", true, "set source consumer group");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("d", "destGroup", true, "set destination consumer group");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("t", "topic", true, "set the topic");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("o", "offline", true, "the group or the topic is offline");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        String srcGroup = commandLine.getOptionValue("s").trim();
+        String destGroup = commandLine.getOptionValue("d").trim();
+        String topic = commandLine.getOptionValue("t").trim();
+
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName("admin-" + Long.toString(System.currentTimeMillis()));
+
+        try {
+            defaultMQAdminExt.start();
+            ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(srcGroup);
+            Set<MessageQueue> mqs = consumeStats.getOffsetTable().keySet();
+            if (mqs != null && !mqs.isEmpty()) {
+                TopicRouteData topicRoute = defaultMQAdminExt.examineTopicRouteInfo(topic);
+                for (MessageQueue mq : mqs) {
+                    String addr = null;
+                    for (BrokerData brokerData : topicRoute.getBrokerDatas()) {
+                        if (brokerData.getBrokerName().equals(mq.getBrokerName())) {
+                            addr = brokerData.selectBrokerAddr();
+                            break;
+                        }
+                    }
+                    long offset = consumeStats.getOffsetTable().get(mq).getBrokerOffset();
+                    if (offset >= 0) {
+                        defaultMQAdminExt.updateConsumeOffset(addr, destGroup, mq, offset);
+                    }
+                }
+            }
+            System.out.printf("clone group offset success. srcGroup[%s], destGroup=[%s], topic[%s]",
+                    srcGroup, destGroup, topic);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/GetConsumerStatusCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/GetConsumerStatusCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/GetConsumerStatusCommand.java
new file mode 100644
index 0000000..af79512
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/GetConsumerStatusCommand.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.command.offset;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Map;
+
+public class GetConsumerStatusCommand implements SubCommand {
+    @Override
+    public String commandName() {
+        return "getConsumerStatus";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "get consumer status from client.";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("g", "group", true, "set the consumer group");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("t", "topic", true, "set the topic");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("i", "originClientId", true, "set the consumer clientId");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        try {
+            String group = commandLine.getOptionValue("g").trim();
+            String topic = commandLine.getOptionValue("t").trim();
+            String originClientId = "";
+            if (commandLine.hasOption("i")) {
+                originClientId = commandLine.getOptionValue("i").trim();
+            }
+            defaultMQAdminExt.start();
+
+            Map<String, Map<MessageQueue, Long>> consumerStatusTable =
+                    defaultMQAdminExt.getConsumeStatus(topic, group, originClientId);
+            System.out.printf("get consumer status from client. group=%s, topic=%s, originClientId=%s%n",
+                    group, topic, originClientId);
+
+            System.out.printf("%-50s  %-15s  %-15s  %-20s%n",
+                    "#clientId",
+                    "#brokerName",
+                    "#queueId",
+                    "#offset");
+
+            for (Map.Entry<String, Map<MessageQueue, Long>> entry : consumerStatusTable.entrySet()) {
+                String clientId = entry.getKey();
+                Map<MessageQueue, Long> mqTable = entry.getValue();
+                for (Map.Entry<MessageQueue, Long> entry1 : mqTable.entrySet()) {
+                    MessageQueue mq = entry1.getKey();
+                    System.out.printf("%-50s  %-15s  %-15d  %-20d%n",
+                            UtilAll.frontStringAtLeast(clientId, 50),
+                            mq.getBrokerName(),
+                            mq.getQueueId(),
+                            mqTable.get(mq));
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
new file mode 100644
index 0000000..e2bbbff
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.command.offset;
+
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.srvutil.ServerUtil;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+
+import java.util.Iterator;
+import java.util.Map;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class ResetOffsetByTimeCommand implements SubCommand {
+    public static void main(String[] args) {
+        ResetOffsetByTimeCommand cmd = new ResetOffsetByTimeCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[]{"-t Jodie_rest_test", "-g CID_Jodie_rest_test", "-s -1", "-f true"};
+        final CommandLine commandLine =
+                ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+    }
+
+    @Override
+    public String commandName() {
+        return "resetOffsetByTime";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Reset consumer offset by timestamp(without client restart).";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("g", "group", true, "set the consumer group");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("t", "topic", true, "set the topic");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("s", "timestamp", true, "set the timestamp[now|currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("f", "force", true, "set the force rollback by timestamp switch[true|false]");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "cplus", false, "reset c++ client offset");
+        opt.setRequired(false);
+        options.addOption(opt);
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        try {
+            String group = commandLine.getOptionValue("g").trim();
+            String topic = commandLine.getOptionValue("t").trim();
+            String timeStampStr = commandLine.getOptionValue("s").trim();
+            long timestamp = timeStampStr.equals("now") ? System.currentTimeMillis() : 0;
+
+            try {
+                if (timestamp == 0) {
+                    timestamp = Long.parseLong(timeStampStr);
+                }
+            } catch (NumberFormatException e) {
+
+                timestamp = UtilAll.parseDate(timeStampStr, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime();
+            }
+
+            boolean force = true;
+            if (commandLine.hasOption('f')) {
+                force = Boolean.valueOf(commandLine.getOptionValue("f").trim());
+            }
+
+            boolean isC = false;
+            if (commandLine.hasOption('c')) {
+                isC = true;
+            }
+
+            defaultMQAdminExt.start();
+            Map<MessageQueue, Long> offsetTable;
+            try {
+                offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, force, isC);
+            } catch (MQClientException e) {
+                if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
+                    ResetOffsetByTimeOldCommand.resetOffset(defaultMQAdminExt, group, topic, timestamp, force, timeStampStr);
+                    return;
+                }
+                throw e;
+            }
+
+            System.out.printf("rollback consumer offset by specified group[%s], topic[%s], force[%s], timestamp(string)[%s], timestamp(long)[%s]%n",
+                    group, topic, force, timeStampStr, timestamp);
+
+            System.out.printf("%-40s  %-40s  %-40s%n",
+                    "#brokerName",
+                    "#queueId",
+                    "#offset");
+
+            Iterator<Map.Entry<MessageQueue, Long>> iterator = offsetTable.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<MessageQueue, Long> entry = iterator.next();
+                System.out.printf("%-40s  %-40d  %-40d%n",
+                        UtilAll.frontStringAtLeast(entry.getKey().getBrokerName(), 32),
+                        entry.getKey().getQueueId(),
+                        entry.getValue());
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
new file mode 100644
index 0000000..3420795
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.command.offset;
+
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.admin.RollbackStats;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Date;
+import java.util.List;
+
+
+/**
+ *
+ * @author manhong.yqd
+ *
+ */
+public class ResetOffsetByTimeOldCommand implements SubCommand {
+    @Override
+    public String commandName() {
+        return "resetOffsetByTimeOld";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Reset consumer offset by timestamp(execute this command required client restart).";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("g", "group", true, "set the consumer group");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("t", "topic", true, "set the topic");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("s", "timestamp", true, "set the timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("f", "force", true, "set the force rollback by timestamp switch[true|false]");
+        opt.setRequired(false);
+        options.addOption(opt);
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        try {
+            String consumerGroup = commandLine.getOptionValue("g").trim();
+            String topic = commandLine.getOptionValue("t").trim();
+            String timeStampStr = commandLine.getOptionValue("s").trim();
+            long timestamp = 0;
+            try {
+                timestamp = Long.parseLong(timeStampStr);
+            } catch (NumberFormatException e) {
+
+                Date date = UtilAll.parseDate(timeStampStr, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS);
+                if (date != null) {
+                    timestamp = UtilAll.parseDate(timeStampStr, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime();
+                } else {
+                    System.out.printf("specified timestamp invalid.%n");
+                    return;
+                }
+
+                boolean force = true;
+                if (commandLine.hasOption('f')) {
+                    force = Boolean.valueOf(commandLine.getOptionValue("f").trim());
+                }
+
+                defaultMQAdminExt.start();
+                resetOffset(defaultMQAdminExt, consumerGroup, topic, timestamp, force, timeStampStr);
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+
+    public static void resetOffset(DefaultMQAdminExt defaultMQAdminExt, String consumerGroup, String topic, long timestamp, boolean force,
+                                   String timeStampStr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        List<RollbackStats> rollbackStatsList = defaultMQAdminExt.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force);
+        System.out.printf(
+                "rollback consumer offset by specified consumerGroup[%s], topic[%s], force[%s], timestamp(string)[%s], timestamp(long)[%s]%n",
+                consumerGroup, topic, force, timeStampStr, timestamp);
+
+        System.out.printf("%-20s  %-20s  %-20s  %-20s  %-20s  %-20s%n",
+                "#brokerName",
+                "#queueId",
+                "#brokerOffset",
+                "#consumerOffset",
+                "#timestampOffset",
+                "#rollbackOffset"
+        );
+
+        for (RollbackStats rollbackStats : rollbackStatsList) {
+            System.out.printf("%-20s  %-20d  %-20d  %-20d  %-20d  %-20d%n",
+                    UtilAll.frontStringAtLeast(rollbackStats.getBrokerName(), 32),
+                    rollbackStats.getQueueId(),
+                    rollbackStats.getBrokerOffset(),
+                    rollbackStats.getConsumerOffset(),
+                    rollbackStats.getTimestampOffset(),
+                    rollbackStats.getRollbackOffset()
+            );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/stats/StatsAllSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/stats/StatsAllSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/stats/StatsAllSubCommand.java
new file mode 100644
index 0000000..a57f04a
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/stats/StatsAllSubCommand.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.command.stats;
+
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.admin.ConsumeStats;
+import com.alibaba.rocketmq.common.protocol.body.BrokerStatsData;
+import com.alibaba.rocketmq.common.protocol.body.GroupList;
+import com.alibaba.rocketmq.common.protocol.body.TopicList;
+import com.alibaba.rocketmq.common.protocol.route.BrokerData;
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+
+public class StatsAllSubCommand implements SubCommand {
+    @Override
+    public String commandName() {
+        return "statsAll";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Topic and Consumer tps stats";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("a", "activeTopic", false, "print active topic only");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("t", "topic", true, "print select topic only");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            defaultMQAdminExt.start();
+
+            TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
+
+            System.out.printf("%-32s  %-32s %12s %11s %11s %14s %14s%n",
+                    "#Topic",
+                    "#Consumer Group",
+                    "#Accumulation",
+                    "#InTPS",
+                    "#OutTPS",
+                    "#InMsg24Hour",
+                    "#OutMsg24Hour"
+            );
+
+            boolean activeTopic = commandLine.hasOption('a');
+            String selectTopic = commandLine.getOptionValue('t');
+
+            for (String topic : topicList.getTopicList()) {
+                if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
+                    continue;
+                }
+
+                if (selectTopic != null && selectTopic != "" && !topic.equals(selectTopic)) {
+                    continue;
+                }
+
+                try {
+                    printTopicDetail(defaultMQAdminExt, topic, activeTopic);
+                } catch (Exception e) {
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+
+    public static void printTopicDetail(final DefaultMQAdminExt admin, final String topic, final boolean activeTopic)
+            throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        TopicRouteData topicRouteData = admin.examineTopicRouteInfo(topic);
+
+        GroupList groupList = admin.queryTopicConsumeByWho(topic);
+
+        double inTPS = 0;
+
+        long inMsgCntToday = 0;
+
+
+        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+            String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
+            if (masterAddr != null) {
+                try {
+                    BrokerStatsData bsd = admin.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic);
+                    inTPS += bsd.getStatsMinute().getTps();
+                    inMsgCntToday += compute24HourSum(bsd);
+                } catch (Exception e) {
+                }
+            }
+        }
+
+        if (groupList != null && !groupList.getGroupList().isEmpty()) {
+
+            for (String group : groupList.getGroupList()) {
+                double outTPS = 0;
+                long outMsgCntToday = 0;
+
+                for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+                    String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
+                    if (masterAddr != null) {
+                        try {
+                            String statsKey = String.format("%s@%s", topic, group);
+                            BrokerStatsData bsd = admin.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_NUMS, statsKey);
+                            outTPS += bsd.getStatsMinute().getTps();
+                            outMsgCntToday += compute24HourSum(bsd);
+                        } catch (Exception e) {
+                        }
+                    }
+                }
+
+                long accumulate = 0;
+                try {
+                    ConsumeStats consumeStats = admin.examineConsumeStats(group, topic);
+                    if (consumeStats != null) {
+                        accumulate = consumeStats.computeTotalDiff();
+                        if (accumulate < 0) {
+                            accumulate = 0;
+                        }
+                    }
+                } catch (Exception e) {
+                }
+
+                if (!activeTopic || (inMsgCntToday > 0) ||
+                        (outMsgCntToday > 0)) {
+
+                    System.out.printf("%-32s  %-32s %12d %11.2f %11.2f %14d %14d%n",
+                            UtilAll.frontStringAtLeast(topic, 32),
+                            UtilAll.frontStringAtLeast(group, 32),
+                            accumulate,
+                            inTPS,
+                            outTPS,
+                            inMsgCntToday,
+                            outMsgCntToday
+                    );
+                }
+            }
+        } else {
+            if (!activeTopic || (inMsgCntToday > 0)) {
+
+                System.out.printf("%-32s  %-32s %12d %11.2f %11s %14d %14s%n",
+                        UtilAll.frontStringAtLeast(topic, 32),
+                        "",
+                        0,
+                        inTPS,
+                        "",
+                        inMsgCntToday,
+                        "NO_CONSUMER"
+                );
+            }
+        }
+    }
+
+    public static long compute24HourSum(BrokerStatsData bsd) {
+        if (bsd.getStatsDay().getSum() != 0) {
+            return bsd.getStatsDay().getSum();
+        }
+
+        if (bsd.getStatsHour().getSum() != 0) {
+            return bsd.getStatsHour().getSum();
+        }
+
+        if (bsd.getStatsMinute().getSum() != 0) {
+            return bsd.getStatsMinute().getSum();
+        }
+
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/AllocateMQSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/AllocateMQSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/AllocateMQSubCommand.java
new file mode 100644
index 0000000..35e5d3b
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/AllocateMQSubCommand.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.topic;
+
+import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+
+public class AllocateMQSubCommand implements SubCommand {
+    @Override
+    public String commandName() {
+        return "allocateMQ";
+    }
+
+
+    @Override
+    public String commandDesc() {
+        return "Allocate MQ";
+    }
+
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("t", "topic", true, "topic name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("i", "ipList", true, "ipList");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook);
+        adminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        try {
+            adminExt.start();
+
+            String topic = commandLine.getOptionValue('t').trim();
+            String ips = commandLine.getOptionValue('i').trim();
+            final String[] split = ips.split(",");
+            final List<String> ipList = new LinkedList<String>();
+            for (String ip : split) {
+                ipList.add(ip);
+            }
+
+            final TopicRouteData topicRouteData = adminExt.examineTopicRouteInfo(topic);
+            final Set<MessageQueue> mqs = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
+
+            final AllocateMessageQueueAveragely averagely = new AllocateMessageQueueAveragely();
+
+
+            RebalanceResult rr = new RebalanceResult();
+
+            for (String i : ipList) {
+                final List<MessageQueue> mqResult = averagely.allocate("aa", i, new ArrayList<MessageQueue>(mqs), ipList);
+                rr.getResult().put(i, mqResult);
+            }
+
+            final String json = RemotingSerializable.toJson(rr, false);
+            System.out.printf("%s%n", json);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            adminExt.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/DeleteTopicSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/DeleteTopicSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/DeleteTopicSubCommand.java
new file mode 100644
index 0000000..1832de6
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/DeleteTopicSubCommand.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.topic;
+
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+import com.alibaba.rocketmq.srvutil.ServerUtil;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.CommandUtil;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ *
+ * @author lansheng.zj
+ *
+ */
+public class DeleteTopicSubCommand implements SubCommand {
+    @Override
+    public String commandName() {
+        return "deleteTopic";
+    }
+
+
+    @Override
+    public String commandDesc() {
+        return "Delete topic from broker and NameServer.";
+    }
+
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("t", "topic", true, "topic name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("c", "clusterName", true, "delete topic from which cluster");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+
+    public static void deleteTopic(final DefaultMQAdminExt adminExt,
+                                   final String clusterName,
+                                   final String topic
+    ) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+
+        Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName);
+        adminExt.deleteTopicInBroker(masterSet, topic);
+        System.out.printf("delete topic [%s] from cluster [%s] success.%n", topic, clusterName);
+
+
+        Set<String> nameServerSet = null;
+        if (adminExt.getNamesrvAddr() != null) {
+            String[] ns = adminExt.getNamesrvAddr().trim().split(";");
+            nameServerSet = new HashSet(Arrays.asList(ns));
+        }
+
+
+        adminExt.deleteTopicInNameServer(nameServerSet, topic);
+        System.out.printf("delete topic [%s] from NameServer success.%n", topic);
+    }
+
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook);
+        adminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        try {
+            String topic = commandLine.getOptionValue('t').trim();
+
+            if (commandLine.hasOption('c')) {
+                String clusterName = commandLine.getOptionValue('c').trim();
+
+                adminExt.start();
+                deleteTopic(adminExt, clusterName, topic);
+                return;
+            }
+
+            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            adminExt.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/RebalanceResult.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/RebalanceResult.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/RebalanceResult.java
new file mode 100644
index 0000000..478413e
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/RebalanceResult.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.command.topic;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RebalanceResult {
+    private Map<String/*ip*/, List<MessageQueue>> result = new HashMap<String, List<MessageQueue>>();
+
+    public Map<String, List<MessageQueue>> getResult() {
+        return result;
+    }
+
+    public void setResult(final Map<String, List<MessageQueue>> result) {
+        this.result = result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicClusterSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicClusterSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicClusterSubCommand.java
new file mode 100644
index 0000000..9954305
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicClusterSubCommand.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.topic;
+
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Set;
+
+
+/**
+ *
+ * @author zhouli
+ *
+ */
+public class TopicClusterSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "topicClusterList";
+    }
+
+
+    @Override
+    public String commandDesc() {
+        return "get cluster info for topic";
+    }
+
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("t", "topic", true, "topic name");
+        opt.setRequired(true);
+        options.addOption(opt);
+        return options;
+    }
+
+    @Override
+    public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        String topic = commandLine.getOptionValue('t').trim();
+        try {
+            defaultMQAdminExt.start();
+            Set<String> clusters = defaultMQAdminExt.getTopicClusterList(topic);
+            for (String value : clusters) {
+                System.out.printf("%s%n", value);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicListSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicListSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicListSubCommand.java
new file mode 100644
index 0000000..9224d65
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicListSubCommand.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.topic;
+
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.protocol.body.ClusterInfo;
+import com.alibaba.rocketmq.common.protocol.body.GroupList;
+import com.alibaba.rocketmq.common.protocol.body.TopicList;
+import com.alibaba.rocketmq.common.protocol.route.BrokerData;
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+
+
+/**
+ *
+ * @author shijia.wxr
+ *
+ */
+public class TopicListSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "topicList";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Fetch all topic list from name server";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("c", "clusterModel", false, "clusterModel");
+        opt.setRequired(false);
+        options.addOption(opt);
+        return options;
+    }
+
+    @Override
+    public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            defaultMQAdminExt.start();
+            if (commandLine.hasOption('c')) {
+                ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+
+                System.out.printf("%-20s  %-48s  %-48s%n",
+                        "#Cluster Name",
+                        "#Topic",
+                        "#Consumer Group"
+                );
+
+                TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
+                for (String topic : topicList.getTopicList()) {
+                    if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
+                            || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
+                        continue;
+                    }
+
+                    String clusterName = "";
+                    GroupList groupList = new GroupList();
+
+                    try {
+                        clusterName =
+                                this.findTopicBelongToWhichCluster(topic, clusterInfo, defaultMQAdminExt);
+                        groupList = defaultMQAdminExt.queryTopicConsumeByWho(topic);
+                    } catch (Exception e) {
+                    }
+
+                    if (null == groupList || groupList.getGroupList().isEmpty()) {
+                        groupList = new GroupList();
+                        groupList.getGroupList().add("");
+                    }
+
+                    for (String group : groupList.getGroupList()) {
+                        System.out.printf("%-20s  %-48s  %-48s%n",
+                                UtilAll.frontStringAtLeast(clusterName, 20),
+                                UtilAll.frontStringAtLeast(topic, 48),
+                                UtilAll.frontStringAtLeast(group, 48)
+                        );
+                    }
+                }
+            } else {
+                TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
+                for (String topic : topicList.getTopicList()) {
+                    System.out.printf("%s%n", topic);
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+
+    private String findTopicBelongToWhichCluster(final String topic, final ClusterInfo clusterInfo,
+                                                 final DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, MQClientException,
+            InterruptedException {
+        TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+
+        BrokerData brokerData = topicRouteData.getBrokerDatas().get(0);
+
+        String brokerName = brokerData.getBrokerName();
+
+        Iterator<Entry<String, Set<String>>> it = clusterInfo.getClusterAddrTable().entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, Set<String>> next = it.next();
+            if (next.getValue().contains(brokerName)) {
+                return next.getKey();
+            }
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicRouteSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicRouteSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicRouteSubCommand.java
new file mode 100644
index 0000000..d1d6d28
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicRouteSubCommand.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.topic;
+
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+
+/**
+ *
+ * @author shijia.wxr
+ *
+ */
+public class TopicRouteSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "topicRoute";
+    }
+
+
+    @Override
+    public String commandDesc() {
+        return "Examine topic route info";
+    }
+
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("t", "topic", true, "topic name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+
+    @Override
+    public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            defaultMQAdminExt.start();
+
+            String topic = commandLine.getOptionValue('t').trim();
+            TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+            String json = topicRouteData.toJson(true);
+            System.out.printf("%s%n", json);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicStatusSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicStatusSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicStatusSubCommand.java
new file mode 100644
index 0000000..685dbea
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/TopicStatusSubCommand.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.topic;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.admin.TopicOffset;
+import com.alibaba.rocketmq.common.admin.TopicStatsTable;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+
+/**
+ *
+ * @author shijia.wxr
+ *
+ */
+public class TopicStatusSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "topicStatus";
+    }
+
+
+    @Override
+    public String commandDesc() {
+        return "Examine topic Status info";
+    }
+
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("t", "topic", true, "topic name");
+        opt.setRequired(true);
+        options.addOption(opt);
+        return options;
+    }
+
+
+    @Override
+    public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            defaultMQAdminExt.start();
+            String topic = commandLine.getOptionValue('t').trim();
+            TopicStatsTable topicStatsTable = defaultMQAdminExt.examineTopicStats(topic);
+
+            List<MessageQueue> mqList = new LinkedList<MessageQueue>();
+            mqList.addAll(topicStatsTable.getOffsetTable().keySet());
+            Collections.sort(mqList);
+
+            System.out.printf("%-32s  %-4s  %-20s  %-20s    %s%n",
+                    "#Broker Name",
+                    "#QID",
+                    "#Min Offset",
+                    "#Max Offset",
+                    "#Last Updated"
+            );
+
+            for (MessageQueue mq : mqList) {
+                TopicOffset topicOffset = topicStatsTable.getOffsetTable().get(mq);
+
+                String humanTimestamp = "";
+                if (topicOffset.getLastUpdateTimestamp() > 0) {
+                    humanTimestamp = UtilAll.timeMillisToHumanString2(topicOffset.getLastUpdateTimestamp());
+                }
+
+                System.out.printf("%-32s  %-4d  %-20d  %-20d    %s%n",
+                        UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
+                        mq.getQueueId(),
+                        topicOffset.getMinOffset(),
+                        topicOffset.getMaxOffset(),
+                        humanTimestamp
+                );
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateOrderConfCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateOrderConfCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateOrderConfCommand.java
new file mode 100644
index 0000000..164579f
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateOrderConfCommand.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.topic;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.namesrv.NamesrvUtil;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.srvutil.ServerUtil;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+
+/**
+ *
+ * @author manhong.yqd
+ *
+ */
+public class UpdateOrderConfCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "updateOrderConf";
+    }
+
+
+    @Override
+    public String commandDesc() {
+        return "Create or update or delete order conf";
+    }
+
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("t", "topic", true, "topic name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("v", "orderConf", true, "set order conf [eg. brokerName1:num;brokerName2:num]");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("m", "method", true, "option type [eg. put|get|delete");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+
+    @Override
+    public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            String topic = commandLine.getOptionValue('t').trim();
+            String type = commandLine.getOptionValue('m').trim();
+
+            if ("get".equals(type)) {
+
+                defaultMQAdminExt.start();
+                String orderConf =
+                        defaultMQAdminExt.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, topic);
+                System.out.printf("get orderConf success. topic=[%s], orderConf=[%s] ", topic, orderConf);
+
+                return;
+            } else if ("put".equals(type)) {
+
+                defaultMQAdminExt.start();
+                String orderConf = "";
+                if (commandLine.hasOption('v')) {
+                    orderConf = commandLine.getOptionValue('v').trim();
+                }
+                if (UtilAll.isBlank(orderConf)) {
+                    throw new Exception("please set orderConf with option -v.");
+                }
+
+                defaultMQAdminExt.createOrUpdateOrderConf(topic, orderConf, true);
+                System.out.printf("update orderConf success. topic=[%s], orderConf=[%s]", topic,
+                        orderConf.toString());
+                return;
+            } else if ("delete".equals(type)) {
+
+                defaultMQAdminExt.start();
+                defaultMQAdminExt.deleteKvConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, topic);
+                System.out.printf("delete orderConf success. topic=[%s]", topic);
+
+                return;
+            }
+
+            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java
new file mode 100644
index 0000000..1938934
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.topic;
+
+import com.alibaba.rocketmq.common.TopicConfig;
+import com.alibaba.rocketmq.common.protocol.route.QueueData;
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.srvutil.ServerUtil;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.CommandUtil;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.List;
+import java.util.Set;
+
+
+public class UpdateTopicPermSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "updateTopicPerm";
+    }
+
+
+    @Override
+    public String commandDesc() {
+        return "Update topic perm";
+    }
+
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("b", "brokerAddr", true, "create topic to which broker");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "clusterName", true, "create topic to which cluster");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("t", "topic", true, "topic name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("p", "perm", true, "set topic's permission(2|4|6), intro[2:R; 4:W; 6:RW]");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+
+    @Override
+    public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        try {
+            defaultMQAdminExt.start();
+            TopicConfig topicConfig = new TopicConfig();
+
+            String topic = commandLine.getOptionValue('t').trim();
+            TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+            assert topicRouteData != null;
+            List<QueueData> queueDatas = topicRouteData.getQueueDatas();
+            assert queueDatas != null && queueDatas.size() > 0;
+
+            QueueData queueData = queueDatas.get(0);
+            topicConfig.setTopicName(topic);
+            topicConfig.setWriteQueueNums(queueData.getWriteQueueNums());
+            topicConfig.setReadQueueNums(queueData.getReadQueueNums());
+            topicConfig.setPerm(queueData.getPerm());
+            topicConfig.setTopicSysFlag(queueData.getTopicSynFlag());
+
+            //new perm
+            int perm = Integer.parseInt(commandLine.getOptionValue('p').trim());
+            int oldPerm = topicConfig.getPerm();
+            if (perm == oldPerm) {
+                System.out.printf("new perm equals to the old one!%n");
+                return;
+            }
+            topicConfig.setPerm(perm);
+            if (commandLine.hasOption('b')) {
+                String addr = commandLine.getOptionValue('b').trim();
+                defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+                System.out.printf("update topic perm from %s to %s in %s success.%n", oldPerm, perm, addr);
+                System.out.printf("%s%n", topicConfig);
+                return;
+            } else if (commandLine.hasOption('c')) {
+                String clusterName = commandLine.getOptionValue('c').trim();
+                Set<String> masterSet =
+                        CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+                for (String addr : masterSet) {
+                    defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+                    System.out.printf("update topic perm from %s to %s in %s success.%n", oldPerm, perm, addr);
+                }
+                return;
+            }
+            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateTopicSubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
new file mode 100644
index 0000000..c33018f
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.topic;
+
+import com.alibaba.rocketmq.common.TopicConfig;
+import com.alibaba.rocketmq.common.sysflag.TopicSysFlag;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.srvutil.ServerUtil;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.CommandUtil;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class UpdateTopicSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "updateTopic";
+    }
+
+
+    @Override
+    public String commandDesc() {
+        return "Update or create topic";
+    }
+
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("b", "brokerAddr", true, "create topic to which broker");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "clusterName", true, "create topic to which cluster");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("t", "topic", true, "topic name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("r", "readQueueNums", true, "set read queue nums");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("w", "writeQueueNums", true, "set write queue nums");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("p", "perm", true, "set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("o", "order", true, "set topic's order(true|false");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("u", "unit", true, "is unit topic (true|false");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("s", "hasUnitSub", true, "has unit sub (true|false");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+
+    @Override
+    public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            TopicConfig topicConfig = new TopicConfig();
+            topicConfig.setReadQueueNums(8);
+            topicConfig.setWriteQueueNums(8);
+            topicConfig.setTopicName(commandLine.getOptionValue('t').trim());
+
+            // readQueueNums
+            if (commandLine.hasOption('r')) {
+                topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim()));
+            }
+
+            // writeQueueNums
+            if (commandLine.hasOption('w')) {
+                topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim()));
+            }
+
+            // perm
+            if (commandLine.hasOption('p')) {
+                topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim()));
+            }
+
+            boolean isUnit = false;
+            if (commandLine.hasOption('u')) {
+                isUnit = Boolean.parseBoolean(commandLine.getOptionValue('u').trim());
+            }
+
+            boolean isCenterSync = false;
+            if (commandLine.hasOption('s')) {
+                isCenterSync = Boolean.parseBoolean(commandLine.getOptionValue('s').trim());
+            }
+
+            int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit, isCenterSync);
+            topicConfig.setTopicSysFlag(topicCenterSync);
+
+            boolean isOrder = false;
+            if (commandLine.hasOption('o')) {
+                isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim());
+            }
+            topicConfig.setOrder(isOrder);
+
+            if (commandLine.hasOption('b')) {
+                String addr = commandLine.getOptionValue('b').trim();
+
+                defaultMQAdminExt.start();
+                defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+
+                if (isOrder) {
+                    String brokerName = CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr);
+                    String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums();
+                    defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false);
+                    System.out.printf(String.format("set broker orderConf. isOrder=%s, orderConf=[%s]",
+                            isOrder, orderConf.toString()));
+                }
+                System.out.printf("create topic to %s success.%n", addr);
+                System.out.printf("%s", topicConfig);
+                return;
+
+            } else if (commandLine.hasOption('c')) {
+                String clusterName = commandLine.getOptionValue('c').trim();
+
+                defaultMQAdminExt.start();
+
+                Set<String> masterSet =
+                        CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+                for (String addr : masterSet) {
+                    defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+                    System.out.printf("create topic to %s success.%n", addr);
+                }
+
+                if (isOrder) {
+                    Set<String> brokerNameSet =
+                            CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);
+                    StringBuilder orderConf = new StringBuilder();
+                    String splitor = "";
+                    for (String s : brokerNameSet) {
+                        orderConf.append(splitor).append(s).append(":")
+                                .append(topicConfig.getWriteQueueNums());
+                        splitor = ";";
+                    }
+                    defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
+                            orderConf.toString(), true);
+                    System.out.printf(String.format("set cluster orderConf. isOrder=%s, orderConf=[%s]",
+                            isOrder, orderConf.toString()));
+                }
+
+                System.out.printf("%s", topicConfig);
+                return;
+            }
+
+            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}


Mime
View raw message