rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongefore...@apache.org
Subject [02/51] [abbrv] incubator-rocketmq git commit: ROCKETMQ-80 Add batch feature closes apache/incubator-rocketmq#53
Date Tue, 06 Jun 2017 03:38:22 GMT
ROCKETMQ-80 Add batch feature  closes apache/incubator-rocketmq#53


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/47fad3c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/47fad3c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/47fad3c1

Branch: refs/heads/master
Commit: 47fad3c17ab2d161743d4e52efc6258b7bcafde9
Parents: 0d6c56b
Author: dongeforever <dongeforever@apache.org>
Authored: Fri Mar 17 18:59:43 2017 +0800
Committer: dongeforever <zhendongliu92@yeah.net>
Committed: Tue Jun 6 11:37:29 2017 +0800

----------------------------------------------------------------------
 .../rocketmq/broker/BrokerController.java       |   2 +
 .../processor/AbstractSendMessageProcessor.java |  12 +-
 .../broker/processor/SendMessageProcessor.java  | 350 ++++++++++++-------
 .../org/apache/rocketmq/client/Validators.java  |   1 +
 .../rocketmq/client/impl/MQClientAPIImpl.java   |  56 +--
 .../impl/producer/DefaultMQProducerImpl.java    |  30 +-
 .../client/producer/DefaultMQProducer.java      |  38 ++
 .../rocketmq/client/producer/MQProducer.java    |  14 +
 .../apache/rocketmq/common/TopicFilterType.java |   1 +
 .../rocketmq/common/message/MessageBatch.java   |  73 ++++
 .../rocketmq/common/message/MessageDecoder.java | 103 ++++++
 .../rocketmq/common/message/MessageExt.java     |   2 +-
 .../common/message/MessageExtBatch.java         |  42 +++
 .../rocketmq/common/protocol/RequestCode.java   |   3 +
 .../header/SendMessageRequestHeader.java        |  10 +
 .../header/SendMessageRequestHeaderV2.java      |  14 +
 .../rocketmq/common/MessageBatchTest.java       |  70 ++++
 .../common/MessageEncodeDecodeTest.java         |  81 +++++
 .../rocketmq/store/AppendMessageCallback.java   |  15 +-
 .../rocketmq/store/AppendMessageResult.java     |  11 +
 .../org/apache/rocketmq/store/CommitLog.java    | 344 ++++++++++++++++--
 .../org/apache/rocketmq/store/ConsumeQueue.java |   2 +-
 .../rocketmq/store/DefaultMessageStore.java     |  57 +++
 .../org/apache/rocketmq/store/MappedFile.java   |  32 +-
 .../org/apache/rocketmq/store/MessageStore.java |   3 +
 .../org/apache/rocketmq/store/RunningFlags.java |  11 +
 .../store/config/MessageStoreConfig.java        |   2 +
 .../store/stats/BrokerStatsManager.java         |   8 +-
 .../rocketmq/store/AppendCallbackTest.java      | 150 ++++++++
 .../test/client/producer/batch/BatchSendIT.java | 131 +++++++
 .../exception/msg/MessageExceptionIT.java       |   2 +-
 31 files changed, 1464 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index b656870..7e9e7ac 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -374,9 +374,11 @@ public class BrokerController {
 
         this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
         this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
         this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
         this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
         this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
         this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
         /**
          * PullMessageProcessor

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 9f23bad..3faa7ae 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -17,11 +17,6 @@
 package org.apache.rocketmq.broker.processor;
 
 import io.netty.channel.ChannelHandlerContext;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
 import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
@@ -51,6 +46,12 @@ import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
 public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor {
     protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
@@ -279,6 +280,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
         SendMessageRequestHeaderV2 requestHeaderV2 = null;
         SendMessageRequestHeader requestHeader = null;
         switch (request.getCode()) {
+            case RequestCode.SEND_BATCH_MESSAGE:
             case RequestCode.SEND_MESSAGE_V2:
                 requestHeaderV2 =
                     (SendMessageRequestHeaderV2) request

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index a440462..56a0b99 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBatch;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
@@ -72,7 +73,13 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
 
                 mqtraceContext = buildMsgContext(ctx, requestHeader);
                 this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
-                final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
+
+                RemotingCommand response;
+                if (requestHeader.isBatch()) {
+                    response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
+                } else {
+                    response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
+                }
 
                 this.executeSendMessageHookAfter(response, mqtraceContext);
                 return response;
@@ -238,6 +245,50 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
         return response;
     }
 
+
+    private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response, RemotingCommand request,
+        MessageExt msg, TopicConfig topicConfig) {
+        String newTopic = requestHeader.getTopic();
+        if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+            SubscriptionGroupConfig subscriptionGroupConfig =
+                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
+            if (null == subscriptionGroupConfig) {
+                response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
+                response.setRemark(
+                    "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
+                return false;
+            }
+
+            int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
+            if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
+                maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
+            }
+            int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
+            if (reconsumeTimes >= maxReconsumeTimes) {
+                newTopic = MixAll.getDLQTopic(groupName);
+                int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
+                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
+                    DLQ_NUMS_PER_GROUP, //
+                    PermName.PERM_WRITE, 0
+                );
+                msg.setTopic(newTopic);
+                msg.setQueueId(queueIdInt);
+                if (null == topicConfig) {
+                    response.setCode(ResponseCode.SYSTEM_ERROR);
+                    response.setRemark("topic[" + newTopic + "] not exist");
+                    return false;
+                }
+            }
+        }
+        int sysFlag = requestHeader.getSysFlag();
+        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
+            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
+        }
+        msg.setSysFlag(sysFlag);
+        return true;
+    }
+
     private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
         final RemotingCommand request, //
         final SendMessageContext sendMessageContext, //
@@ -251,9 +302,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
         response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
         response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
 
-        if (log.isDebugEnabled()) {
-            log.debug("receive SendMessage request command, {}", request);
-        }
+        log.debug("receive SendMessage request command, {}", request);
 
         final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
         if (this.brokerController.getMessageStore().now() < startTimstamp) {
@@ -270,6 +319,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
 
         final byte[] body = request.getBody();
 
+
+
         int queueIdInt = requestHeader.getQueueId();
         TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
 
@@ -277,53 +328,18 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
         }
 
-        int sysFlag = requestHeader.getSysFlag();
+        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+        msgInner.setTopic(requestHeader.getTopic());
+        msgInner.setQueueId(queueIdInt);
 
-        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
-            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
+        if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
+            return response;
         }
 
-        String newTopic = requestHeader.getTopic();
-        if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
-            String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
-            SubscriptionGroupConfig subscriptionGroupConfig =
-                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
-            if (null == subscriptionGroupConfig) {
-                response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
-                response.setRemark(
-                    "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
-                return response;
-            }
-
-            int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
-            if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
-                maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
-            }
-            int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
-            if (reconsumeTimes >= maxReconsumeTimes) {
-                newTopic = MixAll.getDLQTopic(groupName);
-                queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
-                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
-                    DLQ_NUMS_PER_GROUP, //
-                    PermName.PERM_WRITE, 0
-                );
-                if (null == topicConfig) {
-                    response.setCode(ResponseCode.SYSTEM_ERROR);
-                    response.setRemark("topic[" + newTopic + "] not exist");
-                    return response;
-                }
-            }
-        }
-        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
-        msgInner.setTopic(newTopic);
         msgInner.setBody(body);
         msgInner.setFlag(requestHeader.getFlag());
         MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
         msgInner.setPropertiesString(requestHeader.getProperties());
-        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
-
-        msgInner.setQueueId(queueIdInt);
-        msgInner.setSysFlag(sysFlag);
         msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
         msgInner.setBornHost(ctx.channel().remoteAddress());
         msgInner.setStoreHost(this.getStoreHost());
@@ -340,105 +356,183 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
         }
 
         PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
-        if (putMessageResult != null) {
-            boolean sendOK = false;
 
-            switch (putMessageResult.getPutMessageStatus()) {
-                // Success
-                case PUT_OK:
-                    sendOK = true;
-                    response.setCode(ResponseCode.SUCCESS);
-                    break;
-                case FLUSH_DISK_TIMEOUT:
-                    response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
-                    sendOK = true;
-                    break;
-                case FLUSH_SLAVE_TIMEOUT:
-                    response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
-                    sendOK = true;
-                    break;
-                case SLAVE_NOT_AVAILABLE:
-                    response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
-                    sendOK = true;
-                    break;
+        return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
 
-                // Failed
-                case CREATE_MAPEDFILE_FAILED:
-                    response.setCode(ResponseCode.SYSTEM_ERROR);
-                    response.setRemark("create mapped file failed, server is busy or broken.");
-                    break;
-                case MESSAGE_ILLEGAL:
-                case PROPERTIES_SIZE_EXCEEDED:
-                    response.setCode(ResponseCode.MESSAGE_ILLEGAL);
-                    response.setRemark(
-                        "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
-                    break;
-                case SERVICE_NOT_AVAILABLE:
-                    response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
-                    response.setRemark(
-                        "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
-                    break;
-                case OS_PAGECACHE_BUSY:
-                    response.setCode(ResponseCode.SYSTEM_ERROR);
-                    response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
-                    break;
-                case UNKNOWN_ERROR:
-                    response.setCode(ResponseCode.SYSTEM_ERROR);
-                    response.setRemark("UNKNOWN_ERROR");
-                    break;
-                default:
-                    response.setCode(ResponseCode.SYSTEM_ERROR);
-                    response.setRemark("UNKNOWN_ERROR DEFAULT");
-                    break;
-            }
+    }
 
-            String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
-            if (sendOK) {
 
-                this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
-                this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
-                    putMessageResult.getAppendMessageResult().getWroteBytes());
-                this.brokerController.getBrokerStatsManager().incBrokerPutNums();
+    private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response, RemotingCommand request, MessageExt msg,
+        SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx, int queueIdInt) {
+        if (putMessageResult == null) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("store putMessage return null");
+            return  response;
+        }
+        boolean sendOK = false;
+
+        switch (putMessageResult.getPutMessageStatus()) {
+            // Success
+            case PUT_OK:
+                sendOK = true;
+                response.setCode(ResponseCode.SUCCESS);
+                break;
+            case FLUSH_DISK_TIMEOUT:
+                response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
+                sendOK = true;
+                break;
+            case FLUSH_SLAVE_TIMEOUT:
+                response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
+                sendOK = true;
+                break;
+            case SLAVE_NOT_AVAILABLE:
+                response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
+                sendOK = true;
+                break;
+
+            // Failed
+            case CREATE_MAPEDFILE_FAILED:
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("create mapped file failed, server is busy or broken.");
+                break;
+            case MESSAGE_ILLEGAL:
+            case PROPERTIES_SIZE_EXCEEDED:
+                response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+                response.setRemark(
+                    "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
+                break;
+            case SERVICE_NOT_AVAILABLE:
+                response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
+                response.setRemark(
+                    "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
+                break;
+            case OS_PAGECACHE_BUSY:
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
+                break;
+            case UNKNOWN_ERROR:
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("UNKNOWN_ERROR");
+                break;
+            default:
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("UNKNOWN_ERROR DEFAULT");
+                break;
+        }
+
+        String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
+        if (sendOK) {
 
-                response.setRemark(null);
+            this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
+            this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
+                putMessageResult.getAppendMessageResult().getWroteBytes());
+            this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
 
-                responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
-                responseHeader.setQueueId(queueIdInt);
-                responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
+            response.setRemark(null);
 
-                doResponse(ctx, request, response);
+            responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
+            responseHeader.setQueueId(queueIdInt);
+            responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
 
-                if (hasSendMessageHook()) {
-                    sendMessageContext.setMsgId(responseHeader.getMsgId());
-                    sendMessageContext.setQueueId(responseHeader.getQueueId());
-                    sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
+            doResponse(ctx, request, response);
 
-                    int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
-                    int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
-                    int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
+            if (hasSendMessageHook()) {
+                sendMessageContext.setMsgId(responseHeader.getMsgId());
+                sendMessageContext.setQueueId(responseHeader.getQueueId());
+                sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
 
-                    sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
-                    sendMessageContext.setCommercialSendTimes(incValue);
-                    sendMessageContext.setCommercialSendSize(wroteSize);
-                    sendMessageContext.setCommercialOwner(owner);
-                }
-                return null;
-            } else {
-                if (hasSendMessageHook()) {
-                    int wroteSize = request.getBody().length;
-                    int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
-
-                    sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
-                    sendMessageContext.setCommercialSendTimes(incValue);
-                    sendMessageContext.setCommercialSendSize(wroteSize);
-                    sendMessageContext.setCommercialOwner(owner);
-                }
+                int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
+                int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
+                int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
+
+                sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
+                sendMessageContext.setCommercialSendTimes(incValue);
+                sendMessageContext.setCommercialSendSize(wroteSize);
+                sendMessageContext.setCommercialOwner(owner);
             }
+            return null;
         } else {
+            if (hasSendMessageHook()) {
+                int wroteSize = request.getBody().length;
+                int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
+
+                sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
+                sendMessageContext.setCommercialSendTimes(incValue);
+                sendMessageContext.setCommercialSendSize(wroteSize);
+                sendMessageContext.setCommercialOwner(owner);
+            }
+        }
+        return  response;
+    }
+    private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, //
+                                        final RemotingCommand request, //
+                                        final SendMessageContext sendMessageContext, //
+                                        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
+
+        final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
+
+
+        response.setOpaque(request.getOpaque());
+
+        response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
+        response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
+
+        log.debug("Receive SendMessage request command {}", request);
+
+        final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
+        if (this.brokerController.getMessageStore().now() < startTimstamp) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark("store putMessage return null");
+            response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
+            return response;
+        }
+
+        response.setCode(-1);
+        super.msgCheck(ctx, requestHeader, response);
+        if (response.getCode() != -1) {
+            return response;
+        }
+
+
+        int queueIdInt = requestHeader.getQueueId();
+        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+
+        if (queueIdInt < 0) {
+            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
+        }
+
+        if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {
+            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+            response.setRemark("message topic length too long " + requestHeader.getTopic().length());
+            return response;
         }
 
+        if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+            response.setRemark("batch request does not support retry group "  + requestHeader.getTopic());
+            return response;
+        }
+        MessageExtBatch messageExtBatch = new MessageExtBatch();
+        messageExtBatch.setTopic(requestHeader.getTopic());
+        messageExtBatch.setQueueId(queueIdInt);
+
+        int sysFlag = requestHeader.getSysFlag();
+        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
+            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
+        }
+        messageExtBatch.setSysFlag(sysFlag);
+
+        messageExtBatch.setFlag(requestHeader.getFlag());
+        MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
+        messageExtBatch.setBody(request.getBody());
+        messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());
+        messageExtBatch.setBornHost(ctx.channel().remoteAddress());
+        messageExtBatch.setStoreHost(this.getStoreHost());
+        messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
+
+        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);
+
+        handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt);
         return response;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/client/src/main/java/org/apache/rocketmq/client/Validators.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java
index 899efa6..b49537f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/Validators.java
+++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java
@@ -95,6 +95,7 @@ public class Validators {
         }
         // topic
         Validators.checkTopic(msg.getTopic());
+
         // body
         if (null == msg.getBody()) {
             throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 12580c1..bdce883 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -18,14 +18,14 @@ package org.apache.rocketmq.client.impl;
 
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.Iterator;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.PullCallback;
@@ -50,10 +50,11 @@ import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageBatch;
 import org.apache.rocketmq.common.namesrv.TopAddressing;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -147,6 +148,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.slf4j.Logger;
 
+
 public class MQClientAPIImpl {
 
     private final static Logger log = ClientLogger.getLog();
@@ -278,14 +280,14 @@ public class MQClientAPIImpl {
     }
 
     public SendResult sendMessage(//
-        final String addr, // 1
-        final String brokerName, // 2
-        final Message msg, // 3
-        final SendMessageRequestHeader requestHeader, // 4
-        final long timeoutMillis, // 5
-        final CommunicationMode communicationMode, // 6
-        final SendMessageContext context, // 7
-        final DefaultMQProducerImpl producer // 8
+                                  final String addr, // 1
+                                  final String brokerName, // 2
+                                  final Message msg, // 3
+                                  final SendMessageRequestHeader requestHeader, // 4
+                                  final long timeoutMillis, // 5
+                                  final CommunicationMode communicationMode, // 6
+                                  final SendMessageContext context, // 7
+                                  final DefaultMQProducerImpl producer // 8
     ) throws RemotingException, MQBrokerException, InterruptedException {
         return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
     }
@@ -305,9 +307,9 @@ public class MQClientAPIImpl {
         final DefaultMQProducerImpl producer // 12
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = null;
-        if (sendSmartMsg) {
+        if (sendSmartMsg || msg instanceof MessageBatch) {
             SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
-            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
+            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
         } else {
             request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
         }
@@ -334,11 +336,11 @@ public class MQClientAPIImpl {
     }
 
     private SendResult sendMessageSync(//
-        final String addr, //
-        final String brokerName, //
-        final Message msg, //
-        final long timeoutMillis, //
-        final RemotingCommand request//
+                                       final String addr, //
+                                       final String brokerName, //
+                                       final Message msg, //
+                                       final long timeoutMillis, //
+                                       final RemotingCommand request//
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
         assert response != null;
@@ -507,8 +509,16 @@ public class MQClientAPIImpl {
 
                 MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId());
 
+                String uniqMsgId = MessageClientIDSetter.getUniqID(msg);
+                if (msg instanceof MessageBatch) {
+                    StringBuilder sb = new StringBuilder();
+                    for (Message message : (MessageBatch) msg) {
+                        sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message));
+                    }
+                    uniqMsgId = sb.toString();
+                }
                 SendResult sendResult = new SendResult(sendStatus,
-                    MessageClientIDSetter.getUniqID(msg),
+                    uniqMsgId,
                     responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
                 sendResult.setTransactionId(responseHeader.getTransactionId());
                 String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
@@ -1452,7 +1462,7 @@ public class MQClientAPIImpl {
     }
 
     public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic, final String group,
-        final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+                                                                                final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
         GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setGroup(group);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 8e81979..d828875 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -30,6 +30,16 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageType;
+import org.apache.rocketmq.common.message.MessageId;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.common.ClientErrorCode;
@@ -58,15 +68,6 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageAccessor;
-import org.apache.rocketmq.common.message.MessageClientIDSetter;
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageId;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.message.MessageType;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
 import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
@@ -595,8 +596,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
 
             byte[] prevBody = msg.getBody();
             try {
-
-                MessageClientIDSetter.setUniqID(msg);
+                //for MessageBatch,ID has been set in the generating process
+                if (!(msg instanceof MessageBatch)) {
+                    MessageClientIDSetter.setUniqID(msg);
+                }
 
                 int sysFlag = 0;
                 if (this.tryToCompressMessage(msg)) {
@@ -652,6 +655,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                 requestHeader.setReconsumeTimes(0);
                 requestHeader.setUnitMode(this.isUnitMode());
+                requestHeader.setBatch(msg instanceof MessageBatch);
                 if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                     String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                     if (reconsumeTimes != null) {
@@ -737,6 +741,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     }
 
     private boolean tryToCompressMessage(final Message msg) {
+        if (msg instanceof MessageBatch) {
+            //batch dose not support compressing right now
+            return false;
+        }
         byte[] body = msg.getBody();
         if (body != null) {
             if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 3480c92..135a447 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -16,14 +16,18 @@
  */
 package org.apache.rocketmq.client.producer;
 
+import java.util.Collection;
 import java.util.List;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.QueryResult;
+import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageId;
@@ -577,6 +581,40 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
         return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId);
     }
 
