This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch test-release
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit a8aea48134ef98e94310a013e6455c8a2c1c467e
Author: huangli <areyouok@gmail.com>
AuthorDate: Fri Dec 6 10:35:01 2019 +0800
Optimise tx benchmark producer (#1628)
---
.../example/benchmark/TransactionProducer.java | 364 +++++++++++++++------
.../org/apache/rocketmq/srvutil/ServerUtil.java | 3 +-
2 files changed, 266 insertions(+), 101 deletions(-)
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index 04707e1..3531eb5 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -18,11 +18,19 @@
package org.apache.rocketmq.example.benchmark;
import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
@@ -30,35 +38,44 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.TransactionCheckListener;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.srvutil.ServerUtil;
public class TransactionProducer {
+ private static final long START_TIME = System.currentTimeMillis();
+ private static final AtomicLong MSG_COUNT = new AtomicLong(0);
+
+ //broker max check times should less than this value
+ static final int MAX_CHECK_RESULT_IN_MSG = 20;
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException
{
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("TransactionProducer", args, buildCommandlineOptions(options),
new PosixParser());
-
- final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim()
: "BenchmarkTest";
- final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w'))
: 32;
- final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s'))
: 2048;
- final boolean ischeck = commandLine.hasOption('c') ? Boolean.parseBoolean(commandLine.getOptionValue('c'))
: false;
- final boolean ischeckffalse = commandLine.hasOption('r') ? Boolean.parseBoolean(commandLine.getOptionValue('r'))
: true;
-
- final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
+ TxSendConfig config = new TxSendConfig();
+ config.topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim()
: "BenchmarkTest";
+ config.threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w'))
: 32;
+ config.messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s'))
: 2048;
+ config.sendRollbackRate = commandLine.hasOption("sr") ? Double.parseDouble(commandLine.getOptionValue("sr"))
: 0.0;
+ config.sendUnknownRate = commandLine.hasOption("su") ? Double.parseDouble(commandLine.getOptionValue("su"))
: 0.0;
+ config.checkRollbackRate = commandLine.hasOption("cr") ? Double.parseDouble(commandLine.getOptionValue("cr"))
: 0.0;
+ config.checkUnknownRate = commandLine.hasOption("cu") ? Double.parseDouble(commandLine.getOptionValue("cu"))
: 0.0;
+ config.batchId = commandLine.hasOption("b") ? Long.parseLong(commandLine.getOptionValue("b"))
: System.currentTimeMillis();
+ config.sendInterval = commandLine.hasOption("i") ? Integer.parseInt(commandLine.getOptionValue("i"))
: 0;
+
+ final ExecutorService sendThreadPool = Executors.newFixedThreadPool(config.threadCount);
final StatsBenchmarkTProducer statsBenchmark = new StatsBenchmarkTProducer();
final Timer timer = new Timer("BenchmarkTimerThread", true);
- final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
+ final LinkedList<Snapshot> snapshotList = new LinkedList<>();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
@@ -73,16 +90,24 @@ public class TransactionProducer {
timer.scheduleAtFixedRate(new TimerTask() {
private void printStats() {
if (snapshotList.size() >= 10) {
- Long[] begin = snapshotList.getFirst();
- Long[] end = snapshotList.getLast();
+ Snapshot begin = snapshotList.getFirst();
+ Snapshot end = snapshotList.getLast();
- final long sendTps =
- (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
- final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
+ final long sendCount = (end.sendRequestSuccessCount - begin.sendRequestSuccessCount)
+ + (end.sendRequestFailedCount - begin.sendRequestFailedCount);
+ final long sendTps = (sendCount * 1000L) / (end.endTime - begin.endTime);
+ final double averageRT = (end.sendMessageTimeTotal - begin.sendMessageTimeTotal)
/ (double) (end.sendRequestSuccessCount - begin.sendRequestSuccessCount);
+
+ final long failCount = end.sendRequestFailedCount - begin.sendRequestFailedCount;
+ final long checkCount = end.checkCount - begin.checkCount;
+ final long unexpectedCheck = end.unexpectedCheckCount - begin.unexpectedCheckCount;
+ final long dupCheck = end.duplicatedCheck - begin.duplicatedCheck;
System.out.printf(
- "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response
Failed: %d transaction checkCount: %d %n",
- sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2],
end[4], end[6]);
+ "Send TPS:%5d Max RT:%5d AVG RT:%3.1f Send Failed: %d check: %d unexpectedCheck:
%d duplicatedCheck: %d %n",
+ sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT,
failCount, checkCount,
+ unexpectedCheck, dupCheck);
+ statsBenchmark.getSendMessageMaxRT().set(0);
}
}
@@ -96,11 +121,10 @@ public class TransactionProducer {
}
}, 10000, 10000);
- final TransactionCheckListener transactionCheckListener =
- new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark);
+ final TransactionListener transactionCheckListener = new TransactionListenerImpl(statsBenchmark,
config);
final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer");
producer.setInstanceName(Long.toString(System.currentTimeMillis()));
- producer.setTransactionCheckListener(transactionCheckListener);
+ producer.setTransactionListener(transactionCheckListener);
producer.setDefaultTopicQueueNums(1000);
if (commandLine.hasOption('n')) {
String ns = commandLine.getOptionValue('n');
@@ -108,37 +132,42 @@ public class TransactionProducer {
}
producer.start();
- final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck);
-
- for (int i = 0; i < threadCount; i++) {
+ for (int i = 0; i < config.threadCount; i++) {
sendThreadPool.execute(new Runnable() {
@Override
public void run() {
while (true) {
+ boolean success = false;
+ final long beginTimestamp = System.currentTimeMillis();
try {
- // Thread.sleep(1000);
- final long beginTimestamp = System.currentTimeMillis();
SendResult sendResult =
- producer.sendMessageInTransaction(buildMessage(messageSize,
topic), tranExecuter, null);
- if (sendResult != null) {
- statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
- statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
- }
-
+ producer.sendMessageInTransaction(buildMessage(config),
null);
+ success = sendResult != null && sendResult.getSendStatus()
== SendStatus.SEND_OK;
+ } catch (Throwable e) {
+ success = false;
+ } finally {
final long currentRT = System.currentTimeMillis() - beginTimestamp;
- statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT);
+ statsBenchmark.getSendMessageTimeTotal().addAndGet(currentRT);
long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
while (currentRT > prevMaxRT) {
- boolean updated =
- statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT,
- currentRT);
+ boolean updated = statsBenchmark.getSendMessageMaxRT()
+ .compareAndSet(prevMaxRT, currentRT);
if (updated)
break;
prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
}
- } catch (MQClientException e) {
- statsBenchmark.getSendRequestFailedCount().incrementAndGet();
+ if (success) {
+ statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
+ } else {
+ statsBenchmark.getSendRequestFailedCount().incrementAndGet();
+ }
+ if (config.sendInterval > 0) {
+ try {
+ Thread.sleep(config.sendInterval);
+ } catch (InterruptedException e) {
+ }
+ }
}
}
}
@@ -146,20 +175,42 @@ public class TransactionProducer {
}
}
- private static Message buildMessage(final int messageSize, String topic) {
- try {
- Message msg = new Message();
- msg.setTopic(topic);
+ private static Message buildMessage(TxSendConfig config) {
+ byte[] bs = new byte[config.messageSize];
+ ThreadLocalRandom r = ThreadLocalRandom.current();
+ r.nextBytes(bs);
+
+ ByteBuffer buf = ByteBuffer.wrap(bs);
+ buf.putLong(config.batchId);
+ long sendMachineId = START_TIME << 32;
+ long msgId = sendMachineId | MSG_COUNT.getAndIncrement();
+ buf.putLong(msgId);
+
+ // save send tx result in message
+ if (r.nextDouble() < config.sendRollbackRate) {
+ buf.put((byte) LocalTransactionState.ROLLBACK_MESSAGE.ordinal());
+ } else if (r.nextDouble() < config.sendUnknownRate) {
+ buf.put((byte) LocalTransactionState.UNKNOW.ordinal());
+ } else {
+ buf.put((byte) LocalTransactionState.COMMIT_MESSAGE.ordinal());
+ }
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < messageSize; i += 10) {
- sb.append("hello baby");
+ // save check tx result in message
+ for (int i = 0; i < MAX_CHECK_RESULT_IN_MSG; i++) {
+ if (r.nextDouble() < config.checkRollbackRate) {
+ buf.put((byte) LocalTransactionState.ROLLBACK_MESSAGE.ordinal());
+ } else if (r.nextDouble() < config.checkUnknownRate) {
+ buf.put((byte) LocalTransactionState.UNKNOW.ordinal());
+ } else {
+ buf.put((byte) LocalTransactionState.COMMIT_MESSAGE.ordinal());
}
- msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
- return msg;
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
}
+
+ Message msg = new Message();
+ msg.setTopic(config.topic);
+
+ msg.setBody(bs);
+ return msg;
}
public static Options buildCommandlineOptions(final Options options) {
@@ -175,84 +226,171 @@ public class TransactionProducer {
opt.setRequired(false);
options.addOption(opt);
- opt = new Option("c", "check", true, "Check the message, Default: false");
+ opt = new Option("sr", "send rollback rate", true, "Send rollback rate, Default:
0.0");
opt.setRequired(false);
options.addOption(opt);
- opt = new Option("r", "checkResult", true, "Message check result, Default: true");
+ opt = new Option("su", "send unknown rate", true, "Send unknown rate, Default: 0.0");
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("cr", "check rollback rate", true, "Check rollback rate, Default:
0.0");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("cu", "check unknown rate", true, "Check unknown rate, Default:
0.0");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("b", "test batch id", true, "test batch id, Default: System.currentMillis()");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("i", "send interval", true, "sleep interval in millis between messages,
Default: 0");
+ opt.setRequired(false);
+ options.addOption(opt);
return options;
}
}
-class TransactionExecuterBImpl implements LocalTransactionExecuter {
+class TransactionListenerImpl implements TransactionListener {
+ private StatsBenchmarkTProducer statBenchmark;
+ private TxSendConfig sendConfig;
+ private final LRUMap<Long, Integer> cache = new LRUMap<>(200000);
- private boolean ischeck;
+ private class MsgMeta {
+ long batchId;
+ long msgId;
+ LocalTransactionState sendResult;
+ List<LocalTransactionState> checkResult;
+ }
- public TransactionExecuterBImpl(boolean ischeck) {
- this.ischeck = ischeck;
+ public TransactionListenerImpl(StatsBenchmarkTProducer statsBenchmark, TxSendConfig sendConfig)
{
+ this.statBenchmark = statsBenchmark;
+ this.sendConfig = sendConfig;
}
@Override
- public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object
arg) {
- if (ischeck) {
- return LocalTransactionState.UNKNOW;
- }
- return LocalTransactionState.COMMIT_MESSAGE;
+ public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
+ return parseFromMsg(msg).sendResult;
}
-}
-class TransactionCheckListenerBImpl implements TransactionCheckListener {
- private boolean ischeckffalse;
- private StatsBenchmarkTProducer statsBenchmarkTProducer;
-
- public TransactionCheckListenerBImpl(boolean ischeckffalse,
- StatsBenchmarkTProducer statsBenchmarkTProducer) {
- this.ischeckffalse = ischeckffalse;
- this.statsBenchmarkTProducer = statsBenchmarkTProducer;
+ private MsgMeta parseFromMsg(Message msg) {
+ byte[] bs = msg.getBody();
+ ByteBuffer buf = ByteBuffer.wrap(bs);
+ MsgMeta msgMeta = new MsgMeta();
+ msgMeta.batchId = buf.getLong();
+ msgMeta.msgId = buf.getLong();
+ msgMeta.sendResult = LocalTransactionState.values()[buf.get()];
+ msgMeta.checkResult = new ArrayList<>();
+ for (int i = 0; i < TransactionProducer.MAX_CHECK_RESULT_IN_MSG; i++) {
+ msgMeta.checkResult.add(LocalTransactionState.values()[buf.get()]);
+ }
+ return msgMeta;
}
@Override
- public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
- statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet();
- if (ischeckffalse) {
+ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
+ MsgMeta msgMeta = parseFromMsg(msg);
+ if (msgMeta.batchId != sendConfig.batchId) {
+ // message not generated in this test
+ return LocalTransactionState.ROLLBACK_MESSAGE;
+ }
+ statBenchmark.getCheckCount().incrementAndGet();
+ int times = 0;
+ try {
+ String checkTimes = msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES);
+ times = Integer.parseInt(checkTimes);
+ } catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
+ times = times <= 0 ? 1 : times;
+
+ boolean dup;
+ synchronized (cache) {
+ Integer oldCheckLog = cache.get(msgMeta.msgId);
+ Integer newCheckLog;
+ if (oldCheckLog == null) {
+ newCheckLog = 1 << (times - 1);
+ } else {
+ newCheckLog = oldCheckLog | (1 << (times - 1));
+ }
+ dup = newCheckLog.equals(oldCheckLog);
+ }
+ if (dup) {
+ statBenchmark.getDuplicatedCheckCount().incrementAndGet();
+ }
+ if (msgMeta.sendResult != LocalTransactionState.UNKNOW) {
+ System.out.printf("%s unexpected check: msgId=%s,txId=%s,checkTimes=%s,sendResult=%s\n",
+ new SimpleDateFormat("HH:mm:ss,SSS").format(new Date()),
+ msg.getMsgId(), msg.getTransactionId(),
+ msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES),
+ msgMeta.sendResult.toString());
+ statBenchmark.getUnexpectedCheckCount().incrementAndGet();
+ return msgMeta.sendResult;
+ }
- return LocalTransactionState.COMMIT_MESSAGE;
+ for (int i = 0; i < times - 1; i++) {
+ LocalTransactionState s = msgMeta.checkResult.get(i);
+ if (s != LocalTransactionState.UNKNOW) {
+ System.out.printf("%s unexpected check: msgId=%s,txId=%s,checkTimes=%s,sendResult,lastCheckResult=%s\n",
+ new SimpleDateFormat("HH:mm:ss,SSS").format(new Date()),
+ msg.getMsgId(), msg.getTransactionId(),
+ msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES),
s);
+ statBenchmark.getUnexpectedCheckCount().incrementAndGet();
+ return s;
+ }
+ }
+ return msgMeta.checkResult.get(times - 1);
}
}
+class Snapshot {
+ long endTime;
+
+ long sendRequestSuccessCount;
+
+ long sendRequestFailedCount;
+
+ long sendMessageTimeTotal;
+
+ long sendMessageMaxRT;
+
+ long checkCount;
+
+ long unexpectedCheckCount;
+
+ long duplicatedCheck;
+}
+
class StatsBenchmarkTProducer {
private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L);
private final AtomicLong sendRequestFailedCount = new AtomicLong(0L);
- private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L);
-
- private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L);
-
- private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L);
+ private final AtomicLong sendMessageTimeTotal = new AtomicLong(0L);
private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
- private final AtomicLong checkRequestSuccessCount = new AtomicLong(0L);
+ private final AtomicLong checkCount = new AtomicLong(0L);
+
+ private final AtomicLong unexpectedCheckCount = new AtomicLong(0L);
- public Long[] createSnapshot() {
- Long[] snap = new Long[] {
- System.currentTimeMillis(),
- this.sendRequestSuccessCount.get(),
- this.sendRequestFailedCount.get(),
- this.receiveResponseSuccessCount.get(),
- this.receiveResponseFailedCount.get(),
- this.sendMessageSuccessTimeTotal.get(),
- this.checkRequestSuccessCount.get()};
+ private final AtomicLong duplicatedCheckCount = new AtomicLong(0);
- return snap;
+ public Snapshot createSnapshot() {
+ Snapshot s = new Snapshot();
+ s.endTime = System.currentTimeMillis();
+ s.sendRequestSuccessCount = sendRequestSuccessCount.get();
+ s.sendRequestFailedCount = sendRequestFailedCount.get();
+ s.sendMessageTimeTotal = sendMessageTimeTotal.get();
+ s.sendMessageMaxRT = sendMessageMaxRT.get();
+ s.checkCount = checkCount.get();
+ s.unexpectedCheckCount = unexpectedCheckCount.get();
+ s.duplicatedCheck = duplicatedCheckCount.get();
+ return s;
}
public AtomicLong getSendRequestSuccessCount() {
@@ -263,23 +401,49 @@ class StatsBenchmarkTProducer {
return sendRequestFailedCount;
}
- public AtomicLong getReceiveResponseSuccessCount() {
- return receiveResponseSuccessCount;
+ public AtomicLong getSendMessageTimeTotal() {
+ return sendMessageTimeTotal;
+ }
+
+ public AtomicLong getSendMessageMaxRT() {
+ return sendMessageMaxRT;
}
- public AtomicLong getReceiveResponseFailedCount() {
- return receiveResponseFailedCount;
+ public AtomicLong getCheckCount() {
+ return checkCount;
}
- public AtomicLong getSendMessageSuccessTimeTotal() {
- return sendMessageSuccessTimeTotal;
+ public AtomicLong getUnexpectedCheckCount() {
+ return unexpectedCheckCount;
}
- public AtomicLong getSendMessageMaxRT() {
- return sendMessageMaxRT;
+ public AtomicLong getDuplicatedCheckCount() {
+ return duplicatedCheckCount;
}
+}
+
+class TxSendConfig {
+ String topic;
+ int threadCount;
+ int messageSize;
+ double sendRollbackRate;
+ double sendUnknownRate;
+ double checkRollbackRate;
+ double checkUnknownRate;
+ long batchId;
+ int sendInterval;
+}
+
+class LRUMap<K, V> extends LinkedHashMap<K, V> {
- public AtomicLong getCheckRequestSuccessCount() {
- return checkRequestSuccessCount;
+ private int maxSize;
+
+ public LRUMap(int maxSize) {
+ this.maxSize = maxSize;
+ }
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry eldest) {
+ return size() > maxSize;
}
}
diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
index 066d36c..421eedb 100644
--- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
+++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
@@ -49,10 +49,11 @@ public class ServerUtil {
commandLine = parser.parse(options, args);
if (commandLine.hasOption('h')) {
hf.printHelp(appName, options, true);
- return null;
+ System.exit(0);
}
} catch (ParseException e) {
hf.printHelp(appName, options, true);
+ System.exit(1);
}
return commandLine;
|