rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [39/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander
Date Mon, 19 Dec 2016 09:40:56 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java
new file mode 100644
index 0000000..269918a
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.offset;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.broker.BrokerPathConfigHelper;
+import com.alibaba.rocketmq.common.ConfigManager;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerOffsetManager extends ConfigManager {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private static final String TOPIC_GROUP_SEPARATOR = "@";
+
+    private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable =
+            new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512);
+
+    private transient BrokerController brokerController;
+
+
+    public ConsumerOffsetManager() {
+    }
+
+
+    public ConsumerOffsetManager(BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+
+    public void scanUnsubscribedTopic() {
+        Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
+            String topicAtGroup = next.getKey();
+            String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+            if (arrays != null && arrays.length == 2) {
+                String topic = arrays[0];
+                String group = arrays[1];
+
+                if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic)
+                        && this.offsetBehindMuchThanData(topic, next.getValue())) {
+                    it.remove();
+                    log.warn("remove topic offset, {}", topicAtGroup);
+                }
+            }
+        }
+    }
+
+
+    private boolean offsetBehindMuchThanData(final String topic, ConcurrentHashMap<Integer, Long> table) {
+        Iterator<Entry<Integer, Long>> it = table.entrySet().iterator();
+        boolean result = !table.isEmpty();
+
+        while (it.hasNext() && result) {
+            Entry<Integer, Long> next = it.next();
+            long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, next.getKey());
+            long offsetInPersist = next.getValue();
+            if (offsetInPersist > minOffsetInStore) {
+                result = false;
+            } else {
+                result = true;
+            }
+        }
+
+        return result;
+    }
+
+
+    public Set<String> whichTopicByConsumer(final String group) {
+        Set<String> topics = new HashSet<String>();
+
+        Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
+            String topicAtGroup = next.getKey();
+            String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+            if (arrays != null && arrays.length == 2) {
+                if (group.equals(arrays[1])) {
+                    topics.add(arrays[0]);
+                }
+            }
+        }
+
+        return topics;
+    }
+
+
+    public Set<String> whichGroupByTopic(final String topic) {
+        Set<String> groups = new HashSet<String>();
+
+        Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
+            String topicAtGroup = next.getKey();
+            String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+            if (arrays != null && arrays.length == 2) {
+                if (topic.equals(arrays[0])) {
+                    groups.add(arrays[1]);
+                }
+            }
+        }
+
+        return groups;
+    }
+
+
+    public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) {
+        // topic@group
+        String key = topic + TOPIC_GROUP_SEPARATOR + group;
+        this.commitOffset(clientHost, key, queueId, offset);
+    }
+
+    private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
+        ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
+        if (null == map) {
+            map = new ConcurrentHashMap<Integer, Long>(32);
+            map.put(queueId, offset);
+            this.offsetTable.put(key, map);
+        } else {
+            Long storeOffset = map.put(queueId, offset);
+            if (storeOffset != null && offset < storeOffset) {
+                log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
+            }
+        }
+    }
+
+    public long queryOffset(final String group, final String topic, final int queueId) {
+        // topic@group
+        String key = topic + TOPIC_GROUP_SEPARATOR + group;
+        ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
+        if (null != map) {
+            Long offset = map.get(queueId);
+            if (offset != null)
+                return offset;
+        }
+
+        return -1;
+    }
+
+    public String encode() {
+        return this.encode(false);
+    }
+
+    @Override
+    public String configFilePath() {
+        return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
+    }
+
+    @Override
+    public void decode(String jsonString) {
+        if (jsonString != null) {
+            ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
+            if (obj != null) {
+                this.offsetTable = obj.offsetTable;
+            }
+        }
+    }
+
+    public String encode(final boolean prettyFormat) {
+        return RemotingSerializable.toJson(this, prettyFormat);
+    }
+
+    public ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> getOffsetTable() {
+        return offsetTable;
+    }
+
+
+    public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) {
+        this.offsetTable = offsetTable;
+    }
+
+
+    public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final String filterGroups) {
+
+        Map<Integer, Long> queueMinOffset = new HashMap<Integer, Long>();
+        Set<String> topicGroups = this.offsetTable.keySet();
+        if (!UtilAll.isBlank(filterGroups)) {
+            for (String group : filterGroups.split(",")) {
+                Iterator<String> it = topicGroups.iterator();
+                while (it.hasNext()) {
+                    if (group.equals(it.next().split(TOPIC_GROUP_SEPARATOR)[1])) {
+                        it.remove();
+                    }
+                }
+            }
+        }
+
+        for (Map.Entry<String, ConcurrentHashMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) {
+            String topicGroup = offSetEntry.getKey();
+            String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR);
+            if (topic.equals(topicGroupArr[0])) {
+                for (Entry<Integer, Long> entry : offSetEntry.getValue().entrySet()) {
+                    long minOffset = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, entry.getKey());
+                    if (entry.getValue() >= minOffset) {
+                        Long offset = queueMinOffset.get(entry.getKey());
+                        if (offset == null) {
+                            queueMinOffset.put(entry.getKey(), Math.min(Long.MAX_VALUE, entry.getValue()));
+                        } else {
+                            queueMinOffset.put(entry.getKey(), Math.min(entry.getValue(), offset));
+                        }
+                    }
+                }
+            }
+
+        }
+        return queueMinOffset;
+    }
+
+
+    public Map<Integer, Long> queryOffset(final String group, final String topic) {
+        // topic@group
+        String key = topic + TOPIC_GROUP_SEPARATOR + group;
+        return this.offsetTable.get(key);
+    }
+
+
+    public void cloneOffset(final String srcGroup, final String destGroup, final String topic) {
+        ConcurrentHashMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup);
+        if (offsets != null) {
+            this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<Integer, Long>(offsets));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java
new file mode 100644
index 0000000..f051d29
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.out;
+
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult;
+import com.alibaba.rocketmq.common.namesrv.TopAddressing;
+import com.alibaba.rocketmq.common.protocol.RequestCode;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+import com.alibaba.rocketmq.common.protocol.body.*;
+import com.alibaba.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
+import com.alibaba.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
+import com.alibaba.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.RemotingClient;
+import com.alibaba.rocketmq.remoting.exception.*;
+import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
+import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ * @author manhong.yqd
+ */
+public class BrokerOuterAPI {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final RemotingClient remotingClient;
+    private final TopAddressing topAddressing = new TopAddressing(MixAll.WS_ADDR);
+    private String nameSrvAddr = null;
+
+    public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
+        this(nettyClientConfig, null);
+    }
+
+    public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {
+        this.remotingClient = new NettyRemotingClient(nettyClientConfig);
+        this.remotingClient.registerRPCHook(rpcHook);
+    }
+
+    public void start() {
+        this.remotingClient.start();
+    }
+
+    public void shutdown() {
+        this.remotingClient.shutdown();
+    }
+
+    public String fetchNameServerAddr() {
+        try {
+            String addrs = this.topAddressing.fetchNSAddr();
+            if (addrs != null) {
+                if (!addrs.equals(this.nameSrvAddr)) {
+                    log.info("name server address changed, old: " + this.nameSrvAddr + " new: " + addrs);
+                    this.updateNameServerAddressList(addrs);
+                    this.nameSrvAddr = addrs;
+                    return nameSrvAddr;
+                }
+            }
+        } catch (Exception e) {
+            log.error("fetchNameServerAddr Exception", e);
+        }
+        return nameSrvAddr;
+    }
+
+    public void updateNameServerAddressList(final String addrs) {
+        List<String> lst = new ArrayList<String>();
+        String[] addrArray = addrs.split(";");
+        if (addrArray != null) {
+            for (String addr : addrArray) {
+                lst.add(addr);
+            }
+
+            this.remotingClient.updateNameServerAddressList(lst);
+        }
+    }
+
+    public RegisterBrokerResult registerBrokerAll(
+            final String clusterName,
+            final String brokerAddr,
+            final String brokerName,
+            final long brokerId,
+            final String haServerAddr,
+            final TopicConfigSerializeWrapper topicConfigWrapper,
+            final List<String> filterServerList,
+            final boolean oneway,
+            final int timeoutMills) {
+        RegisterBrokerResult registerBrokerResult = null;
+
+        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
+        if (nameServerAddressList != null) {
+            for (String namesrvAddr : nameServerAddressList) {
+                try {
+                    RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
+                            haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
+                    if (result != null) {
+                        registerBrokerResult = result;
+                    }
+
+                    log.info("register broker to name server {} OK", namesrvAddr);
+                } catch (Exception e) {
+                    log.warn("registerBroker Exception, " + namesrvAddr, e);
+                }
+            }
+        }
+
+        return registerBrokerResult;
+    }
+
+    private RegisterBrokerResult registerBroker(
+            final String namesrvAddr,
+            final String clusterName,
+            final String brokerAddr,
+            final String brokerName,
+            final long brokerId,
+            final String haServerAddr,
+            final TopicConfigSerializeWrapper topicConfigWrapper,
+            final List<String> filterServerList,
+            final boolean oneway,
+            final int timeoutMills
+    ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+            InterruptedException {
+        RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
+        requestHeader.setBrokerAddr(brokerAddr);
+        requestHeader.setBrokerId(brokerId);
+        requestHeader.setBrokerName(brokerName);
+        requestHeader.setClusterName(clusterName);
+        requestHeader.setHaServerAddr(haServerAddr);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
+
+        RegisterBrokerBody requestBody = new RegisterBrokerBody();
+        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
+        requestBody.setFilterServerList(filterServerList);
+        request.setBody(requestBody.encode());
+
+        if (oneway) {
+            try {
+                this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
+            } catch (RemotingTooMuchRequestException e) {
+            }
+            return null;
+        }
+
+        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                RegisterBrokerResponseHeader responseHeader =
+                        (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
+                RegisterBrokerResult result = new RegisterBrokerResult();
+                result.setMasterAddr(responseHeader.getMasterAddr());
+                result.setHaServerAddr(responseHeader.getHaServerAddr());
+                result.setHaServerAddr(responseHeader.getHaServerAddr());
+                if (response.getBody() != null) {
+                    result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
+                }
+                return result;
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+    public void unregisterBrokerAll(
+            final String clusterName,
+            final String brokerAddr,
+            final String brokerName,
+            final long brokerId
+    ) {
+        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
+        if (nameServerAddressList != null) {
+            for (String namesrvAddr : nameServerAddressList) {
+                try {
+                    this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
+                    log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
+                } catch (Exception e) {
+                    log.warn("unregisterBroker Exception, " + namesrvAddr, e);
+                }
+            }
+        }
+    }
+
+    public void unregisterBroker(
+            final String namesrvAddr,
+            final String clusterName,
+            final String brokerAddr,
+            final String brokerName,
+            final long brokerId
+    ) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
+        UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
+        requestHeader.setBrokerAddr(brokerAddr);
+        requestHeader.setBrokerId(brokerId);
+        requestHeader.setBrokerName(brokerName);
+        requestHeader.setClusterName(clusterName);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_BROKER, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return;
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+    public TopicConfigSerializeWrapper getAllTopicConfig(final String addr) throws RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, InterruptedException, MQBrokerException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class);
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+    public ConsumerOffsetSerializeWrapper getAllConsumerOffset(final String addr) throws InterruptedException, RemotingTimeoutException,
+            RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
+        RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+    public String getAllDelayOffset(final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+            RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
+        RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return new String(response.getBody(), MixAll.DEFAULT_CHARSET);
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+    public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(final String addr) throws InterruptedException, RemotingTimeoutException,
+            RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
+        RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return SubscriptionGroupWrapper.decode(response.getBody(), SubscriptionGroupWrapper.class);
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+    public void registerRPCHook(RPCHook rpcHook) {
+        remotingClient.registerRPCHook(rpcHook);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java
new file mode 100644
index 0000000..8050bc1
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.pagecache;
+
+import com.alibaba.rocketmq.store.GetMessageResult;
+import io.netty.channel.FileRegion;
+import io.netty.util.AbstractReferenceCounted;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ManyMessageTransfer extends AbstractReferenceCounted implements FileRegion {
+    private final ByteBuffer byteBufferHeader;
+    private final GetMessageResult getMessageResult;
+    private long transfered; // the bytes which was transfered already
+
+
+    public ManyMessageTransfer(ByteBuffer byteBufferHeader, GetMessageResult getMessageResult) {
+        this.byteBufferHeader = byteBufferHeader;
+        this.getMessageResult = getMessageResult;
+    }
+
+
+    @Override
+    public long position() {
+        int pos = byteBufferHeader.position();
+        List<ByteBuffer> messageBufferList = this.getMessageResult.getMessageBufferList();
+        for (ByteBuffer bb : messageBufferList) {
+            pos += bb.position();
+        }
+        return pos;
+    }
+
+    @Override
+    public long transfered() {
+        return transfered;
+    }
+
+    @Override
+    public long count() {
+        return byteBufferHeader.limit() + this.getMessageResult.getBufferTotalSize();
+    }
+
+    @Override
+    public long transferTo(WritableByteChannel target, long position) throws IOException {
+        if (this.byteBufferHeader.hasRemaining()) {
+            transfered += target.write(this.byteBufferHeader);
+            return transfered;
+        } else {
+            List<ByteBuffer> messageBufferList = this.getMessageResult.getMessageBufferList();
+            for (ByteBuffer bb : messageBufferList) {
+                if (bb.hasRemaining()) {
+                    transfered += target.write(bb);
+                    return transfered;
+                }
+            }
+        }
+
+        return 0;
+    }
+
+    public void close() {
+        this.deallocate();
+    }
+
+    @Override
+    protected void deallocate() {
+        this.getMessageResult.release();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java
new file mode 100644
index 0000000..df742c5
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.pagecache;
+
+import com.alibaba.rocketmq.store.SelectMappedBufferResult;
+import io.netty.channel.FileRegion;
+import io.netty.util.AbstractReferenceCounted;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class OneMessageTransfer extends AbstractReferenceCounted implements FileRegion {
+    private final ByteBuffer byteBufferHeader;
+    private final SelectMappedBufferResult selectMappedBufferResult;
+    private long transfered; // the bytes which was transfered already
+
+
+    public OneMessageTransfer(ByteBuffer byteBufferHeader, SelectMappedBufferResult selectMappedBufferResult) {
+        this.byteBufferHeader = byteBufferHeader;
+        this.selectMappedBufferResult = selectMappedBufferResult;
+    }
+
+
+    @Override
+    public long position() {
+        return this.byteBufferHeader.position() + this.selectMappedBufferResult.getByteBuffer().position();
+    }
+
+    @Override
+    public long transfered() {
+        return transfered;
+    }
+
+    @Override
+    public long count() {
+        return this.byteBufferHeader.limit() + this.selectMappedBufferResult.getSize();
+    }
+
+    @Override
+    public long transferTo(WritableByteChannel target, long position) throws IOException {
+        if (this.byteBufferHeader.hasRemaining()) {
+            transfered += target.write(this.byteBufferHeader);
+            return transfered;
+        } else if (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
+            transfered += target.write(this.selectMappedBufferResult.getByteBuffer());
+            return transfered;
+        }
+
+        return 0;
+    }
+
+    public void close() {
+        this.deallocate();
+    }
+
+    @Override
+    protected void deallocate() {
+        this.selectMappedBufferResult.release();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java
new file mode 100644
index 0000000..cbcbc74
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.pagecache;
+
+import com.alibaba.rocketmq.store.QueryMessageResult;
+import io.netty.channel.FileRegion;
+import io.netty.util.AbstractReferenceCounted;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class QueryMessageTransfer extends AbstractReferenceCounted implements FileRegion {
+    private final ByteBuffer byteBufferHeader;
+    private final QueryMessageResult queryMessageResult;
+    private long transfered; // the bytes which was transfered already
+
+
+    public QueryMessageTransfer(ByteBuffer byteBufferHeader, QueryMessageResult queryMessageResult) {
+        this.byteBufferHeader = byteBufferHeader;
+        this.queryMessageResult = queryMessageResult;
+    }
+
+
+    @Override
+    public long position() {
+        int pos = byteBufferHeader.position();
+        List<ByteBuffer> messageBufferList = this.queryMessageResult.getMessageBufferList();
+        for (ByteBuffer bb : messageBufferList) {
+            pos += bb.position();
+        }
+        return pos;
+    }
+
+    @Override
+    public long transfered() {
+        return transfered;
+    }
+
+    @Override
+    public long count() {
+        return byteBufferHeader.limit() + this.queryMessageResult.getBufferTotalSize();
+    }
+
+    @Override
+    public long transferTo(WritableByteChannel target, long position) throws IOException {
+        if (this.byteBufferHeader.hasRemaining()) {
+            transfered += target.write(this.byteBufferHeader);
+            return transfered;
+        } else {
+            List<ByteBuffer> messageBufferList = this.queryMessageResult.getMessageBufferList();
+            for (ByteBuffer bb : messageBufferList) {
+                if (bb.hasRemaining()) {
+                    transfered += target.write(bb);
+                    return transfered;
+                }
+            }
+        }
+
+        return 0;
+    }
+
+    public void close() {
+        this.deallocate();
+    }
+
+    @Override
+    protected void deallocate() {
+        this.queryMessageResult.release();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java
new file mode 100644
index 0000000..141ba69
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ *
+ */
+package com.alibaba.rocketmq.broker.plugin;
+
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import com.alibaba.rocketmq.store.*;
+
+import java.util.HashMap;
+import java.util.Set;
+
+public abstract class AbstractPluginMessageStore implements MessageStore {
+    protected MessageStore next = null;
+    protected MessageStorePluginContext context;
+
+    public AbstractPluginMessageStore(MessageStorePluginContext context, MessageStore next) {
+        this.next = next;
+        this.context = context;
+    }
+
+    @Override
+    public long getEarliestMessageTime() {
+        return next.getEarliestMessageTime();
+    }
+
+    @Override
+    public long lockTimeMills() {
+        return next.lockTimeMills();
+    }
+
+    @Override
+    public boolean isOSPageCacheBusy() {
+        return next.isOSPageCacheBusy();
+    }
+
+    @Override
+    public boolean isTransientStorePoolDeficient() {
+        return next.isTransientStorePoolDeficient();
+    }
+
+    @Override
+    public boolean load() {
+        return next.load();
+    }
+
+    @Override
+    public void start() throws Exception {
+        next.start();
+    }
+
+    @Override
+    public void shutdown() {
+        next.shutdown();
+    }
+
+    @Override
+    public void destroy() {
+        next.destroy();
+    }
+
+    @Override
+    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
+        return next.putMessage(msg);
+    }
+
+    @Override
+    public GetMessageResult getMessage(String group, String topic, int queueId, long offset,
+                                       int maxMsgNums, SubscriptionData subscriptionData) {
+        return next.getMessage(group, topic, queueId, offset, maxMsgNums, subscriptionData);
+    }
+
+    @Override
+    public long getMaxOffsetInQuque(String topic, int queueId) {
+        return next.getMaxOffsetInQuque(topic, queueId);
+    }
+
+    @Override
+    public long getMinOffsetInQuque(String topic, int queueId) {
+        return next.getMinOffsetInQuque(topic, queueId);
+    }
+
+    @Override
+    public long getCommitLogOffsetInQueue(String topic, int queueId, long cqOffset) {
+        return next.getCommitLogOffsetInQueue(topic, queueId, cqOffset);
+    }
+
+    @Override
+    public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
+        return next.getOffsetInQueueByTime(topic, queueId, timestamp);
+    }
+
+    @Override
+    public MessageExt lookMessageByOffset(long commitLogOffset) {
+        return next.lookMessageByOffset(commitLogOffset);
+    }
+
+    @Override
+    public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {
+        return next.selectOneMessageByOffset(commitLogOffset);
+    }
+
+    @Override
+    public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) {
+        return next.selectOneMessageByOffset(commitLogOffset, msgSize);
+    }
+
+    @Override
+    public String getRunningDataInfo() {
+        return next.getRunningDataInfo();
+    }
+
+    @Override
+    public HashMap<String, String> getRuntimeInfo() {
+        return next.getRuntimeInfo();
+    }
+
+    @Override
+    public long getMaxPhyOffset() {
+        return next.getMaxPhyOffset();
+    }
+
+    @Override
+    public long getMinPhyOffset() {
+        return next.getMinPhyOffset();
+    }
+
+    @Override
+    public long getEarliestMessageTime(String topic, int queueId) {
+        return next.getEarliestMessageTime(topic, queueId);
+    }
+
+    @Override
+    public long getMessageStoreTimeStamp(String topic, int queueId, long offset) {
+        return next.getMessageStoreTimeStamp(topic, queueId, offset);
+    }
+
+    @Override
+    public long getMessageTotalInQueue(String topic, int queueId) {
+        return next.getMessageTotalInQueue(topic, queueId);
+    }
+
+    @Override
+    public SelectMappedBufferResult getCommitLogData(long offset) {
+        return next.getCommitLogData(offset);
+    }
+
+    @Override
+    public boolean appendToCommitLog(long startOffset, byte[] data) {
+        return next.appendToCommitLog(startOffset, data);
+    }
+
+    @Override
+    public void excuteDeleteFilesManualy() {
+        next.excuteDeleteFilesManualy();
+    }
+
+    @Override
+    public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin,
+                                           long end) {
+        return next.queryMessage(topic, key, maxNum, begin, end);
+    }
+
+    @Override
+    public void updateHaMasterAddress(String newAddr) {
+        next.updateHaMasterAddress(newAddr);
+    }
+
+    @Override
+    public long slaveFallBehindMuch() {
+        return next.slaveFallBehindMuch();
+    }
+
+    @Override
+    public long now() {
+        return next.now();
+    }
+
+    @Override
+    public int cleanUnusedTopic(Set<String> topics) {
+        return next.cleanUnusedTopic(topics);
+    }
+
+    @Override
+    public void cleanExpiredConsumerQueue() {
+        next.cleanExpiredConsumerQueue();
+    }
+
+    @Override
+    public boolean checkInDiskByConsumeOffset(String topic, int queueId, long consumeOffset) {
+        return next.checkInDiskByConsumeOffset(topic, queueId, consumeOffset);
+    }
+
+    @Override
+    public long dispatchBehindBytes() {
+        return next.dispatchBehindBytes();
+    }
+
+    @Override
+    public long flush() {
+        return next.flush();
+    }
+
+    @Override
+    public boolean resetWriteOffset(long phyOffset) {
+        return next.resetWriteOffset(phyOffset);
+    }
+
+    @Override
+    public long getConfirmOffset() {
+        return next.getConfirmOffset();
+    }
+
+    @Override
+    public void setConfirmOffset(long phyOffset) {
+        next.setConfirmOffset(phyOffset);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java
new file mode 100644
index 0000000..84f5be7
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ *
+ */
+package com.alibaba.rocketmq.broker.plugin;
+
+import com.alibaba.rocketmq.store.MessageStore;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+
+public final class MessageStoreFactory {
+    public final static MessageStore build(MessageStorePluginContext context, MessageStore messageStore)
+            throws IOException {
+        String plugin = context.getBrokerConfig().getMessageStorePlugIn();
+        if (plugin != null && plugin.trim().length() != 0) {
+            String[] pluginClasses = plugin.split(",");
+            for (int i = pluginClasses.length - 1; i >= 0; --i) {
+                String pluginClass = pluginClasses[i];
+                try {
+                    @SuppressWarnings("unchecked")
+                    Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>) Class.forName(pluginClass);
+                    Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class);
+                    AbstractPluginMessageStore pluginMessageStore = (AbstractPluginMessageStore) construct.newInstance(context, messageStore);
+                    messageStore = pluginMessageStore;
+                } catch (Throwable e) {
+                    throw new RuntimeException(String.format(
+                            "Initialize plugin's class %s not found!", pluginClass), e);
+                }
+            }
+        }
+        return messageStore;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java
new file mode 100644
index 0000000..15e8b07
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ *
+ */
+package com.alibaba.rocketmq.broker.plugin;
+
+import com.alibaba.rocketmq.common.BrokerConfig;
+import com.alibaba.rocketmq.store.MessageArrivingListener;
+import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
+
+public class MessageStorePluginContext {
+    private MessageStoreConfig messageStoreConfig;
+    private BrokerStatsManager brokerStatsManager;
+    private MessageArrivingListener messageArrivingListener;
+    private BrokerConfig brokerConfig;
+
+    public MessageStorePluginContext(MessageStoreConfig messageStoreConfig,
+                                     BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener,
+                                     BrokerConfig brokerConfig) {
+        super();
+        this.messageStoreConfig = messageStoreConfig;
+        this.brokerStatsManager = brokerStatsManager;
+        this.messageArrivingListener = messageArrivingListener;
+        this.brokerConfig = brokerConfig;
+    }
+
+    public MessageStoreConfig getMessageStoreConfig() {
+        return messageStoreConfig;
+    }
+
+    public BrokerStatsManager getBrokerStatsManager() {
+        return brokerStatsManager;
+    }
+
+    public MessageArrivingListener getMessageArrivingListener() {
+        return messageArrivingListener;
+    }
+
+    public BrokerConfig getBrokerConfig() {
+        return brokerConfig;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java
new file mode 100644
index 0000000..95db52d
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.processor;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.broker.mqtrace.SendMessageContext;
+import com.alibaba.rocketmq.broker.mqtrace.SendMessageHook;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.TopicConfig;
+import com.alibaba.rocketmq.common.TopicFilterType;
+import com.alibaba.rocketmq.common.constant.DBMsgConstants;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.constant.PermName;
+import com.alibaba.rocketmq.common.help.FAQUrl;
+import com.alibaba.rocketmq.common.message.MessageAccessor;
+import com.alibaba.rocketmq.common.message.MessageConst;
+import com.alibaba.rocketmq.common.message.MessageDecoder;
+import com.alibaba.rocketmq.common.protocol.RequestCode;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
+import com.alibaba.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
+import com.alibaba.rocketmq.common.sysflag.TopicSysFlag;
+import com.alibaba.rocketmq.common.utils.ChannelUtil;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import com.alibaba.rocketmq.store.MessageExtBrokerInner;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+
+/**
+ * @author shijia.wxr
+ */
+public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor {
+    protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    protected final static int DLQ_NUMS_PER_GROUP = 1;
+    protected final BrokerController brokerController;
+    protected final Random random = new Random(System.currentTimeMillis());
+    protected final SocketAddress storeHost;
+    private List<SendMessageHook> sendMessageHookList;
+
+
+    public AbstractSendMessageProcessor(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+        this.storeHost =
+                new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController
+                        .getNettyServerConfig().getListenPort());
+    }
+
+    protected SendMessageContext buildMsgContext(ChannelHandlerContext ctx,
+                                                 SendMessageRequestHeader requestHeader) {
+        if (!this.hasSendMessageHook()) {
+            return null;
+        }
+        SendMessageContext mqtraceContext;
+        mqtraceContext = new SendMessageContext();
+        mqtraceContext.setProducerGroup(requestHeader.getProducerGroup());
+        mqtraceContext.setTopic(requestHeader.getTopic());
+        mqtraceContext.setMsgProps(requestHeader.getProperties());
+        mqtraceContext.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+        mqtraceContext.setBrokerAddr(this.brokerController.getBrokerAddr());
+        mqtraceContext.setBrokerRegionId(this.brokerController.getBrokerConfig().getRegionId());
+        mqtraceContext.setBornTimeStamp(requestHeader.getBornTimestamp());
+
+        Map<String, String> properties = MessageDecoder.string2messageProperties(requestHeader.getProperties());
+        String uniqueKey = properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+        properties.put(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
+        properties.put(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
+        requestHeader.setProperties(MessageDecoder.messageProperties2String(properties));
+
+
+        if (uniqueKey == null) {
+            uniqueKey = "";
+        }
+        mqtraceContext.setMsgUniqueKey(uniqueKey);
+        return mqtraceContext;
+    }
+
+    public boolean hasSendMessageHook() {
+        return sendMessageHookList != null && !this.sendMessageHookList.isEmpty();
+    }
+
+    protected MessageExtBrokerInner buildInnerMsg(final ChannelHandlerContext ctx,
+                                                  final SendMessageRequestHeader requestHeader, final byte[] body, TopicConfig topicConfig) {
+        int queueIdInt = requestHeader.getQueueId();
+        if (queueIdInt < 0) {
+            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
+        }
+        int sysFlag = requestHeader.getSysFlag();
+
+        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
+            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
+        }
+
+        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+        msgInner.setTopic(requestHeader.getTopic());
+        msgInner.setBody(body);
+        msgInner.setFlag(requestHeader.getFlag());
+        MessageAccessor.setProperties(msgInner,
+                MessageDecoder.string2messageProperties(requestHeader.getProperties()));
+        msgInner.setPropertiesString(requestHeader.getProperties());
+        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(),
+                msgInner.getTags()));
+
+        msgInner.setQueueId(queueIdInt);
+        msgInner.setSysFlag(sysFlag);
+        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
+        msgInner.setBornHost(ctx.channel().remoteAddress());
+        msgInner.setStoreHost(this.getStoreHost());
+        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader
+                .getReconsumeTimes());
+        return msgInner;
+    }
+
+    public SocketAddress getStoreHost() {
+        return storeHost;
+    }
+
+    protected RemotingCommand msgContentCheck(final ChannelHandlerContext ctx,
+                                              final SendMessageRequestHeader requestHeader, RemotingCommand request,
+                                              final RemotingCommand response) {
+        if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {
+            log.warn("putMessage message topic length too long " + requestHeader.getTopic().length());
+            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+            return response;
+        }
+        if (requestHeader.getProperties() != null && requestHeader.getProperties().length() > Short.MAX_VALUE) {
+            log.warn("putMessage message properties length too long "
+                    + requestHeader.getProperties().length());
+            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+            return response;
+        }
+        if (request.getBody().length > DBMsgConstants.MAX_BODY_SIZE) {
+            log.warn(" topic {}  msg body size {}  from {}", requestHeader.getTopic(),
+                    request.getBody().length, ChannelUtil.getRemoteIp(ctx.channel()));
+            response.setRemark("msg body must be less 64KB");
+            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+            return response;
+        }
+        return response;
+    }
+
+    protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
+                                       final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
+        if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
+                && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
+            response.setCode(ResponseCode.NO_PERMISSION);
+            response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+                    + "] sending message is forbidden");
+            return response;
+        }
+        if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
+            String errorMsg =
+                    "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
+            log.warn(errorMsg);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(errorMsg);
+            return response;
+        }
+
+        TopicConfig topicConfig =
+                this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+        if (null == topicConfig) {
+            int topicSysFlag = 0;
+            if (requestHeader.isUnitMode()) {
+                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+                    topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
+                } else {
+                    topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
+                }
+            }
+
+            log.warn("the topic " + requestHeader.getTopic() + " not exist, producer: "
+                    + ctx.channel().remoteAddress());
+            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(//
+                    requestHeader.getTopic(), //
+                    requestHeader.getDefaultTopic(), //
+                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
+                    requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
+
+            if (null == topicConfig) {
+                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+                    topicConfig =
+                            this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
+                                    requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
+                                    topicSysFlag);
+                }
+            }
+
+            if (null == topicConfig) {
+                response.setCode(ResponseCode.TOPIC_NOT_EXIST);
+                response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
+                        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
+                return response;
+            }
+        }
+
+        int queueIdInt = requestHeader.getQueueId();
+        int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
+        if (queueIdInt >= idValid) {
+            String errorInfo = String.format("request queueId[%d] is illagal, %s Producer: %s",
+                    queueIdInt,
+                    topicConfig.toString(),
+                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+            log.warn(errorInfo);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(errorInfo);
+
+            return response;
+        }
+        return response;
+    }
+
+    public void registerSendMessageHook(List<SendMessageHook> sendMessageHookList) {
+        this.sendMessageHookList = sendMessageHookList;
+    }
+
+    protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request,
+                              final RemotingCommand response) {
+        if (!request.isOnewayRPC()) {
+            try {
+                ctx.writeAndFlush(response);
+            } catch (Throwable e) {
+                log.error("SendMessageProcessor process request over, but response failed", e);
+                log.error(request.toString());
+                log.error(response.toString());
+            }
+        }
+    }
+
+    public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request,
+                                             SendMessageContext context) {
+        if (hasSendMessageHook()) {
+            for (SendMessageHook hook : this.sendMessageHookList) {
+                try {
+                    final SendMessageRequestHeader requestHeader = parseRequestHeader(request);
+
+                    if (null != requestHeader) {
+                        context.setProducerGroup(requestHeader.getProducerGroup());
+                        context.setTopic(requestHeader.getTopic());
+                        context.setBodyLength(request.getBody().length);
+                        context.setMsgProps(requestHeader.getProperties());
+                        context.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+                        context.setBrokerAddr(this.brokerController.getBrokerAddr());
+                        context.setQueueId(requestHeader.getQueueId());
+                    }
+
+                    hook.sendMessageBefore(context);
+                    requestHeader.setProperties(context.getMsgProps());
+                } catch (Throwable e) {
+                }
+            }
+        }
+    }
+
+    protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request)
+            throws RemotingCommandException {
+
+        SendMessageRequestHeaderV2 requestHeaderV2 = null;
+        SendMessageRequestHeader requestHeader = null;
+        switch (request.getCode()) {
+            case RequestCode.SEND_MESSAGE_V2:
+                requestHeaderV2 =
+                        (SendMessageRequestHeaderV2) request
+                                .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
+            case RequestCode.SEND_MESSAGE:
+                if (null == requestHeaderV2) {
+                    requestHeader =
+                            (SendMessageRequestHeader) request
+                                    .decodeCommandCustomHeader(SendMessageRequestHeader.class);
+                } else {
+                    requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
+                }
+            default:
+                break;
+        }
+        return requestHeader;
+    }
+
+    public void executeSendMessageHookAfter(final RemotingCommand response, final SendMessageContext context) {
+        if (hasSendMessageHook()) {
+            for (SendMessageHook hook : this.sendMessageHookList) {
+                try {
+                    if (response != null) {
+                        final SendMessageResponseHeader responseHeader =
+                                (SendMessageResponseHeader) response.readCustomHeader();
+                        context.setMsgId(responseHeader.getMsgId());
+                        context.setQueueId(responseHeader.getQueueId());
+                        context.setQueueOffset(responseHeader.getQueueOffset());
+                        context.setCode(response.getCode());
+                        context.setErrorMsg(response.getRemark());
+                    }
+                    hook.sendMessageAfter(context);
+                } catch (Throwable e) {
+
+                }
+            }
+        }
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+}


Mime
View raw message