rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [36/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander
Date Mon, 19 Dec 2016 09:40:53 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java
new file mode 100644
index 0000000..d954a46
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.subscription;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.broker.BrokerPathConfigHelper;
+import com.alibaba.rocketmq.common.ConfigManager;
+import com.alibaba.rocketmq.common.DataVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SubscriptionGroupManager extends ConfigManager {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    private final ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
+            new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
+    private final DataVersion dataVersion = new DataVersion();
+    private transient BrokerController brokerController;
+
+
+    public SubscriptionGroupManager() {
+        this.init();
+    }
+
+    private void init() {
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);
+            this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);
+        }
+
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);
+            this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);
+        }
+
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);
+            this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);
+        }
+
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);
+            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+            this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);
+        }
+
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);
+            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);
+        }
+
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);
+            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);
+        }
+
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);
+            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);
+        }
+    }
+
+
+    public SubscriptionGroupManager(BrokerController brokerController) {
+        this.brokerController = brokerController;
+        this.init();
+    }
+
+
+    public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
+        SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
+        if (old != null) {
+            log.info("update subscription group config, old: " + old + " new: " + config);
+        } else {
+            log.info("create new subscription group, " + config);
+        }
+
+        this.dataVersion.nextVersion();
+
+        this.persist();
+    }
+
+    public void disableConsume(final String groupName) {
+        SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);
+        if (old != null) {
+            old.setConsumeEnable(false);
+            this.dataVersion.nextVersion();
+        }
+    }
+
+
+    public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
+        SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);
+        if (null == subscriptionGroupConfig) {
+            if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {
+                subscriptionGroupConfig = new SubscriptionGroupConfig();
+                subscriptionGroupConfig.setGroupName(group);
+                SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);
+                if (null == preConfig) {
+                    log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());
+                }
+                this.dataVersion.nextVersion();
+                this.persist();
+            }
+        }
+
+        return subscriptionGroupConfig;
+    }
+
+
+    @Override
+    public String encode() {
+        return this.encode(false);
+    }
+
+    @Override
+    public String configFilePath() {
+        //return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
+        return BrokerPathConfigHelper.getSubscriptionGroupPath(System.getProperty("user.home") + File.separator + "store");
+    }
+
+    @Override
+    public void decode(String jsonString) {
+        if (jsonString != null) {
+            SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);
+            if (obj != null) {
+                this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);
+                this.dataVersion.assignNewOne(obj.dataVersion);
+                this.printLoadDataWhenFirstBoot(obj);
+            }
+        }
+    }
+
+    public String encode(final boolean prettyFormat) {
+        return RemotingSerializable.toJson(this, prettyFormat);
+    }
+
+    private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {
+        Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, SubscriptionGroupConfig> next = it.next();
+            log.info("load exist subscription group, {}", next.getValue().toString());
+        }
+    }
+
+    public ConcurrentHashMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
+        return subscriptionGroupTable;
+    }
+
+
+    public DataVersion getDataVersion() {
+        return dataVersion;
+    }
+
+
+    public void deleteSubscriptionGroupConfig(final String groupName) {
+        SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);
+        if (old != null) {
+            log.info("delete subscription group OK, subscription group: " + old);
+            this.dataVersion.nextVersion();
+            this.persist();
+        } else {
+            log.warn("delete subscription group failed, subscription group: " + old + " not exist");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java
new file mode 100644
index 0000000..94d7e9f
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java
@@ -0,0 +1,440 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.topic;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.broker.BrokerPathConfigHelper;
+import com.alibaba.rocketmq.common.ConfigManager;
+import com.alibaba.rocketmq.common.DataVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.TopicConfig;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.constant.PermName;
+import com.alibaba.rocketmq.common.protocol.body.KVTable;
+import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import com.alibaba.rocketmq.common.sysflag.TopicSysFlag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TopicConfigManager extends ConfigManager {
+    private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private static final long LOCK_TIMEOUT_MILLIS = 3000;
+    private transient final Lock lockTopicConfigTable = new ReentrantLock();
+
+    private final ConcurrentHashMap<String, TopicConfig> topicConfigTable =
+            new ConcurrentHashMap<String, TopicConfig>(1024);
+    private final DataVersion dataVersion = new DataVersion();
+    private final Set<String> systemTopicList = new HashSet<String>();
+    private transient BrokerController brokerController;
+
+
+    public TopicConfigManager() {
+    }
+
+
+    public TopicConfigManager(BrokerController brokerController) {
+        this.brokerController = brokerController;
+        {
+            // MixAll.SELF_TEST_TOPIC
+            String topic = MixAll.SELF_TEST_TOPIC;
+            TopicConfig topicConfig = new TopicConfig(topic);
+            this.systemTopicList.add(topic);
+            topicConfig.setReadQueueNums(1);
+            topicConfig.setWriteQueueNums(1);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
+        {
+            // MixAll.DEFAULT_TOPIC
+            if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
+                String topic = MixAll.DEFAULT_TOPIC;
+                TopicConfig topicConfig = new TopicConfig(topic);
+                this.systemTopicList.add(topic);
+                topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
+                        .getDefaultTopicQueueNums());
+                topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
+                        .getDefaultTopicQueueNums());
+                int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
+                topicConfig.setPerm(perm);
+                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+            }
+        }
+        {
+            // MixAll.BENCHMARK_TOPIC
+            String topic = MixAll.BENCHMARK_TOPIC;
+            TopicConfig topicConfig = new TopicConfig(topic);
+            this.systemTopicList.add(topic);
+            topicConfig.setReadQueueNums(1024);
+            topicConfig.setWriteQueueNums(1024);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
+        {
+
+            String topic = this.brokerController.getBrokerConfig().getBrokerClusterName();
+            TopicConfig topicConfig = new TopicConfig(topic);
+            this.systemTopicList.add(topic);
+            int perm = PermName.PERM_INHERIT;
+            if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) {
+                perm |= PermName.PERM_READ | PermName.PERM_WRITE;
+            }
+            topicConfig.setPerm(perm);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
+        {
+
+            String topic = this.brokerController.getBrokerConfig().getBrokerName();
+            TopicConfig topicConfig = new TopicConfig(topic);
+            this.systemTopicList.add(topic);
+            int perm = PermName.PERM_INHERIT;
+            if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) {
+                perm |= PermName.PERM_READ | PermName.PERM_WRITE;
+            }
+            topicConfig.setReadQueueNums(1);
+            topicConfig.setWriteQueueNums(1);
+            topicConfig.setPerm(perm);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
+        {
+            // MixAll.OFFSET_MOVED_EVENT
+            String topic = MixAll.OFFSET_MOVED_EVENT;
+            TopicConfig topicConfig = new TopicConfig(topic);
+            this.systemTopicList.add(topic);
+            topicConfig.setReadQueueNums(1);
+            topicConfig.setWriteQueueNums(1);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
+    }
+
+
+    public boolean isSystemTopic(final String topic) {
+        return this.systemTopicList.contains(topic);
+    }
+
+
+    public Set<String> getSystemTopic() {
+        return this.systemTopicList;
+    }
+
+
+    public boolean isTopicCanSendMessage(final String topic) {
+        return !topic.equals(MixAll.DEFAULT_TOPIC);
+    }
+
+
+    public TopicConfig selectTopicConfig(final String topic) {
+        return this.topicConfigTable.get(topic);
+    }
+
+
+    public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
+                                                      final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
+        TopicConfig topicConfig = null;
+        boolean createNew = false;
+
+        try {
+            if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                try {
+                    topicConfig = this.topicConfigTable.get(topic);
+                    if (topicConfig != null)
+                        return topicConfig;
+
+                    TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
+                    if (defaultTopicConfig != null) {
+                        if (defaultTopic.equals(MixAll.DEFAULT_TOPIC)) {
+                            if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
+                                defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
+                            }
+                        }
+
+                        if (PermName.isInherited(defaultTopicConfig.getPerm())) {
+                            topicConfig = new TopicConfig(topic);
+
+                            int queueNums =
+                                    clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
+                                            .getWriteQueueNums() : clientDefaultTopicQueueNums;
+
+                            if (queueNums < 0) {
+                                queueNums = 0;
+                            }
+
+                            topicConfig.setReadQueueNums(queueNums);
+                            topicConfig.setWriteQueueNums(queueNums);
+                            int perm = defaultTopicConfig.getPerm();
+                            perm &= ~PermName.PERM_INHERIT;
+                            topicConfig.setPerm(perm);
+                            topicConfig.setTopicSysFlag(topicSysFlag);
+                            topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
+                        } else {
+                            LOG.warn("create new topic failed, because the default topic[" + defaultTopic
+                                    + "] no perm, " + defaultTopicConfig.getPerm() + " producer: "
+                                    + remoteAddress);
+                        }
+                    } else {
+                        LOG.warn("create new topic failed, because the default topic[" + defaultTopic
+                                + "] not exist." + " producer: " + remoteAddress);
+                    }
+
+                    if (topicConfig != null) {
+                        LOG.info("create new topic by default topic[" + defaultTopic + "], " + topicConfig
+                                + " producer: " + remoteAddress);
+
+                        this.topicConfigTable.put(topic, topicConfig);
+
+                        this.dataVersion.nextVersion();
+
+                        createNew = true;
+
+                        this.persist();
+                    }
+                } finally {
+                    this.lockTopicConfigTable.unlock();
+                }
+            }
+        } catch (InterruptedException e) {
+            LOG.error("createTopicInSendMessageMethod exception", e);
+        }
+
+        if (createNew) {
+            this.brokerController.registerBrokerAll(false, true);
+        }
+
+        return topicConfig;
+    }
+
+    public TopicConfig createTopicInSendMessageBackMethod(
+            final String topic,
+            final int clientDefaultTopicQueueNums,
+            final int perm,
+            final int topicSysFlag) {
+        TopicConfig topicConfig = this.topicConfigTable.get(topic);
+        if (topicConfig != null)
+            return topicConfig;
+
+        boolean createNew = false;
+
+        try {
+            if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                try {
+                    topicConfig = this.topicConfigTable.get(topic);
+                    if (topicConfig != null)
+                        return topicConfig;
+
+                    topicConfig = new TopicConfig(topic);
+                    topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);
+                    topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);
+                    topicConfig.setPerm(perm);
+                    topicConfig.setTopicSysFlag(topicSysFlag);
+
+                    LOG.info("create new topic {}", topicConfig);
+                    this.topicConfigTable.put(topic, topicConfig);
+                    createNew = true;
+                    this.dataVersion.nextVersion();
+                    this.persist();
+                } finally {
+                    this.lockTopicConfigTable.unlock();
+                }
+            }
+        } catch (InterruptedException e) {
+            LOG.error("createTopicInSendMessageBackMethod exception", e);
+        }
+
+        if (createNew) {
+            this.brokerController.registerBrokerAll(false, true);
+        }
+
+        return topicConfig;
+    }
+
+    public void updateTopicUnitFlag(final String topic, final boolean unit) {
+
+        TopicConfig topicConfig = this.topicConfigTable.get(topic);
+        if (topicConfig != null) {
+            int oldTopicSysFlag = topicConfig.getTopicSysFlag();
+            if (unit) {
+                topicConfig.setTopicSysFlag(TopicSysFlag.setUnitFlag(oldTopicSysFlag));
+            } else {
+                topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitFlag(oldTopicSysFlag));
+            }
+
+            LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
+                    topicConfig.getTopicSysFlag());
+
+            this.topicConfigTable.put(topic, topicConfig);
+
+            this.dataVersion.nextVersion();
+
+            this.persist();
+            this.brokerController.registerBrokerAll(false, true);
+        }
+    }
+
+    public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub) {
+        TopicConfig topicConfig = this.topicConfigTable.get(topic);
+        if (topicConfig != null) {
+            int oldTopicSysFlag = topicConfig.getTopicSysFlag();
+            if (hasUnitSub) {
+                topicConfig.setTopicSysFlag(TopicSysFlag.setUnitSubFlag(oldTopicSysFlag));
+            }
+
+            LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
+                    topicConfig.getTopicSysFlag());
+
+            this.topicConfigTable.put(topic, topicConfig);
+
+            this.dataVersion.nextVersion();
+
+            this.persist();
+            this.brokerController.registerBrokerAll(false, true);
+        }
+    }
+
+    public void updateTopicConfig(final TopicConfig topicConfig) {
+        TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        if (old != null) {
+            LOG.info("update topic config, old: " + old + " new: " + topicConfig);
+        } else {
+            LOG.info("create new topic, " + topicConfig);
+        }
+
+        this.dataVersion.nextVersion();
+
+        this.persist();
+    }
+
+
+    public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {
+
+        if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) {
+            boolean isChange = false;
+            Set<String> orderTopics = orderKVTableFromNs.getTable().keySet();
+            for (String topic : orderTopics) {
+                TopicConfig topicConfig = this.topicConfigTable.get(topic);
+                if (topicConfig != null && !topicConfig.isOrder()) {
+                    topicConfig.setOrder(true);
+                    isChange = true;
+                    LOG.info("update order topic config, topic={}, order={}", topic, true);
+                }
+            }
+
+            for (Map.Entry<String, TopicConfig> entry : this.topicConfigTable.entrySet()) {
+                String topic = entry.getKey();
+                if (!orderTopics.contains(topic)) {
+                    TopicConfig topicConfig = entry.getValue();
+                    if (topicConfig.isOrder()) {
+                        topicConfig.setOrder(false);
+                        isChange = true;
+                        LOG.info("update order topic config, topic={}, order={}", topic, false);
+                    }
+                }
+            }
+
+            if (isChange) {
+                this.dataVersion.nextVersion();
+                this.persist();
+            }
+        }
+    }
+
+    public boolean isOrderTopic(final String topic) {
+        TopicConfig topicConfig = this.topicConfigTable.get(topic);
+        if (topicConfig == null) {
+            return false;
+        } else {
+            return topicConfig.isOrder();
+        }
+    }
+
+    public void deleteTopicConfig(final String topic) {
+        TopicConfig old = this.topicConfigTable.remove(topic);
+        if (old != null) {
+            LOG.info("delete topic config OK, topic: " + old);
+            this.dataVersion.nextVersion();
+            this.persist();
+        } else {
+            LOG.warn("delete topic config failed, topic: " + topic + " not exist");
+        }
+    }
+
+    public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+        topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
+        topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
+        return topicConfigSerializeWrapper;
+    }
+
+    @Override
+    public String encode() {
+        return encode(false);
+    }
+
+    @Override
+    public String configFilePath() {
+//        return BrokerPathConfigHelper.getTopicConfigPath(this.brokerController.getMessageStoreConfig()
+//                .getStorePathRootDir());
+        return BrokerPathConfigHelper.getTopicConfigPath(System.getProperty("user.home") + File.separator + "store");
+    }
+
+    @Override
+    public void decode(String jsonString) {
+        if (jsonString != null) {
+            TopicConfigSerializeWrapper topicConfigSerializeWrapper =
+                    TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class);
+            if (topicConfigSerializeWrapper != null) {
+                this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
+                this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion());
+                this.printLoadDataWhenFirstBoot(topicConfigSerializeWrapper);
+            }
+        }
+    }
+
+    public String encode(final boolean prettyFormat) {
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+        topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
+        topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
+        return topicConfigSerializeWrapper.toJson(prettyFormat);
+    }
+
+    private void printLoadDataWhenFirstBoot(final TopicConfigSerializeWrapper tcs) {
+        Iterator<Entry<String, TopicConfig>> it = tcs.getTopicConfigTable().entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, TopicConfig> next = it.next();
+            LOG.info("load exist local topic, {}", next.getValue().toString());
+        }
+    }
+
+    public DataVersion getDataVersion() {
+        return dataVersion;
+    }
+
+    public ConcurrentHashMap<String, TopicConfig> getTopicConfigTable() {
+        return topicConfigTable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java
new file mode 100644
index 0000000..4328cf8
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.broker.transaction;
+
+public class TransactionRecord {
+    // Commit Log Offset
+    private long offset;
+    private String producerGroup;
+
+
+    public long getOffset() {
+        return offset;
+    }
+
+
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+
+
+    public String getProducerGroup() {
+        return producerGroup;
+    }
+
+
+    public void setProducerGroup(String producerGroup) {
+        this.producerGroup = producerGroup;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java
new file mode 100644
index 0000000..9d977ab
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.broker.transaction;
+
+import java.util.List;
+
+
+public interface TransactionStore {
+    public boolean open();
+
+
+    public void close();
+
+
+    public boolean put(final List<TransactionRecord> trs);
+
+
+    public void remove(final List<Long> pks);
+
+
+    public List<TransactionRecord> traverse(final long pk, final int nums);
+
+
+    public long totalRecords();
+
+
+    public long minPK();
+
+
+    public long maxPK();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
new file mode 100644
index 0000000..47de33b
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.broker.transaction.jdbc;
+
+import com.alibaba.rocketmq.broker.transaction.TransactionRecord;
+import com.alibaba.rocketmq.broker.transaction.TransactionStore;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.sql.*;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class JDBCTransactionStore implements TransactionStore {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
+    private final JDBCTransactionStoreConfig jdbcTransactionStoreConfig;
+    private Connection connection;
+    private AtomicLong totalRecordsValue = new AtomicLong(0);
+
+    public JDBCTransactionStore(JDBCTransactionStoreConfig jdbcTransactionStoreConfig) {
+        this.jdbcTransactionStoreConfig = jdbcTransactionStoreConfig;
+    }
+
+    @Override
+    public boolean open() {
+        if (this.loadDriver()) {
+            Properties props = new Properties();
+            props.put("user", jdbcTransactionStoreConfig.getJdbcUser());
+            props.put("password", jdbcTransactionStoreConfig.getJdbcPassword());
+
+            try {
+                this.connection =
+                        DriverManager.getConnection(this.jdbcTransactionStoreConfig.getJdbcURL(), props);
+
+                this.connection.setAutoCommit(false);
+
+
+                if (!this.computeTotalRecords()) {
+                    return this.createDB();
+                }
+
+                return true;
+            } catch (SQLException e) {
+                log.info("Create JDBC Connection Exeption", e);
+            }
+        }
+
+        return false;
+    }
+
+    private boolean loadDriver() {
+        try {
+            Class.forName(this.jdbcTransactionStoreConfig.getJdbcDriverClass()).newInstance();
+            log.info("Loaded the appropriate driver, {}",
+                    this.jdbcTransactionStoreConfig.getJdbcDriverClass());
+            return true;
+        } catch (Exception e) {
+            log.info("Loaded the appropriate driver Exception", e);
+        }
+
+        return false;
+    }
+
+    private boolean computeTotalRecords() {
+        Statement statement = null;
+        ResultSet resultSet = null;
+        try {
+            statement = this.connection.createStatement();
+
+            resultSet = statement.executeQuery("select count(offset) as total from t_transaction");
+            if (!resultSet.next()) {
+                log.warn("computeTotalRecords ResultSet is empty");
+                return false;
+            }
+
+            this.totalRecordsValue.set(resultSet.getLong(1));
+        } catch (Exception e) {
+            log.warn("computeTotalRecords Exception", e);
+            return false;
+        } finally {
+            if (null != statement) {
+                try {
+                    statement.close();
+                } catch (SQLException e) {
+                }
+            }
+
+            if (null != resultSet) {
+                try {
+                    resultSet.close();
+                } catch (SQLException e) {
+                }
+            }
+        }
+
+        return true;
+    }
+
+    private boolean createDB() {
+        Statement statement = null;
+        try {
+            statement = this.connection.createStatement();
+
+            String sql = this.createTableSql();
+            log.info("createDB SQL:\n {}", sql);
+            statement.execute(sql);
+            this.connection.commit();
+            return true;
+        } catch (Exception e) {
+            log.warn("createDB Exception", e);
+            return false;
+        } finally {
+            if (null != statement) {
+                try {
+                    statement.close();
+                } catch (SQLException e) {
+                    log.warn("Close statement exception", e);
+                }
+            }
+        }
+    }
+
+    private String createTableSql() {
+        URL resource = JDBCTransactionStore.class.getClassLoader().getResource("transaction.sql");
+        String fileContent = MixAll.file2String(resource);
+        return fileContent;
+    }
+
+    @Override
+    public void close() {
+        try {
+            if (this.connection != null) {
+                this.connection.close();
+            }
+        } catch (SQLException e) {
+        }
+    }
+
+    @Override
+    public boolean put(List<TransactionRecord> trs) {
+        PreparedStatement statement = null;
+        try {
+            this.connection.setAutoCommit(false);
+            statement = this.connection.prepareStatement("insert into t_transaction values (?, ?)");
+            for (TransactionRecord tr : trs) {
+                statement.setLong(1, tr.getOffset());
+                statement.setString(2, tr.getProducerGroup());
+                statement.addBatch();
+            }
+            int[] executeBatch = statement.executeBatch();
+            this.connection.commit();
+            this.totalRecordsValue.addAndGet(updatedRows(executeBatch));
+            return true;
+        } catch (Exception e) {
+            log.warn("createDB Exception", e);
+            return false;
+        } finally {
+            if (null != statement) {
+                try {
+                    statement.close();
+                } catch (SQLException e) {
+                    log.warn("Close statement exception", e);
+                }
+            }
+        }
+    }
+
+    private long updatedRows(int[] rows) {
+        long res = 0;
+        for (int i : rows) {
+            res += i;
+        }
+
+        return res;
+    }
+
+    @Override
+    public void remove(List<Long> pks) {
+        PreparedStatement statement = null;
+        try {
+            this.connection.setAutoCommit(false);
+            statement = this.connection.prepareStatement("DELETE FROM t_transaction WHERE offset = ?");
+            for (long pk : pks) {
+                statement.setLong(1, pk);
+                statement.addBatch();
+            }
+            int[] executeBatch = statement.executeBatch();
+            this.connection.commit();
+        } catch (Exception e) {
+            log.warn("createDB Exception", e);
+        } finally {
+            if (null != statement) {
+                try {
+                    statement.close();
+                } catch (SQLException e) {
+                }
+            }
+        }
+    }
+
+    @Override
+    public List<TransactionRecord> traverse(long pk, int nums) {
+        return null;
+    }
+
+    @Override
+    public long totalRecords() {
+        return this.totalRecordsValue.get();
+    }
+
+    @Override
+    public long minPK() {
+        return 0;
+    }
+
+    @Override
+    public long maxPK() {
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
new file mode 100644
index 0000000..1244cfc
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.broker.transaction.jdbc;
+
+public class JDBCTransactionStoreConfig {
+    private String jdbcDriverClass = "com.mysql.jdbc.Driver";
+    private String jdbcURL = "jdbc:mysql://xxx.xxx.xxx.xxx:1000/xxx?useUnicode=true&characterEncoding=UTF-8";
+    private String jdbcUser = "xxx";
+    private String jdbcPassword = "xxx";
+
+
+    public String getJdbcDriverClass() {
+        return jdbcDriverClass;
+    }
+
+
+    public void setJdbcDriverClass(String jdbcDriverClass) {
+        this.jdbcDriverClass = jdbcDriverClass;
+    }
+
+
+    public String getJdbcURL() {
+        return jdbcURL;
+    }
+
+
+    public void setJdbcURL(String jdbcURL) {
+        this.jdbcURL = jdbcURL;
+    }
+
+
+    public String getJdbcUser() {
+        return jdbcUser;
+    }
+
+
+    public void setJdbcUser(String jdbcUser) {
+        this.jdbcUser = jdbcUser;
+    }
+
+
+    public String getJdbcPassword() {
+        return jdbcPassword;
+    }
+
+
+    public void setJdbcPassword(String jdbcPassword) {
+        this.jdbcPassword = jdbcPassword;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/resources/transaction.sql
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/resources/transaction.sql b/rocketmq-broker/src/main/resources/transaction.sql
new file mode 100644
index 0000000..aaefe43
--- /dev/null
+++ b/rocketmq-broker/src/main/resources/transaction.sql
@@ -0,0 +1,4 @@
+CREATE TABLE t_transaction(
+	offset				NUMERIC(20) PRIMARY KEY,
+	producerGroup		VARCHAR(64)
+)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
new file mode 100644
index 0000000..34ebfa5
--- /dev/null
+++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.broker.api;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.client.hook.SendMessageContext;
+import com.alibaba.rocketmq.client.impl.CommunicationMode;
+import com.alibaba.rocketmq.client.impl.MQClientAPIImpl;
+import com.alibaba.rocketmq.client.producer.SendResult;
+import com.alibaba.rocketmq.common.BrokerConfig;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.message.MessageDecoder;
+import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
+import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
+import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Test;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SendMessageTest {
+    @Test
+    public void test_sendMessage() throws Exception {
+        BrokerController brokerController = new BrokerController(//
+                new BrokerConfig(), //
+                new NettyServerConfig(), //
+                new NettyClientConfig(), //
+                new MessageStoreConfig());
+        boolean initResult = brokerController.initialize();
+        System.out.println("initialize " + initResult);
+
+        brokerController.start();
+
+        MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig(), null, null, null);
+        client.start();
+
+        for (int i = 0; i < 100; i++) {
+            String topic = "UnitTestTopic_" + i % 3;
+            Message msg = new Message(topic, "TAG1 TAG2", "100200300", ("Hello, Nice world\t" + i).getBytes());
+            msg.setDelayTimeLevel(i % 3 + 1);
+
+            try {
+                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
+                requestHeader.setProducerGroup("abc");
+                requestHeader.setTopic(msg.getTopic());
+                requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC);
+                requestHeader.setDefaultTopicQueueNums(4);
+                requestHeader.setQueueId(i % 4);
+                requestHeader.setSysFlag(0);
+                requestHeader.setBornTimestamp(System.currentTimeMillis());
+                requestHeader.setFlag(msg.getFlag());
+                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
+
+                SendResult result = client.sendMessage("127.0.0.1:10911", "brokerName", msg, requestHeader, 1000 * 5,
+                        CommunicationMode.SYNC, new SendMessageContext(), null);
+                System.out.println(i + "\t" + result);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+        client.shutdown();
+
+        brokerController.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
new file mode 100644
index 0000000..55844eb
--- /dev/null
+++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: ConsumerOffsetManagerTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.broker.offset;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.common.BrokerConfig;
+import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
+import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
+import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Test;
+
+import java.util.Random;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerOffsetManagerTest {
+    @Test
+    public void test_flushConsumerOffset() throws Exception {
+        BrokerController brokerController = new BrokerController(//
+                new BrokerConfig(), //
+                new NettyServerConfig(), //
+                new NettyClientConfig(), //
+                new MessageStoreConfig());
+        boolean initResult = brokerController.initialize();
+        System.out.println("initialize " + initResult);
+        brokerController.start();
+
+        ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController);
+
+        Random random = new Random();
+
+        for (int i = 0; i < 100; i++) {
+            String group = "DIANPU_GROUP_" + i;
+            for (int id = 0; id < 16; id++) {
+                consumerOffsetManager.commitOffset(null, group, "TOPIC_A", id,
+                        random.nextLong() % 1024 * 1024 * 1024);
+                consumerOffsetManager.commitOffset(null, group, "TOPIC_B", id,
+                        random.nextLong() % 1024 * 1024 * 1024);
+                consumerOffsetManager.commitOffset(null, group, "TOPIC_C", id,
+                        random.nextLong() % 1024 * 1024 * 1024);
+            }
+        }
+
+        consumerOffsetManager.persist();
+
+        brokerController.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
new file mode 100644
index 0000000..9edd02e
--- /dev/null
+++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: TopicConfigManagerTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.broker.topic;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.common.BrokerConfig;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.TopicConfig;
+import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
+import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
+import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TopicConfigManagerTest {
+    @Test
+    public void test_flushTopicConfig() throws Exception {
+        BrokerController brokerController = new BrokerController(//
+                new BrokerConfig(), //
+                new NettyServerConfig(), //
+                new NettyClientConfig(), //
+                new MessageStoreConfig());
+        boolean initResult = brokerController.initialize();
+        System.out.println("initialize " + initResult);
+        brokerController.start();
+
+        TopicConfigManager topicConfigManager = new TopicConfigManager(brokerController);
+
+        TopicConfig topicConfig =
+                topicConfigManager.createTopicInSendMessageMethod("TestTopic_SEND", MixAll.DEFAULT_TOPIC,
+                        null, 4, 0);
+        assertTrue(topicConfig != null);
+
+        System.out.println(topicConfig);
+
+        for (int i = 0; i < 10; i++) {
+            String topic = "UNITTEST-" + i;
+            topicConfig =
+                    topicConfigManager
+                            .createTopicInSendMessageMethod(topic, MixAll.DEFAULT_TOPIC, null, 4, 0);
+            assertTrue(topicConfig != null);
+        }
+
+        topicConfigManager.persist();
+
+        brokerController.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/deploy.bat
----------------------------------------------------------------------
diff --git a/rocketmq-client/deploy.bat b/rocketmq-client/deploy.bat
new file mode 100644
index 0000000..f778070
--- /dev/null
+++ b/rocketmq-client/deploy.bat
@@ -0,0 +1 @@
+mvn -Dmaven.test.skip=true deploy -Pclient-shade
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/install.bat
----------------------------------------------------------------------
diff --git a/rocketmq-client/install.bat b/rocketmq-client/install.bat
new file mode 100644
index 0000000..87bf456
--- /dev/null
+++ b/rocketmq-client/install.bat
@@ -0,0 +1,2 @@
+mvn -Dmaven.test.skip=true clean package install -Pclient-shade -U
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-client/pom.xml b/rocketmq-client/pom.xml
new file mode 100644
index 0000000..63a6114
--- /dev/null
+++ b/rocketmq-client/pom.xml
@@ -0,0 +1,97 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain producerGroup copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>com.alibaba.rocketmq</groupId>
+        <artifactId>rocketmq-all</artifactId>
+        <version>4.0.0-SNAPSHOT</version>
+    </parent>
+
+
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+    <artifactId>rocketmq-client</artifactId>
+    <name>rocketmq-client ${project.version}</name>
+
+    <profiles>
+        <profile>
+            <id>client-shade</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-shade-plugin</artifactId>
+                        <version>2.4.3</version>
+                        <executions>
+                            <execution>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>shade</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                        <configuration>
+                            <keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope>
+                            <promoteTransitiveDependencies>false</promoteTransitiveDependencies>
+                            <createDependencyReducedPom>true</createDependencyReducedPom>
+                            <minimizeJar>false</minimizeJar>
+                            <createSourcesJar>true</createSourcesJar>
+                            <artifactSet>
+                                <includes>
+                                    <include>com.alibaba:fastjson</include>
+                                    <include>io.netty:netty-all</include>
+                                    <include>com.alibaba.rocketmq:rocketmq-client</include>
+                                    <include>com.alibaba.rocketmq:rocketmq-common</include>
+                                    <include>com.alibaba.rocketmq:rocketmq-remoting</include>
+                                </includes>
+                            </artifactSet>
+                            <relocations>
+                                <relocation>
+                                    <pattern>io.netty</pattern>
+                                    <shadedPattern>com.alibaba.rocketmq.shade.io.netty</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.alibaba.fastjson</pattern>
+                                    <shadedPattern>com.alibaba.rocketmq.shade.com.alibaba.fastjson</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java
new file mode 100644
index 0000000..4d80564
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client;
+
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+
+
+/**
+ * Client Common configuration
+ *
+ * @author shijia.wxr
+ * @author vongosling
+ */
+public class ClientConfig {
+    public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";
+    private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
+    private String clientIP = RemotingUtil.getLocalAddress();
+    private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
+    private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
+    /**
+     * Pulling topic information interval from the named server
+     */
+    private int pollNameServerInteval = 1000 * 30;
+    /**
+     * Heartbeat interval in microseconds with message broker
+     */
+    private int heartbeatBrokerInterval = 1000 * 30;
+    /**
+     * Offset persistent interval for consumer
+     */
+    private int persistConsumerOffsetInterval = 1000 * 5;
+    private boolean unitMode = false;
+    private String unitName;
+    private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
+
+    public String buildMQClientId() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(this.getClientIP());
+
+        sb.append("@");
+        sb.append(this.getInstanceName());
+        if (!UtilAll.isBlank(this.unitName)) {
+            sb.append("@");
+            sb.append(this.unitName);
+        }
+
+        return sb.toString();
+    }
+
+    public String getClientIP() {
+        return clientIP;
+    }
+
+    public void setClientIP(String clientIP) {
+        this.clientIP = clientIP;
+    }
+
+    public String getInstanceName() {
+        return instanceName;
+    }
+
+    public void setInstanceName(String instanceName) {
+        this.instanceName = instanceName;
+    }
+
+    public void changeInstanceNameToPID() {
+        if (this.instanceName.equals("DEFAULT")) {
+            this.instanceName = String.valueOf(UtilAll.getPid());
+        }
+    }
+
+    public void resetClientConfig(final ClientConfig cc) {
+        this.namesrvAddr = cc.namesrvAddr;
+        this.clientIP = cc.clientIP;
+        this.instanceName = cc.instanceName;
+        this.clientCallbackExecutorThreads = cc.clientCallbackExecutorThreads;
+        this.pollNameServerInteval = cc.pollNameServerInteval;
+        this.heartbeatBrokerInterval = cc.heartbeatBrokerInterval;
+        this.persistConsumerOffsetInterval = cc.persistConsumerOffsetInterval;
+        this.unitMode = cc.unitMode;
+        this.unitName = cc.unitName;
+        this.vipChannelEnabled = cc.vipChannelEnabled;
+    }
+
+    public ClientConfig cloneClientConfig() {
+        ClientConfig cc = new ClientConfig();
+        cc.namesrvAddr = namesrvAddr;
+        cc.clientIP = clientIP;
+        cc.instanceName = instanceName;
+        cc.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
+        cc.pollNameServerInteval = pollNameServerInteval;
+        cc.heartbeatBrokerInterval = heartbeatBrokerInterval;
+        cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
+        cc.unitMode = unitMode;
+        cc.unitName = unitName;
+        cc.vipChannelEnabled = vipChannelEnabled;
+        return cc;
+    }
+
+    public String getNamesrvAddr() {
+        return namesrvAddr;
+    }
+
+    public void setNamesrvAddr(String namesrvAddr) {
+        this.namesrvAddr = namesrvAddr;
+    }
+
+    public int getClientCallbackExecutorThreads() {
+        return clientCallbackExecutorThreads;
+    }
+
+
+    public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) {
+        this.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
+    }
+
+
+    public int getPollNameServerInteval() {
+        return pollNameServerInteval;
+    }
+
+
+    public void setPollNameServerInteval(int pollNameServerInteval) {
+        this.pollNameServerInteval = pollNameServerInteval;
+    }
+
+
+    public int getHeartbeatBrokerInterval() {
+        return heartbeatBrokerInterval;
+    }
+
+
+    public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) {
+        this.heartbeatBrokerInterval = heartbeatBrokerInterval;
+    }
+
+
+    public int getPersistConsumerOffsetInterval() {
+        return persistConsumerOffsetInterval;
+    }
+
+
+    public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) {
+        this.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
+    }
+
+
+    public String getUnitName() {
+        return unitName;
+    }
+
+
+    public void setUnitName(String unitName) {
+        this.unitName = unitName;
+    }
+
+
+    public boolean isUnitMode() {
+        return unitMode;
+    }
+
+
+    public void setUnitMode(boolean unitMode) {
+        this.unitMode = unitMode;
+    }
+
+
+    public boolean isVipChannelEnabled() {
+        return vipChannelEnabled;
+    }
+
+
+    public void setVipChannelEnabled(final boolean vipChannelEnabled) {
+        this.vipChannelEnabled = vipChannelEnabled;
+    }
+
+
+    @Override
+    public String toString() {
+        return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
+                + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInteval=" + pollNameServerInteval
+                + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
+                + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+                + vipChannelEnabled + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java
new file mode 100644
index 0000000..4e202e9
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client;
+
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+
+
+/**
+ * Base interface for MQ management
+ *
+ * @author shijia.wxr
+ */
+public interface MQAdmin {
+    /**
+     * Creates an topic
+     *
+     * @param key
+     *         accesskey
+     * @param newTopic
+     *         topic name
+     * @param queueNum
+     *         topic's queue number
+     *
+     * @throws MQClientException
+     */
+    void createTopic(final String key, final String newTopic, final int queueNum)
+            throws MQClientException;
+
+
+    /**
+     * Creates an topic
+     *
+     * @param key
+     *         accesskey
+     * @param newTopic
+     *         topic name
+     * @param queueNum
+     *         topic's queue number
+     * @param topicSysFlag
+     *         topic system flag
+     *
+     * @throws MQClientException
+     */
+    void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
+            throws MQClientException;
+
+
+    /**
+     * Gets the message queue offset according to some time in milliseconds<br>
+     * be cautious to call because of more IO overhead
+     *
+     * @param mq
+     *         Instance of MessageQueue
+     * @param timestamp
+     *         from when in milliseconds.
+     *
+     * @return offset
+     *
+     * @throws MQClientException
+     */
+    long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
+
+
+    /**
+     * Gets the max offset
+     *
+     * @param mq
+     *         Instance of MessageQueue
+     *
+     * @return the max offset
+     *
+     * @throws MQClientException
+     */
+    long maxOffset(final MessageQueue mq) throws MQClientException;
+
+
+    /**
+     * Gets the minimum offset
+     *
+     * @param mq
+     *         Instance of MessageQueue
+     *
+     * @return the minimum offset
+     *
+     * @throws MQClientException
+     */
+    long minOffset(final MessageQueue mq) throws MQClientException;
+
+
+    /**
+     * Gets the earliest stored message time
+     *
+     * @param mq
+     *         Instance of MessageQueue
+     *
+     * @return the time in microseconds
+     *
+     * @throws MQClientException
+     */
+    long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
+
+
+    /**
+     * Query message according tto message id
+     *
+     * @param offsetMsgId
+     *         message id
+     *
+     * @return message
+     *
+     * @throws InterruptedException
+     * @throws MQBrokerException
+     * @throws RemotingException
+     * @throws MQClientException
+     */
+    MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException;
+
+
+    /**
+     * Query messages
+     *
+     * @param topic
+     *         message topic
+     * @param key
+     *         message key index word
+     * @param maxNum
+     *         max message number
+     * @param begin
+     *         from when
+     * @param end
+     *         to when
+     *
+     * @return Instance of QueryResult
+     *
+     * @throws MQClientException
+     * @throws InterruptedException
+     */
+    QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
+                             final long end) throws MQClientException, InterruptedException;
+    
+    /**
+
+     * @param topic
+     * @param msgId
+     * @return The {@code MessageExt} of given msgId
+     * @throws RemotingException
+     * @throws MQBrokerException
+     * @throws InterruptedException
+     * @throws MQClientException
+     */
+    MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;        
+
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java
new file mode 100644
index 0000000..5934b49
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client;
+
+import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
+import com.alibaba.rocketmq.client.log.ClientLogger;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.slf4j.Logger;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MQHelper {
+    public static void resetOffsetByTimestamp(
+            final MessageModel messageModel,
+            final String consumerGroup,
+            final String topic,
+            final long timestamp) throws Exception {
+        resetOffsetByTimestamp(messageModel, "DEFAULT", consumerGroup, topic, timestamp);
+    }
+
+    /**
+     * Reset consumer topic offset according to time
+     *
+     * @param messageModel
+     *         which model
+     * @param instanceName
+     *         which instance
+     * @param consumerGroup
+     *         consumer group
+     * @param topic
+     *         topic
+     * @param timestamp
+     *         time
+     *
+     * @throws Exception
+     */
+    public static void resetOffsetByTimestamp(
+            final MessageModel messageModel,
+            final String instanceName,
+            final String consumerGroup,
+            final String topic,
+            final long timestamp) throws Exception {
+        final Logger log = ClientLogger.getLog();
+
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroup);
+        consumer.setInstanceName(instanceName);
+        consumer.setMessageModel(messageModel);
+        consumer.start();
+
+        Set<MessageQueue> mqs = null;
+        try {
+            mqs = consumer.fetchSubscribeMessageQueues(topic);
+            if (mqs != null && !mqs.isEmpty()) {
+                TreeSet<MessageQueue> mqsNew = new TreeSet<MessageQueue>(mqs);
+                for (MessageQueue mq : mqsNew) {
+                    long offset = consumer.searchOffset(mq, timestamp);
+                    if (offset >= 0) {
+                        consumer.updateConsumeOffset(mq, offset);
+                        log.info("resetOffsetByTimestamp updateConsumeOffset success, {} {} {}",
+                                consumerGroup, offset, mq);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.warn("resetOffsetByTimestamp Exception", e);
+            throw e;
+        } finally {
+            if (mqs != null) {
+                consumer.getDefaultMQPullConsumerImpl().getOffsetStore().persistAll(mqs);
+            }
+            consumer.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java
new file mode 100644
index 0000000..43c8106
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client;
+
+import com.alibaba.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class QueryResult {
+    private final long indexLastUpdateTimestamp;
+    private final List<MessageExt> messageList;
+
+
+    public QueryResult(long indexLastUpdateTimestamp, List<MessageExt> messageList) {
+        this.indexLastUpdateTimestamp = indexLastUpdateTimestamp;
+        this.messageList = messageList;
+    }
+
+
+    public long getIndexLastUpdateTimestamp() {
+        return indexLastUpdateTimestamp;
+    }
+
+
+    public List<MessageExt> getMessageList() {
+        return messageList;
+    }
+
+
+    @Override
+    public String toString() {
+        return "QueryResult [indexLastUpdateTimestamp=" + indexLastUpdateTimestamp + ", messageList="
+                + messageList + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/Validators.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/Validators.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/Validators.java
new file mode 100644
index 0000000..203aae0
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/Validators.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.client;
+
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * Common Validator
+ *
+ * @author manhong.yqd
+ */
+public class Validators {
+    public static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$";
+    public static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR);
+    public static final int CHARACTER_MAX_LENGTH = 255;
+
+    /**
+     * @param origin
+     * @param patternStr
+     *
+     * @return The resulting {@code String}
+     */
+    public static String getGroupWithRegularExpression(String origin, String patternStr) {
+        Pattern pattern = Pattern.compile(patternStr);
+        Matcher matcher = pattern.matcher(origin);
+        while (matcher.find()) {
+            return matcher.group(0);
+        }
+        return null;
+    }
+
+    /**
+     * Validate group
+     *
+     * @param group
+     *
+     * @throws com.alibaba.rocketmq.client.exception.MQClientException
+     */
+    public static void checkGroup(String group) throws MQClientException {
+        if (UtilAll.isBlank(group)) {
+            throw new MQClientException("the specified group is blank", null);
+        }
+        if (!regularExpressionMatcher(group, PATTERN)) {
+            throw new MQClientException(String.format(
+                    "the specified group[%s] contains illegal characters, allowing only %s", group,
+                    VALID_PATTERN_STR), null);
+        }
+        if (group.length() > CHARACTER_MAX_LENGTH) {
+            throw new MQClientException("the specified group is longer than group max length 255.", null);
+        }
+    }
+
+    /**
+     * @param origin
+     * @param pattern
+     *
+     * @return <tt>true</tt> if, and only if, the entire origin sequence
+     *          matches this matcher's pattern
+     */
+    public static boolean regularExpressionMatcher(String origin, Pattern pattern) {
+        if (pattern == null) {
+            return true;
+        }
+        Matcher matcher = pattern.matcher(origin);
+        return matcher.matches();
+    }
+
+    /**
+     * Validate message
+     *
+     * @param msg
+     * @param defaultMQProducer
+     *
+     * @throws com.alibaba.rocketmq.client.exception.MQClientException
+     */
+    public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
+            throws MQClientException {
+        if (null == msg) {
+            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
+        }
+        // topic
+        Validators.checkTopic(msg.getTopic());
+        // body
+        if (null == msg.getBody()) {
+            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
+        }
+
+        if (0 == msg.getBody().length) {
+            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
+        }
+
+        if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
+            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
+                    "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
+        }
+    }
+
+    /**
+     * Validate topic
+     *
+     * @param topic
+     *
+     * @throws com.alibaba.rocketmq.client.exception.MQClientException
+     */
+    public static void checkTopic(String topic) throws MQClientException {
+        if (UtilAll.isBlank(topic)) {
+            throw new MQClientException("the specified topic is blank", null);
+        }
+
+        if (!regularExpressionMatcher(topic, PATTERN)) {
+            throw new MQClientException(String.format(
+                    "the specified topic[%s] contains illegal characters, allowing only %s", topic,
+                    VALID_PATTERN_STR), null);
+        }
+
+        if (topic.length() > CHARACTER_MAX_LENGTH) {
+            throw new MQClientException("the specified topic is longer than topic max length 255.", null);
+        }
+
+        //whether the same with system reserved keyword
+        if (topic.equals(MixAll.DEFAULT_TOPIC)) {
+            throw new MQClientException(
+                    String.format("the topic[%s] is conflict with default topic.", topic), null);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java
new file mode 100644
index 0000000..071a872
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.admin;
+
+/**
+ * @author shijia.wxr
+ */
+public interface MQAdminExtInner {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java
new file mode 100644
index 0000000..88d0eea
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.client.common;
+
+public class ClientErrorCode {
+    public static final int CONNECT_BROKER_EXCEPTION = 10001;
+    public static final int ACCESS_BROKER_TIMEOUT = 10002;
+    public static final int BROKER_NOT_EXIST_EXCEPTION = 10003;
+    public static final int NO_NAME_SERVER_EXCEPTION = 10004;
+    public static final int NOT_FOUND_TOPIC_EXCEPTION = 10005;
+}
\ No newline at end of file



Mime
View raw message