+    @Override
+    public SendResult send(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQProducerImpl.send(batch(msgs));
+    }
+
+    @Override
+    public SendResult send(Collection<Message> msgs, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQProducerImpl.send(batch(msgs), timeout);
+    }
+
+    @Override
+    public SendResult send(Collection<Message> msgs, MessageQueue messageQueue) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQProducerImpl.send(batch(msgs), messageQueue);
+    }
+
+    @Override
+    public SendResult send(Collection<Message> msgs, MessageQueue messageQueue, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout);
+    }
+
+    private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
+        MessageBatch msgBatch;
+        try {
+            msgBatch = MessageBatch.generateFromList(msgs);
+            for (Message message : msgBatch) {
+                Validators.checkMessage(message, this);
+                MessageClientIDSetter.setUniqID(message);
+            }
+            msgBatch.setBody(msgBatch.encode());
+        } catch (Exception e) {
+            throw new MQClientException("Failed to initiate the MessageBatch", e);
+        }
+        return msgBatch;
+    }
     public String getProducerGroup() {
         return producerGroup;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index 9fc7586..14caf6f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.client.producer;
 
+import java.util.Collection;
 import java.util.List;
 import org.apache.rocketmq.client.MQAdmin;
 import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -81,4 +82,17 @@ public interface MQProducer extends MQAdmin {
 
     TransactionSendResult sendMessageInTransaction(final Message msg,
         final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
+
+    //for batch
+    SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
+        InterruptedException;
+
+    SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException,
+        RemotingException, MQBrokerException, InterruptedException;
+
+    SendResult send(final Collection<Message> msgs, final MessageQueue mq) throws MQClientException,
+        RemotingException, MQBrokerException, InterruptedException;
+
+    SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java b/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java
index 58459e0..8dde5d8 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java
@@ -19,4 +19,5 @@ package org.apache.rocketmq.common;
 public enum TopicFilterType {
     SINGLE_TAG,
     MULTI_TAG
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
new file mode 100644
index 0000000..ca2ce88
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.message;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.rocketmq.common.MixAll;
+
+public class MessageBatch extends Message implements Iterable<Message> {
+
+    private static final long serialVersionUID = 621335151046335557L;
+    private final List<Message> messages;
+
+    private MessageBatch(List<Message> messages) {
+        this.messages = messages;
+    }
+
+    public byte[] encode() {
+        return MessageDecoder.encodeMessages(messages);
+    }
+
+    public Iterator<Message> iterator() {
+        return messages.iterator();
+    }
+
+    public static MessageBatch generateFromList(Collection<Message> messages) {
+        assert messages != null;
+        assert messages.size() > 0;
+        List<Message> messageList = new ArrayList<Message>(messages.size());
+        Message first = null;
+        for (Message message : messages) {
+            if (message.getDelayTimeLevel() > 0) {
+                throw new UnsupportedOperationException("TimeDelayLevel in not supported for batching");
+            }
+            if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+                throw new UnsupportedOperationException("Retry Group is not supported for batching");
+            }
+            if (first == null) {
+                first = message;
+            } else {
+                if (!first.getTopic().equals(message.getTopic())) {
+                    throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
+                }
+                if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
+                    throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
+                }
+            }
+            messageList.add(message);
+        }
+        MessageBatch messageBatch = new MessageBatch(messageList);
+
+        messageBatch.setTopic(first.getTopic());
+        messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());
+        return messageBatch;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index 4f4e158..90b837a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -200,6 +200,8 @@ public class MessageDecoder {
         return byteBuffer.array();
     }
 
+
+
     public static MessageExt decode(
         java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody) {
         return decode(byteBuffer, readBody, deCompressBody, false);
@@ -372,4 +374,105 @@ public class MessageDecoder {
 
         return map;
     }
+
+
+    public static byte[] encodeMessage(Message message) {
+        //only need flag, body, properties
+        byte[] body = message.getBody();
+        int bodyLen = body.length;
+        String properties = messageProperties2String(message.getProperties());
+        byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
+        //note properties length must not more than Short.MAX
+        short propertiesLength = (short) propertiesBytes.length;
+        int sysFlag = message.getFlag();
+        int storeSize = 4 // 1 TOTALSIZE
+            + 4 // 2 MAGICCOD
+            + 4 // 3 BODYCRC
+            + 4 // 4 FLAG
+            + 4 + bodyLen // 4 BODY
+            + 2 + propertiesLength;
+        ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
+        // 1 TOTALSIZE
+        byteBuffer.putInt(storeSize);
+
+        // 2 MAGICCODE
+        byteBuffer.putInt(0);
+
+        // 3 BODYCRC
+        byteBuffer.putInt(0);
+
+        // 4 FLAG
+        int flag = message.getFlag();
+        byteBuffer.putInt(flag);
+
+        // 5 BODY
+        byteBuffer.putInt(bodyLen);
+        byteBuffer.put(body);
+
+        // 6 properties
+        byteBuffer.putShort(propertiesLength);
+        byteBuffer.put(propertiesBytes);
+
+        return byteBuffer.array();
+    }
+
+    public static Message decodeMessage(ByteBuffer byteBuffer) throws Exception {
+        Message message = new Message();
+
+        // 1 TOTALSIZE
+        byteBuffer.getInt();
+
+        // 2 MAGICCODE
+        byteBuffer.getInt();
+
+        // 3 BODYCRC
+        byteBuffer.getInt();
+
+        // 4 FLAG
+        int flag = byteBuffer.getInt();
+        message.setFlag(flag);
+
+        // 5 BODY
+        int bodyLen = byteBuffer.getInt();
+        byte[] body = new byte[bodyLen];
+        byteBuffer.get(body);
+        message.setBody(body);
+
+        // 6 properties
+        short propertiesLen = byteBuffer.getShort();
+        byte[] propertiesBytes = new byte[propertiesLen];
+        byteBuffer.get(propertiesBytes);
+        message.setProperties(string2messageProperties(new String(propertiesBytes, CHARSET_UTF8)));
+
+        return message;
+    }
+
+    public static byte[] encodeMessages(List<Message> messages) {
+        //TO DO refactor, accumulate in one buffer, avoid copies
+        List<byte[]>  encodedMessages = new ArrayList<byte[]>(messages.size());
+        int allSize = 0;
+        for (Message message: messages) {
+            byte[] tmp = encodeMessage(message);
+            encodedMessages.add(tmp);
+            allSize += tmp.length;
+        }
+        byte[] allBytes = new byte[allSize];
+        int pos = 0;
+        for (byte[] bytes : encodedMessages) {
+            System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
+            pos += bytes.length;
+        }
+        return allBytes;
+    }
+
+
+    public static List<Message> decodeMessages(ByteBuffer byteBuffer) throws Exception {
+        //TO DO add a callback for processing,  avoid creating lists
+        List<Message> msgs = new ArrayList<Message>();
+        while (byteBuffer.hasRemaining()) {
+            Message msg = decodeMessage(byteBuffer);
+            msgs.add(msg);
+        }
+        return msgs;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
index d11069f..3f77767 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
@@ -64,7 +64,7 @@ public class MessageExt extends Message {
         return TopicFilterType.SINGLE_TAG;
     }
 
-    private static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {
+    public static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {
         InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
         byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
         byteBuffer.putInt(inetSocketAddress.getPort());

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java
new file mode 100644
index 0000000..352ab37
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.message;
+
+import java.nio.ByteBuffer;
+
+public class MessageExtBatch extends MessageExt {
+
+    private static final long serialVersionUID = -2353110995348498537L;
+
+
+    public ByteBuffer wrap() {
+        assert getBody() != null;
+        return  ByteBuffer.wrap(getBody(), 0, getBody().length);
+    }
+
+
+    private ByteBuffer encodedBuff;
+
+    public ByteBuffer getEncodedBuff() {
+        return encodedBuff;
+    }
+
+    public void setEncodedBuff(ByteBuffer encodedBuff) {
+        this.encodedBuff = encodedBuff;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 217e8df..c6b0925 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -159,4 +159,7 @@ public class RequestCode {
      * get config from name server
      */
     public static final int GET_NAMESRV_CONFIG = 319;
+
+
+    public static final int SEND_BATCH_MESSAGE = 320;
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index 38b6589..2df31e6 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -48,6 +48,8 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
     private Integer reconsumeTimes;
     @CFNullable
     private boolean unitMode = false;
+    @CFNullable
+    private boolean batch = false;
     private Integer maxReconsumeTimes;
 
     @Override
@@ -149,4 +151,12 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
     public void setMaxReconsumeTimes(final Integer maxReconsumeTimes) {
         this.maxReconsumeTimes = maxReconsumeTimes;
     }
+
+    public boolean isBatch() {
+        return batch;
+    }
+
+    public void setBatch(boolean batch) {
+        this.batch = batch;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index 34c83cb..757ef0c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -51,6 +51,10 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
 
     private Integer l; // consumeRetryTimes
 
+    @CFNullable
+    private boolean m; //batch
+
+
     public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
         SendMessageRequestHeader v1 = new SendMessageRequestHeader();
         v1.setProducerGroup(v2.a);
@@ -65,6 +69,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
         v1.setReconsumeTimes(v2.j);
         v1.setUnitMode(v2.k);
         v1.setMaxReconsumeTimes(v2.l);
+        v1.setBatch(v2.m);
         return v1;
     }
 
@@ -82,6 +87,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
         v2.j = v1.getReconsumeTimes();
         v2.k = v1.isUnitMode();
         v2.l = v1.getMaxReconsumeTimes();
+        v2.m = v1.isBatch();
         return v2;
     }
 
@@ -184,4 +190,12 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
     public void setL(final Integer l) {
         this.l = l;
     }
+
+    public boolean isM() {
+        return m;
+    }
+
+    public void setM(boolean m) {
+        this.m = m;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java
new file mode 100644
index 0000000..1e406d2
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.junit.Test;
+
+public class MessageBatchTest {
+
+
+    public List<Message> generateMessages() {
+        List<Message> messages = new ArrayList<Message>();
+        Message message1 = new Message("topic1", "body".getBytes());
+        Message message2 = new Message("topic1", "body".getBytes());
+
+        messages.add(message1);
+        messages.add(message2);
+        return messages;
+    }
+
+    @Test
+    public void testGenerate_OK() throws Exception{
+        List<Message> messages = generateMessages();
+        MessageBatch.generateFromList(messages);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testGenerate_DiffTopic() throws Exception{
+        List<Message> messages = generateMessages();
+        messages.get(1).setTopic("topic2");
+        MessageBatch.generateFromList(messages);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testGenerate_DiffWaitOK() throws Exception{
+        List<Message> messages = generateMessages();
+        messages.get(1).setWaitStoreMsgOK(false);
+        MessageBatch.generateFromList(messages);
+    }
+    @Test(expected = UnsupportedOperationException.class)
+    public void testGenerate_Delay() throws Exception{
+        List<Message> messages = generateMessages();
+        messages.get(1).setDelayTimeLevel(1);
+        MessageBatch.generateFromList(messages);
+    }
+    @Test(expected = UnsupportedOperationException.class)
+    public void testGenerate_Retry() throws Exception{
+        List<Message> messages = generateMessages();
+        messages.get(1).setTopic(MixAll.RETRY_GROUP_TOPIC_PREFIX + "topic");
+        MessageBatch.generateFromList(messages);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java
new file mode 100644
index 0000000..a219eda
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Created by liuzhendong on 16/12/21.
+ */
+public class MessageEncodeDecodeTest {
+
+
+    @Test
+    public void testEncodeDecodeSingle() throws Exception{
+        Message message = new Message("topic", "body".getBytes());
+        message.setFlag(12);
+        message.putUserProperty("key","value");
+        byte[] bytes = MessageDecoder.encodeMessage(message);
+        ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+        buffer.put(bytes);
+        buffer.flip();
+        Message newMessage = MessageDecoder.decodeMessage(buffer);
+
+        assertTrue(message.getFlag() == newMessage.getFlag());
+        assertTrue(newMessage.getProperty("key").equals(newMessage.getProperty("key")));
+        assertTrue(Arrays.equals(newMessage.getBody(), message.getBody()));
+    }
+
+    @Test
+    public void testEncodeDecodeList() throws Exception {
+        List<Message> messages = new ArrayList<Message>(128);
+        for (int i = 0; i < 100; i++) {
+            Message message = new Message("topic", ("body" + i).getBytes());
+            message.setFlag(i);
+            message.putUserProperty("key", "value" + i);
+            messages.add(message);
+        }
+        byte[]  bytes = MessageDecoder.encodeMessages(messages);
+
+        ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+        buffer.put(bytes);
+        buffer.flip();
+
+        List<Message> newMsgs = MessageDecoder.decodeMessages(buffer);
+
+        assertTrue(newMsgs.size() == messages.size());
+
+        for (int i = 0; i < newMsgs.size(); i++) {
+            Message message = messages.get(i);
+            Message newMessage = newMsgs.get(i);
+            assertTrue(message.getFlag() == newMessage.getFlag());
+            assertTrue(newMessage.getProperty("key").equals(newMessage.getProperty("key")));
+            assertTrue(Arrays.equals(newMessage.getBody(), message.getBody()));
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
index 70b702e..16a62fa 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.store;
 
 import java.nio.ByteBuffer;
+import org.apache.rocketmq.common.message.MessageExtBatch;
 
 /**
  * Write messages callback interface
@@ -32,5 +33,17 @@ public interface AppendMessageCallback {
      * @return How many bytes to write
      */
     AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
-        final int maxBlank, final MessageExtBrokerInner msg);
+                                 final int maxBlank, final MessageExtBrokerInner msg);
+
+    /**
+     * After batched message serialization, write MapedByteBuffer
+     *
+     * @param byteBuffer
+     * @param maxBlank
+     * @param messageExtBatch, backed up by a byte array
+     *
+     * @return How many bytes to write
+     */
+    AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
+                                 final int maxBlank, final MessageExtBatch messageExtBatch);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
index 5182dc4..d6d1aa6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
@@ -34,6 +34,8 @@ public class AppendMessageResult {
     private long logicsOffset;
     private long pagecacheRT = 0;
 
+    private int msgNum = 1;
+
     public AppendMessageResult(AppendMessageStatus status) {
         this(status, 0, 0, "", 0, 0, 0);
     }
@@ -109,6 +111,14 @@ public class AppendMessageResult {
         this.logicsOffset = logicsOffset;
     }
 
+    public int getMsgNum() {
+        return msgNum;
+    }
+
+    public void setMsgNum(int msgNum) {
+        this.msgNum = msgNum;
+    }
+
     @Override
     public String toString() {
         return "AppendMessageResult{" +
@@ -119,6 +129,7 @@ public class AppendMessageResult {
             ", storeTimestamp=" + storeTimestamp +
             ", logicsOffset=" + logicsOffset +
             ", pagecacheRT=" + pagecacheRT +
+            ", msgNum=" + msgNum +
             '}';
     }
 }


Mime
View raw message