From commits-return-19-apmail-rocketmq-commits-archive=rocketmq.apache.org@rocketmq.incubator.apache.org Mon Dec 19 09:40:41 2016 Return-Path: X-Original-To: apmail-rocketmq-commits-archive@minotaur.apache.org Delivered-To: apmail-rocketmq-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1F5C0198E6 for ; Mon, 19 Dec 2016 09:40:41 +0000 (UTC) Received: (qmail 9046 invoked by uid 500); 19 Dec 2016 09:40:41 -0000 Delivered-To: apmail-rocketmq-commits-archive@rocketmq.apache.org Received: (qmail 9017 invoked by uid 500); 19 Dec 2016 09:40:41 -0000 Mailing-List: contact commits-help@rocketmq.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@rocketmq.incubator.apache.org Delivered-To: mailing list commits@rocketmq.incubator.apache.org Received: (qmail 9004 invoked by uid 99); 19 Dec 2016 09:40:41 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Dec 2016 09:40:40 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 5AC9DC03A0 for ; Mon, 19 Dec 2016 09:40:40 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id fR-QjdvyYD2G for ; Mon, 19 Dec 2016 09:40:31 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 52D0E5FDC7 for ; Mon, 19 Dec 2016 09:40:20 +0000 (UTC) Received: (qmail 7687 invoked by uid 99); 19 Dec 2016 09:40:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Dec 2016 09:40:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5DCC5DFBAD; Mon, 19 Dec 2016 09:40:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yukon@apache.org To: commits@rocketmq.incubator.apache.org Date: Mon, 19 Dec 2016 09:40:35 -0000 Message-Id: <20473ca806624c678af31b01bcc9c30f@git.apache.org> In-Reply-To: <3e2dd9def9aa4fd7a6ce213370431713@git.apache.org> References: <3e2dd9def9aa4fd7a6ce213370431713@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [18/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/UtilAllTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/UtilAllTest.java b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/UtilAllTest.java new file mode 100644 index 0000000..7764fcc --- /dev/null +++ b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/UtilAllTest.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common; + +import org.junit.Test; + +import java.net.URL; +import java.util.Properties; + +import static org.junit.Assert.assertTrue; + + +public class UtilAllTest { + + @Test + public void test_currentStackTrace() { + System.out.println(UtilAll.currentStackTrace()); + } + + + @Test + public void test_a() { + URL url = this.getClass().getProtectionDomain().getCodeSource().getLocation(); + System.out.println(url); + System.out.println(url.getPath()); + } + + + @Test + public void test_resetClassProperties() { + DemoConfig demoConfig = new DemoConfig(); + MixAll.properties2Object(new Properties(), demoConfig); + } + + + @Test + public void test_properties2String() { + DemoConfig demoConfig = new DemoConfig(); + Properties properties = MixAll.object2Properties(demoConfig); + System.out.println(MixAll.properties2String(properties)); + } + + + @Test + public void test_timeMillisToHumanString() { + System.out.println(UtilAll.timeMillisToHumanString()); + } + + + @Test + public void test_isPropertiesEqual() { + final Properties p1 = new Properties(); + final Properties p2 = new Properties(); + + p1.setProperty("a", "1"); + p1.setProperty("b", "2"); + + p2.setProperty("a", "1"); + p2.setProperty("b", "2"); + // p2.setProperty("c", "3"); + + assertTrue(MixAll.isPropertiesEqual(p1, p2)); + } + + + @Test + public void test_getpid() { + int pid = UtilAll.getPid(); + + System.out.println("PID = " + pid); + assertTrue(pid > 0); + } + + + @Test + public void test_isBlank() { + { + boolean result = UtilAll.isBlank("Hello "); + assertTrue(!result); + } + + { + boolean result = UtilAll.isBlank(" Hello"); + assertTrue(!result); + } + + { + boolean result = UtilAll.isBlank("He llo"); + assertTrue(!result); + } + + { + boolean result = UtilAll.isBlank(" "); + assertTrue(result); + } + + { + boolean result = UtilAll.isBlank("Hello"); + assertTrue(!result); + } + } + + static class DemoConfig { + private int demoWidth = 0; + private int demoLength = 0; + private boolean demoOK = false; + private String demoName = "haha"; + + + public int getDemoWidth() { + return demoWidth; + } + + + public void setDemoWidth(int demoWidth) { + this.demoWidth = demoWidth; + } + + + public int getDemoLength() { + return demoLength; + } + + + public void setDemoLength(int demoLength) { + this.demoLength = demoLength; + } + + + public boolean isDemoOK() { + return demoOK; + } + + + public void setDemoOK(boolean demoOK) { + this.demoOK = demoOK; + } + + + public String getDemoName() { + return demoName; + } + + + public void setDemoNfieldame(String demoName) { + this.demoName = demoName; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/filter/FilterAPITest.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/filter/FilterAPITest.java b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/filter/FilterAPITest.java new file mode 100644 index 0000000..e45873b --- /dev/null +++ b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/filter/FilterAPITest.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.filter; + +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; +import org.junit.Test; + + +/** + * @author shijia.wxr + * + */ +public class FilterAPITest { + + @Test + public void testBuildSubscriptionData() throws Exception { + SubscriptionData subscriptionData = + FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3"); + System.out.println(subscriptionData); + } + + @Test + public void testSubscriptionData() throws Exception { + SubscriptionData subscriptionData = + FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3"); + subscriptionData.setFilterClassSource("java hello"); + String json = RemotingSerializable.toJson(subscriptionData, true); + System.out.println(json); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/filter/PolishExprTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/filter/PolishExprTest.java b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/filter/PolishExprTest.java new file mode 100644 index 0000000..612df69 --- /dev/null +++ b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/filter/PolishExprTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.filter; + +import com.alibaba.rocketmq.common.filter.impl.Op; +import com.alibaba.rocketmq.common.filter.impl.PolishExpr; +import junit.framework.Assert; +import org.junit.Test; + +import java.util.List; + + +/** + * @author lansheng.zj + */ +public class PolishExprTest { + + private String expression = "tag1||(tag2&&tag3)&&tag4||tag5&&(tag6 && tag7)|| tag8 && tag9"; + private PolishExpr polishExpr; + + + public void init() { + polishExpr = new PolishExpr(); + } + + + @Test + public void testReversePolish() { + List antiPolishExpression = polishExpr.reversePolish(expression); + System.out.println(antiPolishExpression); + } + + + @Test + public void testReversePolish_Performance() { + // prepare + for (int i = 0; i < 100000; i++) { + polishExpr.reversePolish(expression); + } + + long start = System.currentTimeMillis(); + for (int i = 0; i < 100000; i++) { + polishExpr.reversePolish(expression); + } + long cost = System.currentTimeMillis() - start; + System.out.println(cost); + // System.out.println(cost / 100000F); + + Assert.assertTrue(cost < 500); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/protocol/ConsumeStatusTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/protocol/ConsumeStatusTest.java b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/protocol/ConsumeStatusTest.java new file mode 100644 index 0000000..32e3d98 --- /dev/null +++ b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/protocol/ConsumeStatusTest.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol; + +import com.alibaba.rocketmq.common.protocol.body.ConsumeStatus; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; +import org.junit.Test; + + +public class ConsumeStatusTest { + + @Test + public void decode_test() throws Exception { + ConsumeStatus cs = new ConsumeStatus(); + cs.setConsumeFailedTPS(0L); + String json = RemotingSerializable.toJson(cs, true); + System.out.println(json); + ConsumeStatus fromJson = RemotingSerializable.fromJson(json, ConsumeStatus.class); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/protocol/MQProtosHelperTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/protocol/MQProtosHelperTest.java b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/protocol/MQProtosHelperTest.java new file mode 100644 index 0000000..749e7df --- /dev/null +++ b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/protocol/MQProtosHelperTest.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.common.protocol; + +/** + * @author shijia.wxr + */ +public class MQProtosHelperTest { + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/pom.xml ---------------------------------------------------------------------- diff --git a/rocketmq-example/pom.xml b/rocketmq-example/pom.xml new file mode 100644 index 0000000..8e68a58 --- /dev/null +++ b/rocketmq-example/pom.xml @@ -0,0 +1,58 @@ + + + + + com.alibaba.rocketmq + rocketmq-all + 4.0.0-SNAPSHOT + + + 4.0.0 + jar + rocketmq-example + rocketmq-example ${project.version} + + + + junit + junit + test + + + ${project.groupId} + rocketmq-client + + + ${project.groupId} + rocketmq-srvutil + + + ch.qos.logback + logback-classic + + + ch.qos.logback + logback-core + + + org.javassist + javassist + + + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/Consumer.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/Consumer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/Consumer.java new file mode 100644 index 0000000..7150513 --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/Consumer.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.example.benchmark; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.srvutil.ServerUtil; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; + +import java.util.LinkedList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicLong; + +public class Consumer { + + public static void main(String[] args) throws MQClientException { + Options options = ServerUtil.buildCommandlineOptions(new Options()); + CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkConsumer", args, buildCommandlineOptions(options), new PosixParser()); + if (null == commandLine) { + System.exit(-1); + } + + final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest"; + final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim() : "benchmark_consumer"; + final String isPrefixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true"; + String group = groupPrefix; + if (Boolean.parseBoolean(isPrefixEnable)) { + group = groupPrefix + "_" + Long.toString(System.currentTimeMillis() % 100); + } + + System.out.printf("topic %s group %s prefix %s%n", topic, group, isPrefixEnable); + + final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer(); + + final Timer timer = new Timer("BenchmarkTimerThread", true); + + final LinkedList snapshotList = new LinkedList(); + + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + snapshotList.addLast(statsBenchmarkConsumer.createSnapshot()); + if (snapshotList.size() > 10) { + snapshotList.removeFirst(); + } + } + }, 1000, 1000); + + timer.scheduleAtFixedRate(new TimerTask() { + private void printStats() { + if (snapshotList.size() >= 10) { + Long[] begin = snapshotList.getFirst(); + Long[] end = snapshotList.getLast(); + + final long consumeTps = + (long) (((end[1] - begin[1]) / (double) (end[0] - begin[0])) * 1000L); + final double averageB2CRT = (end[2] - begin[2]) / (double) (end[1] - begin[1]); + final double averageS2CRT = (end[3] - begin[3]) / (double) (end[1] - begin[1]); + + System.out.printf("Consume TPS: %d Average(B2C) RT: %7.3f Average(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n", + consumeTps, averageB2CRT, averageS2CRT, end[4], end[5] + ); + } + } + + + @Override + public void run() { + try { + this.printStats(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }, 10000, 10000); + + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); + consumer.setInstanceName(Long.toString(System.currentTimeMillis())); + + consumer.subscribe(topic, "*"); + + consumer.registerMessageListener(new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + MessageExt msg = msgs.get(0); + long now = System.currentTimeMillis(); + + statsBenchmarkConsumer.getReceiveMessageTotalCount().incrementAndGet(); + + long born2ConsumerRT = now - msg.getBornTimestamp(); + statsBenchmarkConsumer.getBorn2ConsumerTotalRT().addAndGet(born2ConsumerRT); + + long store2ConsumerRT = now - msg.getStoreTimestamp(); + statsBenchmarkConsumer.getStore2ConsumerTotalRT().addAndGet(store2ConsumerRT); + + compareAndSetMax(statsBenchmarkConsumer.getBorn2ConsumerMaxRT(), born2ConsumerRT); + + compareAndSetMax(statsBenchmarkConsumer.getStore2ConsumerMaxRT(), store2ConsumerRT); + + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + + consumer.start(); + + System.out.printf("Consumer Started.%n"); + } + + public static Options buildCommandlineOptions(final Options options) { + Option opt = new Option("t", "topic", true, "Topic name, Default: BenchmarkTest"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("g", "group", true, "Consumer group name, Default: benchmark_consumer"); + opt.setRequired(false); + options.addOption(opt); + + + opt = new Option("p", "group prefix enable", true, "Consumer group name, Default: false"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + + public static void compareAndSetMax(final AtomicLong target, final long value) { + long prev = target.get(); + while (value > prev) { + boolean updated = target.compareAndSet(prev, value); + if (updated) + break; + + prev = target.get(); + } + } +} + + +class StatsBenchmarkConsumer { + private final AtomicLong receiveMessageTotalCount = new AtomicLong(0L); + + private final AtomicLong born2ConsumerTotalRT = new AtomicLong(0L); + + private final AtomicLong store2ConsumerTotalRT = new AtomicLong(0L); + + private final AtomicLong born2ConsumerMaxRT = new AtomicLong(0L); + + private final AtomicLong store2ConsumerMaxRT = new AtomicLong(0L); + + + public Long[] createSnapshot() { + Long[] snap = new Long[]{ + System.currentTimeMillis(), + this.receiveMessageTotalCount.get(), + this.born2ConsumerTotalRT.get(), + this.store2ConsumerTotalRT.get(), + this.born2ConsumerMaxRT.get(), + this.store2ConsumerMaxRT.get(), + }; + + return snap; + } + + + public AtomicLong getReceiveMessageTotalCount() { + return receiveMessageTotalCount; + } + + + public AtomicLong getBorn2ConsumerTotalRT() { + return born2ConsumerTotalRT; + } + + + public AtomicLong getStore2ConsumerTotalRT() { + return store2ConsumerTotalRT; + } + + + public AtomicLong getBorn2ConsumerMaxRT() { + return born2ConsumerMaxRT; + } + + + public AtomicLong getStore2ConsumerMaxRT() { + return store2ConsumerMaxRT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/Producer.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/Producer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/Producer.java new file mode 100644 index 0000000..b0351c6 --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/Producer.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.example.benchmark; + +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.client.log.ClientLogger; +import com.alibaba.rocketmq.client.producer.DefaultMQProducer; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.exception.RemotingException; +import com.alibaba.rocketmq.srvutil.ServerUtil; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.slf4j.Logger; + +import java.io.UnsupportedEncodingException; +import java.util.LinkedList; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + +public class Producer { + public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { + + Options options = ServerUtil.buildCommandlineOptions(new Options()); + CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkProducer", args, buildCommandlineOptions(options), new PosixParser()); + if (null == commandLine) { + System.exit(-1); + } + + final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest"; + final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64; + final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128; + final boolean keyEnable = commandLine.hasOption('k') ? Boolean.parseBoolean(commandLine.getOptionValue('k')) : false; + + System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s%n", topic, threadCount, messageSize, keyEnable); + + final Logger log = ClientLogger.getLog(); + + final Message msg = buildMessage(messageSize, topic); + + final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount); + + final StatsBenchmarkProducer statsBenchmark = new StatsBenchmarkProducer(); + + final Timer timer = new Timer("BenchmarkTimerThread", true); + + final LinkedList snapshotList = new LinkedList(); + + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + snapshotList.addLast(statsBenchmark.createSnapshot()); + if (snapshotList.size() > 10) { + snapshotList.removeFirst(); + } + } + }, 1000, 1000); + + timer.scheduleAtFixedRate(new TimerTask() { + private void printStats() { + if (snapshotList.size() >= 10) { + Long[] begin = snapshotList.getFirst(); + Long[] end = snapshotList.getLast(); + + final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L); + final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]); + + System.out.printf("Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d%n", + sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]); + } + } + + + @Override + public void run() { + try { + this.printStats(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }, 10000, 10000); + + final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer"); + producer.setInstanceName(Long.toString(System.currentTimeMillis())); + + if (commandLine.hasOption('n')) { + String ns = commandLine.getOptionValue('n'); + producer.setNamesrvAddr(ns); + } + + producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE); + + producer.start(); + + for (int i = 0; i < threadCount; i++) { + sendThreadPool.execute(new Runnable() { + @Override + public void run() { + while (true) { + try { + final long beginTimestamp = System.currentTimeMillis(); + if (keyEnable) { + msg.setKeys(String.valueOf(beginTimestamp / 1000)); + } + producer.send(msg); + statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); + statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet(); + final long currentRT = System.currentTimeMillis() - beginTimestamp; + statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT); + long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); + while (currentRT > prevMaxRT) { + boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT); + if (updated) + break; + + prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); + } + } catch (RemotingException e) { + statsBenchmark.getSendRequestFailedCount().incrementAndGet(); + log.error("[BENCHMARK_PRODUCER] Send Exception", e); + + try { + Thread.sleep(3000); + } catch (InterruptedException e1) { + } + } catch (InterruptedException e) { + statsBenchmark.getSendRequestFailedCount().incrementAndGet(); + try { + Thread.sleep(3000); + } catch (InterruptedException e1) { + } + } catch (MQClientException e) { + statsBenchmark.getSendRequestFailedCount().incrementAndGet(); + log.error("[BENCHMARK_PRODUCER] Send Exception", e); + } catch (MQBrokerException e) { + statsBenchmark.getReceiveResponseFailedCount().incrementAndGet(); + log.error("[BENCHMARK_PRODUCER] Send Exception", e); + try { + Thread.sleep(3000); + } catch (InterruptedException e1) { + } + } + } + } + }); + } + } + + public static Options buildCommandlineOptions(final Options options) { + Option opt = new Option("w", "threadCount", true, "Thread count, Default: 64"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("s", "messageSize", true, "Message Size, Default: 128"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("k", "keyEnable", true, "Message Key Enable, Default: false"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("t", "topic", true, "Topic name, Default: BenchmarkTest"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + private static Message buildMessage(final int messageSize, final String topic) throws UnsupportedEncodingException { + Message msg = new Message(); + msg.setTopic(topic); + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < messageSize; i += 10) { + sb.append("hello baby"); + } + + msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); + + return msg; + } +} + + +class StatsBenchmarkProducer { + private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L); + + private final AtomicLong sendRequestFailedCount = new AtomicLong(0L); + + private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L); + + private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L); + + private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L); + + private final AtomicLong sendMessageMaxRT = new AtomicLong(0L); + + + public Long[] createSnapshot() { + Long[] snap = new Long[]{ + System.currentTimeMillis(), + this.sendRequestSuccessCount.get(), + this.sendRequestFailedCount.get(), + this.receiveResponseSuccessCount.get(), + this.receiveResponseFailedCount.get(), + this.sendMessageSuccessTimeTotal.get(), + }; + + return snap; + } + + + public AtomicLong getSendRequestSuccessCount() { + return sendRequestSuccessCount; + } + + + public AtomicLong getSendRequestFailedCount() { + return sendRequestFailedCount; + } + + + public AtomicLong getReceiveResponseSuccessCount() { + return receiveResponseSuccessCount; + } + + + public AtomicLong getReceiveResponseFailedCount() { + return receiveResponseFailedCount; + } + + + public AtomicLong getSendMessageSuccessTimeTotal() { + return sendMessageSuccessTimeTotal; + } + + + public AtomicLong getSendMessageMaxRT() { + return sendMessageMaxRT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/TransactionProducer.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/TransactionProducer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/TransactionProducer.java new file mode 100644 index 0000000..3dffd2f --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/benchmark/TransactionProducer.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.example.benchmark; + +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.client.producer.*; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; + +import java.io.UnsupportedEncodingException; +import java.util.LinkedList; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + +public class TransactionProducer { + private static int threadCount; + private static int messageSize; + private static boolean ischeck; + private static boolean ischeckffalse; + + + public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { + threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32; + messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2; + ischeck = args.length >= 3 ? Boolean.parseBoolean(args[2]) : false; + ischeckffalse = args.length >= 4 ? Boolean.parseBoolean(args[3]) : false; + + final Message msg = buildMessage(messageSize); + + final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount); + + final StatsBenchmarkTProducer statsBenchmark = new StatsBenchmarkTProducer(); + + final Timer timer = new Timer("BenchmarkTimerThread", true); + + final LinkedList snapshotList = new LinkedList(); + + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + snapshotList.addLast(statsBenchmark.createSnapshot()); + while (snapshotList.size() > 10) { + snapshotList.removeFirst(); + } + } + }, 1000, 1000); + + timer.scheduleAtFixedRate(new TimerTask() { + private void printStats() { + if (snapshotList.size() >= 10) { + Long[] begin = snapshotList.getFirst(); + Long[] end = snapshotList.getLast(); + + final long sendTps = + (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L); + final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]); + + System.out.printf( + "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n", + sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4], end[6]); + } + } + + + @Override + public void run() { + try { + this.printStats(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }, 10000, 10000); + + final TransactionCheckListener transactionCheckListener = + new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark); + final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer"); + producer.setInstanceName(Long.toString(System.currentTimeMillis())); + producer.setTransactionCheckListener(transactionCheckListener); + producer.setDefaultTopicQueueNums(1000); + producer.start(); + + final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck); + + for (int i = 0; i < threadCount; i++) { + sendThreadPool.execute(new Runnable() { + @Override + public void run() { + while (true) { + try { + // Thread.sleep(1000); + final long beginTimestamp = System.currentTimeMillis(); + SendResult sendResult = + producer.sendMessageInTransaction(msg, tranExecuter, null); + if (sendResult != null) { + statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); + statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet(); + } + + final long currentRT = System.currentTimeMillis() - beginTimestamp; + statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT); + long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); + while (currentRT > prevMaxRT) { + boolean updated = + statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, + currentRT); + if (updated) + break; + + prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); + } + } catch (MQClientException e) { + statsBenchmark.getSendRequestFailedCount().incrementAndGet(); + } + } + } + }); + } + } + + + private static Message buildMessage(final int messageSize) throws UnsupportedEncodingException { + Message msg = new Message(); + msg.setTopic("BenchmarkTest"); + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < messageSize; i += 10) { + sb.append("hello baby"); + } + + msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); + + return msg; + } +} + + +class TransactionExecuterBImpl implements LocalTransactionExecuter { + + private boolean ischeck; + + + public TransactionExecuterBImpl(boolean ischeck) { + this.ischeck = ischeck; + } + + + @Override + public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { + if (ischeck) { + return LocalTransactionState.UNKNOW; + } + return LocalTransactionState.COMMIT_MESSAGE; + } +} + + +class TransactionCheckListenerBImpl implements TransactionCheckListener { + private boolean ischeckffalse; + private StatsBenchmarkTProducer statsBenchmarkTProducer; + + + public TransactionCheckListenerBImpl(boolean ischeckffalse, + StatsBenchmarkTProducer statsBenchmarkTProducer) { + this.ischeckffalse = ischeckffalse; + this.statsBenchmarkTProducer = statsBenchmarkTProducer; + } + + + @Override + public LocalTransactionState checkLocalTransactionState(MessageExt msg) { + statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet(); + if (ischeckffalse) { + + return LocalTransactionState.ROLLBACK_MESSAGE; + } + + return LocalTransactionState.COMMIT_MESSAGE; + } +} + + +class StatsBenchmarkTProducer { + private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L); + + private final AtomicLong sendRequestFailedCount = new AtomicLong(0L); + + private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L); + + private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L); + + private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L); + + private final AtomicLong sendMessageMaxRT = new AtomicLong(0L); + + private final AtomicLong checkRequestSuccessCount = new AtomicLong(0L); + + + public Long[] createSnapshot() { + Long[] snap = new Long[]{ + System.currentTimeMillis(), + this.sendRequestSuccessCount.get(), + this.sendRequestFailedCount.get(), + this.receiveResponseSuccessCount.get(), + this.receiveResponseFailedCount.get(), + this.sendMessageSuccessTimeTotal.get(), + this.checkRequestSuccessCount.get()}; + + return snap; + } + + + public AtomicLong getSendRequestSuccessCount() { + return sendRequestSuccessCount; + } + + + public AtomicLong getSendRequestFailedCount() { + return sendRequestFailedCount; + } + + + public AtomicLong getReceiveResponseSuccessCount() { + return receiveResponseSuccessCount; + } + + + public AtomicLong getReceiveResponseFailedCount() { + return receiveResponseFailedCount; + } + + + public AtomicLong getSendMessageSuccessTimeTotal() { + return sendMessageSuccessTimeTotal; + } + + + public AtomicLong getSendMessageMaxRT() { + return sendMessageMaxRT; + } + + + public AtomicLong getCheckRequestSuccessCount() { + return checkRequestSuccessCount; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/broadcast/PushConsumer.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/broadcast/PushConsumer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/broadcast/PushConsumer.java new file mode 100644 index 0000000..6cc6238 --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/broadcast/PushConsumer.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.example.broadcast; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; + +import java.util.List; + +public class PushConsumer { + + public static void main(String[] args) throws InterruptedException, MQClientException { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1"); + + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + + consumer.setMessageModel(MessageModel.BROADCASTING); + + consumer.subscribe("TopicTest", "TagA || TagC || TagD"); + + consumer.registerMessageListener(new MessageListenerConcurrently() { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + + consumer.start(); + System.out.printf("Broadcast Consumer Started.%n"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/filter/Consumer.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/filter/Consumer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/filter/Consumer.java new file mode 100644 index 0000000..104e6d9 --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/filter/Consumer.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.example.filter; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.message.MessageExt; + +import java.util.List; + + +public class Consumer { + + public static void main(String[] args) throws InterruptedException, MQClientException { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4"); + + String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java"); + consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl", + filterCode); + + consumer.registerMessageListener(new MessageListenerConcurrently() { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + + consumer.start(); + + System.out.printf("Consumer Started.%n"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/filter/Producer.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/filter/Producer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/filter/Producer.java new file mode 100644 index 0000000..04251fa --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/filter/Producer.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.example.filter; + +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.client.producer.DefaultMQProducer; +import com.alibaba.rocketmq.client.producer.SendResult; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; + +public class Producer { + public static void main(String[] args) throws MQClientException, InterruptedException { + DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); + producer.start(); + + try { + for (int i = 0; i < 6000000; i++) { + Message msg = new Message("TopicFilter7", + "TagA", + "OrderID001", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + + msg.putUserProperty("SequenceId", String.valueOf(i)); + SendResult sendResult = producer.send(msg); + System.out.printf("%s%n", sendResult); + } + } catch (Exception e) { + e.printStackTrace(); + } + producer.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/operation/Consumer.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/operation/Consumer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/operation/Consumer.java new file mode 100644 index 0000000..f6ba067 --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/operation/Consumer.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.example.operation; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.message.MessageExt; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + + +public class Consumer { + + public static void main(String[] args) throws InterruptedException, MQClientException { + CommandLine commandLine = buildCommandline(args); + if (commandLine != null) { + String group = commandLine.getOptionValue('g'); + String topic = commandLine.getOptionValue('t'); + String subscription = commandLine.getOptionValue('s'); + final String returnFailedHalf = commandLine.getOptionValue('f'); + + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); + consumer.setInstanceName(Long.toString(System.currentTimeMillis())); + + consumer.subscribe(topic, subscription); + + consumer.registerMessageListener(new MessageListenerConcurrently() { + AtomicLong consumeTimes = new AtomicLong(0); + + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + long currentTimes = this.consumeTimes.incrementAndGet(); + System.out.printf("%-8d %s%n", currentTimes, msgs); + if (Boolean.parseBoolean(returnFailedHalf)) { + if ((currentTimes % 2) == 0) { + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + + consumer.start(); + + System.out.printf("Consumer Started.%n"); + } + } + + public static CommandLine buildCommandline(String[] args) { + final Options options = new Options(); + Option opt = new Option("h", "help", false, "Print help"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("g", "consumerGroup", true, "Consumer Group Name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("t", "topic", true, "Topic Name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("s", "subscription", true, "subscription"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("f", "returnFailedHalf", true, "return failed result, for half message"); + opt.setRequired(true); + options.addOption(opt); + + PosixParser parser = new PosixParser(); + HelpFormatter hf = new HelpFormatter(); + hf.setWidth(110); + CommandLine commandLine = null; + try { + commandLine = parser.parse(options, args); + if (commandLine.hasOption('h')) { + hf.printHelp("producer", options, true); + return null; + } + } catch (ParseException e) { + hf.printHelp("producer", options, true); + return null; + } + + return commandLine; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/operation/Producer.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/operation/Producer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/operation/Producer.java new file mode 100644 index 0000000..816e3e8 --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/operation/Producer.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.example.operation; + +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.client.producer.DefaultMQProducer; +import com.alibaba.rocketmq.client.producer.SendResult; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import org.apache.commons.cli.*; + +public class Producer { + + public static void main(String[] args) throws MQClientException, InterruptedException { + CommandLine commandLine = buildCommandline(args); + if (commandLine != null) { + String group = commandLine.getOptionValue('g'); + String topic = commandLine.getOptionValue('t'); + String tags = commandLine.getOptionValue('a'); + String keys = commandLine.getOptionValue('k'); + String msgCount = commandLine.getOptionValue('c'); + + DefaultMQProducer producer = new DefaultMQProducer(group); + producer.setInstanceName(Long.toString(System.currentTimeMillis())); + + producer.start(); + + for (int i = 0; i < Integer.parseInt(msgCount); i++) { + try { + Message msg = new Message( + topic, + tags, + keys, + ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); + SendResult sendResult = producer.send(msg); + System.out.printf("%-8d %s%n", i, sendResult); + } catch (Exception e) { + e.printStackTrace(); + Thread.sleep(1000); + } + } + + producer.shutdown(); + } + } + + public static CommandLine buildCommandline(String[] args) { + final Options options = new Options(); + Option opt = new Option("h", "help", false, "Print help"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("g", "producerGroup", true, "Producer Group Name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("t", "topic", true, "Topic Name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("a", "tags", true, "Tags Name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("k", "keys", true, "Keys Name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("c", "msgCount", true, "Message Count"); + opt.setRequired(true); + options.addOption(opt); + + PosixParser parser = new PosixParser(); + HelpFormatter hf = new HelpFormatter(); + hf.setWidth(110); + CommandLine commandLine = null; + try { + commandLine = parser.parse(options, args); + if (commandLine.hasOption('h')) { + hf.printHelp("producer", options, true); + return null; + } + } catch (ParseException e) { + hf.printHelp("producer", options, true); + return null; + } + + return commandLine; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Consumer.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Consumer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Consumer.java new file mode 100644 index 0000000..7b5f657 --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Consumer.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.example.ordermessage; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; +import com.alibaba.rocketmq.common.message.MessageExt; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + + +public class Consumer { + + public static void main(String[] args) throws MQClientException { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); + + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + + consumer.subscribe("TopicTest", "TagA || TagC || TagD"); + + consumer.registerMessageListener(new MessageListenerOrderly() { + AtomicLong consumeTimes = new AtomicLong(0); + + @Override + public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { + context.setAutoCommit(false); + System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); + this.consumeTimes.incrementAndGet(); + if ((this.consumeTimes.get() % 2) == 0) { + return ConsumeOrderlyStatus.SUCCESS; + } else if ((this.consumeTimes.get() % 3) == 0) { + return ConsumeOrderlyStatus.ROLLBACK; + } else if ((this.consumeTimes.get() % 4) == 0) { + return ConsumeOrderlyStatus.COMMIT; + } else if ((this.consumeTimes.get() % 5) == 0) { + context.setSuspendCurrentQueueTimeMillis(3000); + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + } + + return ConsumeOrderlyStatus.SUCCESS; + } + }); + + consumer.start(); + System.out.printf("Consumer Started.%n"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Producer.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Producer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Producer.java new file mode 100644 index 0000000..609aa62 --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Producer.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.example.ordermessage; + +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.client.producer.DefaultMQProducer; +import com.alibaba.rocketmq.client.producer.MQProducer; +import com.alibaba.rocketmq.client.producer.MessageQueueSelector; +import com.alibaba.rocketmq.client.producer.SendResult; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.exception.RemotingException; + +import java.io.UnsupportedEncodingException; +import java.util.List; + +public class Producer { + public static void main(String[] args) throws UnsupportedEncodingException { + try { + MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); + producer.start(); + + String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; + for (int i = 0; i < 100; i++) { + int orderId = i % 10; + Message msg = + new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, + ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); + SendResult sendResult = producer.send(msg, new MessageQueueSelector() { + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + Integer id = (Integer) arg; + int index = id % mqs.size(); + return mqs.get(index); + } + }, orderId); + + System.out.printf("%s%n", sendResult); + } + + producer.shutdown(); + } catch (MQClientException e) { + e.printStackTrace(); + } catch (RemotingException e) { + e.printStackTrace(); + } catch (MQBrokerException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/quickstart/Consumer.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/quickstart/Consumer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/quickstart/Consumer.java new file mode 100644 index 0000000..adac497 --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/quickstart/Consumer.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.example.quickstart; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; +import com.alibaba.rocketmq.common.message.MessageExt; + +import java.util.List; + +public class Consumer { + + public static void main(String[] args) throws InterruptedException, MQClientException { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); + + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + + consumer.subscribe("TopicTest", "*"); + + consumer.registerMessageListener(new MessageListenerConcurrently() { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + + consumer.start(); + System.out.printf("Consumer Started.%n"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/quickstart/Producer.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/quickstart/Producer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/quickstart/Producer.java new file mode 100644 index 0000000..fb5dbea --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/quickstart/Producer.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.example.quickstart; + +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.client.producer.DefaultMQProducer; +import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; +import com.alibaba.rocketmq.client.producer.LocalTransactionState; +import com.alibaba.rocketmq.client.producer.SendResult; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; + +public class Producer { + public static void main(String[] args) throws MQClientException, InterruptedException { + DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); + producer.start(); + + for (int i = 0; i < 1000; i++) { + try { + Message msg = new Message("TopicTest", + "TagA", + ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) + ); + SendResult sendResult = producer.send(msg); + LocalTransactionExecuter tranExecuter = new LocalTransactionExecuter() { + @Override + public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { + return null; + } + }; + System.out.printf("%s%n", sendResult); + } catch (Exception e) { + e.printStackTrace(); + Thread.sleep(1000); + } + } + producer.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/AsyncProducer.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/AsyncProducer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/AsyncProducer.java new file mode 100644 index 0000000..1a8f07e --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/AsyncProducer.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.example.simple; + +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.client.producer.DefaultMQProducer; +import com.alibaba.rocketmq.client.producer.SendCallback; +import com.alibaba.rocketmq.client.producer.SendResult; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; + +import java.io.UnsupportedEncodingException; + + +public class AsyncProducer { + public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException { + + DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test"); + producer.start(); + producer.setRetryTimesWhenSendAsyncFailed(0); + + for (int i = 0; i < 10000000; i++) { + try { + final int index = i; + Message msg = new Message("Jodie_topic_1023", + "TagA", + "OrderID188", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + producer.send(msg, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); + } + + @Override + public void onException(Throwable e) { + System.out.printf("%-10d Exception %s %n", index, e); + e.printStackTrace(); + } + }); + } catch (Exception e) { + e.printStackTrace(); + } + } + producer.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/CachedQueue.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/CachedQueue.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/CachedQueue.java new file mode 100644 index 0000000..7beb064 --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/CachedQueue.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.example.simple; + +import com.alibaba.rocketmq.common.message.MessageExt; + +import java.util.TreeMap; + + +public class CachedQueue { + private final TreeMap msgCachedTable = new TreeMap(); + + + public TreeMap getMsgCachedTable() { + return msgCachedTable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/Producer.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/Producer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/Producer.java new file mode 100644 index 0000000..e0010d4 --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/Producer.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.example.simple; + +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.client.producer.DefaultMQProducer; +import com.alibaba.rocketmq.client.producer.SendResult; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; + + +public class Producer { + public static void main(String[] args) throws MQClientException, InterruptedException { + + DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); + + producer.start(); + + for (int i = 0; i < 10000000; i++) + try { + { + Message msg = new Message("TopicTest", + "TagA", + "OrderID188", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + SendResult sendResult = producer.send(msg); + System.out.printf("%s%n", sendResult); + } + + } catch (Exception e) { + e.printStackTrace(); + } + + producer.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumer.java ---------------------------------------------------------------------- diff --git a/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumer.java b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumer.java new file mode 100644 index 0000000..6245769 --- /dev/null +++ b/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumer.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.example.simple; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; +import com.alibaba.rocketmq.client.consumer.PullResult; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.message.MessageQueue; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class PullConsumer { + private static final Map OFFSE_TABLE = new HashMap(); + + + public static void main(String[] args) throws MQClientException { + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); + + consumer.start(); + + Set mqs = consumer.fetchSubscribeMessageQueues("TopicTest1"); + for (MessageQueue mq : mqs) { + System.out.printf("Consume from the queue: " + mq + "%n"); + SINGLE_MQ: + while (true) { + try { + PullResult pullResult = + consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); + System.out.printf("%s%n", pullResult); + putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); + switch (pullResult.getPullStatus()) { + case FOUND: + break; + case NO_MATCHED_MSG: + break; + case NO_NEW_MSG: + break SINGLE_MQ; + case OFFSET_ILLEGAL: + break; + default: + break; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + consumer.shutdown(); + } + + private static long getMessageQueueOffset(MessageQueue mq) { + Long offset = OFFSE_TABLE.get(mq); + if (offset != null) + return offset; + + return 0; + } + + private static void putMessageQueueOffset(MessageQueue mq, long offset) { + OFFSE_TABLE.put(mq, offset); + } + +}