rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [18/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander
Date Mon, 19 Dec 2016 09:40:35 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/UtilAllTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/UtilAllTest.java b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/UtilAllTest.java
new file mode 100644
index 0000000..7764fcc
--- /dev/null
+++ b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/UtilAllTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common;
+
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Properties;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class UtilAllTest {
+
+    @Test
+    public void test_currentStackTrace() {
+        System.out.println(UtilAll.currentStackTrace());
+    }
+
+
+    @Test
+    public void test_a() {
+        URL url = this.getClass().getProtectionDomain().getCodeSource().getLocation();
+        System.out.println(url);
+        System.out.println(url.getPath());
+    }
+
+
+    @Test
+    public void test_resetClassProperties() {
+        DemoConfig demoConfig = new DemoConfig();
+        MixAll.properties2Object(new Properties(), demoConfig);
+    }
+
+
+    @Test
+    public void test_properties2String() {
+        DemoConfig demoConfig = new DemoConfig();
+        Properties properties = MixAll.object2Properties(demoConfig);
+        System.out.println(MixAll.properties2String(properties));
+    }
+
+
+    @Test
+    public void test_timeMillisToHumanString() {
+        System.out.println(UtilAll.timeMillisToHumanString());
+    }
+
+
+    @Test
+    public void test_isPropertiesEqual() {
+        final Properties p1 = new Properties();
+        final Properties p2 = new Properties();
+
+        p1.setProperty("a", "1");
+        p1.setProperty("b", "2");
+
+        p2.setProperty("a", "1");
+        p2.setProperty("b", "2");
+        // p2.setProperty("c", "3");
+
+        assertTrue(MixAll.isPropertiesEqual(p1, p2));
+    }
+
+
+    @Test
+    public void test_getpid() {
+        int pid = UtilAll.getPid();
+
+        System.out.println("PID = " + pid);
+        assertTrue(pid > 0);
+    }
+
+
+    @Test
+    public void test_isBlank() {
+        {
+            boolean result = UtilAll.isBlank("Hello ");
+            assertTrue(!result);
+        }
+
+        {
+            boolean result = UtilAll.isBlank(" Hello");
+            assertTrue(!result);
+        }
+
+        {
+            boolean result = UtilAll.isBlank("He llo");
+            assertTrue(!result);
+        }
+
+        {
+            boolean result = UtilAll.isBlank("  ");
+            assertTrue(result);
+        }
+
+        {
+            boolean result = UtilAll.isBlank("Hello");
+            assertTrue(!result);
+        }
+    }
+
+    static class DemoConfig {
+        private int demoWidth = 0;
+        private int demoLength = 0;
+        private boolean demoOK = false;
+        private String demoName = "haha";
+
+
+        public int getDemoWidth() {
+            return demoWidth;
+        }
+
+
+        public void setDemoWidth(int demoWidth) {
+            this.demoWidth = demoWidth;
+        }
+
+
+        public int getDemoLength() {
+            return demoLength;
+        }
+
+
+        public void setDemoLength(int demoLength) {
+            this.demoLength = demoLength;
+        }
+
+
+        public boolean isDemoOK() {
+            return demoOK;
+        }
+
+
+        public void setDemoOK(boolean demoOK) {
+            this.demoOK = demoOK;
+        }
+
+
+        public String getDemoName() {
+            return demoName;
+        }
+
+
+        public void setDemoNfieldame(String demoName) {
+            this.demoName = demoName;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/filter/FilterAPITest.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/filter/FilterAPITest.java b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/filter/FilterAPITest.java
new file mode 100644
index 0000000..e45873b
--- /dev/null
+++ b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/filter/FilterAPITest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.filter;
+
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+import org.junit.Test;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class FilterAPITest {
+
+    @Test
+    public void testBuildSubscriptionData() throws Exception {
+        SubscriptionData subscriptionData =
+                FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3");
+        System.out.println(subscriptionData);
+    }
+
+    @Test
+    public void testSubscriptionData() throws Exception {
+        SubscriptionData subscriptionData =
+                FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3");
+        subscriptionData.setFilterClassSource("java hello");
+        String json = RemotingSerializable.toJson(subscriptionData, true);
+        System.out.println(json);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/filter/PolishExprTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/filter/PolishExprTest.java b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/filter/PolishExprTest.java
new file mode 100644
index 0000000..612df69
--- /dev/null
+++ b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/filter/PolishExprTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.filter;
+
+import com.alibaba.rocketmq.common.filter.impl.Op;
+import com.alibaba.rocketmq.common.filter.impl.PolishExpr;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+
+/**
+ * @author lansheng.zj
+ */
+public class PolishExprTest {
+
+    private String expression = "tag1||(tag2&&tag3)&&tag4||tag5&&(tag6 && tag7)|| tag8    && tag9";
+    private PolishExpr polishExpr;
+
+
+    public void init() {
+        polishExpr = new PolishExpr();
+    }
+
+
+    @Test
+    public void testReversePolish() {
+        List<Op> antiPolishExpression = polishExpr.reversePolish(expression);
+        System.out.println(antiPolishExpression);
+    }
+
+
+    @Test
+    public void testReversePolish_Performance() {
+        // prepare
+        for (int i = 0; i < 100000; i++) {
+            polishExpr.reversePolish(expression);
+        }
+
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < 100000; i++) {
+            polishExpr.reversePolish(expression);
+        }
+        long cost = System.currentTimeMillis() - start;
+        System.out.println(cost);
+        // System.out.println(cost / 100000F);
+
+        Assert.assertTrue(cost < 500);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/protocol/ConsumeStatusTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/protocol/ConsumeStatusTest.java b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/protocol/ConsumeStatusTest.java
new file mode 100644
index 0000000..32e3d98
--- /dev/null
+++ b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/protocol/ConsumeStatusTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol;
+
+import com.alibaba.rocketmq.common.protocol.body.ConsumeStatus;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+import org.junit.Test;
+
+
+public class ConsumeStatusTest {
+
+    @Test
+    public void decode_test() throws Exception {
+        ConsumeStatus cs = new ConsumeStatus();
+        cs.setConsumeFailedTPS(0L);
+        String json = RemotingSerializable.toJson(cs, true);
+        System.out.println(json);
+        ConsumeStatus fromJson = RemotingSerializable.fromJson(json, ConsumeStatus.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/protocol/MQProtosHelperTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/protocol/MQProtosHelperTest.java b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/protocol/MQProtosHelperTest.java
new file mode 100644
index 0000000..749e7df
--- /dev/null
+++ b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/protocol/MQProtosHelperTest.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol;
+
+/**
+ * @author shijia.wxr
+ */
+public class MQProtosHelperTest {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-example/pom.xml b/rocketmq-example/pom.xml
new file mode 100644
index 0000000..8e68a58
--- /dev/null
+++ b/rocketmq-example/pom.xml
@@ -0,0 +1,58 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain producerGroup copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>com.alibaba.rocketmq</groupId>
+        <artifactId>rocketmq-all</artifactId>
+        <version>4.0.0-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+    <artifactId>rocketmq-example</artifactId>
+    <name>rocketmq-example ${project.version}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-srvutil</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.javassist</groupId>
+            <artifactId>javassist</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/Consumer.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/Consumer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/Consumer.java
new file mode 100644
index 0000000..7150513
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/Consumer.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.example.benchmark;
+
+import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.srvutil.ServerUtil;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Consumer {
+
+    public static void main(String[] args) throws MQClientException {
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkConsumer", args, buildCommandlineOptions(options), new PosixParser());
+        if (null == commandLine) {
+            System.exit(-1);
+        }
+
+        final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
+        final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim() : "benchmark_consumer";
+        final String isPrefixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true";
+        String group = groupPrefix;
+        if (Boolean.parseBoolean(isPrefixEnable)) {
+            group = groupPrefix + "_" + Long.toString(System.currentTimeMillis() % 100);
+        }
+
+        System.out.printf("topic %s group %s prefix %s%n", topic, group, isPrefixEnable);
+
+        final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();
+
+        final Timer timer = new Timer("BenchmarkTimerThread", true);
+
+        final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
+
+        timer.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                snapshotList.addLast(statsBenchmarkConsumer.createSnapshot());
+                if (snapshotList.size() > 10) {
+                    snapshotList.removeFirst();
+                }
+            }
+        }, 1000, 1000);
+
+        timer.scheduleAtFixedRate(new TimerTask() {
+            private void printStats() {
+                if (snapshotList.size() >= 10) {
+                    Long[] begin = snapshotList.getFirst();
+                    Long[] end = snapshotList.getLast();
+
+                    final long consumeTps =
+                            (long) (((end[1] - begin[1]) / (double) (end[0] - begin[0])) * 1000L);
+                    final double averageB2CRT = (end[2] - begin[2]) / (double) (end[1] - begin[1]);
+                    final double averageS2CRT = (end[3] - begin[3]) / (double) (end[1] - begin[1]);
+
+                    System.out.printf("Consume TPS: %d Average(B2C) RT: %7.3f Average(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n",
+                            consumeTps, averageB2CRT, averageS2CRT, end[4], end[5]
+                    );
+                }
+            }
+
+
+            @Override
+            public void run() {
+                try {
+                    this.printStats();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }, 10000, 10000);
+
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
+        consumer.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        consumer.subscribe(topic, "*");
+
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                            ConsumeConcurrentlyContext context) {
+                MessageExt msg = msgs.get(0);
+                long now = System.currentTimeMillis();
+
+                statsBenchmarkConsumer.getReceiveMessageTotalCount().incrementAndGet();
+
+                long born2ConsumerRT = now - msg.getBornTimestamp();
+                statsBenchmarkConsumer.getBorn2ConsumerTotalRT().addAndGet(born2ConsumerRT);
+
+                long store2ConsumerRT = now - msg.getStoreTimestamp();
+                statsBenchmarkConsumer.getStore2ConsumerTotalRT().addAndGet(store2ConsumerRT);
+
+                compareAndSetMax(statsBenchmarkConsumer.getBorn2ConsumerMaxRT(), born2ConsumerRT);
+
+                compareAndSetMax(statsBenchmarkConsumer.getStore2ConsumerMaxRT(), store2ConsumerRT);
+
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+
+        consumer.start();
+
+        System.out.printf("Consumer Started.%n");
+    }
+
+    public static Options buildCommandlineOptions(final Options options) {
+        Option opt = new Option("t", "topic", true, "Topic name, Default: BenchmarkTest");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("g", "group", true, "Consumer group name, Default: benchmark_consumer");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+
+        opt = new Option("p", "group prefix enable", true, "Consumer group name, Default: false");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+
+    public static void compareAndSetMax(final AtomicLong target, final long value) {
+        long prev = target.get();
+        while (value > prev) {
+            boolean updated = target.compareAndSet(prev, value);
+            if (updated)
+                break;
+
+            prev = target.get();
+        }
+    }
+}
+
+
+class StatsBenchmarkConsumer {
+    private final AtomicLong receiveMessageTotalCount = new AtomicLong(0L);
+
+    private final AtomicLong born2ConsumerTotalRT = new AtomicLong(0L);
+
+    private final AtomicLong store2ConsumerTotalRT = new AtomicLong(0L);
+
+    private final AtomicLong born2ConsumerMaxRT = new AtomicLong(0L);
+
+    private final AtomicLong store2ConsumerMaxRT = new AtomicLong(0L);
+
+
+    public Long[] createSnapshot() {
+        Long[] snap = new Long[]{
+                System.currentTimeMillis(),
+                this.receiveMessageTotalCount.get(),
+                this.born2ConsumerTotalRT.get(),
+                this.store2ConsumerTotalRT.get(),
+                this.born2ConsumerMaxRT.get(),
+                this.store2ConsumerMaxRT.get(),
+        };
+
+        return snap;
+    }
+
+
+    public AtomicLong getReceiveMessageTotalCount() {
+        return receiveMessageTotalCount;
+    }
+
+
+    public AtomicLong getBorn2ConsumerTotalRT() {
+        return born2ConsumerTotalRT;
+    }
+
+
+    public AtomicLong getStore2ConsumerTotalRT() {
+        return store2ConsumerTotalRT;
+    }
+
+
+    public AtomicLong getBorn2ConsumerMaxRT() {
+        return born2ConsumerMaxRT;
+    }
+
+
+    public AtomicLong getStore2ConsumerMaxRT() {
+        return store2ConsumerMaxRT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/Producer.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/Producer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/Producer.java
new file mode 100644
index 0000000..b0351c6
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/Producer.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.example.benchmark;
+
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.log.ClientLogger;
+import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+import com.alibaba.rocketmq.srvutil.ServerUtil;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+
+import java.io.UnsupportedEncodingException;
+import java.util.LinkedList;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Producer {
+    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
+
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkProducer", args, buildCommandlineOptions(options), new PosixParser());
+        if (null == commandLine) {
+            System.exit(-1);
+        }
+
+        final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
+        final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64;
+        final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128;
+        final boolean keyEnable = commandLine.hasOption('k') ? Boolean.parseBoolean(commandLine.getOptionValue('k')) : false;
+
+        System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s%n", topic, threadCount, messageSize, keyEnable);
+
+        final Logger log = ClientLogger.getLog();
+
+        final Message msg = buildMessage(messageSize, topic);
+
+        final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
+
+        final StatsBenchmarkProducer statsBenchmark = new StatsBenchmarkProducer();
+
+        final Timer timer = new Timer("BenchmarkTimerThread", true);
+
+        final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
+
+        timer.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                snapshotList.addLast(statsBenchmark.createSnapshot());
+                if (snapshotList.size() > 10) {
+                    snapshotList.removeFirst();
+                }
+            }
+        }, 1000, 1000);
+
+        timer.scheduleAtFixedRate(new TimerTask() {
+            private void printStats() {
+                if (snapshotList.size() >= 10) {
+                    Long[] begin = snapshotList.getFirst();
+                    Long[] end = snapshotList.getLast();
+
+                    final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
+                    final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
+
+                    System.out.printf("Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d%n",
+                            sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
+                }
+            }
+
+
+            @Override
+            public void run() {
+                try {
+                    this.printStats();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }, 10000, 10000);
+
+        final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer");
+        producer.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        if (commandLine.hasOption('n')) {
+            String ns = commandLine.getOptionValue('n');
+            producer.setNamesrvAddr(ns);
+        }
+
+        producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
+
+        producer.start();
+
+        for (int i = 0; i < threadCount; i++) {
+            sendThreadPool.execute(new Runnable() {
+                @Override
+                public void run() {
+                    while (true) {
+                        try {
+                            final long beginTimestamp = System.currentTimeMillis();
+                            if (keyEnable) {
+                                msg.setKeys(String.valueOf(beginTimestamp / 1000));
+                            }
+                            producer.send(msg);
+                            statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
+                            statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
+                            final long currentRT = System.currentTimeMillis() - beginTimestamp;
+                            statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT);
+                            long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
+                            while (currentRT > prevMaxRT) {
+                                boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT);
+                                if (updated)
+                                    break;
+
+                                prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
+                            }
+                        } catch (RemotingException e) {
+                            statsBenchmark.getSendRequestFailedCount().incrementAndGet();
+                            log.error("[BENCHMARK_PRODUCER] Send Exception", e);
+
+                            try {
+                                Thread.sleep(3000);
+                            } catch (InterruptedException e1) {
+                            }
+                        } catch (InterruptedException e) {
+                            statsBenchmark.getSendRequestFailedCount().incrementAndGet();
+                            try {
+                                Thread.sleep(3000);
+                            } catch (InterruptedException e1) {
+                            }
+                        } catch (MQClientException e) {
+                            statsBenchmark.getSendRequestFailedCount().incrementAndGet();
+                            log.error("[BENCHMARK_PRODUCER] Send Exception", e);
+                        } catch (MQBrokerException e) {
+                            statsBenchmark.getReceiveResponseFailedCount().incrementAndGet();
+                            log.error("[BENCHMARK_PRODUCER] Send Exception", e);
+                            try {
+                                Thread.sleep(3000);
+                            } catch (InterruptedException e1) {
+                            }
+                        }
+                    }
+                }
+            });
+        }
+    }
+
+    public static Options buildCommandlineOptions(final Options options) {
+        Option opt = new Option("w", "threadCount", true, "Thread count, Default: 64");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("s", "messageSize", true, "Message Size, Default: 128");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("k", "keyEnable", true, "Message Key Enable, Default: false");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("t", "topic", true, "Topic name, Default: BenchmarkTest");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    private static Message buildMessage(final int messageSize, final String topic) throws UnsupportedEncodingException {
+        Message msg = new Message();
+        msg.setTopic(topic);
+
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < messageSize; i += 10) {
+            sb.append("hello baby");
+        }
+
+        msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
+
+        return msg;
+    }
+}
+
+
+class StatsBenchmarkProducer {
+    private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L);
+
+    private final AtomicLong sendRequestFailedCount = new AtomicLong(0L);
+
+    private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L);
+
+    private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L);
+
+    private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L);
+
+    private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
+
+
+    public Long[] createSnapshot() {
+        Long[] snap = new Long[]{
+                System.currentTimeMillis(),
+                this.sendRequestSuccessCount.get(),
+                this.sendRequestFailedCount.get(),
+                this.receiveResponseSuccessCount.get(),
+                this.receiveResponseFailedCount.get(),
+                this.sendMessageSuccessTimeTotal.get(),
+        };
+
+        return snap;
+    }
+
+
+    public AtomicLong getSendRequestSuccessCount() {
+        return sendRequestSuccessCount;
+    }
+
+
+    public AtomicLong getSendRequestFailedCount() {
+        return sendRequestFailedCount;
+    }
+
+
+    public AtomicLong getReceiveResponseSuccessCount() {
+        return receiveResponseSuccessCount;
+    }
+
+
+    public AtomicLong getReceiveResponseFailedCount() {
+        return receiveResponseFailedCount;
+    }
+
+
+    public AtomicLong getSendMessageSuccessTimeTotal() {
+        return sendMessageSuccessTimeTotal;
+    }
+
+
+    public AtomicLong getSendMessageMaxRT() {
+        return sendMessageMaxRT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/TransactionProducer.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/TransactionProducer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/TransactionProducer.java
new file mode 100644
index 0000000..3dffd2f
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/TransactionProducer.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.example.benchmark;
+
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.producer.*;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+
+import java.io.UnsupportedEncodingException;
+import java.util.LinkedList;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TransactionProducer {
+    private static int threadCount;
+    private static int messageSize;
+    private static boolean ischeck;
+    private static boolean ischeckffalse;
+
+
+    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
+        threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32;
+        messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2;
+        ischeck = args.length >= 3 ? Boolean.parseBoolean(args[2]) : false;
+        ischeckffalse = args.length >= 4 ? Boolean.parseBoolean(args[3]) : false;
+
+        final Message msg = buildMessage(messageSize);
+
+        final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
+
+        final StatsBenchmarkTProducer statsBenchmark = new StatsBenchmarkTProducer();
+
+        final Timer timer = new Timer("BenchmarkTimerThread", true);
+
+        final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
+
+        timer.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                snapshotList.addLast(statsBenchmark.createSnapshot());
+                while (snapshotList.size() > 10) {
+                    snapshotList.removeFirst();
+                }
+            }
+        }, 1000, 1000);
+
+        timer.scheduleAtFixedRate(new TimerTask() {
+            private void printStats() {
+                if (snapshotList.size() >= 10) {
+                    Long[] begin = snapshotList.getFirst();
+                    Long[] end = snapshotList.getLast();
+
+                    final long sendTps =
+                            (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
+                    final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
+
+                    System.out.printf(
+                            "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n",
+                            sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4], end[6]);
+                }
+            }
+
+
+            @Override
+            public void run() {
+                try {
+                    this.printStats();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }, 10000, 10000);
+
+        final TransactionCheckListener transactionCheckListener =
+                new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark);
+        final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer");
+        producer.setInstanceName(Long.toString(System.currentTimeMillis()));
+        producer.setTransactionCheckListener(transactionCheckListener);
+        producer.setDefaultTopicQueueNums(1000);
+        producer.start();
+
+        final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck);
+
+        for (int i = 0; i < threadCount; i++) {
+            sendThreadPool.execute(new Runnable() {
+                @Override
+                public void run() {
+                    while (true) {
+                        try {
+                            // Thread.sleep(1000);
+                            final long beginTimestamp = System.currentTimeMillis();
+                            SendResult sendResult =
+                                    producer.sendMessageInTransaction(msg, tranExecuter, null);
+                            if (sendResult != null) {
+                                statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
+                                statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
+                            }
+
+                            final long currentRT = System.currentTimeMillis() - beginTimestamp;
+                            statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT);
+                            long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
+                            while (currentRT > prevMaxRT) {
+                                boolean updated =
+                                        statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT,
+                                                currentRT);
+                                if (updated)
+                                    break;
+
+                                prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
+                            }
+                        } catch (MQClientException e) {
+                            statsBenchmark.getSendRequestFailedCount().incrementAndGet();
+                        }
+                    }
+                }
+            });
+        }
+    }
+
+
+    private static Message buildMessage(final int messageSize) throws UnsupportedEncodingException {
+        Message msg = new Message();
+        msg.setTopic("BenchmarkTest");
+
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < messageSize; i += 10) {
+            sb.append("hello baby");
+        }
+
+        msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
+
+        return msg;
+    }
+}
+
+
+class TransactionExecuterBImpl implements LocalTransactionExecuter {
+
+    private boolean ischeck;
+
+
+    public TransactionExecuterBImpl(boolean ischeck) {
+        this.ischeck = ischeck;
+    }
+
+
+    @Override
+    public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
+        if (ischeck) {
+            return LocalTransactionState.UNKNOW;
+        }
+        return LocalTransactionState.COMMIT_MESSAGE;
+    }
+}
+
+
+class TransactionCheckListenerBImpl implements TransactionCheckListener {
+    private boolean ischeckffalse;
+    private StatsBenchmarkTProducer statsBenchmarkTProducer;
+
+
+    public TransactionCheckListenerBImpl(boolean ischeckffalse,
+                                         StatsBenchmarkTProducer statsBenchmarkTProducer) {
+        this.ischeckffalse = ischeckffalse;
+        this.statsBenchmarkTProducer = statsBenchmarkTProducer;
+    }
+
+
+    @Override
+    public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
+        statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet();
+        if (ischeckffalse) {
+
+            return LocalTransactionState.ROLLBACK_MESSAGE;
+        }
+
+        return LocalTransactionState.COMMIT_MESSAGE;
+    }
+}
+
+
+class StatsBenchmarkTProducer {
+    private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L);
+
+    private final AtomicLong sendRequestFailedCount = new AtomicLong(0L);
+
+    private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L);
+
+    private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L);
+
+    private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L);
+
+    private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
+
+    private final AtomicLong checkRequestSuccessCount = new AtomicLong(0L);
+
+
+    public Long[] createSnapshot() {
+        Long[] snap = new Long[]{
+                System.currentTimeMillis(),
+                this.sendRequestSuccessCount.get(),
+                this.sendRequestFailedCount.get(),
+                this.receiveResponseSuccessCount.get(),
+                this.receiveResponseFailedCount.get(),
+                this.sendMessageSuccessTimeTotal.get(),
+                this.checkRequestSuccessCount.get()};
+
+        return snap;
+    }
+
+
+    public AtomicLong getSendRequestSuccessCount() {
+        return sendRequestSuccessCount;
+    }
+
+
+    public AtomicLong getSendRequestFailedCount() {
+        return sendRequestFailedCount;
+    }
+
+
+    public AtomicLong getReceiveResponseSuccessCount() {
+        return receiveResponseSuccessCount;
+    }
+
+
+    public AtomicLong getReceiveResponseFailedCount() {
+        return receiveResponseFailedCount;
+    }
+
+
+    public AtomicLong getSendMessageSuccessTimeTotal() {
+        return sendMessageSuccessTimeTotal;
+    }
+
+
+    public AtomicLong getSendMessageMaxRT() {
+        return sendMessageMaxRT;
+    }
+
+
+    public AtomicLong getCheckRequestSuccessCount() {
+        return checkRequestSuccessCount;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/broadcast/PushConsumer.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/broadcast/PushConsumer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/broadcast/PushConsumer.java
new file mode 100644
index 0000000..6cc6238
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/broadcast/PushConsumer.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.example.broadcast;
+
+import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
+
+import java.util.List;
+
+public class PushConsumer {
+
+    public static void main(String[] args) throws InterruptedException, MQClientException {
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
+
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+
+        consumer.setMessageModel(MessageModel.BROADCASTING);
+
+        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
+
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                            ConsumeConcurrentlyContext context) {
+                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+
+        consumer.start();
+        System.out.printf("Broadcast Consumer Started.%n");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/filter/Consumer.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/filter/Consumer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/filter/Consumer.java
new file mode 100644
index 0000000..104e6d9
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/filter/Consumer.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.example.filter;
+
+import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+
+public class Consumer {
+
+    public static void main(String[] args) throws InterruptedException, MQClientException {
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
+
+        String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");
+        consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl",
+                filterCode);
+
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                            ConsumeConcurrentlyContext context) {
+                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+
+        consumer.start();
+
+        System.out.printf("Consumer Started.%n");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/filter/Producer.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/filter/Producer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/filter/Producer.java
new file mode 100644
index 0000000..04251fa
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/filter/Producer.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.example.filter;
+
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
+import com.alibaba.rocketmq.client.producer.SendResult;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+
+public class Producer {
+    public static void main(String[] args) throws MQClientException, InterruptedException {
+        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
+        producer.start();
+
+        try {
+            for (int i = 0; i < 6000000; i++) {
+                Message msg = new Message("TopicFilter7",
+                        "TagA",
+                        "OrderID001",
+                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+
+                msg.putUserProperty("SequenceId", String.valueOf(i));
+                SendResult sendResult = producer.send(msg);
+                System.out.printf("%s%n", sendResult);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        producer.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/operation/Consumer.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/operation/Consumer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/operation/Consumer.java
new file mode 100644
index 0000000..f6ba067
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/operation/Consumer.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.example.operation;
+
+import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class Consumer {
+
+    public static void main(String[] args) throws InterruptedException, MQClientException {
+        CommandLine commandLine = buildCommandline(args);
+        if (commandLine != null) {
+            String group = commandLine.getOptionValue('g');
+            String topic = commandLine.getOptionValue('t');
+            String subscription = commandLine.getOptionValue('s');
+            final String returnFailedHalf = commandLine.getOptionValue('f');
+
+            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
+            consumer.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+            consumer.subscribe(topic, subscription);
+
+            consumer.registerMessageListener(new MessageListenerConcurrently() {
+                AtomicLong consumeTimes = new AtomicLong(0);
+
+
+                @Override
+                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                                ConsumeConcurrentlyContext context) {
+                    long currentTimes = this.consumeTimes.incrementAndGet();
+                    System.out.printf("%-8d %s%n", currentTimes, msgs);
+                    if (Boolean.parseBoolean(returnFailedHalf)) {
+                        if ((currentTimes % 2) == 0) {
+                            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+                        }
+                    }
+                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                }
+            });
+
+            consumer.start();
+
+            System.out.printf("Consumer Started.%n");
+        }
+    }
+
+    public static CommandLine buildCommandline(String[] args) {
+        final Options options = new Options();
+        Option opt = new Option("h", "help", false, "Print help");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("g", "consumerGroup", true, "Consumer Group Name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("t", "topic", true, "Topic Name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("s", "subscription", true, "subscription");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("f", "returnFailedHalf", true, "return failed result, for half message");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        PosixParser parser = new PosixParser();
+        HelpFormatter hf = new HelpFormatter();
+        hf.setWidth(110);
+        CommandLine commandLine = null;
+        try {
+            commandLine = parser.parse(options, args);
+            if (commandLine.hasOption('h')) {
+                hf.printHelp("producer", options, true);
+                return null;
+            }
+        } catch (ParseException e) {
+            hf.printHelp("producer", options, true);
+            return null;
+        }
+
+        return commandLine;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/operation/Producer.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/operation/Producer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/operation/Producer.java
new file mode 100644
index 0000000..816e3e8
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/operation/Producer.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.example.operation;
+
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
+import com.alibaba.rocketmq.client.producer.SendResult;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import org.apache.commons.cli.*;
+
+public class Producer {
+
+    public static void main(String[] args) throws MQClientException, InterruptedException {
+        CommandLine commandLine = buildCommandline(args);
+        if (commandLine != null) {
+            String group = commandLine.getOptionValue('g');
+            String topic = commandLine.getOptionValue('t');
+            String tags = commandLine.getOptionValue('a');
+            String keys = commandLine.getOptionValue('k');
+            String msgCount = commandLine.getOptionValue('c');
+
+            DefaultMQProducer producer = new DefaultMQProducer(group);
+            producer.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+            producer.start();
+
+            for (int i = 0; i < Integer.parseInt(msgCount); i++) {
+                try {
+                    Message msg = new Message(
+                            topic,
+                            tags,
+                            keys,
+                            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
+                    SendResult sendResult = producer.send(msg);
+                    System.out.printf("%-8d %s%n", i, sendResult);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    Thread.sleep(1000);
+                }
+            }
+
+            producer.shutdown();
+        }
+    }
+
+    public static CommandLine buildCommandline(String[] args) {
+        final Options options = new Options();
+        Option opt = new Option("h", "help", false, "Print help");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("g", "producerGroup", true, "Producer Group Name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("t", "topic", true, "Topic Name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("a", "tags", true, "Tags Name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("k", "keys", true, "Keys Name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("c", "msgCount", true, "Message Count");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        PosixParser parser = new PosixParser();
+        HelpFormatter hf = new HelpFormatter();
+        hf.setWidth(110);
+        CommandLine commandLine = null;
+        try {
+            commandLine = parser.parse(options, args);
+            if (commandLine.hasOption('h')) {
+                hf.printHelp("producer", options, true);
+                return null;
+            }
+        } catch (ParseException e) {
+            hf.printHelp("producer", options, true);
+            return null;
+        }
+
+        return commandLine;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Consumer.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Consumer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Consumer.java
new file mode 100644
index 0000000..7b5f657
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Consumer.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.example.ordermessage;
+
+import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
+import com.alibaba.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class Consumer {
+
+    public static void main(String[] args) throws MQClientException {
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
+
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+
+        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
+
+        consumer.registerMessageListener(new MessageListenerOrderly() {
+            AtomicLong consumeTimes = new AtomicLong(0);
+
+            @Override
+            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
+                context.setAutoCommit(false);
+                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
+                this.consumeTimes.incrementAndGet();
+                if ((this.consumeTimes.get() % 2) == 0) {
+                    return ConsumeOrderlyStatus.SUCCESS;
+                } else if ((this.consumeTimes.get() % 3) == 0) {
+                    return ConsumeOrderlyStatus.ROLLBACK;
+                } else if ((this.consumeTimes.get() % 4) == 0) {
+                    return ConsumeOrderlyStatus.COMMIT;
+                } else if ((this.consumeTimes.get() % 5) == 0) {
+                    context.setSuspendCurrentQueueTimeMillis(3000);
+                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+                }
+
+                return ConsumeOrderlyStatus.SUCCESS;
+            }
+        });
+
+        consumer.start();
+        System.out.printf("Consumer Started.%n");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Producer.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Producer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Producer.java
new file mode 100644
index 0000000..609aa62
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Producer.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.example.ordermessage;
+
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
+import com.alibaba.rocketmq.client.producer.MQProducer;
+import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
+import com.alibaba.rocketmq.client.producer.SendResult;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+public class Producer {
+    public static void main(String[] args) throws UnsupportedEncodingException {
+        try {
+            MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
+            producer.start();
+
+            String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
+            for (int i = 0; i < 100; i++) {
+                int orderId = i % 10;
+                Message msg =
+                        new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
+                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
+                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
+                    @Override
+                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                        Integer id = (Integer) arg;
+                        int index = id % mqs.size();
+                        return mqs.get(index);
+                    }
+                }, orderId);
+
+                System.out.printf("%s%n", sendResult);
+            }
+
+            producer.shutdown();
+        } catch (MQClientException e) {
+            e.printStackTrace();
+        } catch (RemotingException e) {
+            e.printStackTrace();
+        } catch (MQBrokerException e) {
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/quickstart/Consumer.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/quickstart/Consumer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/quickstart/Consumer.java
new file mode 100644
index 0000000..adac497
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/quickstart/Consumer.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.example.quickstart;
+
+import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
+import com.alibaba.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+public class Consumer {
+
+    public static void main(String[] args) throws InterruptedException, MQClientException {
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
+
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+
+        consumer.subscribe("TopicTest", "*");
+
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                            ConsumeConcurrentlyContext context) {
+                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+
+        consumer.start();
+        System.out.printf("Consumer Started.%n");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/quickstart/Producer.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/quickstart/Producer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/quickstart/Producer.java
new file mode 100644
index 0000000..fb5dbea
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/quickstart/Producer.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.example.quickstart;
+
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
+import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
+import com.alibaba.rocketmq.client.producer.LocalTransactionState;
+import com.alibaba.rocketmq.client.producer.SendResult;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+
+public class Producer {
+    public static void main(String[] args) throws MQClientException, InterruptedException {
+        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
+        producer.start();
+
+        for (int i = 0; i < 1000; i++) {
+            try {
+                Message msg = new Message("TopicTest",
+                        "TagA",
+                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
+                );
+                SendResult sendResult = producer.send(msg);
+                LocalTransactionExecuter tranExecuter = new LocalTransactionExecuter() {
+                    @Override
+                    public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
+                        return null;
+                    }
+                };
+                System.out.printf("%s%n", sendResult);
+            } catch (Exception e) {
+                e.printStackTrace();
+                Thread.sleep(1000);
+            }
+        }
+        producer.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/AsyncProducer.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/AsyncProducer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/AsyncProducer.java
new file mode 100644
index 0000000..1a8f07e
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/AsyncProducer.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.example.simple;
+
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
+import com.alibaba.rocketmq.client.producer.SendCallback;
+import com.alibaba.rocketmq.client.producer.SendResult;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+
+import java.io.UnsupportedEncodingException;
+
+
+public class AsyncProducer {
+    public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
+
+        DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
+        producer.start();
+        producer.setRetryTimesWhenSendAsyncFailed(0);
+
+        for (int i = 0; i < 10000000; i++) {
+            try {
+                final int index = i;
+                Message msg = new Message("Jodie_topic_1023",
+                        "TagA",
+                        "OrderID188",
+                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+                producer.send(msg, new SendCallback() {
+                    @Override
+                    public void onSuccess(SendResult sendResult) {
+                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
+                    }
+
+                    @Override
+                    public void onException(Throwable e) {
+                        System.out.printf("%-10d Exception %s %n", index, e);
+                        e.printStackTrace();
+                    }
+                });
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+        producer.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/CachedQueue.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/CachedQueue.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/CachedQueue.java
new file mode 100644
index 0000000..7beb064
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/CachedQueue.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.example.simple;
+
+import com.alibaba.rocketmq.common.message.MessageExt;
+
+import java.util.TreeMap;
+
+
+public class CachedQueue {
+    private final TreeMap<Long, MessageExt> msgCachedTable = new TreeMap<Long, MessageExt>();
+
+
+    public TreeMap<Long, MessageExt> getMsgCachedTable() {
+        return msgCachedTable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/Producer.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/Producer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/Producer.java
new file mode 100644
index 0000000..e0010d4
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/Producer.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.example.simple;
+
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
+import com.alibaba.rocketmq.client.producer.SendResult;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+
+
+public class Producer {
+    public static void main(String[] args) throws MQClientException, InterruptedException {
+
+        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
+
+        producer.start();
+
+        for (int i = 0; i < 10000000; i++)
+            try {
+                {
+                    Message msg = new Message("TopicTest",
+                            "TagA",
+                            "OrderID188",
+                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+                    SendResult sendResult = producer.send(msg);
+                    System.out.printf("%s%n", sendResult);
+                }
+
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+        producer.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumer.java
----------------------------------------------------------------------
diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumer.java
new file mode 100644
index 0000000..6245769
--- /dev/null
+++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumer.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.example.simple;
+
+import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
+import com.alibaba.rocketmq.client.consumer.PullResult;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class PullConsumer {
+    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
+
+
+    public static void main(String[] args) throws MQClientException {
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
+
+        consumer.start();
+
+        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
+        for (MessageQueue mq : mqs) {
+            System.out.printf("Consume from the queue: " + mq + "%n");
+            SINGLE_MQ:
+            while (true) {
+                try {
+                    PullResult pullResult =
+                            consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
+                    System.out.printf("%s%n", pullResult);
+                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
+                    switch (pullResult.getPullStatus()) {
+                        case FOUND:
+                            break;
+                        case NO_MATCHED_MSG:
+                            break;
+                        case NO_NEW_MSG:
+                            break SINGLE_MQ;
+                        case OFFSET_ILLEGAL:
+                            break;
+                        default:
+                            break;
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+        consumer.shutdown();
+    }
+
+    private static long getMessageQueueOffset(MessageQueue mq) {
+        Long offset = OFFSE_TABLE.get(mq);
+        if (offset != null)
+            return offset;
+
+        return 0;
+    }
+
+    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
+        OFFSE_TABLE.put(mq, offset);
+    }
+
+}



Mime
View raw message