From commits-return-9-apmail-rocketmq-commits-archive=rocketmq.apache.org@rocketmq.incubator.apache.org Mon Dec 19 09:40:33 2016 Return-Path: X-Original-To: apmail-rocketmq-commits-archive@minotaur.apache.org Delivered-To: apmail-rocketmq-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6B95D198DC for ; Mon, 19 Dec 2016 09:40:32 +0000 (UTC) Received: (qmail 8488 invoked by uid 500); 19 Dec 2016 09:40:32 -0000 Delivered-To: apmail-rocketmq-commits-archive@rocketmq.apache.org Received: (qmail 8424 invoked by uid 500); 19 Dec 2016 09:40:32 -0000 Mailing-List: contact commits-help@rocketmq.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@rocketmq.incubator.apache.org Delivered-To: mailing list commits@rocketmq.incubator.apache.org Received: (qmail 8408 invoked by uid 99); 19 Dec 2016 09:40:32 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Dec 2016 09:40:32 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id B67791A044C for ; Mon, 19 Dec 2016 09:40:31 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id WilBwKW6eaMP for ; Mon, 19 Dec 2016 09:40:22 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id E1A475FC21 for ; Mon, 19 Dec 2016 09:40:19 +0000 (UTC) Received: (qmail 7662 invoked by uid 99); 19 Dec 2016 09:40:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Dec 2016 09:40:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E17F8DFB02; Mon, 19 Dec 2016 09:40:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yukon@apache.org To: commits@rocketmq.incubator.apache.org Date: Mon, 19 Dec 2016 09:40:25 -0000 Message-Id: <84ad23f34bfc41a5a10330f2189b8de1@git.apache.org> In-Reply-To: <3e2dd9def9aa4fd7a6ce213370431713@git.apache.org> References: <3e2dd9def9aa4fd7a6ce213370431713@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreStatsService.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreStatsService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreStatsService.java new file mode 100644 index 0000000..b5ed3f3 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreStatsService.java @@ -0,0 +1,615 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +import com.alibaba.rocketmq.common.ServiceThread; +import com.alibaba.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.MessageFormat; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * @author shijia.wxr + */ +public class StoreStatsService extends ServiceThread { + private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + + private static final int FREQUENCY_OF_SAMPLING = 1000; + + private static final int MAX_RECORDS_OF_SAMPLING = 60 * 10; + private static final String[] PUT_MESSAGE_ENTIRE_TIME_MAX_DESC = new String[]{ + "[<=0ms]", "[0~10ms]", "[10~50ms]", "[50~100ms]", "[100~200ms]", "[200~500ms]", "[500ms~1s]", "[1~2s]", "[2~3s]", "[3~4s]", "[4~5s]", "[5~10s]", "[10s~]", + }; + + private static int printTPSInterval = 60 * 1; + + private final AtomicLong putMessageFailedTimes = new AtomicLong(0); + + private final Map putMessageTopicTimesTotal = + new ConcurrentHashMap(128); + private final Map putMessageTopicSizeTotal = + new ConcurrentHashMap(128); + + private final AtomicLong getMessageTimesTotalFound = new AtomicLong(0); + private final AtomicLong getMessageTransferedMsgCount = new AtomicLong(0); + private final AtomicLong getMessageTimesTotalMiss = new AtomicLong(0); + private final LinkedList putTimesList = new LinkedList(); + + private final LinkedList getTimesFoundList = new LinkedList(); + private final LinkedList getTimesMissList = new LinkedList(); + private final LinkedList transferedMsgCountList = new LinkedList(); + private volatile AtomicLong[] putMessageDistributeTime; + private long messageStoreBootTimestamp = System.currentTimeMillis(); + private volatile long putMessageEntireTimeMax = 0; + private volatile long getMessageEntireTimeMax = 0; + // for putMessageEntireTimeMax + private ReentrantLock lockPut = new ReentrantLock(); + // for getMessageEntireTimeMax + private ReentrantLock lockGet = new ReentrantLock(); + + private volatile long dispatchMaxBuffer = 0; + + private ReentrantLock lockSampling = new ReentrantLock(); + private long lastPrintTimestamp = System.currentTimeMillis(); + + + public StoreStatsService() { + this.initPutMessageDistributeTime(); + } + + private AtomicLong[] initPutMessageDistributeTime() { + AtomicLong[] next = new AtomicLong[13]; + for (int i = 0; i < next.length; i++) { + next[i] = new AtomicLong(0); + } + + AtomicLong[] old = this.putMessageDistributeTime; + + this.putMessageDistributeTime = next; + + return old; + } + + public long getPutMessageEntireTimeMax() { + return putMessageEntireTimeMax; + } + + public void setPutMessageEntireTimeMax(long value) { + final AtomicLong[] times = this.putMessageDistributeTime; + + if (null == times) return; + + // us + if (value <= 0) { + times[0].incrementAndGet(); + } else if (value < 10) { + times[1].incrementAndGet(); + } else if (value < 50) { + times[2].incrementAndGet(); + } else if (value < 100) { + times[3].incrementAndGet(); + } else if (value < 200) { + times[4].incrementAndGet(); + } else if (value < 500) { + times[5].incrementAndGet(); + } else if (value < 1000) { + times[6].incrementAndGet(); + } + // 2s + else if (value < 2000) { + times[7].incrementAndGet(); + } + // 3s + else if (value < 3000) { + times[8].incrementAndGet(); + } + // 4s + else if (value < 4000) { + times[9].incrementAndGet(); + } + // 5s + else if (value < 5000) { + times[10].incrementAndGet(); + } + // 10s + else if (value < 10000) { + times[11].incrementAndGet(); + } else { + times[12].incrementAndGet(); + } + + if (value > this.putMessageEntireTimeMax) { + this.lockPut.lock(); + this.putMessageEntireTimeMax = + value > this.putMessageEntireTimeMax ? value : this.putMessageEntireTimeMax; + this.lockPut.unlock(); + } + } + + + public long getGetMessageEntireTimeMax() { + return getMessageEntireTimeMax; + } + + + public void setGetMessageEntireTimeMax(long value) { + if (value > this.getMessageEntireTimeMax) { + this.lockGet.lock(); + this.getMessageEntireTimeMax = + value > this.getMessageEntireTimeMax ? value : this.getMessageEntireTimeMax; + this.lockGet.unlock(); + } + } + + + public long getDispatchMaxBuffer() { + return dispatchMaxBuffer; + } + + + public void setDispatchMaxBuffer(long value) { + this.dispatchMaxBuffer = value > this.dispatchMaxBuffer ? value : this.dispatchMaxBuffer; + } + + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(1024); + Long totalTimes = getPutMessageTimesTotal(); + if (0 == totalTimes) { + totalTimes = 1L; + } + + sb.append("\truntime: " + this.getFormatRuntime() + "\r\n"); + sb.append("\tputMessageEntireTimeMax: " + this.putMessageEntireTimeMax + "\r\n"); + sb.append("\tputMessageTimesTotal: " + totalTimes + "\r\n"); + sb.append("\tputMessageSizeTotal: " + this.getPutMessageSizeTotal() + "\r\n"); + sb.append("\tputMessageDistributeTime: " + this.getPutMessageDistributeTimeStringInfo(totalTimes) + + "\r\n"); + sb.append("\tputMessageAverageSize: " + (this.getPutMessageSizeTotal() / totalTimes.doubleValue()) + + "\r\n"); + sb.append("\tdispatchMaxBuffer: " + this.dispatchMaxBuffer + "\r\n"); + sb.append("\tgetMessageEntireTimeMax: " + this.getMessageEntireTimeMax + "\r\n"); + sb.append("\tputTps: " + this.getPutTps() + "\r\n"); + sb.append("\tgetFoundTps: " + this.getGetFoundTps() + "\r\n"); + sb.append("\tgetMissTps: " + this.getGetMissTps() + "\r\n"); + sb.append("\tgetTotalTps: " + this.getGetTotalTps() + "\r\n"); + sb.append("\tgetTransferedTps: " + this.getGetTransferedTps() + "\r\n"); + return sb.toString(); + } + + public long getPutMessageTimesTotal() { + long rs = 0; + for (AtomicLong data : putMessageTopicTimesTotal.values()) { + rs += data.get(); + } + return rs; + } + + private String getFormatRuntime() { + final long millisecond = 1; + final long second = 1000 * millisecond; + final long minute = 60 * second; + final long hour = 60 * minute; + final long day = 24 * hour; + final MessageFormat messageFormat = new MessageFormat("[ {0} days, {1} hours, {2} minutes, {3} seconds ]"); + + long time = System.currentTimeMillis() - this.messageStoreBootTimestamp; + long days = time / day; + long hours = (time % day) / hour; + long minutes = (time % hour) / minute; + long seconds = (time % minute) / second; + return messageFormat.format(new Long[]{days, hours, minutes, seconds}); + } + + public long getPutMessageSizeTotal() { + long rs = 0; + for (AtomicLong data : putMessageTopicSizeTotal.values()) { + rs += data.get(); + } + return rs; + } + + private String getPutMessageDistributeTimeStringInfo(Long total) { + return this.putMessageDistributeTimeToString(); + } + + private String getPutTps() { + StringBuilder sb = new StringBuilder(); + + sb.append(this.getPutTps(10)); + sb.append(" "); + + + sb.append(this.getPutTps(60)); + sb.append(" "); + + + sb.append(this.getPutTps(600)); + + return sb.toString(); + } + + private String getGetFoundTps() { + StringBuilder sb = new StringBuilder(); + + sb.append(this.getGetFoundTps(10)); + sb.append(" "); + + + sb.append(this.getGetFoundTps(60)); + sb.append(" "); + + + sb.append(this.getGetFoundTps(600)); + + return sb.toString(); + } + + private String getGetMissTps() { + StringBuilder sb = new StringBuilder(); + + sb.append(this.getGetMissTps(10)); + sb.append(" "); + + + sb.append(this.getGetMissTps(60)); + sb.append(" "); + + + sb.append(this.getGetMissTps(600)); + + return sb.toString(); + } + + private String getGetTotalTps() { + StringBuilder sb = new StringBuilder(); + + sb.append(this.getGetTotalTps(10)); + sb.append(" "); + + + sb.append(this.getGetTotalTps(60)); + sb.append(" "); + + + sb.append(this.getGetTotalTps(600)); + + return sb.toString(); + } + + private String getGetTransferedTps() { + StringBuilder sb = new StringBuilder(); + + sb.append(this.getGetTransferedTps(10)); + sb.append(" "); + + + sb.append(this.getGetTransferedTps(60)); + sb.append(" "); + + + sb.append(this.getGetTransferedTps(600)); + + return sb.toString(); + } + + private String putMessageDistributeTimeToString() { + final AtomicLong[] times = this.putMessageDistributeTime; + if (null == times) return null; + + final StringBuilder sb = new StringBuilder(); + for (int i = 0; i < times.length; i++) { + long value = times[i].get(); + sb.append(String.format("%s:%d", PUT_MESSAGE_ENTIRE_TIME_MAX_DESC[i], value)); + sb.append(" "); + } + + return sb.toString(); + } + + private String getPutTps(int time) { + String result = ""; + this.lockSampling.lock(); + try { + CallSnapshot last = this.putTimesList.getLast(); + + if (this.putTimesList.size() > time) { + CallSnapshot lastBefore = this.putTimesList.get(this.putTimesList.size() - (time + 1)); + result += CallSnapshot.getTPS(lastBefore, last); + } + + } finally { + this.lockSampling.unlock(); + } + return result; + } + + private String getGetFoundTps(int time) { + String result = ""; + this.lockSampling.lock(); + try { + CallSnapshot last = this.getTimesFoundList.getLast(); + + if (this.getTimesFoundList.size() > time) { + CallSnapshot lastBefore = + this.getTimesFoundList.get(this.getTimesFoundList.size() - (time + 1)); + result += CallSnapshot.getTPS(lastBefore, last); + } + } finally { + this.lockSampling.unlock(); + } + + return result; + } + + private String getGetMissTps(int time) { + String result = ""; + this.lockSampling.lock(); + try { + CallSnapshot last = this.getTimesMissList.getLast(); + + if (this.getTimesMissList.size() > time) { + CallSnapshot lastBefore = + this.getTimesMissList.get(this.getTimesMissList.size() - (time + 1)); + result += CallSnapshot.getTPS(lastBefore, last); + } + + } finally { + this.lockSampling.unlock(); + } + + return result; + } + + private String getGetTotalTps(int time) { + this.lockSampling.lock(); + double found = 0; + double miss = 0; + try { + { + CallSnapshot last = this.getTimesFoundList.getLast(); + + if (this.getTimesFoundList.size() > time) { + CallSnapshot lastBefore = + this.getTimesFoundList.get(this.getTimesFoundList.size() - (time + 1)); + found = CallSnapshot.getTPS(lastBefore, last); + } + } + { + CallSnapshot last = this.getTimesMissList.getLast(); + + if (this.getTimesMissList.size() > time) { + CallSnapshot lastBefore = + this.getTimesMissList.get(this.getTimesMissList.size() - (time + 1)); + miss = CallSnapshot.getTPS(lastBefore, last); + } + } + + } finally { + this.lockSampling.unlock(); + } + + return Double.toString(found + miss); + } + + private String getGetTransferedTps(int time) { + String result = ""; + this.lockSampling.lock(); + try { + CallSnapshot last = this.transferedMsgCountList.getLast(); + + if (this.transferedMsgCountList.size() > time) { + CallSnapshot lastBefore = + this.transferedMsgCountList.get(this.transferedMsgCountList.size() - (time + 1)); + result += CallSnapshot.getTPS(lastBefore, last); + } + + } finally { + this.lockSampling.unlock(); + } + + return result; + } + + public HashMap getRuntimeInfo() { + HashMap result = new HashMap(64); + + Long totalTimes = getPutMessageTimesTotal(); + if (0 == totalTimes) { + totalTimes = 1L; + } + + result.put("bootTimestamp", String.valueOf(this.messageStoreBootTimestamp)); + result.put("runtime", this.getFormatRuntime()); + result.put("putMessageEntireTimeMax", String.valueOf(this.putMessageEntireTimeMax)); + result.put("putMessageTimesTotal", String.valueOf(totalTimes)); + result.put("putMessageSizeTotal", String.valueOf(this.getPutMessageSizeTotal())); + result.put("putMessageDistributeTime", + String.valueOf(this.getPutMessageDistributeTimeStringInfo(totalTimes))); + result.put("putMessageAverageSize", + String.valueOf(this.getPutMessageSizeTotal() / totalTimes.doubleValue())); + result.put("dispatchMaxBuffer", String.valueOf(this.dispatchMaxBuffer)); + result.put("getMessageEntireTimeMax", String.valueOf(this.getMessageEntireTimeMax)); + result.put("putTps", String.valueOf(this.getPutTps())); + result.put("getFoundTps", String.valueOf(this.getGetFoundTps())); + result.put("getMissTps", String.valueOf(this.getGetMissTps())); + result.put("getTotalTps", String.valueOf(this.getGetTotalTps())); + result.put("getTransferedTps", String.valueOf(this.getGetTransferedTps())); + + return result; + } + + public void run() { + log.info(this.getServiceName() + " service started"); + + while (!this.isStopped()) { + try { + this.waitForRunning(FREQUENCY_OF_SAMPLING); + + this.sampling(); + + this.printTps(); + } catch (Exception e) { + log.warn(this.getServiceName() + " service has exception. ", e); + } + } + + log.info(this.getServiceName() + " service end"); + } + + @Override + public String getServiceName() { + return StoreStatsService.class.getSimpleName(); + } + + private void sampling() { + this.lockSampling.lock(); + try { + this.putTimesList.add(new CallSnapshot(System.currentTimeMillis(), getPutMessageTimesTotal())); + if (this.putTimesList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) { + this.putTimesList.removeFirst(); + } + + this.getTimesFoundList.add(new CallSnapshot(System.currentTimeMillis(), + this.getMessageTimesTotalFound.get())); + if (this.getTimesFoundList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) { + this.getTimesFoundList.removeFirst(); + } + + this.getTimesMissList.add(new CallSnapshot(System.currentTimeMillis(), + this.getMessageTimesTotalMiss.get())); + if (this.getTimesMissList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) { + this.getTimesMissList.removeFirst(); + } + + this.transferedMsgCountList.add(new CallSnapshot(System.currentTimeMillis(), + this.getMessageTransferedMsgCount.get())); + if (this.transferedMsgCountList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) { + this.transferedMsgCountList.removeFirst(); + } + + } finally { + this.lockSampling.unlock(); + } + } + + private void printTps() { + if (System.currentTimeMillis() > (this.lastPrintTimestamp + printTPSInterval * 1000)) { + this.lastPrintTimestamp = System.currentTimeMillis(); + + log.info("[STORETPS] put_tps {} get_found_tps {} get_miss_tps {} get_transfered_tps {}", + this.getPutTps(printTPSInterval), + this.getGetFoundTps(printTPSInterval), + this.getGetMissTps(printTPSInterval), + this.getGetTransferedTps(printTPSInterval) + ); + + final AtomicLong[] times = this.initPutMessageDistributeTime(); + if (null == times) return; + + final StringBuilder sb = new StringBuilder(); + long totalPut = 0; + for (int i = 0; i < times.length; i++) { + long value = times[i].get(); + totalPut += value; + sb.append(String.format("%s:%d", PUT_MESSAGE_ENTIRE_TIME_MAX_DESC[i], value)); + sb.append(" "); + } + + log.info("[PAGECACHERT] TotalPut {}, PutMessageDistributeTime {}", totalPut, sb.toString()); + } + } + + public AtomicLong getGetMessageTimesTotalFound() { + return getMessageTimesTotalFound; + } + + + public AtomicLong getGetMessageTimesTotalMiss() { + return getMessageTimesTotalMiss; + } + + + public AtomicLong getGetMessageTransferedMsgCount() { + return getMessageTransferedMsgCount; + } + + + public AtomicLong getPutMessageFailedTimes() { + return putMessageFailedTimes; + } + + + public AtomicLong getSinglePutMessageTopicSizeTotal(String topic) { + AtomicLong rs = putMessageTopicSizeTotal.get(topic); + if (null == rs) { + rs = new AtomicLong(0); + putMessageTopicSizeTotal.put(topic, rs); + } + return rs; + } + + + public AtomicLong getSinglePutMessageTopicTimesTotal(String topic) { + AtomicLong rs = putMessageTopicTimesTotal.get(topic); + if (null == rs) { + rs = new AtomicLong(0); + putMessageTopicTimesTotal.put(topic, rs); + } + return rs; + } + + + public Map getPutMessageTopicTimesTotal() { + return putMessageTopicTimesTotal; + } + + + public Map getPutMessageTopicSizeTotal() { + return putMessageTopicSizeTotal; + } + + static class CallSnapshot { + public final long timestamp; + public final long callTimesTotal; + + + public CallSnapshot(long timestamp, long callTimesTotal) { + this.timestamp = timestamp; + this.callTimesTotal = callTimesTotal; + } + + + public static double getTPS(final CallSnapshot begin, final CallSnapshot end) { + long total = end.callTimesTotal - begin.callTimesTotal; + Long time = end.timestamp - begin.timestamp; + + double tps = total / time.doubleValue(); + + return tps * 1000; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreUtil.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreUtil.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreUtil.java new file mode 100644 index 0000000..9e0b565 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreUtil.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +import java.lang.management.ManagementFactory; +import java.lang.management.OperatingSystemMXBean; + + +/** + * @author shijia.wxr + * + */ +public class StoreUtil { + public static final long TOTAL_PHYSICAL_MEMORY_SIZE = getTotalPhysicalMemorySize(); + + + @SuppressWarnings("restriction") + public static long getTotalPhysicalMemorySize() { + long physicalTotal = 1024 * 1024 * 1024 * 24; + OperatingSystemMXBean osmxb = ManagementFactory.getOperatingSystemMXBean(); + if (osmxb instanceof com.sun.management.OperatingSystemMXBean) { + physicalTotal = ((com.sun.management.OperatingSystemMXBean) osmxb).getTotalPhysicalMemorySize(); + } + + return physicalTotal; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/TransientStorePool.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/TransientStorePool.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/TransientStorePool.java new file mode 100644 index 0000000..8abe7e9 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/TransientStorePool.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.store.config.MessageStoreConfig; +import com.alibaba.rocketmq.store.util.LibC; +import com.sun.jna.NativeLong; +import com.sun.jna.Pointer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.nio.ch.DirectBuffer; + +import java.nio.ByteBuffer; +import java.util.Deque; +import java.util.concurrent.ConcurrentLinkedDeque; + +/** + * @author xinyuzhou.zxy + */ +public class TransientStorePool { + private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + + private final int poolSize; + private final int fileSize; + private final Deque availableBuffers; + private final MessageStoreConfig storeConfig; + + public TransientStorePool(final MessageStoreConfig storeConfig) { + this.storeConfig = storeConfig; + this.poolSize = storeConfig.getTransientStorePoolSize(); + this.fileSize = storeConfig.getMapedFileSizeCommitLog(); + this.availableBuffers = new ConcurrentLinkedDeque<>(); + } + + /** + * It's a heavy init method. + */ + public void init() { + for (int i = 0; i < poolSize; i++) { + ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize); + + final long address = ((DirectBuffer) byteBuffer).address(); + Pointer pointer = new Pointer(address); + LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize)); + + availableBuffers.offer(byteBuffer); + } + } + + public void destroy() { + for (ByteBuffer byteBuffer : availableBuffers) { + final long address = ((DirectBuffer) byteBuffer).address(); + Pointer pointer = new Pointer(address); + LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize)); + } + } + + public void returnBuffer(ByteBuffer byteBuffer) { + byteBuffer.position(0); + byteBuffer.limit(fileSize); + this.availableBuffers.offerFirst(byteBuffer); + } + + public ByteBuffer borrowBuffer() { + ByteBuffer buffer = availableBuffers.pollFirst(); + if (availableBuffers.size() < poolSize * 0.4) { + log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size()); + } + return buffer; + } + + public int remainBufferNumbs() { + if (storeConfig.isTransientStorePoolEnable()) { + return availableBuffers.size(); + } + return Integer.MAX_VALUE; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/BrokerRole.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/BrokerRole.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/BrokerRole.java new file mode 100644 index 0000000..06714b2 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/BrokerRole.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store.config; + +/** + * @author shijia.wxr + */ +public enum BrokerRole { + ASYNC_MASTER, + SYNC_MASTER, + SLAVE; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/FlushDiskType.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/FlushDiskType.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/FlushDiskType.java new file mode 100644 index 0000000..48ae4b2 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/FlushDiskType.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store.config; + +/** + * @author shijia.wxr + */ +public enum FlushDiskType { + SYNC_FLUSH, + ASYNC_FLUSH +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/MessageStoreConfig.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/MessageStoreConfig.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/MessageStoreConfig.java new file mode 100644 index 0000000..138bc2e --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/MessageStoreConfig.java @@ -0,0 +1,727 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store.config; + +import com.alibaba.rocketmq.common.annotation.ImportantField; +import com.alibaba.rocketmq.store.ConsumeQueue; + +import java.io.File; + + +/** + * @author vongosling + * @author shijia.wxr + */ +public class MessageStoreConfig { + //The root directory in which the log data is kept + @ImportantField + private String storePathRootDir = System.getProperty("user.home") + File.separator + "store"; + + //The directory in which the commitlog is kept + @ImportantField + private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store" + + File.separator + "commitlog"; + + // CommitLog file size,default is 1G + private int mapedFileSizeCommitLog = 1024 * 1024 * 1024; + // ConsumeQueue file size,default is 30W + private int mapedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE; + + // CommitLog flush interval + // flush data to disk + @ImportantField + private int flushIntervalCommitLog = 500; + + // Only used if TransientStorePool enabled + // flush data to FileChannel + @ImportantField + private int commitIntervalCommitLog = 200; + + private boolean useReentrantLockWhenPutMessage = false; + + // Whether schedule flush,default is real-time + @ImportantField + private boolean flushCommitLogTimed = false; + // ConsumeQueue flush interval + private int flushIntervalConsumeQueue = 1000; + // Resource reclaim interval + private int cleanResourceInterval = 10000; + // CommitLog removal interval + private int deleteCommitLogFilesInterval = 100; + // ConsumeQueue removal interval + private int deleteConsumeQueueFilesInterval = 100; + private int destroyMapedFileIntervalForcibly = 1000 * 120; + private int redeleteHangedFileInterval = 1000 * 120; + // When to delete,default is at 4 am + @ImportantField + private String deleteWhen = "04"; + private int diskMaxUsedSpaceRatio = 75; + // The number of hours to keep a log file before deleting it (in hours) + @ImportantField + private int fileReservedTime = 72; + // Flow control for ConsumeQueue + private int putMsgIndexHightWater = 600000; + // The maximum size of a single log file,default is 512K + private int maxMessageSize = 1024 * 1024 * 4; + // Whether check the CRC32 of the records consumed. + // This ensures no on-the-wire or on-disk corruption to the messages occurred. + // This check adds some overhead,so it may be disabled in cases seeking extreme performance. + private boolean checkCRCOnRecover = true; + // How many pages are to be flushed when flush CommitLog + private int flushCommitLogLeastPages = 4; + // How many pages are to be committed when commit data to file + private int commitCommitLogLeastPages = 4; + // Flush page size when the disk in warming state + private int flushLeastPagesWhenWarmMapedFile = 1024 / 4 * 16; + // How many pages are to be flushed when flush ConsumeQueue + private int flushConsumeQueueLeastPages = 2; + private int flushCommitLogThoroughInterval = 1000 * 10; + private int commitCommitLogThoroughInterval = 200; + private int flushConsumeQueueThoroughInterval = 1000 * 60; + @ImportantField + private int maxTransferBytesOnMessageInMemory = 1024 * 256; + @ImportantField + private int maxTransferCountOnMessageInMemory = 32; + @ImportantField + private int maxTransferBytesOnMessageInDisk = 1024 * 64; + @ImportantField + private int maxTransferCountOnMessageInDisk = 8; + @ImportantField + private int accessMessageInMemoryMaxRatio = 40; + @ImportantField + private boolean messageIndexEnable = true; + private int maxHashSlotNum = 5000000; + private int maxIndexNum = 5000000 * 4; + private int maxMsgsNumBatch = 64; + @ImportantField + private boolean messageIndexSafe = false; + private int haListenPort = 10912; + private int haSendHeartbeatInterval = 1000 * 5; + private int haHousekeepingInterval = 1000 * 20; + private int haTransferBatchSize = 1024 * 32; + @ImportantField + private String haMasterAddress = null; + private int haSlaveFallbehindMax = 1024 * 1024 * 256; + @ImportantField + private BrokerRole brokerRole = BrokerRole.ASYNC_MASTER; + @ImportantField + private FlushDiskType flushDiskType = FlushDiskType.ASYNC_FLUSH; + private int syncFlushTimeout = 1000 * 5; + private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; + private long flushDelayOffsetInterval = 1000 * 10; + @ImportantField + private boolean cleanFileForciblyEnable = true; + private boolean warmMapedFileEnable = false; + private boolean offsetCheckInSlave = false; + private boolean debugLockEnable = false; + private boolean duplicationEnable = false; + private boolean diskFallRecorded = true; + private long osPageCacheBusyTimeOutMills = 1000; + private int defaultQueryMaxNum = 32; + + @ImportantField + private boolean transientStorePoolEnable = false; + private int transientStorePoolSize = 5; + private boolean fastFailIfNoBufferInStorePool = false; + + public boolean isDebugLockEnable() { + return debugLockEnable; + } + + public void setDebugLockEnable(final boolean debugLockEnable) { + this.debugLockEnable = debugLockEnable; + } + + public boolean isDuplicationEnable() { + return duplicationEnable; + } + + public void setDuplicationEnable(final boolean duplicationEnable) { + this.duplicationEnable = duplicationEnable; + } + + public long getOsPageCacheBusyTimeOutMills() { + return osPageCacheBusyTimeOutMills; + } + + public void setOsPageCacheBusyTimeOutMills(final long osPageCacheBusyTimeOutMills) { + this.osPageCacheBusyTimeOutMills = osPageCacheBusyTimeOutMills; + } + + public boolean isDiskFallRecorded() { + return diskFallRecorded; + } + + public void setDiskFallRecorded(final boolean diskFallRecorded) { + this.diskFallRecorded = diskFallRecorded; + } + + public boolean isWarmMapedFileEnable() { + return warmMapedFileEnable; + } + + + public void setWarmMapedFileEnable(boolean warmMapedFileEnable) { + this.warmMapedFileEnable = warmMapedFileEnable; + } + + + public int getMapedFileSizeCommitLog() { + return mapedFileSizeCommitLog; + } + + + public void setMapedFileSizeCommitLog(int mapedFileSizeCommitLog) { + this.mapedFileSizeCommitLog = mapedFileSizeCommitLog; + } + + + public int getMapedFileSizeConsumeQueue() { + + int factor = (int) Math.ceil(this.mapedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0)); + return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE); + } + + + public void setMapedFileSizeConsumeQueue(int mapedFileSizeConsumeQueue) { + this.mapedFileSizeConsumeQueue = mapedFileSizeConsumeQueue; + } + + + public int getFlushIntervalCommitLog() { + return flushIntervalCommitLog; + } + + + public void setFlushIntervalCommitLog(int flushIntervalCommitLog) { + this.flushIntervalCommitLog = flushIntervalCommitLog; + } + + + public int getFlushIntervalConsumeQueue() { + return flushIntervalConsumeQueue; + } + + + public void setFlushIntervalConsumeQueue(int flushIntervalConsumeQueue) { + this.flushIntervalConsumeQueue = flushIntervalConsumeQueue; + } + + + public int getPutMsgIndexHightWater() { + return putMsgIndexHightWater; + } + + + public void setPutMsgIndexHightWater(int putMsgIndexHightWater) { + this.putMsgIndexHightWater = putMsgIndexHightWater; + } + + + public int getCleanResourceInterval() { + return cleanResourceInterval; + } + + + public void setCleanResourceInterval(int cleanResourceInterval) { + this.cleanResourceInterval = cleanResourceInterval; + } + + + public int getMaxMessageSize() { + return maxMessageSize; + } + + + public void setMaxMessageSize(int maxMessageSize) { + this.maxMessageSize = maxMessageSize; + } + + + public boolean isCheckCRCOnRecover() { + return checkCRCOnRecover; + } + + + public boolean getCheckCRCOnRecover() { + return checkCRCOnRecover; + } + + + public void setCheckCRCOnRecover(boolean checkCRCOnRecover) { + this.checkCRCOnRecover = checkCRCOnRecover; + } + + + public String getStorePathCommitLog() { + return storePathCommitLog; + } + + + public void setStorePathCommitLog(String storePathCommitLog) { + this.storePathCommitLog = storePathCommitLog; + } + + + public String getDeleteWhen() { + return deleteWhen; + } + + + public void setDeleteWhen(String deleteWhen) { + this.deleteWhen = deleteWhen; + } + + + public int getDiskMaxUsedSpaceRatio() { + if (this.diskMaxUsedSpaceRatio < 10) + return 10; + + if (this.diskMaxUsedSpaceRatio > 95) + return 95; + + return diskMaxUsedSpaceRatio; + } + + + public void setDiskMaxUsedSpaceRatio(int diskMaxUsedSpaceRatio) { + this.diskMaxUsedSpaceRatio = diskMaxUsedSpaceRatio; + } + + + public int getDeleteCommitLogFilesInterval() { + return deleteCommitLogFilesInterval; + } + + + public void setDeleteCommitLogFilesInterval(int deleteCommitLogFilesInterval) { + this.deleteCommitLogFilesInterval = deleteCommitLogFilesInterval; + } + + + public int getDeleteConsumeQueueFilesInterval() { + return deleteConsumeQueueFilesInterval; + } + + + public void setDeleteConsumeQueueFilesInterval(int deleteConsumeQueueFilesInterval) { + this.deleteConsumeQueueFilesInterval = deleteConsumeQueueFilesInterval; + } + + + public int getMaxTransferBytesOnMessageInMemory() { + return maxTransferBytesOnMessageInMemory; + } + + + public void setMaxTransferBytesOnMessageInMemory(int maxTransferBytesOnMessageInMemory) { + this.maxTransferBytesOnMessageInMemory = maxTransferBytesOnMessageInMemory; + } + + + public int getMaxTransferCountOnMessageInMemory() { + return maxTransferCountOnMessageInMemory; + } + + + public void setMaxTransferCountOnMessageInMemory(int maxTransferCountOnMessageInMemory) { + this.maxTransferCountOnMessageInMemory = maxTransferCountOnMessageInMemory; + } + + + public int getMaxTransferBytesOnMessageInDisk() { + return maxTransferBytesOnMessageInDisk; + } + + + public void setMaxTransferBytesOnMessageInDisk(int maxTransferBytesOnMessageInDisk) { + this.maxTransferBytesOnMessageInDisk = maxTransferBytesOnMessageInDisk; + } + + + public int getMaxTransferCountOnMessageInDisk() { + return maxTransferCountOnMessageInDisk; + } + + + public void setMaxTransferCountOnMessageInDisk(int maxTransferCountOnMessageInDisk) { + this.maxTransferCountOnMessageInDisk = maxTransferCountOnMessageInDisk; + } + + + public int getFlushCommitLogLeastPages() { + return flushCommitLogLeastPages; + } + + + public void setFlushCommitLogLeastPages(int flushCommitLogLeastPages) { + this.flushCommitLogLeastPages = flushCommitLogLeastPages; + } + + + public int getFlushConsumeQueueLeastPages() { + return flushConsumeQueueLeastPages; + } + + + public void setFlushConsumeQueueLeastPages(int flushConsumeQueueLeastPages) { + this.flushConsumeQueueLeastPages = flushConsumeQueueLeastPages; + } + + + public int getFlushCommitLogThoroughInterval() { + return flushCommitLogThoroughInterval; + } + + + public void setFlushCommitLogThoroughInterval(int flushCommitLogThoroughInterval) { + this.flushCommitLogThoroughInterval = flushCommitLogThoroughInterval; + } + + + public int getFlushConsumeQueueThoroughInterval() { + return flushConsumeQueueThoroughInterval; + } + + + public void setFlushConsumeQueueThoroughInterval(int flushConsumeQueueThoroughInterval) { + this.flushConsumeQueueThoroughInterval = flushConsumeQueueThoroughInterval; + } + + + public int getDestroyMapedFileIntervalForcibly() { + return destroyMapedFileIntervalForcibly; + } + + + public void setDestroyMapedFileIntervalForcibly(int destroyMapedFileIntervalForcibly) { + this.destroyMapedFileIntervalForcibly = destroyMapedFileIntervalForcibly; + } + + + public int getFileReservedTime() { + return fileReservedTime; + } + + + public void setFileReservedTime(int fileReservedTime) { + this.fileReservedTime = fileReservedTime; + } + + + public int getRedeleteHangedFileInterval() { + return redeleteHangedFileInterval; + } + + + public void setRedeleteHangedFileInterval(int redeleteHangedFileInterval) { + this.redeleteHangedFileInterval = redeleteHangedFileInterval; + } + + + public int getAccessMessageInMemoryMaxRatio() { + return accessMessageInMemoryMaxRatio; + } + + + public void setAccessMessageInMemoryMaxRatio(int accessMessageInMemoryMaxRatio) { + this.accessMessageInMemoryMaxRatio = accessMessageInMemoryMaxRatio; + } + + + public boolean isMessageIndexEnable() { + return messageIndexEnable; + } + + + public void setMessageIndexEnable(boolean messageIndexEnable) { + this.messageIndexEnable = messageIndexEnable; + } + + + public int getMaxHashSlotNum() { + return maxHashSlotNum; + } + + + public void setMaxHashSlotNum(int maxHashSlotNum) { + this.maxHashSlotNum = maxHashSlotNum; + } + + + public int getMaxIndexNum() { + return maxIndexNum; + } + + + public void setMaxIndexNum(int maxIndexNum) { + this.maxIndexNum = maxIndexNum; + } + + + public int getMaxMsgsNumBatch() { + return maxMsgsNumBatch; + } + + + public void setMaxMsgsNumBatch(int maxMsgsNumBatch) { + this.maxMsgsNumBatch = maxMsgsNumBatch; + } + + + public int getHaListenPort() { + return haListenPort; + } + + + public void setHaListenPort(int haListenPort) { + this.haListenPort = haListenPort; + } + + + public int getHaSendHeartbeatInterval() { + return haSendHeartbeatInterval; + } + + + public void setHaSendHeartbeatInterval(int haSendHeartbeatInterval) { + this.haSendHeartbeatInterval = haSendHeartbeatInterval; + } + + + public int getHaHousekeepingInterval() { + return haHousekeepingInterval; + } + + + public void setHaHousekeepingInterval(int haHousekeepingInterval) { + this.haHousekeepingInterval = haHousekeepingInterval; + } + + + public BrokerRole getBrokerRole() { + return brokerRole; + } + + public void setBrokerRole(BrokerRole brokerRole) { + this.brokerRole = brokerRole; + } + + public void setBrokerRole(String brokerRole) { + this.brokerRole = BrokerRole.valueOf(brokerRole); + } + + public int getHaTransferBatchSize() { + return haTransferBatchSize; + } + + + public void setHaTransferBatchSize(int haTransferBatchSize) { + this.haTransferBatchSize = haTransferBatchSize; + } + + + public int getHaSlaveFallbehindMax() { + return haSlaveFallbehindMax; + } + + + public void setHaSlaveFallbehindMax(int haSlaveFallbehindMax) { + this.haSlaveFallbehindMax = haSlaveFallbehindMax; + } + + + public FlushDiskType getFlushDiskType() { + return flushDiskType; + } + + public void setFlushDiskType(FlushDiskType flushDiskType) { + this.flushDiskType = flushDiskType; + } + + public void setFlushDiskType(String type) { + this.flushDiskType = FlushDiskType.valueOf(type); + } + + public int getSyncFlushTimeout() { + return syncFlushTimeout; + } + + + public void setSyncFlushTimeout(int syncFlushTimeout) { + this.syncFlushTimeout = syncFlushTimeout; + } + + + public String getHaMasterAddress() { + return haMasterAddress; + } + + + public void setHaMasterAddress(String haMasterAddress) { + this.haMasterAddress = haMasterAddress; + } + + + public String getMessageDelayLevel() { + return messageDelayLevel; + } + + + public void setMessageDelayLevel(String messageDelayLevel) { + this.messageDelayLevel = messageDelayLevel; + } + + + public long getFlushDelayOffsetInterval() { + return flushDelayOffsetInterval; + } + + + public void setFlushDelayOffsetInterval(long flushDelayOffsetInterval) { + this.flushDelayOffsetInterval = flushDelayOffsetInterval; + } + + + public boolean isCleanFileForciblyEnable() { + return cleanFileForciblyEnable; + } + + + public void setCleanFileForciblyEnable(boolean cleanFileForciblyEnable) { + this.cleanFileForciblyEnable = cleanFileForciblyEnable; + } + + + public boolean isMessageIndexSafe() { + return messageIndexSafe; + } + + + public void setMessageIndexSafe(boolean messageIndexSafe) { + this.messageIndexSafe = messageIndexSafe; + } + + + public boolean isFlushCommitLogTimed() { + return flushCommitLogTimed; + } + + + public void setFlushCommitLogTimed(boolean flushCommitLogTimed) { + this.flushCommitLogTimed = flushCommitLogTimed; + } + + + public String getStorePathRootDir() { + return storePathRootDir; + } + + + public void setStorePathRootDir(String storePathRootDir) { + this.storePathRootDir = storePathRootDir; + } + + + public int getFlushLeastPagesWhenWarmMapedFile() { + return flushLeastPagesWhenWarmMapedFile; + } + + + public void setFlushLeastPagesWhenWarmMapedFile(int flushLeastPagesWhenWarmMapedFile) { + this.flushLeastPagesWhenWarmMapedFile = flushLeastPagesWhenWarmMapedFile; + } + + + public boolean isOffsetCheckInSlave() { + return offsetCheckInSlave; + } + + + public void setOffsetCheckInSlave(boolean offsetCheckInSlave) { + this.offsetCheckInSlave = offsetCheckInSlave; + } + + public int getDefaultQueryMaxNum() { + return defaultQueryMaxNum; + } + + public void setDefaultQueryMaxNum(int defaultQueryMaxNum) { + this.defaultQueryMaxNum = defaultQueryMaxNum; + } + + /** + * Enable transient commitLog store poll only if transientStorePoolEnable is true and the FlushDiskType is ASYNC_FLUSH + * @return true or false + */ + public boolean isTransientStorePoolEnable() { + return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType() + && BrokerRole.SLAVE != getBrokerRole(); + } + + public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) { + this.transientStorePoolEnable = transientStorePoolEnable; + } + + public int getTransientStorePoolSize() { + return transientStorePoolSize; + } + + public void setTransientStorePoolSize(final int transientStorePoolSize) { + this.transientStorePoolSize = transientStorePoolSize; + } + + public int getCommitIntervalCommitLog() { + return commitIntervalCommitLog; + } + + public void setCommitIntervalCommitLog(final int commitIntervalCommitLog) { + this.commitIntervalCommitLog = commitIntervalCommitLog; + } + + public boolean isFastFailIfNoBufferInStorePool() { + return fastFailIfNoBufferInStorePool; + } + + public void setFastFailIfNoBufferInStorePool(final boolean fastFailIfNoBufferInStorePool) { + this.fastFailIfNoBufferInStorePool = fastFailIfNoBufferInStorePool; + } + + public boolean isUseReentrantLockWhenPutMessage() { + return useReentrantLockWhenPutMessage; + } + + public void setUseReentrantLockWhenPutMessage(final boolean useReentrantLockWhenPutMessage) { + this.useReentrantLockWhenPutMessage = useReentrantLockWhenPutMessage; + } + + public int getCommitCommitLogLeastPages() { + return commitCommitLogLeastPages; + } + + public void setCommitCommitLogLeastPages(final int commitCommitLogLeastPages) { + this.commitCommitLogLeastPages = commitCommitLogLeastPages; + } + + public int getCommitCommitLogThoroughInterval() { + return commitCommitLogThoroughInterval; + } + + public void setCommitCommitLogThoroughInterval(final int commitCommitLogThoroughInterval) { + this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/StorePathConfigHelper.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/StorePathConfigHelper.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/StorePathConfigHelper.java new file mode 100644 index 0000000..d3d77c0 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/StorePathConfigHelper.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store.config; + +import java.io.File; + + +public class StorePathConfigHelper { + + public static String getStorePathConsumeQueue(final String rootDir) { + return rootDir + File.separator + "consumequeue"; + } + + + public static String getStorePathIndex(final String rootDir) { + return rootDir + File.separator + "index"; + } + + + public static String getStoreCheckpoint(final String rootDir) { + return rootDir + File.separator + "checkpoint"; + } + + + public static String getAbortFile(final String rootDir) { + return rootDir + File.separator + "abort"; + } + + + public static String getDelayOffsetStorePath(final String rootDir) { + return rootDir + File.separator + "config" + File.separator + "delayOffset.json"; + } + + + public static String getTranStateTableStorePath(final String rootDir) { + return rootDir + File.separator + "transaction" + File.separator + "statetable"; + } + + + public static String getTranRedoLogStorePath(final String rootDir) { + return rootDir + File.separator + "transaction" + File.separator + "redolog"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAConnection.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAConnection.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAConnection.java new file mode 100644 index 0000000..1773059 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAConnection.java @@ -0,0 +1,429 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store.ha; + +import com.alibaba.rocketmq.common.ServiceThread; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.remoting.common.RemotingUtil; +import com.alibaba.rocketmq.store.SelectMappedBufferResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; + + +/** + * @author shijia.wxr + */ +public class HAConnection { + private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + private final HAService haService; + private final SocketChannel socketChannel; + private final String clientAddr; + private WriteSocketService writeSocketService; + private ReadSocketService readSocketService; + + private volatile long slaveRequestOffset = -1; + private volatile long slaveAckOffset = -1; + + + public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException { + this.haService = haService; + this.socketChannel = socketChannel; + this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString(); + this.socketChannel.configureBlocking(false); + this.socketChannel.socket().setSoLinger(false, -1); + this.socketChannel.socket().setTcpNoDelay(true); + this.socketChannel.socket().setReceiveBufferSize(1024 * 64); + this.socketChannel.socket().setSendBufferSize(1024 * 64); + this.writeSocketService = new WriteSocketService(this.socketChannel); + this.readSocketService = new ReadSocketService(this.socketChannel); + this.haService.getConnectionCount().incrementAndGet(); + } + + + public void start() { + this.readSocketService.start(); + this.writeSocketService.start(); + } + + + public void shutdown() { + this.writeSocketService.shutdown(true); + this.readSocketService.shutdown(true); + this.close(); + } + + + public void close() { + if (this.socketChannel != null) { + try { + this.socketChannel.close(); + } catch (IOException e) { + HAConnection.log.error("", e); + } + } + } + + + public SocketChannel getSocketChannel() { + return socketChannel; + } + + /** + + * + * @author shijia.wxr + */ + class ReadSocketService extends ServiceThread { + private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024; + private final Selector selector; + private final SocketChannel socketChannel; + private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); + private int processPostion = 0; + private volatile long lastReadTimestamp = System.currentTimeMillis(); + + + public ReadSocketService(final SocketChannel socketChannel) throws IOException { + this.selector = RemotingUtil.openSelector(); + this.socketChannel = socketChannel; + this.socketChannel.register(this.selector, SelectionKey.OP_READ); + this.thread.setDaemon(true); + } + + + @Override + public void run() { + HAConnection.log.info(this.getServiceName() + " service started"); + + while (!this.isStopped()) { + try { + this.selector.select(1000); + boolean ok = this.processReadEvent(); + if (!ok) { + HAConnection.log.error("processReadEvent error"); + break; + } + + + long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp; + if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) { + log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval); + break; + } + } catch (Exception e) { + HAConnection.log.error(this.getServiceName() + " service has exception.", e); + break; + } + } + + this.makeStop(); + + writeSocketService.makeStop(); + + + haService.removeConnection(HAConnection.this); + + + HAConnection.this.haService.getConnectionCount().decrementAndGet(); + + SelectionKey sk = this.socketChannel.keyFor(this.selector); + if (sk != null) { + sk.cancel(); + } + + try { + this.selector.close(); + this.socketChannel.close(); + } catch (IOException e) { + HAConnection.log.error("", e); + } + + HAConnection.log.info(this.getServiceName() + " service end"); + } + + @Override + public String getServiceName() { + return ReadSocketService.class.getSimpleName(); + } + + private boolean processReadEvent() { + int readSizeZeroTimes = 0; + + if (!this.byteBufferRead.hasRemaining()) { + this.byteBufferRead.flip(); + this.processPostion = 0; + } + + while (this.byteBufferRead.hasRemaining()) { + try { + int readSize = this.socketChannel.read(this.byteBufferRead); + if (readSize > 0) { + readSizeZeroTimes = 0; + this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); + if ((this.byteBufferRead.position() - this.processPostion) >= 8) { + int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8); + long readOffset = this.byteBufferRead.getLong(pos - 8); + this.processPostion = pos; + + + HAConnection.this.slaveAckOffset = readOffset; + if (HAConnection.this.slaveRequestOffset < 0) { + HAConnection.this.slaveRequestOffset = readOffset; + log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset); + } + + + HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset); + } + } else if (readSize == 0) { + if (++readSizeZeroTimes >= 3) { + break; + } + } else { + log.error("read socket[" + HAConnection.this.clientAddr + "] < 0"); + return false; + } + } catch (IOException e) { + log.error("processReadEvent exception", e); + return false; + } + } + + return true; + } + } + + /** + + * + * @author shijia.wxr + */ + class WriteSocketService extends ServiceThread { + private final Selector selector; + private final SocketChannel socketChannel; + + private final int headerSize = 8 + 4; + private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize); + private long nextTransferFromWhere = -1; + private SelectMappedBufferResult selectMappedBufferResult; + private boolean lastWriteOver = true; + private long lastWriteTimestamp = System.currentTimeMillis(); + + + public WriteSocketService(final SocketChannel socketChannel) throws IOException { + this.selector = RemotingUtil.openSelector(); + this.socketChannel = socketChannel; + this.socketChannel.register(this.selector, SelectionKey.OP_WRITE); + this.thread.setDaemon(true); + } + + + @Override + public void run() { + HAConnection.log.info(this.getServiceName() + " service started"); + + while (!this.isStopped()) { + try { + this.selector.select(1000); + + if (-1 == HAConnection.this.slaveRequestOffset) { + Thread.sleep(10); + continue; + } + + + + if (-1 == this.nextTransferFromWhere) { + if (0 == HAConnection.this.slaveRequestOffset) { + long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset(); + masterOffset = + masterOffset + - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig() + .getMapedFileSizeCommitLog()); + + if (masterOffset < 0) { + masterOffset = 0; + } + + this.nextTransferFromWhere = masterOffset; + } else { + this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset; + } + + log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr + + "], and slave request " + HAConnection.this.slaveRequestOffset); + } + + if (this.lastWriteOver) { + + long interval = + HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; + + if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig() + .getHaSendHeartbeatInterval()) { + + // Build Header + this.byteBufferHeader.position(0); + this.byteBufferHeader.limit(headerSize); + this.byteBufferHeader.putLong(this.nextTransferFromWhere); + this.byteBufferHeader.putInt(0); + this.byteBufferHeader.flip(); + + this.lastWriteOver = this.transferData(); + if (!this.lastWriteOver) + continue; + } + } + + else { + this.lastWriteOver = this.transferData(); + if (!this.lastWriteOver) + continue; + } + + SelectMappedBufferResult selectResult = + HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere); + if (selectResult != null) { + int size = selectResult.getSize(); + if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) { + size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize(); + } + + long thisOffset = this.nextTransferFromWhere; + this.nextTransferFromWhere += size; + + selectResult.getByteBuffer().limit(size); + this.selectMappedBufferResult = selectResult; + + // Build Header + this.byteBufferHeader.position(0); + this.byteBufferHeader.limit(headerSize); + this.byteBufferHeader.putLong(thisOffset); + this.byteBufferHeader.putInt(size); + this.byteBufferHeader.flip(); + + this.lastWriteOver = this.transferData(); + } else { + + HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100); + } + } catch (Exception e) { + + HAConnection.log.error(this.getServiceName() + " service has exception.", e); + break; + } + } + + + if (this.selectMappedBufferResult != null) { + this.selectMappedBufferResult.release(); + } + + this.makeStop(); + + readSocketService.makeStop(); + + + haService.removeConnection(HAConnection.this); + + SelectionKey sk = this.socketChannel.keyFor(this.selector); + if (sk != null) { + sk.cancel(); + } + + try { + this.selector.close(); + this.socketChannel.close(); + } catch (IOException e) { + HAConnection.log.error("", e); + } + + HAConnection.log.info(this.getServiceName() + " service end"); + } + + + /** + + */ + private boolean transferData() throws Exception { + int writeSizeZeroTimes = 0; + // Write Header + while (this.byteBufferHeader.hasRemaining()) { + int writeSize = this.socketChannel.write(this.byteBufferHeader); + if (writeSize > 0) { + writeSizeZeroTimes = 0; + this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); + } else if (writeSize == 0) { + if (++writeSizeZeroTimes >= 3) { + break; + } + } else { + throw new Exception("ha master write header error < 0"); + } + } + + if (null == this.selectMappedBufferResult) { + return !this.byteBufferHeader.hasRemaining(); + } + + writeSizeZeroTimes = 0; + + // Write Body + if (!this.byteBufferHeader.hasRemaining()) { + while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) { + int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer()); + if (writeSize > 0) { + writeSizeZeroTimes = 0; + this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); + } else if (writeSize == 0) { + if (++writeSizeZeroTimes >= 3) { + break; + } + } else { + throw new Exception("ha master write body error < 0"); + } + } + } + + boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining(); + + if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) { + this.selectMappedBufferResult.release(); + this.selectMappedBufferResult = null; + } + + return result; + } + + + @Override + public String getServiceName() { + return WriteSocketService.class.getSimpleName(); + } + + + @Override + public void shutdown() { + super.shutdown(); + } + } +}