From commits-return-3553-apmail-rocketmq-commits-archive=rocketmq.apache.org@rocketmq.apache.org Fri Jun 14 15:06:54 2019 Return-Path: X-Original-To: apmail-rocketmq-commits-archive@minotaur.apache.org Delivered-To: apmail-rocketmq-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by minotaur.apache.org (Postfix) with SMTP id D53CD190D3 for ; Fri, 14 Jun 2019 15:06:52 +0000 (UTC) Received: (qmail 2184 invoked by uid 500); 14 Jun 2019 15:06:52 -0000 Delivered-To: apmail-rocketmq-commits-archive@rocketmq.apache.org Received: (qmail 2060 invoked by uid 500); 14 Jun 2019 15:06:52 -0000 Mailing-List: contact commits-help@rocketmq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@rocketmq.apache.org Delivered-To: mailing list commits@rocketmq.apache.org Received: (qmail 2008 invoked by uid 99); 14 Jun 2019 15:06:52 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Jun 2019 15:06:52 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 7255D87AD8; Fri, 14 Jun 2019 15:06:48 +0000 (UTC) Date: Fri, 14 Jun 2019 15:06:49 +0000 To: "commits@rocketmq.apache.org" Subject: [rocketmq-ons] 01/03: Init commit. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: duhengforever@apache.org In-Reply-To: <156052480837.29470.13736054091599834519@gitbox.apache.org> References: <156052480837.29470.13736054091599834519@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: rocketmq-ons X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Rev: 3dbc38a9e4e742c604540b5887453bc0219e2817 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20190614150648.7255D87AD8@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git commit 3dbc38a9e4e742c604540b5887453bc0219e2817 Author: duheng.dh AuthorDate: Wed Jun 5 11:16:34 2019 +0800 Init commit. --- .gitignore | 22 ++ .gitmodules | 0 README.md | 2 + ons-core/.idea/codeStyleSettings.xml | 38 ++ ons-core/.idea/copyright/profiles_settings.xml | 7 + ons-core/.idea/encodings.xml | 11 + ons-core/README.md | 0 ons-core/ons-api/pom.xml | 28 ++ .../java/org/apache/rocketmq/ons/api/Action.java | 26 ++ .../java/org/apache/rocketmq/ons/api/Admin.java | 37 ++ .../org/apache/rocketmq/ons/api/Constants.java | 21 ++ .../apache/rocketmq/ons/api/ConsumeContext.java | 21 ++ .../java/org/apache/rocketmq/ons/api/Consumer.java | 29 ++ .../apache/rocketmq/ons/api/ExpressionType.java | 24 ++ .../java/org/apache/rocketmq/ons/api/MQType.java | 38 ++ .../java/org/apache/rocketmq/ons/api/Message.java | 245 +++++++++++++ .../apache/rocketmq/ons/api/MessageAccessor.java | 41 +++ .../apache/rocketmq/ons/api/MessageListener.java | 23 ++ .../apache/rocketmq/ons/api/MessageSelector.java | 49 +++ .../org/apache/rocketmq/ons/api/ONSFactory.java | 88 +++++ .../org/apache/rocketmq/ons/api/ONSFactoryAPI.java | 40 +++ .../rocketmq/ons/api/OnExceptionContext.java | 56 +++ .../java/org/apache/rocketmq/ons/api/Producer.java | 37 ++ .../apache/rocketmq/ons/api/PropertyKeyConst.java | 102 ++++++ .../rocketmq/ons/api/PropertyValueConst.java | 27 ++ .../org/apache/rocketmq/ons/api/SendCallback.java | 24 ++ .../org/apache/rocketmq/ons/api/SendResult.java | 47 +++ .../rocketmq/ons/api/batch/BatchConsumer.java | 27 ++ .../ons/api/batch/BatchMessageListener.java | 29 ++ .../rocketmq/ons/api/bean/BatchConsumerBean.java | 114 ++++++ .../apache/rocketmq/ons/api/bean/ConsumerBean.java | 157 +++++++++ .../rocketmq/ons/api/bean/OrderConsumerBean.java | 111 ++++++ .../rocketmq/ons/api/bean/OrderProducerBean.java | 82 +++++ .../apache/rocketmq/ons/api/bean/ProducerBean.java | 98 ++++++ .../apache/rocketmq/ons/api/bean/Subscription.java | 93 +++++ .../rocketmq/ons/api/bean/SubscriptionExt.java | 47 +++ .../ons/api/bean/TransactionProducerBean.java | 95 +++++ .../ons/api/exception/ONSClientException.java | 42 +++ .../ons/api/order/ConsumeOrderContext.java | 21 ++ .../ons/api/order/MessageOrderListener.java | 25 ++ .../ons/api/order/MessageQueueSelector.java | 25 ++ .../apache/rocketmq/ons/api/order/OrderAction.java | 26 ++ .../rocketmq/ons/api/order/OrderConsumer.java | 35 ++ .../rocketmq/ons/api/order/OrderProducer.java | 28 ++ .../api/transaction/LocalTransactionChecker.java | 24 ++ .../api/transaction/LocalTransactionExecuter.java | 26 ++ .../ons/api/transaction/TransactionProducer.java | 38 ++ .../ons/api/transaction/TransactionStatus.java | 28 ++ ons-core/ons-auth4client/pom.xml | 37 ++ .../rocketmq/ons/api/impl/authority/AuthUtil.java | 58 ++++ .../ons/api/impl/authority/OnsAuthSigner.java | 90 +++++ .../ons/api/impl/authority/SessionCredentials.java | 214 ++++++++++++ .../ons/api/impl/authority/SigningAlgorithm.java | 24 ++ .../exception/AuthenticationException.java | 69 ++++ .../authority/exception/FlowControlException.java | 44 +++ .../authority/exception/SignatureException.java | 48 +++ .../ons/api/impl/rocketmq/AbstractRPCHook.java | 75 ++++ .../ons/api/impl/rocketmq/ClientRPCHook.java | 57 +++ .../rocketmq/ons/api/impl/rocketmq/ONSChannel.java | 24 ++ ons-core/ons-client/pom.xml | 74 ++++ .../apache/rocketmq/ons/api/impl/MQClientInfo.java | 39 +++ .../rocketmq/ons/api/impl/ONSFactoryImpl.java | 90 +++++ .../ons/api/impl/rocketmq/BatchConsumerImpl.java | 131 +++++++ .../ons/api/impl/rocketmq/ConsumerImpl.java | 133 +++++++ .../apache/rocketmq/ons/api/impl/rocketmq/FAQ.java | 46 +++ .../ons/api/impl/rocketmq/ONSClientAbstract.java | 268 +++++++++++++++ .../ons/api/impl/rocketmq/ONSConsumerAbstract.java | 198 +++++++++++ .../rocketmq/ons/api/impl/rocketmq/ONSUtil.java | 185 ++++++++++ .../ons/api/impl/rocketmq/OnsClientRPCHook.java | 42 +++ .../ons/api/impl/rocketmq/OrderConsumerImpl.java | 114 ++++++ .../ons/api/impl/rocketmq/OrderProducerImpl.java | 149 ++++++++ .../ons/api/impl/rocketmq/ProducerImpl.java | 246 +++++++++++++ .../api/impl/rocketmq/TransactionProducerImpl.java | 155 +++++++++ .../tracehook/OnsClientSendMessageHookImpl.java | 98 ++++++ .../impl/tracehook/OnsConsumeMessageHookImpl.java | 123 +++++++ .../ons/api/impl/util/ClientLoggerUtil.java | 55 +++ .../rocketmq/ons/api/impl/util/MsgConvertUtil.java | 89 +++++ .../rocketmq/ons/api/impl/util/NameAddrUtils.java | 47 +++ .../src/main/resources/ons_client_info.properties | 16 + .../impl/rocketmq/NameServerAutoUpdateTest.java | 95 +++++ .../impl/rocketmq/ONSClientTokenUpdateTest.java | 189 ++++++++++ .../ons/api/impl/rocketmq/ONSUtilTest.java | 59 ++++ ons-core/ons-trace-core/pom.xml | 41 +++ .../ons/open/trace/core/common/OnsTraceBean.java | 145 ++++++++ .../open/trace/core/common/OnsTraceConstants.java | 57 +++ .../open/trace/core/common/OnsTraceContext.java | 168 +++++++++ .../trace/core/common/OnsTraceDataEncoder.java | 162 +++++++++ .../trace/core/common/OnsTraceDispatcherType.java | 25 ++ .../trace/core/common/OnsTraceTransferBean.java | 46 +++ .../ons/open/trace/core/common/OnsTraceType.java | 27 ++ .../open/trace/core/dispatch/AsyncDispatcher.java | 33 ++ .../core/dispatch/impl/AsyncArrayDispatcher.java | 382 +++++++++++++++++++++ .../core/dispatch/impl/TraceProducerFactory.java | 97 ++++++ .../ons/open/trace/core/hook/AbstractRPCHook.java | 72 ++++ .../ons/open/trace/core/utils/MixUtils.java | 111 ++++++ ons-core/pom.xml | 271 +++++++++++++++ pom.xml | 277 +++++++++++++++ style/copyright/Apache.xml | 23 ++ style/copyright/profiles_settings.xml | 64 ++++ style/ons_checkstyle.xml | 137 ++++++++ style/ons_codeStyle.xml | 143 ++++++++ 101 files changed, 7916 insertions(+) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7a1b3a8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +*.iml +.idea/ +target/ +logs/ +.classpath +.project +.settings/ +.amateras +src/main/resources/hibernate.cfg.xml +src/main/resources/hibernate.reveng.xml +/target* +*/bin/* +*.log* +*.versionsBackup +test-output* +runtest.sh +/ons-top/frontend +*dependency-reduced-pom.xml +local +frontend +/ons-top/ons-frontend +.DS_Store/ diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..e69de29 diff --git a/README.md b/README.md new file mode 100644 index 0000000..868fecc --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +## Overview +### Apache RocketMQ lightweight client. diff --git a/ons-core/.idea/codeStyleSettings.xml b/ons-core/.idea/codeStyleSettings.xml new file mode 100644 index 0000000..3319e57 --- /dev/null +++ b/ons-core/.idea/codeStyleSettings.xml @@ -0,0 +1,38 @@ + + + + + + \ No newline at end of file diff --git a/ons-core/.idea/copyright/profiles_settings.xml b/ons-core/.idea/copyright/profiles_settings.xml new file mode 100644 index 0000000..74200de --- /dev/null +++ b/ons-core/.idea/copyright/profiles_settings.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/ons-core/.idea/encodings.xml b/ons-core/.idea/encodings.xml new file mode 100644 index 0000000..3446947 --- /dev/null +++ b/ons-core/.idea/encodings.xml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/ons-core/README.md b/ons-core/README.md new file mode 100644 index 0000000..e69de29 diff --git a/ons-core/ons-api/pom.xml b/ons-core/ons-api/pom.xml new file mode 100644 index 0000000..374e460 --- /dev/null +++ b/ons-core/ons-api/pom.xml @@ -0,0 +1,28 @@ + + + + + org.apache.rocketmq + ons-all + 1.8.1-SNAPSHOT + + 4.0.0 + jar + ons-api + ons-api ${project.version} + diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Action.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Action.java new file mode 100644 index 0000000..2c0a2c9 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Action.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api; + + +public enum Action { + + CommitMessage, + + ReconsumeLater, +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java new file mode 100644 index 0000000..702d9a5 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api; + +import java.util.Properties; + + +public interface Admin { + + boolean isStarted(); + + + boolean isClosed(); + + + void start(); + + + void updateCredential(Properties credentialProperties); + + + void shutdown(); +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Constants.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Constants.java new file mode 100644 index 0000000..a51ba20 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Constants.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api; + +public class Constants { + public static final String TRANSACTION_ID = "__transactionId__"; +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/ConsumeContext.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/ConsumeContext.java new file mode 100644 index 0000000..5973428 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/ConsumeContext.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api; + +public class ConsumeContext { + +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Consumer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Consumer.java new file mode 100644 index 0000000..a592559 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Consumer.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api; + + +public interface Consumer extends Admin { + + void subscribe(final String topic, final String subExpression, final MessageListener listener); + + + void subscribe(final String topic, final MessageSelector selector, final MessageListener listener); + + + void unsubscribe(final String topic); +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/ExpressionType.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/ExpressionType.java new file mode 100644 index 0000000..1980ac6 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/ExpressionType.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api; + +public enum ExpressionType { + + SQL92, + + TAG +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MQType.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MQType.java new file mode 100644 index 0000000..17d4afa --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MQType.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api; + +public enum MQType { + + NOTIFY("NOTIFY"), + + METAQ("METAQ"); + + private String name; + + MQType(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Message.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Message.java new file mode 100644 index 0000000..5d1f999 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Message.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api; + +import java.io.Serializable; +import java.util.Properties; + + +public class Message implements Serializable { + + private static final long serialVersionUID = -1385924226856188094L; + + + Properties systemProperties; + + + private String topic; + + + private Properties userProperties; + + + private byte[] body; + + + public Message() { + this(null, null, "", null); + } + + + public Message(String topic, String tag, String key, byte[] body) { + this.topic = topic; + this.body = body; + + this.putSystemProperties(SystemPropKey.TAG, tag); + this.putSystemProperties(SystemPropKey.KEY, key); + } + + + void putSystemProperties(final String key, final String value) { + if (null == this.systemProperties) { + this.systemProperties = new Properties(); + } + + if (key != null && value != null) { + this.systemProperties.put(key, value); + } + } + + + public Message(String topic, String tags, byte[] body) { + this(topic, tags, "", body); + } + + + public void putUserProperties(final String key, final String value) { + if (null == this.userProperties) { + this.userProperties = new Properties(); + } + + if (key != null && value != null) { + this.userProperties.put(key, value); + } + } + + + public String getUserProperties(final String key) { + if (null != this.userProperties) { + return (String) this.userProperties.get(key); + } + + return null; + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public String getTag() { + return this.getSystemProperties(SystemPropKey.TAG); + } + + + String getSystemProperties(final String key) { + if (null != this.systemProperties) { + return this.systemProperties.getProperty(key); + } + + return null; + } + + + public void setTag(String tag) { + this.putSystemProperties(SystemPropKey.TAG, tag); + } + + + public String getKey() { + return this.getSystemProperties(SystemPropKey.KEY); + } + + + public void setKey(String key) { + this.putSystemProperties(SystemPropKey.KEY, key); + } + + + public String getMsgID() { + return this.getSystemProperties(SystemPropKey.MSGID); + } + + + public void setMsgID(String msgid) { + this.putSystemProperties(SystemPropKey.MSGID, msgid); + } + + Properties getSystemProperties() { + return systemProperties; + } + + void setSystemProperties(Properties systemProperties) { + this.systemProperties = systemProperties; + } + + public Properties getUserProperties() { + return userProperties; + } + + public void setUserProperties(Properties userProperties) { + this.userProperties = userProperties; + } + + public byte[] getBody() { + return body; + } + + public void setBody(byte[] body) { + this.body = body; + } + + + public int getReconsumeTimes() { + String pro = this.getSystemProperties(SystemPropKey.RECONSUMETIMES); + if (pro != null) { + return Integer.parseInt(pro); + } + + return 0; + } + + + public void setReconsumeTimes(final int value) { + putSystemProperties(SystemPropKey.RECONSUMETIMES, String.valueOf(value)); + } + + + public long getBornTimestamp() { + String pro = this.getSystemProperties(SystemPropKey.BORNTIMESTAMP); + if (pro != null) { + return Long.parseLong(pro); + } + + return 0L; + } + + + public void setBornTimestamp(final long value) { + putSystemProperties(SystemPropKey.BORNTIMESTAMP, String.valueOf(value)); + } + + + public String getBornHost() { + String pro = this.getSystemProperties(SystemPropKey.BORNHOST); + return pro == null ? "" : pro; + } + + + public void setBornHost(final String value) { + putSystemProperties(SystemPropKey.BORNHOST, value); + } + + + public long getStartDeliverTime() { + String pro = this.getSystemProperties(SystemPropKey.STARTDELIVERTIME); + if (pro != null) { + return Long.parseLong(pro); + } + + return 0L; + } + + public String getShardingKey() { + String pro = this.getSystemProperties(SystemPropKey.SHARDINGKEY); + return pro == null ? "" : pro; + } + + public void setShardingKey(final String value) { + putSystemProperties(SystemPropKey.SHARDINGKEY, value); + } + + + public void setStartDeliverTime(final long value) { + putSystemProperties(SystemPropKey.STARTDELIVERTIME, String.valueOf(value)); + } + + @Override + public String toString() { + return "Message [topic=" + topic + ", systemProperties=" + systemProperties + ", userProperties=" + userProperties + ", body=" + + (body != null ? body.length : 0) + "]"; + } + + + static public class SystemPropKey { + public static final String TAG = "__TAG"; + public static final String KEY = "__KEY"; + public static final String MSGID = "__MSGID"; + public static final String SHARDINGKEY = "__SHARDINGKEY"; + public static final String RECONSUMETIMES = "__RECONSUMETIMES"; + public static final String BORNTIMESTAMP = "__BORNTIMESTAMP"; + public static final String BORNHOST = "__BORNHOST"; + + public static final String STARTDELIVERTIME = "__STARTDELIVERTIME"; + } +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageAccessor.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageAccessor.java new file mode 100644 index 0000000..61f23b1 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageAccessor.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api; + +import java.util.Properties; + + +public class MessageAccessor { + public static Properties getSystemProperties(final Message msg) { + return msg.systemProperties; + } + + + public static void setSystemProperties(final Message msg, Properties systemProperties) { + msg.systemProperties = systemProperties; + } + + + public static void putSystemProperties(final Message msg, final String key, final String value) { + msg.putSystemProperties(key, value); + } + + + public static String getSystemProperties(final Message msg, final String key) { + return msg.getSystemProperties(key); + } +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageListener.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageListener.java new file mode 100644 index 0000000..22b56c7 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageListener.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api; + + +public interface MessageListener { + + Action consume(final Message message, final ConsumeContext context); +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageSelector.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageSelector.java new file mode 100644 index 0000000..f28bdae --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageSelector.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api; + +public class MessageSelector { + + private ExpressionType type; + + private String subExpression; + + public static MessageSelector bySql(String subExpression) { + return new MessageSelector(ExpressionType.SQL92, subExpression); + } + + public static MessageSelector byTag(String subExpression) { + return new MessageSelector(ExpressionType.TAG, subExpression); + } + + private MessageSelector() { + } + + private MessageSelector(ExpressionType type, String subExpression) { + this.type = type; + this.subExpression = subExpression; + } + + public ExpressionType getType() { + return type; + } + + public String getSubExpression() { + return subExpression; + } +} \ No newline at end of file diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/ONSFactory.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/ONSFactory.java new file mode 100644 index 0000000..eaff503 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/ONSFactory.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api; + +import org.apache.rocketmq.ons.api.batch.BatchConsumer; +import org.apache.rocketmq.ons.api.order.OrderConsumer; +import org.apache.rocketmq.ons.api.order.OrderProducer; +import org.apache.rocketmq.ons.api.transaction.LocalTransactionChecker; +import org.apache.rocketmq.ons.api.transaction.TransactionProducer; +import java.util.Properties; + + +public class ONSFactory { + + + private static ONSFactoryAPI onsFactory = null; + + static { + try { + + Class factoryClass = + ONSFactory.class.getClassLoader().loadClass( + "org.apache.rocketmq.ons.api.impl.ONSFactoryNotifyAndMetaQImpl"); + onsFactory = (ONSFactoryAPI) factoryClass.newInstance(); + } catch (Throwable e) { + try { + Class factoryClass = + ONSFactory.class.getClassLoader().loadClass( + "org.apache.rocketmq.ons.api.impl.ONSFactoryImpl"); + onsFactory = (ONSFactoryAPI) factoryClass.newInstance(); + } catch (Throwable e1) { + e.printStackTrace(); + e1.printStackTrace(); + } + } + } + + + + public static Producer createProducer(final Properties properties) { + return onsFactory.createProducer(properties); + } + + + + public static OrderProducer createOrderProducer(final Properties properties) { + return onsFactory.createOrderProducer(properties); + } + + + + public static TransactionProducer createTransactionProducer(final Properties properties, + final LocalTransactionChecker checker) { + return onsFactory.createTransactionProducer(properties, checker); + } + + + + public static Consumer createConsumer(final Properties properties) { + return onsFactory.createConsumer(properties); + } + + + public static BatchConsumer createBatchConsumer(final Properties properties) { + return onsFactory.createBatchConsumer(properties); + } + + + + public static OrderConsumer createOrderedConsumer(final Properties properties) { + return onsFactory.createOrderedConsumer(properties); + } + +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/ONSFactoryAPI.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/ONSFactoryAPI.java new file mode 100644 index 0000000..b4bdd8a --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/ONSFactoryAPI.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api; + +import org.apache.rocketmq.ons.api.batch.BatchConsumer; +import org.apache.rocketmq.ons.api.order.OrderConsumer; +import org.apache.rocketmq.ons.api.order.OrderProducer; +import org.apache.rocketmq.ons.api.transaction.LocalTransactionChecker; +import org.apache.rocketmq.ons.api.transaction.TransactionProducer; +import java.util.Properties; + +public interface ONSFactoryAPI { + + Producer createProducer(final Properties properties); + + Consumer createConsumer(final Properties properties); + + BatchConsumer createBatchConsumer(final Properties properties); + + OrderProducer createOrderProducer(final Properties properties); + + OrderConsumer createOrderedConsumer(final Properties properties); + + TransactionProducer createTransactionProducer(final Properties properties, + final LocalTransactionChecker checker); +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/OnExceptionContext.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/OnExceptionContext.java new file mode 100644 index 0000000..3eb7620 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/OnExceptionContext.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api; + +import org.apache.rocketmq.ons.api.exception.ONSClientException; + + +public class OnExceptionContext { + + + private String messageId; + + + private String topic; + + + private ONSClientException exception; + + public String getMessageId() { + return messageId; + } + + public void setMessageId(String messageId) { + this.messageId = messageId; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public ONSClientException getException() { + return exception; + } + + public void setException(ONSClientException exception) { + this.exception = exception; + } +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Producer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Producer.java new file mode 100644 index 0000000..e45e613 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Producer.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api; + +import java.util.concurrent.ExecutorService; + +public interface Producer extends Admin { + + + @Override + void start(); + + @Override + void shutdown(); + + SendResult send(final Message message); + + void sendOneway(final Message message); + + void sendAsync(final Message message, final SendCallback sendCallback); + + void setCallbackExecutor(final ExecutorService callbackExecutor); +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PropertyKeyConst.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PropertyKeyConst.java new file mode 100644 index 0000000..d98d49c --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PropertyKeyConst.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api; + +import javax.annotation.Generated; + +@Generated("ons-api") +public class PropertyKeyConst { + + public static final String MessageModel = "MessageModel"; + + /** + * Deprecated, replaced with GROUP_ID + */ + @Deprecated + public static final String ProducerId = "ProducerId"; + + /** + * Deprecated, replaced with GROUP_ID + */ + @Deprecated + public static final String ConsumerId = "ConsumerId"; + + public static final String GROUP_ID = "GROUP_ID"; + + public static final String AccessKey = "AccessKey"; + + public static final String SecretKey = "SecretKey"; + + public static final String SecurityToken = "SecurityToken"; + + public static final String SendMsgTimeoutMillis = "SendMsgTimeoutMillis"; + + public static final String ONSAddr = "ONSAddr"; + + public static final String NAMESRV_ADDR = "NAMESRV_ADDR"; + + public static final String ConsumeThreadNums = "ConsumeThreadNums"; + + public static final String OnsChannel = "OnsChannel"; + + public static final String MQType = "MQType"; + + public static final String isVipChannelEnabled = "isVipChannelEnabled"; + + public static final String SuspendTimeMillis = "suspendTimeMillis"; + + public static final String MaxReconsumeTimes = "maxReconsumeTimes"; + + public static final String ConsumeTimeout = "consumeTimeout"; + + public static final String CheckImmunityTimeInSeconds = "CheckImmunityTimeInSeconds"; + + public static final String PostSubscriptionWhenPull = "PostSubscriptionWhenPull"; + + public static final String ConsumeMessageBatchMaxSize = "ConsumeMessageBatchMaxSize"; + + public static final String MaxCachedMessageAmount = "maxCachedMessageAmount"; + + public static final String MaxCachedMessageSizeInMiB = "maxCachedMessageSizeInMiB"; + + public static final String InstanceName = "InstanceName"; + + public static final String MsgTraceSwitch = "MsgTraceSwitch"; + + public static final String MqttMessageId = "mqttMessageId"; + + public static final String MqttMessage = "mqttMessage"; + + public static final String MqttPublishRetain = "mqttRetain"; + + public static final String MqttPublishDubFlag = "mqttPublishDubFlag"; + + public static final String MqttSecondTopic = "mqttSecondTopic"; + + public static final String MqttClientId = "clientId"; + + public static final String MqttQOS = "qoslevel"; + + public static final String INSTANCE_ID = "INSTANCE_ID"; + + public static final String EXACTLYONCE_DELIVERY = "exactlyOnceDelivery"; + + public static final String EXACTLYONCE_RM_REFRESHINTERVAL = "exactlyOnceRmRefreshInterval"; + + public static final String MAX_BATCH_MESSAGE_COUNT = "maxBatchMessageCount"; + +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PropertyValueConst.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PropertyValueConst.java new file mode 100644 index 0000000..c748447 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PropertyValueConst.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api; + + +public class PropertyValueConst { + + + public static final String BROADCASTING = "BROADCASTING"; + + + public static final String CLUSTERING = "CLUSTERING"; +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/SendCallback.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/SendCallback.java new file mode 100644 index 0000000..ade3f70 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/SendCallback.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api; + +public interface SendCallback { + + void onSuccess(final SendResult sendResult); + + void onException(final OnExceptionContext context); +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/SendResult.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/SendResult.java new file mode 100644 index 0000000..2dd0bb5 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/SendResult.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api; + +public class SendResult { + + private String messageId; + + private String topic; + + + public String getMessageId() { + return messageId; + } + + + public void setMessageId(String messageId) { + this.messageId = messageId; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + @Override + public String toString() { + return "SendResult[topic=" + topic + ", messageId=" + messageId + ']'; + } +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/batch/BatchConsumer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/batch/BatchConsumer.java new file mode 100644 index 0000000..848a139 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/batch/BatchConsumer.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.batch; + +import org.apache.rocketmq.ons.api.Admin; + +public interface BatchConsumer extends Admin { + + void subscribe(final String topic, final String subExpression, final BatchMessageListener listener); + + void unsubscribe(final String topic); +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/batch/BatchMessageListener.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/batch/BatchMessageListener.java new file mode 100644 index 0000000..3716a90 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/batch/BatchMessageListener.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.batch; + +import org.apache.rocketmq.ons.api.Action; +import org.apache.rocketmq.ons.api.ConsumeContext; +import org.apache.rocketmq.ons.api.Message; +import java.util.List; + + +public interface BatchMessageListener { + + Action consume(final List messages, final ConsumeContext context); +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/BatchConsumerBean.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/BatchConsumerBean.java new file mode 100644 index 0000000..1cfedde --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/BatchConsumerBean.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.bean; + +import java.util.Map; +import java.util.Properties; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.batch.BatchConsumer; +import org.apache.rocketmq.ons.api.batch.BatchMessageListener; +import org.apache.rocketmq.ons.api.exception.ONSClientException; + + +public class BatchConsumerBean implements BatchConsumer { + + private Properties properties; + + + private Map subscriptionTable; + + private BatchConsumer batchConsumer; + + @Override + public boolean isStarted() { + return this.batchConsumer.isStarted(); + } + + @Override + public boolean isClosed() { + return this.batchConsumer.isClosed(); + } + + + @Override + public void start() { + if (null == this.properties) { + throw new ONSClientException("properties not set"); + } + + if (null == this.subscriptionTable) { + throw new ONSClientException("subscriptionTable not set"); + } + + this.batchConsumer = ONSFactory.createBatchConsumer(this.properties); + + for (final Map.Entry next : this.subscriptionTable.entrySet()) { + this.subscribe(next.getKey().getTopic(), next.getKey().getExpression(), next.getValue()); + } + + this.batchConsumer.start(); + } + + @Override + public void updateCredential(Properties credentialProperties) { + if (this.batchConsumer != null) { + this.batchConsumer.updateCredential(credentialProperties); + } + } + + + @Override + public void shutdown() { + if (this.batchConsumer != null) { + this.batchConsumer.shutdown(); + } + } + + @Override + public void subscribe(final String topic, final String subExpression, final BatchMessageListener listener) { + if (null == this.batchConsumer) { + throw new ONSClientException("subscribe must be called after BatchConsumerBean started"); + } + this.batchConsumer.subscribe(topic, subExpression, listener); + } + + @Override + public void unsubscribe(final String topic) { + if (null == this.batchConsumer) { + throw new ONSClientException("unsubscribe must be called after BatchConsumerBean started"); + } + this.batchConsumer.unsubscribe(topic); + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(final Properties properties) { + this.properties = properties; + } + + public Map getSubscriptionTable() { + return subscriptionTable; + } + + public void setSubscriptionTable( + final Map subscriptionTable) { + this.subscriptionTable = subscriptionTable; + } +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ConsumerBean.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ConsumerBean.java new file mode 100644 index 0000000..839ad62 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ConsumerBean.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.bean; + +import java.lang.reflect.Method; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import org.apache.rocketmq.ons.api.Consumer; +import org.apache.rocketmq.ons.api.ExpressionType; +import org.apache.rocketmq.ons.api.MessageListener; +import org.apache.rocketmq.ons.api.MessageSelector; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.exception.ONSClientException; + + +public class ConsumerBean implements Consumer { + + private Properties properties; + + + private Map subscriptionTable; + + private Consumer consumer; + + + @Override + public void start() { + if (null == this.properties) { + throw new ONSClientException("properties not set"); + } + + if (null == this.subscriptionTable) { + throw new ONSClientException("subscriptionTable not set"); + } + + this.consumer = ONSFactory.createConsumer(this.properties); + + Iterator> it = this.subscriptionTable.entrySet().iterator(); + while (it.hasNext()) { + Entry next = it.next(); + if ("com.aliyun.openservices.ons.api.impl.notify.ConsumerImpl".equals(this.consumer.getClass().getCanonicalName()) + && (next.getKey() instanceof SubscriptionExt)) { + SubscriptionExt subscription = (SubscriptionExt) next.getKey(); + for (Method method : this.consumer.getClass().getMethods()) { + if ("subscribeNotify".equals(method.getName())) { + try { + method.invoke(consumer, subscription.getTopic(), subscription.getExpression(), + subscription.isPersistence(), next.getValue()); + } catch (Exception e) { + throw new ONSClientException("subscribeNotify invoke exception", e); + } + break; + } + } + + } else { + Subscription subscription = next.getKey(); + if (subscription.getType() == null || ExpressionType.TAG.name().equals(subscription.getType())) { + + this.subscribe(subscription.getTopic(), subscription.getExpression(), next.getValue()); + + } else if (ExpressionType.SQL92.name().equals(subscription.getType())) { + + this.subscribe(subscription.getTopic(), MessageSelector.bySql(subscription.getExpression()), next.getValue()); + } else { + + throw new ONSClientException(String.format("Expression type %s is unknown!", subscription.getType())); + } + } + + } + + this.consumer.start(); + } + + @Override + public void updateCredential(Properties credentialProperties) { + if (this.consumer != null) { + this.consumer.updateCredential(credentialProperties); + } + } + + + @Override + public void shutdown() { + if (this.consumer != null) { + this.consumer.shutdown(); + } + } + + @Override + public void subscribe(String topic, String subExpression, MessageListener listener) { + if (null == this.consumer) { + throw new ONSClientException("subscribe must be called after consumerBean started"); + } + this.consumer.subscribe(topic, subExpression, listener); + } + + @Override + public void subscribe(final String topic, final MessageSelector selector, final MessageListener listener) { + if (null == this.consumer) { + throw new ONSClientException("subscribe must be called after consumerBean started"); + } + this.consumer.subscribe(topic, selector, listener); + } + + @Override + public void unsubscribe(String topic) { + if (null == this.consumer) { + throw new ONSClientException("unsubscribe must be called after consumerBean started"); + } + this.consumer.unsubscribe(topic); + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + + public Map getSubscriptionTable() { + return subscriptionTable; + } + + public void setSubscriptionTable(Map subscriptionTable) { + this.subscriptionTable = subscriptionTable; + } + + @Override + public boolean isStarted() { + return this.consumer.isStarted(); + } + + @Override + public boolean isClosed() { + return this.consumer.isClosed(); + } +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/OrderConsumerBean.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/OrderConsumerBean.java new file mode 100644 index 0000000..e5b6402 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/OrderConsumerBean.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.bean; + +import java.util.Map; +import java.util.Properties; +import org.apache.rocketmq.ons.api.MessageSelector; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.exception.ONSClientException; +import org.apache.rocketmq.ons.api.order.MessageOrderListener; +import org.apache.rocketmq.ons.api.order.OrderConsumer; + +public class OrderConsumerBean implements OrderConsumer { + + private Properties properties; + + private Map subscriptionTable; + + private OrderConsumer orderConsumer; + + @Override + public boolean isStarted() { + return this.orderConsumer.isStarted(); + } + + @Override + public boolean isClosed() { + return this.orderConsumer.isClosed(); + } + + @Override + public void start() { + if (null == this.properties) { + throw new ONSClientException("properties not set"); + } + + if (null == this.subscriptionTable) { + throw new ONSClientException("subscriptionTable not set"); + } + + this.orderConsumer = ONSFactory.createOrderedConsumer(this.properties); + + for (final Map.Entry next : this.subscriptionTable.entrySet()) { + this.subscribe(next.getKey().getTopic(), next.getKey().getExpression(), next.getValue()); + } + + this.orderConsumer.start(); + } + + @Override + public void updateCredential(Properties credentialProperties) { + if (this.orderConsumer != null) { + this.orderConsumer.updateCredential(credentialProperties); + } + } + + @Override + public void shutdown() { + if (this.orderConsumer != null) { + this.orderConsumer.shutdown(); + } + } + + @Override + public void subscribe(final String topic, final String subExpression, final MessageOrderListener listener) { + if (null == this.orderConsumer) { + throw new ONSClientException("subscribe must be called after OrderConsumerBean started"); + } + this.orderConsumer.subscribe(topic, subExpression, listener); + } + + @Override + public void subscribe(String topic, MessageSelector selector, MessageOrderListener listener) { + if (null == this.orderConsumer) { + throw new ONSClientException("subscribe must be called after OrderConsumerBean started"); + } + this.orderConsumer.subscribe(topic, selector, listener); + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(final Properties properties) { + this.properties = properties; + } + + public Map getSubscriptionTable() { + return subscriptionTable; + } + + public void setSubscriptionTable( + final Map subscriptionTable) { + this.subscriptionTable = subscriptionTable; + } +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/OrderProducerBean.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/OrderProducerBean.java new file mode 100644 index 0000000..64cf25b --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/OrderProducerBean.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.bean; + +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.SendResult; +import org.apache.rocketmq.ons.api.exception.ONSClientException; +import org.apache.rocketmq.ons.api.order.OrderProducer; +import java.util.Properties; + + +public class OrderProducerBean implements OrderProducer { + + private Properties properties; + + private OrderProducer orderProducer; + + + @Override + public void start() { + if (null == this.properties) { + throw new ONSClientException("properties not set"); + } + + this.orderProducer = ONSFactory.createOrderProducer(this.properties); + this.orderProducer.start(); + } + + @Override + public void updateCredential(Properties credentialProperties) { + if (this.orderProducer != null) { + this.orderProducer.updateCredential(credentialProperties); + } + } + + + @Override + public void shutdown() { + if (this.orderProducer != null) { + this.orderProducer.shutdown(); + } + } + + @Override + public boolean isStarted() { + return this.orderProducer.isStarted(); + } + + @Override + public boolean isClosed() { + return this.orderProducer.isClosed(); + } + + @Override + public SendResult send(final Message message, final String shardingKey) { + return this.orderProducer.send(message, shardingKey); + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(final Properties properties) { + this.properties = properties; + } +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java new file mode 100644 index 0000000..2094158 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.bean; + +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.Producer; +import org.apache.rocketmq.ons.api.SendCallback; +import org.apache.rocketmq.ons.api.SendResult; +import org.apache.rocketmq.ons.api.exception.ONSClientException; + + +public class ProducerBean implements Producer { + + private Properties properties; + private Producer producer; + + @Override + public void start() { + if (null == this.properties) { + throw new ONSClientException("properties not set"); + } + + this.producer = ONSFactory.createProducer(this.properties); + this.producer.start(); + } + + @Override + public void updateCredential(Properties credentialProperties) { + if (this.producer != null) { + this.producer.updateCredential(credentialProperties); + } + } + + @Override + public void shutdown() { + if (this.producer != null) { + this.producer.shutdown(); + } + } + + + @Override + public SendResult send(Message message) { + return this.producer.send(message); + } + + + @Override + public void sendOneway(Message message) { + this.producer.sendOneway(message); + } + + @Override + public void sendAsync(Message message, SendCallback sendCallback) { + this.producer.sendAsync(message, sendCallback); + } + + @Override + public void setCallbackExecutor(final ExecutorService callbackExecutor) { + this.producer.setCallbackExecutor(callbackExecutor); + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + + @Override + public boolean isStarted() { + return this.producer.isStarted(); + } + + @Override + public boolean isClosed() { + return this.producer.isClosed(); + } +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/Subscription.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/Subscription.java new file mode 100644 index 0000000..1840d4d --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/Subscription.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.bean; + +import org.apache.rocketmq.ons.api.ExpressionType; + +public class Subscription { + private String topic; + private String expression; + + /** + * TAG or SQL92 + *
if null, equals to TAG + * + * @see ExpressionType#TAG + * @see ExpressionType#SQL92 + */ + private String type; + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getExpression() { + return expression; + } + + public void setExpression(String expression) { + this.expression = expression; + } + + public String getType() { + return type; + } + + public void setType(final String type) { + this.type = type; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((topic == null) ? 0 : topic.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Subscription other = (Subscription) obj; + if (topic == null) { + if (other.topic != null) { + return false; + } + } else if (!topic.equals(other.topic)) { + return false; + } + return true; + } + + @Override + public String toString() { + return "Subscription [topic=" + topic + ", expression=" + expression + ", type=" + type + "]"; + } +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/SubscriptionExt.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/SubscriptionExt.java new file mode 100644 index 0000000..7413f90 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/SubscriptionExt.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.bean; + + +public class SubscriptionExt extends Subscription { + private boolean persistence = true; + + public boolean isPersistence() { + return persistence; + } + + public void setPersistence(boolean persistence) { + this.persistence = persistence; + } + + @Override + public int hashCode() { + return super.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } + + @Override + public String toString() { + return "Subscription [topic=" + super.getTopic() + ", expression=" + super.getExpression() + + ", persistence=" + persistence + "]"; + } +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java new file mode 100644 index 0000000..cb55e14 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.bean; + +import java.util.Properties; +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.SendResult; +import org.apache.rocketmq.ons.api.exception.ONSClientException; +import org.apache.rocketmq.ons.api.transaction.LocalTransactionChecker; +import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecuter; +import org.apache.rocketmq.ons.api.transaction.TransactionProducer; + + +public class TransactionProducerBean implements TransactionProducer { + + private Properties properties; + + + private LocalTransactionChecker localTransactionChecker; + + private TransactionProducer transactionProducer; + + + @Override + public void start() { + if (null == this.properties) { + throw new ONSClientException("properties not set"); + } + + this.transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker); + this.transactionProducer.start(); + } + + @Override + public void updateCredential(Properties credentialProperties) { + if (this.transactionProducer != null) { + this.transactionProducer.updateCredential(credentialProperties); + } + } + + + @Override + public void shutdown() { + if (this.transactionProducer != null) { + this.transactionProducer.shutdown(); + } + } + + @Override + public SendResult send(Message message, LocalTransactionExecuter executer, Object arg) { + return this.transactionProducer.send(message, executer, arg); + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + + public LocalTransactionChecker getLocalTransactionChecker() { + return localTransactionChecker; + } + + public void setLocalTransactionChecker(LocalTransactionChecker localTransactionChecker) { + this.localTransactionChecker = localTransactionChecker; + } + + @Override + public boolean isStarted() { + return this.transactionProducer.isStarted(); + } + + @Override + public boolean isClosed() { + return this.transactionProducer.isClosed(); + } +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/exception/ONSClientException.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/exception/ONSClientException.java new file mode 100644 index 0000000..a761e3b --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/exception/ONSClientException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.exception; + + +public class ONSClientException extends RuntimeException { + private static final long serialVersionUID = 5755356574640041094L; + + + public ONSClientException() { + } + + + public ONSClientException(String message) { + super(message); + } + + + public ONSClientException(Throwable cause) { + super(cause); + } + + + public ONSClientException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/ConsumeOrderContext.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/ConsumeOrderContext.java new file mode 100644 index 0000000..f7102c3 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/ConsumeOrderContext.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api.order; + +public class ConsumeOrderContext { + +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/MessageOrderListener.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/MessageOrderListener.java new file mode 100644 index 0000000..20a4003 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/MessageOrderListener.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.order; + +import org.apache.rocketmq.ons.api.Message; + +public interface MessageOrderListener { + + OrderAction consume(final Message message, final ConsumeOrderContext context); +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/MessageQueueSelector.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/MessageQueueSelector.java new file mode 100644 index 0000000..bee8d64 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/MessageQueueSelector.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.order; + +import org.apache.rocketmq.ons.api.Message; + +public interface MessageQueueSelector { + + int select(final int queueTotal, final Message msg, final Object arg); +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderAction.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderAction.java new file mode 100644 index 0000000..e2dc190 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderAction.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.order; + + +public enum OrderAction { + + Success, + + Suspend, +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderConsumer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderConsumer.java new file mode 100644 index 0000000..4e73c1b --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderConsumer.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.order; + +import org.apache.rocketmq.ons.api.Admin; +import org.apache.rocketmq.ons.api.MessageSelector; + + +public interface OrderConsumer extends Admin { + + @Override + void start(); + + @Override + void shutdown(); + + void subscribe(final String topic, final String subExpression, final MessageOrderListener listener); + + void subscribe(final String topic, final MessageSelector selector, final MessageOrderListener listener); +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderProducer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderProducer.java new file mode 100644 index 0000000..4e9a0bb --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderProducer.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.order; + +import org.apache.rocketmq.ons.api.Admin; +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.SendResult; + + +public interface OrderProducer extends Admin { + + SendResult send(final Message message, final String shardingKey); +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java new file mode 100644 index 0000000..eb46593 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api.transaction; + +import org.apache.rocketmq.ons.api.Message; + +public interface LocalTransactionChecker { + + TransactionStatus check(final Message msg); +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecuter.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecuter.java new file mode 100644 index 0000000..f3fe785 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecuter.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.transaction; + +import org.apache.rocketmq.ons.api.Message; + + +public interface LocalTransactionExecuter { + + TransactionStatus execute(final Message msg, final Object arg); +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/TransactionProducer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/TransactionProducer.java new file mode 100644 index 0000000..c139db1 --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/TransactionProducer.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.transaction; + +import org.apache.rocketmq.ons.api.Admin; +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.SendResult; + + +public interface TransactionProducer extends Admin { + + @Override + void start(); + + + @Override + void shutdown(); + + + SendResult send(final Message message, + final LocalTransactionExecuter executer, + final Object arg); +} diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/TransactionStatus.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/TransactionStatus.java new file mode 100644 index 0000000..c98c9ef --- /dev/null +++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/TransactionStatus.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.transaction; + + +public enum TransactionStatus { + + CommitTransaction, + + RollbackTransaction, + + Unknow +} diff --git a/ons-core/ons-auth4client/pom.xml b/ons-core/ons-auth4client/pom.xml new file mode 100644 index 0000000..643f7c7 --- /dev/null +++ b/ons-core/ons-auth4client/pom.xml @@ -0,0 +1,37 @@ + + + + + ons-all + org.apache.rocketmq + 1.8.1-SNAPSHOT + + 4.0.0 + ons-auth4client + ons-auth4client ${project.version} + + + org.apache.rocketmq + rocketmq-client + + + commons-codec + commons-codec + + + diff --git a/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/AuthUtil.java b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/AuthUtil.java new file mode 100644 index 0000000..c8dd4f9 --- /dev/null +++ b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/AuthUtil.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api.impl.authority; + +import java.util.Map; +import java.util.SortedMap; + +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + + +public class AuthUtil { + public static byte[] combineRequestContent(RemotingCommand request, SortedMap fieldsMap) { + try { + StringBuilder sb = new StringBuilder(""); + for (Map.Entry entry : fieldsMap.entrySet()) { + if (!SessionCredentials.Signature.equals(entry.getKey())) { + sb.append(entry.getValue()); + } + } + + return AuthUtil.combineBytes(sb.toString().getBytes(SessionCredentials.CHARSET), request.getBody()); + } catch (Exception e) { + throw new RuntimeException("incompatible exception.", e); + } + } + + public static byte[] combineBytes(byte[] b1, byte[] b2) { + int size = (null != b1 ? b1.length : 0) + (null != b2 ? b2.length : 0); + byte[] total = new byte[size]; + if (null != b1) { + System.arraycopy(b1, 0, total, 0, b1.length); + } + if (null != b2) { + System.arraycopy(b2, 0, total, b1.length, b2.length); + } + return total; + } + + + public static String calSignature(byte[] data, String secretKey) { + String signature = OnsAuthSigner.calSignature(data, secretKey); + return signature; + } +} diff --git a/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/OnsAuthSigner.java b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/OnsAuthSigner.java new file mode 100644 index 0000000..b948779 --- /dev/null +++ b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/OnsAuthSigner.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.authority; + +import java.nio.charset.Charset; +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import org.apache.commons.codec.binary.Base64; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.ons.api.impl.authority.exception.AuthenticationException; + +public class OnsAuthSigner { + private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_AUTHORIZE_LOGGER_NAME); + public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8"); + public static final SigningAlgorithm DEFAULT_ALGORITHM = SigningAlgorithm.HmacSHA1; + private static final int CAL_SIGNATURE_FAILED = 10015; + private static final String CAL_SIGNATURE_FAILED_MSG = "[%s:signature-failed] unable to calculate a request signature. error=%s"; + + public static String calSignature(String data, String key) throws AuthenticationException { + return calSignature(data, key, DEFAULT_ALGORITHM, DEFAULT_CHARSET); + } + + public static String calSignature(String data, String key, SigningAlgorithm algorithm, + Charset charset) throws AuthenticationException { + return signAndBase64Encode(data, key, algorithm, charset); + } + + private static String signAndBase64Encode(String data, String key, SigningAlgorithm algorithm, Charset charset) + throws AuthenticationException { + try { + byte[] signature = sign(data.getBytes(charset), key.getBytes(charset), algorithm); + return new String(Base64.encodeBase64(signature), DEFAULT_CHARSET); + } catch (Exception e) { + String message = String.format(CAL_SIGNATURE_FAILED_MSG, CAL_SIGNATURE_FAILED, e.getMessage()); + LOGGER.error(message, e); + throw new AuthenticationException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, message, e); + } + } + + private static byte[] sign(byte[] data, byte[] key, SigningAlgorithm algorithm) throws AuthenticationException { + try { + Mac mac = Mac.getInstance(algorithm.toString()); + mac.init(new SecretKeySpec(key, algorithm.toString())); + return mac.doFinal(data); + } catch (Exception e) { + String message = String.format(CAL_SIGNATURE_FAILED_MSG, CAL_SIGNATURE_FAILED, e.getMessage()); + LOGGER.error(message, e); + throw new AuthenticationException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, message, e); + } + } + + public static String calSignature(byte[] data, String key) throws AuthenticationException { + return calSignature(data, key, DEFAULT_ALGORITHM, DEFAULT_CHARSET); + } + + public static String calSignature(byte[] data, String key, SigningAlgorithm algorithm, + Charset charset) throws AuthenticationException { + return signAndBase64Encode(data, key, algorithm, charset); + } + + private static String signAndBase64Encode(byte[] data, String key, SigningAlgorithm algorithm, Charset charset) + throws AuthenticationException { + try { + byte[] signature = sign(data, key.getBytes(charset), algorithm); + return new String(Base64.encodeBase64(signature), DEFAULT_CHARSET); + } catch (Exception e) { + String message = String.format(CAL_SIGNATURE_FAILED_MSG, CAL_SIGNATURE_FAILED, e.getMessage()); + LOGGER.error(message, e); + throw new AuthenticationException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, message, e); + } + } + +} diff --git a/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/SessionCredentials.java b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/SessionCredentials.java new file mode 100644 index 0000000..fdbac11 --- /dev/null +++ b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/SessionCredentials.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.authority; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.Properties; + +import javax.annotation.Generated; + +import org.apache.rocketmq.common.MixAll; + +import org.apache.rocketmq.ons.api.impl.rocketmq.ONSChannel; + +@Generated("ons-auth4client") +public class SessionCredentials { + public static final Charset CHARSET = Charset.forName("UTF-8"); + public static final String AccessKey = "AccessKey"; + public static final String SecretKey = "SecretKey"; + public static final String Signature = "Signature"; + public static final String SecurityToken = "SecurityToken"; + public static final String SignatureMethod = "SignatureMethod"; + public static final String ONSChannelKey = "OnsChannel"; + + public static final String KeyFile = System.getProperty("rocketmq.client.keyFile", + System.getProperty("user.home") + File.separator + "onskey"); + + private String accessKey; + private String secretKey; + private String securityToken; + private String signature; + private String signatureMethod; + private ONSChannel onsChannel = ONSChannel.ALIYUN; + + public SessionCredentials() { + String keyContent = null; + try { + keyContent = MixAll.file2String(KeyFile); + } catch (IOException ignore) { + } + if (keyContent != null) { + Properties prop = MixAll.string2Properties(keyContent); + if (prop != null) { + this.updateContent(prop); + } + } + } + + public void updateContent(Properties prop) { + { + String value = prop.getProperty(AccessKey); + if (value != null) { + this.accessKey = value.trim(); + } + } + { + String value = prop.getProperty(SecretKey); + if (value != null) { + this.secretKey = value.trim(); + } + } + { + Object value = prop.get(ONSChannelKey); + if (value != null) { + this.onsChannel = ONSChannel.valueOf(value.toString()); + } + } + + { + String value = prop.getProperty(SecurityToken); + if (value != null) { + this.securityToken = value.trim(); + } + } + } + + public String getAccessKey() { + return accessKey; + } + + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; + } + + public String getSecretKey() { + return secretKey; + } + + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + + public String getSignature() { + return signature; + } + + public void setSignature(String signature) { + this.signature = signature; + } + + public String getSecurityToken() { + return securityToken; + } + + public void setSecurityToken(final String securityToken) { + this.securityToken = securityToken; + } + + public String getSignatureMethod() { + return signatureMethod; + } + + public void setSignatureMethod(String signatureMethod) { + this.signatureMethod = signatureMethod; + } + + public ONSChannel getOnsChannel() { + return onsChannel; + } + + public void setOnsChannel(ONSChannel onsChannel) { + this.onsChannel = onsChannel; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((accessKey == null) ? 0 : accessKey.hashCode()); + result = prime * result + ((secretKey == null) ? 0 : secretKey.hashCode()); + result = prime * result + ((signature == null) ? 0 : signature.hashCode()); + result = prime * result + ((signatureMethod == null) ? 0 : signatureMethod.hashCode()); + result = prime * result + ((onsChannel == null) ? 0 : onsChannel.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + + SessionCredentials other = (SessionCredentials) obj; + if (accessKey == null) { + if (other.accessKey != null) { + return false; + } + } else if (!accessKey.equals(other.accessKey)) { + return false; + } + + if (secretKey == null) { + if (other.secretKey != null) { + return false; + } + } else if (!secretKey.equals(other.secretKey)) { + return false; + } + + if (signature == null) { + if (other.signature != null) { + return false; + } + } else if (!signature.equals(other.signature)) { + return false; + } + + if (signatureMethod == null) { + if (other.signatureMethod != null) { + return false; + } + } else if (!signatureMethod.equals(other.signatureMethod)) { + return false; + } + + if (onsChannel == null) { + if (other.onsChannel != null) { + return false; + } + } else if (!onsChannel.equals(other.onsChannel)) { + return false; + } + + return true; + } + + @Override + public String toString() { + return "SessionCredentials [accessKey=" + accessKey + ", secretKey=" + secretKey + ", signature=" + + signature + ", signatureMethod=" + signatureMethod + ", onsChannel=" + onsChannel + "]"; + } +} \ No newline at end of file diff --git a/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/SigningAlgorithm.java b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/SigningAlgorithm.java new file mode 100644 index 0000000..663b887 --- /dev/null +++ b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/SigningAlgorithm.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.authority; + +public enum SigningAlgorithm { + HmacSHA1, + HmacSHA256, + HmacMD5; +} diff --git a/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/exception/AuthenticationException.java b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/exception/AuthenticationException.java new file mode 100644 index 0000000..4188a98 --- /dev/null +++ b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/exception/AuthenticationException.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.authority.exception; + +public class AuthenticationException extends RuntimeException { + private static final long serialVersionUID = 1L; + + private String status; + private int code; + + + public AuthenticationException(String status, int code) { + super(); + this.status = status; + this.code = code; + } + + + public AuthenticationException(String status, int code, String message) { + super(message); + this.status = status; + this.code = code; + } + + + public AuthenticationException(String status, int code, Throwable throwable) { + super(throwable); + this.status = status; + this.code = code; + } + + + public AuthenticationException(String status, int code, String message, Throwable throwable) { + super(message, throwable); + this.status = status; + this.code = code; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } +} diff --git a/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/exception/FlowControlException.java b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/exception/FlowControlException.java new file mode 100644 index 0000000..17d872c --- /dev/null +++ b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/exception/FlowControlException.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api.impl.authority.exception; + +public class FlowControlException extends RuntimeException { + + private static final long serialVersionUID = -3662598055526208602L; + private final int code; + + public FlowControlException(int code) { + super(); + this.code = code; + } + + public FlowControlException(int code, String message, Throwable cause) { + super(message, cause); + this.code = code; + } + + public FlowControlException(int code, String message) { + super(message); + this.code = code; + } + + public FlowControlException(int code, Throwable cause) { + super(cause); + this.code = code; + } + +} diff --git a/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/exception/SignatureException.java b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/exception/SignatureException.java new file mode 100644 index 0000000..4269391 --- /dev/null +++ b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/exception/SignatureException.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.authority.exception; + +public class SignatureException extends RuntimeException { + + private static final long serialVersionUID = -3662598055526208602L; + private final int code; + + + public SignatureException(int code) { + super(); + this.code = code; + } + + + public SignatureException(int code, String message, Throwable cause) { + super(message, cause); + this.code = code; + } + + + public SignatureException(int code, String message) { + super(message); + this.code = code; + } + + + public SignatureException(int code, Throwable cause) { + super(cause); + this.code = code; + } +} diff --git a/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/AbstractRPCHook.java b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/AbstractRPCHook.java new file mode 100644 index 0000000..102b24b --- /dev/null +++ b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/AbstractRPCHook.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.rocketmq; + +import java.lang.reflect.Field; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +import static org.apache.rocketmq.ons.api.impl.authority.SessionCredentials.AccessKey; +import static org.apache.rocketmq.ons.api.impl.authority.SessionCredentials.ONSChannelKey; +import static org.apache.rocketmq.ons.api.impl.authority.SessionCredentials.SecurityToken; + +public abstract class AbstractRPCHook implements RPCHook { + protected ConcurrentHashMap, Field[]> fieldCache = + new ConcurrentHashMap, Field[]>(); + + + protected SortedMap parseRequestContent(RemotingCommand request, String ak, String securityToken, String onsChannel) { + CommandCustomHeader header = request.readCustomHeader(); + // sort property + SortedMap map = new TreeMap(); + map.put(AccessKey, ak); + map.put(ONSChannelKey, onsChannel); + if (securityToken != null) { + map.put(SecurityToken, securityToken); + } + try { + // add header properties + if (null != header) { + Field[] fields = fieldCache.get(header.getClass()); + if (null == fields) { + fields = header.getClass().getDeclaredFields(); + for (Field field : fields) { + field.setAccessible(true); + } + Field[] tmp = fieldCache.putIfAbsent(header.getClass(), fields); + if (null != tmp) { + fields = tmp; + } + } + + for (Field field : fields) { + Object value = field.get(header); + if (null != value && !field.isSynthetic()) { + map.put(field.getName(), value.toString()); + } + } + } + return map; + } catch (Exception e) { + throw new RuntimeException("incompatible exception.", e); + } + } + +} diff --git a/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ClientRPCHook.java b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ClientRPCHook.java new file mode 100644 index 0000000..72e36b9 --- /dev/null +++ b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ClientRPCHook.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.rocketmq; + +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.ons.api.impl.authority.AuthUtil; +import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials; + +import static org.apache.rocketmq.ons.api.impl.authority.SessionCredentials.AccessKey; +import static org.apache.rocketmq.ons.api.impl.authority.SessionCredentials.ONSChannelKey; +import static org.apache.rocketmq.ons.api.impl.authority.SessionCredentials.SecurityToken; +import static org.apache.rocketmq.ons.api.impl.authority.SessionCredentials.Signature; + +public class ClientRPCHook extends AbstractRPCHook { + private SessionCredentials sessionCredentials; + + public ClientRPCHook(SessionCredentials sessionCredentials) { + this.sessionCredentials = sessionCredentials; + } + + @Override + public void doBeforeRequest(String remoteAddr, RemotingCommand request) { + byte[] total = AuthUtil.combineRequestContent(request, + parseRequestContent(request, sessionCredentials.getAccessKey(), + sessionCredentials.getSecurityToken(), sessionCredentials.getOnsChannel().name())); + String signature = AuthUtil.calSignature(total, sessionCredentials.getSecretKey()); + request.addExtField(Signature, signature); + request.addExtField(AccessKey, sessionCredentials.getAccessKey()); + request.addExtField(ONSChannelKey, sessionCredentials.getOnsChannel().name()); + + if (sessionCredentials.getSecurityToken() != null) { + request.addExtField(SecurityToken, sessionCredentials.getSecurityToken()); + } + } + + + @Override + public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { + + } + +} diff --git a/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSChannel.java b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSChannel.java new file mode 100644 index 0000000..93fdaf1 --- /dev/null +++ b/ons-core/ons-auth4client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSChannel.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.rocketmq; + +public enum ONSChannel { + CLOUD, + ALIYUN, + ALL +} diff --git a/ons-core/ons-client/pom.xml b/ons-core/ons-client/pom.xml new file mode 100644 index 0000000..7cc5df2 --- /dev/null +++ b/ons-core/ons-client/pom.xml @@ -0,0 +1,74 @@ + + + + + org.apache.rocketmq + ons-all + 1.8.1-SNAPSHOT + + 4.0.0 + jar + ons-client + ons-client ${project.version} + + + org.apache.rocketmq + rocketmq-client + ${rocketmq.version} + + + commons-codec + commons-codec + + + ${project.groupId} + ons-api + ${project.version} + + + ${project.groupId} + ons-trace-core + ${project.version} + + + ${project.groupId} + ons-auth4client + ${project.version} + + + org.slf4j + slf4j-api + + + junit + junit + 4.11 + test + + + org.assertj + assertj-core + test + + + org.mockito + mockito-core + test + + + diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/MQClientInfo.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/MQClientInfo.java new file mode 100644 index 0000000..96ce1ca --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/MQClientInfo.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl; + +import java.io.InputStream; +import java.util.Properties; +import org.apache.rocketmq.common.MQVersion; + +public class MQClientInfo { + + public static int versionCode = MQVersion.CURRENT_VERSION; + + static { + try { + InputStream stream = MQClientInfo.class.getClassLoader().getResourceAsStream("ons_client_info.properties"); + Properties properties = new Properties(); + properties.load(stream); + String pkgVersion = String.valueOf(properties.get("version")); + versionCode = Integer.MAX_VALUE - Integer.valueOf(pkgVersion.replaceAll("[^0-9]", "")); + } catch (Exception ignore) { + } + } + +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/ONSFactoryImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/ONSFactoryImpl.java new file mode 100644 index 0000000..4fca00e --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/ONSFactoryImpl.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl; + +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.common.message.MessageExt; +import java.util.Properties; +import org.apache.rocketmq.ons.api.Constants; +import org.apache.rocketmq.ons.api.Consumer; +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.ONSFactoryAPI; +import org.apache.rocketmq.ons.api.Producer; +import org.apache.rocketmq.ons.api.batch.BatchConsumer; +import org.apache.rocketmq.ons.api.impl.rocketmq.BatchConsumerImpl; +import org.apache.rocketmq.ons.api.impl.rocketmq.ConsumerImpl; +import org.apache.rocketmq.ons.api.impl.rocketmq.ONSUtil; +import org.apache.rocketmq.ons.api.impl.rocketmq.OrderConsumerImpl; +import org.apache.rocketmq.ons.api.impl.rocketmq.OrderProducerImpl; +import org.apache.rocketmq.ons.api.impl.rocketmq.ProducerImpl; +import org.apache.rocketmq.ons.api.impl.rocketmq.TransactionProducerImpl; +import org.apache.rocketmq.ons.api.order.OrderConsumer; +import org.apache.rocketmq.ons.api.order.OrderProducer; +import org.apache.rocketmq.ons.api.transaction.LocalTransactionChecker; +import org.apache.rocketmq.ons.api.transaction.TransactionProducer; +import org.apache.rocketmq.ons.api.transaction.TransactionStatus; + +public class ONSFactoryImpl implements ONSFactoryAPI { + @Override + public Producer createProducer(final Properties properties) { + return new ProducerImpl(ONSUtil.extractProperties(properties)); + } + + + @Override + public Consumer createConsumer(final Properties properties) { + return new ConsumerImpl(ONSUtil.extractProperties(properties)); + } + + @Override + public BatchConsumer createBatchConsumer(final Properties properties) { + return new BatchConsumerImpl(ONSUtil.extractProperties(properties)); + } + + @Override + public OrderProducer createOrderProducer(final Properties properties) { + return new OrderProducerImpl(ONSUtil.extractProperties(properties)); + } + + + @Override + public OrderConsumer createOrderedConsumer(final Properties properties) { + return new OrderConsumerImpl(ONSUtil.extractProperties(properties)); + } + + @Override + public TransactionProducer createTransactionProducer(Properties properties, + final LocalTransactionChecker checker) { + return new TransactionProducerImpl(ONSUtil.extractProperties(properties), new TransactionCheckListener() { + @Override + public LocalTransactionState checkLocalTransactionState(MessageExt msg) { + String msgId = msg.getProperty(Constants.TRANSACTION_ID); + Message message = ONSUtil.msgConvert(msg); + message.setMsgID(msgId); + TransactionStatus check = checker.check(message); + if (TransactionStatus.CommitTransaction == check) { + return LocalTransactionState.COMMIT_MESSAGE; + } else if (TransactionStatus.RollbackTransaction == check) { + return LocalTransactionState.ROLLBACK_MESSAGE; + } + return LocalTransactionState.UNKNOW; + } + }); + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/BatchConsumerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/BatchConsumerImpl.java new file mode 100644 index 0000000..592a40c --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/BatchConsumerImpl.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.rocketmq; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import javax.annotation.Generated; + +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; + +import org.apache.rocketmq.ons.api.Action; +import org.apache.rocketmq.ons.api.Constants; +import org.apache.rocketmq.ons.api.ConsumeContext; +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.PropertyValueConst; +import org.apache.rocketmq.ons.api.batch.BatchConsumer; +import org.apache.rocketmq.ons.api.batch.BatchMessageListener; +import org.apache.rocketmq.ons.api.exception.ONSClientException; + +@Generated("ons-client") +public class BatchConsumerImpl extends ONSConsumerAbstract implements BatchConsumer { + private final static int MAX_BATCH_SIZE = 32; + private final static int MIN_BATCH_SIZE = 1; + private final ConcurrentHashMap subscribeTable = new ConcurrentHashMap(); + + public BatchConsumerImpl(final Properties properties) { + super(properties); + + boolean postSubscriptionWhenPull = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.PostSubscriptionWhenPull, "false")); + this.defaultMQPushConsumer.setPostSubscriptionWhenPull(postSubscriptionWhenPull); + + String messageModel = properties.getProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); + this.defaultMQPushConsumer.setMessageModel(MessageModel.valueOf(messageModel)); + + String consumeBatchSize = properties.getProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize); + if (!UtilAll.isBlank(consumeBatchSize)) { + int batchSize = Math.min(MAX_BATCH_SIZE, Integer.valueOf(consumeBatchSize)); + batchSize = Math.max(MIN_BATCH_SIZE, batchSize); + this.defaultMQPushConsumer.setConsumeMessageBatchMaxSize(batchSize); + } + } + + @Override + public void start() { + this.defaultMQPushConsumer.registerMessageListener(new BatchMessageListenerImpl()); + super.start(); + } + + @Override + public void subscribe(String topic, String subExpression, BatchMessageListener listener) { + if (null == topic) { + throw new ONSClientException("topic is null"); + } + + if (null == listener) { + throw new ONSClientException("listener is null"); + } + this.subscribeTable.put(topic, listener); + super.subscribe(topic, subExpression); + } + + @Override + public void unsubscribe(String topic) { + if (null != topic) { + this.subscribeTable.remove(topic); + super.unsubscribe(topic); + } + } + + class BatchMessageListenerImpl implements MessageListenerConcurrently { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List rmqMsgList, + ConsumeConcurrentlyContext contextRMQ) { + List msgList = new ArrayList(); + for (MessageExt rmqMsg : rmqMsgList) { + Message msg = ONSUtil.msgConvert(rmqMsg); + Map propertiesMap = rmqMsg.getProperties(); + msg.setMsgID(rmqMsg.getMsgId()); + if (propertiesMap != null && propertiesMap.get(Constants.TRANSACTION_ID) != null) { + msg.setMsgID(propertiesMap.get(Constants.TRANSACTION_ID)); + } + msgList.add(msg); + } + + BatchMessageListener listener = BatchConsumerImpl.this.subscribeTable.get(msgList.get(0).getTopic()); + if (null == listener) { + throw new ONSClientException("BatchMessageListener is null"); + } + + final ConsumeContext context = new ConsumeContext(); + Action action = listener.consume(msgList, context); + if (action != null) { + switch (action) { + case CommitMessage: + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + case ReconsumeLater: + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + default: + break; + } + } + return null; + } + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ConsumerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ConsumerImpl.java new file mode 100644 index 0000000..415482f --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ConsumerImpl.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.rocketmq; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import javax.annotation.Generated; + +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; + +import org.apache.rocketmq.ons.api.Action; +import org.apache.rocketmq.ons.api.Constants; +import org.apache.rocketmq.ons.api.ConsumeContext; +import org.apache.rocketmq.ons.api.Consumer; +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.MessageListener; +import org.apache.rocketmq.ons.api.MessageSelector; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.PropertyValueConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; + +@Generated("ons-client") +public class ConsumerImpl extends ONSConsumerAbstract implements Consumer { + private final ConcurrentHashMap subscribeTable = new ConcurrentHashMap(); + + public ConsumerImpl(final Properties properties) { + super(properties); + boolean postSubscriptionWhenPull = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.PostSubscriptionWhenPull, "false")); + this.defaultMQPushConsumer.setPostSubscriptionWhenPull(postSubscriptionWhenPull); + + String messageModel = properties.getProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); + this.defaultMQPushConsumer.setMessageModel(MessageModel.valueOf(messageModel)); + } + + @Override + public void start() { + this.defaultMQPushConsumer.registerMessageListener(new MessageListenerImpl()); + super.start(); + } + + + @Override + public void subscribe(String topic, String subExpression, MessageListener listener) { + if (null == topic) { + throw new ONSClientException("topic is null"); + } + + if (null == listener) { + throw new ONSClientException("listener is null"); + } + this.subscribeTable.put(topic, listener); + super.subscribe(topic, subExpression); + } + + @Override + public void subscribe(final String topic, final MessageSelector selector, final MessageListener listener) { + if (null == topic) { + throw new ONSClientException("topic is null"); + } + + if (null == listener) { + throw new ONSClientException("listener is null"); + } + this.subscribeTable.put(topic, listener); + super.subscribe(topic, selector); + } + + + @Override + public void unsubscribe(String topic) { + if (null != topic) { + this.subscribeTable.remove(topic); + super.unsubscribe(topic); + } + } + + + class MessageListenerImpl implements MessageListenerConcurrently { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgsRMQList, + ConsumeConcurrentlyContext contextRMQ) { + MessageExt msgRMQ = msgsRMQList.get(0); + Message msg = ONSUtil.msgConvert(msgRMQ); + Map stringStringMap = msgRMQ.getProperties(); + msg.setMsgID(msgRMQ.getMsgId()); + if (stringStringMap != null && stringStringMap.get(Constants.TRANSACTION_ID) != null) { + msg.setMsgID(stringStringMap.get(Constants.TRANSACTION_ID)); + } + MessageListener listener = ConsumerImpl.this.subscribeTable.get(msg.getTopic()); + if (null == listener) { + throw new ONSClientException("MessageListener is null"); + } + + final ConsumeContext context = new ConsumeContext(); + Action action = listener.consume(msg, context); + if (action != null) { + switch (action) { + case CommitMessage: + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + case ReconsumeLater: + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + default: + break; + } + } + + return null; + } + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/FAQ.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/FAQ.java new file mode 100644 index 0000000..523184d --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/FAQ.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.rocketmq; + +public class FAQ { + public static final String FIND_NS_FAILED = + "http://rocketmq.apache.org/docs/faq/exceptions&namesrv_not_exist"; + + public static final String CONNECT_BROKER_FAILED = + "http://rocketmq.apache.org/docs/faq/exceptions&connect_broker_failed"; + + public static final String SEND_MSG_TO_BROKER_TIMEOUT = + "http://rocketmq.apache.org/docs/faq/exceptions&send_msg_failed"; + + public static final String SERVICE_STATE_WRONG = + "http://rocketmq.apache.org/docs/faq/exceptions&service_not_ok"; + + public static final String BROKER_RESPONSE_EXCEPTION = + "http://rocketmq.apache.org/docs/faq/exceptions&broker_response_exception"; + + public static final String CLIENT_CHECK_MSG_EXCEPTION = + "http://rocketmq.apache.org/docs/faq/exceptions&msg_check_failed"; + + public static final String TOPIC_ROUTE_NOT_EXIST = + "http://rocketmq.apache.org/docs/faq/exceptions&topic_not_exist"; + + + public static String errorMessage(final String msg, final String url) { + return String.format("%s\nSee %s for further details.", msg, url); + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientAbstract.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientAbstract.java new file mode 100644 index 0000000..2cc4d9c --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientAbstract.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.rocketmq; + +import java.util.Properties; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.annotation.Generated; + +import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.namesrv.TopAddressing; +import org.apache.rocketmq.logging.InternalLogger; + +import org.apache.rocketmq.ons.api.Admin; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; +import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials; +import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil; +import org.apache.rocketmq.ons.api.impl.util.NameAddrUtils; +import org.apache.commons.lang3.StringUtils; + +import static org.apache.rocketmq.common.UtilAll.getPid; + +@Generated("ons-client") +public abstract class ONSClientAbstract implements Admin { + + protected static final String WSADDR_INTERNAL = System.getProperty("com.aliyun.openservices.ons.addr.internal", + "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"); + + protected static final String WSADDR_INTERNET = System.getProperty("com.aliyun.openservices.ons.addr.internet", + "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet"); + protected static final long WSADDR_INTERNAL_TIMEOUTMILLS = + Long.parseLong(System.getProperty("com.aliyun.openservices.ons.addr.internal.timeoutmills", "3000")); + protected static final long WSADDR_INTERNET_TIMEOUTMILLS = + Long.parseLong(System.getProperty("com.aliyun.openservices.ons.addr.internet.timeoutmills", "5000")); + private final static InternalLogger LOGGER = ClientLoggerUtil.getClientLogger(); + protected final Properties properties; + protected final SessionCredentials sessionCredentials = new SessionCredentials(); + protected String nameServerAddr = NameAddrUtils.getNameAdd(); + + protected AsyncDispatcher traceDispatcher = null; + + protected final AtomicBoolean started = new AtomicBoolean(false); + + private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "ONSClient-UpdateNameServerThread"); + } + }); + + public ONSClientAbstract(Properties properties) { + this.properties = properties; + this.sessionCredentials.updateContent(properties); + if (null == this.sessionCredentials.getAccessKey() || "".equals(this.sessionCredentials.getAccessKey())) { + throw new ONSClientException("please set access key"); + } + + if (null == this.sessionCredentials.getSecretKey() || "".equals(this.sessionCredentials.getSecretKey())) { + throw new ONSClientException("please set secret key"); + } + + if (null == this.sessionCredentials.getOnsChannel()) { + throw new ONSClientException("please set ons channel"); + } + + + + this.nameServerAddr = getNameSrvAddrFromProperties(); + if (nameServerAddr != null) { + return; + } + this.nameServerAddr = fetchNameServerAddr(); + if (null == nameServerAddr) { + throw new ONSClientException(FAQ.errorMessage("Can not find name server, May be your network problem.", FAQ.FIND_NS_FAILED)); + } + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + String nsAddrs = fetchNameServerAddr(); + if (nsAddrs != null && !ONSClientAbstract.this.nameServerAddr.equals(nsAddrs)) { + ONSClientAbstract.this.nameServerAddr = nsAddrs; + if (isStarted()) { + updateNameServerAddr(nsAddrs); + } + } + } catch (Exception e) { + LOGGER.error("update name server periodically failed.", e); + } + } + }, 10 * 1000L, 90 * 1000L, TimeUnit.MILLISECONDS); + + } + + protected abstract void updateNameServerAddr(String newAddrs); + + private String getNameSrvAddrFromProperties() { + String nameserverAddrs = this.properties.getProperty(PropertyKeyConst.NAMESRV_ADDR); + if (StringUtils.isNotEmpty(nameserverAddrs) && NameAddrUtils.NAMESRV_ENDPOINT_PATTERN.matcher(nameserverAddrs.trim()).matches()) { + return nameserverAddrs.substring(NameAddrUtils.ENDPOINT_PREFIX.length()); + } + + return nameserverAddrs; + } + + private String fetchNameServerAddr() { + String nsAddrs; + + + { + String property = this.properties.getProperty(PropertyKeyConst.ONSAddr); + if (property != null) { + nsAddrs = new TopAddressing(property).fetchNSAddr(); + if (nsAddrs != null) { + LOGGER.info("connected to user-defined ons addr server, {} success, {}", property, nsAddrs); + return nsAddrs; + } else { + throw new ONSClientException(FAQ.errorMessage("Can not find name server with onsAddr " + property, FAQ.FIND_NS_FAILED)); + } + } + } + + + { + TopAddressing top = new TopAddressing(WSADDR_INTERNAL); + nsAddrs = top.fetchNSAddr(false, WSADDR_INTERNAL_TIMEOUTMILLS); + if (nsAddrs != null) { + LOGGER.info("connected to internal server, {} success, {}", WSADDR_INTERNAL, nsAddrs); + return nsAddrs; + } + } + + + { + TopAddressing top = new TopAddressing(WSADDR_INTERNET); + nsAddrs = top.fetchNSAddr(false, WSADDR_INTERNET_TIMEOUTMILLS); + if (nsAddrs != null) { + LOGGER.info("connected to internet server, {} success, {}", WSADDR_INTERNET, nsAddrs); + } + } + + return nsAddrs; + } + + public String getNameServerAddr() { + return this.nameServerAddr; + } + + protected String buildIntanceName() { + return Integer.toString(UtilAll.getPid()) + + "#" + this.nameServerAddr.hashCode() + + "#" + this.sessionCredentials.getAccessKey().hashCode() + + "#" + System.nanoTime(); + } + + protected String getNamespace() { + String namespace = null; + + + { + String nameserverAddr = this.properties.getProperty(PropertyKeyConst.NAMESRV_ADDR); + if (StringUtils.isNotEmpty(nameserverAddr)) { + if (NameAddrUtils.validateInstanceEndpoint(nameserverAddr)) { + namespace = NameAddrUtils.parseInstanceIdFromEndpoint(nameserverAddr); + LOGGER.info("User specify namespace by endpoint: {}.", namespace); + } + } + } + + + { + String namespaceFromProperty = this.properties.getProperty(PropertyKeyConst.INSTANCE_ID, null); + if (StringUtils.isNotEmpty(namespaceFromProperty)) { + namespace = namespaceFromProperty; + LOGGER.info("User specify namespace by property: {}.", namespace); + } + } + + return namespace; + } + + protected void checkONSProducerServiceState(DefaultMQProducerImpl producer) { + switch (producer.getServiceState()) { + case CREATE_JUST: + throw new ONSClientException( + FAQ.errorMessage(String.format("You do not have start the producer[" + getPid() + "], %s", producer.getServiceState()), + FAQ.SERVICE_STATE_WRONG)); + case SHUTDOWN_ALREADY: + throw new ONSClientException(FAQ.errorMessage(String.format("Your producer has been shut down, %s", producer.getServiceState()), + FAQ.SERVICE_STATE_WRONG)); + case START_FAILED: + throw new ONSClientException(FAQ.errorMessage( + String.format("When you start your service throws an exception, %s", producer.getServiceState()), FAQ.SERVICE_STATE_WRONG)); + case RUNNING: + break; + default: + break; + } + } + + @Override + public void start() { + if (null != traceDispatcher) { + try { + traceDispatcher.start(); + } catch (MQClientException e) { + LOGGER.warn("trace dispatcher start failed ", e); + } + } + } + + @Override + public void updateCredential(Properties credentialProperties) { + if (null == credentialProperties.getProperty(SessionCredentials.AccessKey) + || "".equals(credentialProperties.getProperty(SessionCredentials.AccessKey))) { + throw new ONSClientException("update credential failed. please set access key."); + } + + if (null == credentialProperties.getProperty(SessionCredentials.SecretKey) + || "".equals(credentialProperties.getProperty(SessionCredentials.SecretKey))) { + throw new ONSClientException("update credential failed. please set secret key"); + } + this.sessionCredentials.updateContent(credentialProperties); + } + + @Override + public void shutdown() { + if (null != traceDispatcher) { + traceDispatcher.shutdown(); + } + scheduledExecutorService.shutdown(); + } + + @Override + public boolean isStarted() { + return started.get(); + } + + @Override + public boolean isClosed() { + return !isStarted(); + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java new file mode 100644 index 0000000..d2bb38b --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.rocketmq; + +import java.util.Properties; + +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType; +import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.logging.InternalLogger; + +import org.apache.rocketmq.ons.api.MessageSelector; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; +import org.apache.rocketmq.ons.api.impl.tracehook.OnsConsumeMessageHookImpl; +import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil; +import org.apache.commons.lang3.StringUtils; + +public class ONSConsumerAbstract extends ONSClientAbstract { + final static InternalLogger LOGGER = ClientLoggerUtil.getClientLogger(); + protected final DefaultMQPushConsumer defaultMQPushConsumer; + private final static int MAX_CACHED_MESSAGE_SIZE_IN_MIB = 2048; + private final static int MIN_CACHED_MESSAGE_SIZE_IN_MIB = 16; + private final static int MAX_CACHED_MESSAGE_AMOUNT = 50000; + private final static int MIN_CACHED_MESSAGE_AMOUNT = 100; + + private int maxCachedMessageSizeInMiB = 512; + + private int maxCachedMessageAmount = 5000; + + public ONSConsumerAbstract(final Properties properties) { + super(properties); + + String consumerGroup = properties.getProperty(PropertyKeyConst.GROUP_ID, properties.getProperty(PropertyKeyConst.ConsumerId)); + if (StringUtils.isEmpty(consumerGroup)) { + throw new ONSClientException("ConsumerId property is null"); + } + + this.defaultMQPushConsumer = + new DefaultMQPushConsumer(this.getNamespace(), consumerGroup, new OnsClientRPCHook(sessionCredentials)); + + + String maxReconsumeTimes = properties.getProperty(PropertyKeyConst.MaxReconsumeTimes); + if (!UtilAll.isBlank(maxReconsumeTimes)) { + try { + this.defaultMQPushConsumer.setMaxReconsumeTimes(Integer.parseInt(maxReconsumeTimes)); + } catch (NumberFormatException ignored) { + } + } + + String maxBatchMessageCount = properties.getProperty(PropertyKeyConst.MAX_BATCH_MESSAGE_COUNT); + if (!UtilAll.isBlank(maxBatchMessageCount)) { + this.defaultMQPushConsumer.setPullBatchSize(Integer.valueOf(maxBatchMessageCount)); + } + + String consumeTimeout = properties.getProperty(PropertyKeyConst.ConsumeTimeout); + if (!UtilAll.isBlank(consumeTimeout)) { + try { + this.defaultMQPushConsumer.setConsumeTimeout(Integer.parseInt(consumeTimeout)); + } catch (NumberFormatException ignored) { + } + } + + boolean isVipChannelEnabled = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.isVipChannelEnabled, "false")); + this.defaultMQPushConsumer.setVipChannelEnabled(isVipChannelEnabled); + + String instanceName = properties.getProperty(PropertyKeyConst.InstanceName, this.buildIntanceName()); + this.defaultMQPushConsumer.setInstanceName(instanceName); + this.defaultMQPushConsumer.setNamesrvAddr(this.getNameServerAddr()); + + String consumeThreadNums = properties.getProperty(PropertyKeyConst.ConsumeThreadNums); + if (!UtilAll.isBlank(consumeThreadNums)) { + this.defaultMQPushConsumer.setConsumeThreadMin(Integer.valueOf(consumeThreadNums)); + this.defaultMQPushConsumer.setConsumeThreadMax(Integer.valueOf(consumeThreadNums)); + } + + String configuredCachedMessageAmount = properties.getProperty(PropertyKeyConst.MaxCachedMessageAmount); + if (!UtilAll.isBlank(configuredCachedMessageAmount)) { + maxCachedMessageAmount = Math.min(MAX_CACHED_MESSAGE_AMOUNT, Integer.valueOf(configuredCachedMessageAmount)); + maxCachedMessageAmount = Math.max(MIN_CACHED_MESSAGE_AMOUNT, maxCachedMessageAmount); + this.defaultMQPushConsumer.setPullThresholdForTopic(maxCachedMessageAmount); + + } + + String configuredCachedMessageSizeInMiB = properties.getProperty(PropertyKeyConst.MaxCachedMessageSizeInMiB); + if (!UtilAll.isBlank(configuredCachedMessageSizeInMiB)) { + maxCachedMessageSizeInMiB = Math.min(MAX_CACHED_MESSAGE_SIZE_IN_MIB, Integer.valueOf(configuredCachedMessageSizeInMiB)); + maxCachedMessageSizeInMiB = Math.max(MIN_CACHED_MESSAGE_SIZE_IN_MIB, maxCachedMessageSizeInMiB); + this.defaultMQPushConsumer.setPullThresholdSizeForTopic(maxCachedMessageSizeInMiB); + } + + String msgTraceSwitch = properties.getProperty(PropertyKeyConst.MsgTraceSwitch); + if (!UtilAll.isBlank(msgTraceSwitch) && (!Boolean.parseBoolean(msgTraceSwitch))) { + LOGGER.info("MQ Client Disable the Trace Hook!"); + } else { + try { + Properties tempProperties = new Properties(); + tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey()); + tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey()); + tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000"); + tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048"); + tempProperties.put(OnsTraceConstants.MaxBatchNum, "100"); + tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr()); + tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER"); + tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.CONSUMER.name()); + AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, sessionCredentials); + dispatcher.setHostConsumer(defaultMQPushConsumer.getDefaultMQPushConsumerImpl()); + traceDispatcher = dispatcher; + this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( + new OnsConsumeMessageHookImpl(traceDispatcher)); + } catch (Throwable e) { + LOGGER.error("system mqtrace hook init failed ,maybe can't send msg trace data", e); + } + } + } + + @Override + protected void updateNameServerAddr(String newAddrs) { + this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getMQClientAPIImpl().updateNameServerAddressList(newAddrs); + } + + protected void subscribe(String topic, String subExpression) { + try { + this.defaultMQPushConsumer.subscribe(topic, subExpression); + } catch (MQClientException e) { + throw new ONSClientException("defaultMQPushConsumer subscribe exception", e); + } + } + + protected void subscribe(final String topic, final MessageSelector selector) { + String subExpression = "*"; + String type = org.apache.rocketmq.common.filter.ExpressionType.TAG; + if (selector != null) { + if (selector.getType() == null) { + throw new ONSClientException("Expression type is null!"); + } + subExpression = selector.getSubExpression(); + type = selector.getType().name(); + } + + org.apache.rocketmq.client.consumer.MessageSelector messageSelector; + if (org.apache.rocketmq.common.filter.ExpressionType.SQL92.equals(type)) { + messageSelector = org.apache.rocketmq.client.consumer.MessageSelector.bySql(subExpression); + } else if (org.apache.rocketmq.common.filter.ExpressionType.TAG.equals(type)) { + messageSelector = org.apache.rocketmq.client.consumer.MessageSelector.byTag(subExpression); + } else { + throw new ONSClientException(String.format("Expression type %s is unknown!", type)); + } + + try { + this.defaultMQPushConsumer.subscribe(topic, messageSelector); + } catch (MQClientException e) { + throw new ONSClientException("Consumer subscribe exception", e); + } + } + + protected void unsubscribe(String topic) { + this.defaultMQPushConsumer.unsubscribe(topic); + } + + @Override + public void start() { + try { + if (this.started.compareAndSet(false, true)) { + this.defaultMQPushConsumer.start(); + super.start(); + } + } catch (Exception e) { + throw new ONSClientException(e.getMessage()); + } + } + + @Override + public void shutdown() { + if (this.started.compareAndSet(true, false)) { + this.defaultMQPushConsumer.shutdown(); + } + super.shutdown(); + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSUtil.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSUtil.java new file mode 100644 index 0000000..e6fac51 --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSUtil.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.rocketmq; + +import java.lang.reflect.Field; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; + +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; + +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.MessageAccessor; +import org.apache.rocketmq.ons.api.exception.ONSClientException; + +public class ONSUtil { + private static final Set RESERVED_KEY_SET_RMQ = new HashSet(); + + private static final Set RESERVED_KEY_SET_ONS = new HashSet(); + + + static { + + /** + * RMQ + */ + RESERVED_KEY_SET_RMQ.add(MessageConst.PROPERTY_KEYS); + RESERVED_KEY_SET_RMQ.add(MessageConst.PROPERTY_TAGS); + RESERVED_KEY_SET_RMQ.add(MessageConst.PROPERTY_WAIT_STORE_MSG_OK); + RESERVED_KEY_SET_RMQ.add(MessageConst.PROPERTY_DELAY_TIME_LEVEL); + RESERVED_KEY_SET_RMQ.add(MessageConst.PROPERTY_RETRY_TOPIC); + RESERVED_KEY_SET_RMQ.add(MessageConst.PROPERTY_REAL_TOPIC); + RESERVED_KEY_SET_RMQ.add(MessageConst.PROPERTY_REAL_QUEUE_ID); + RESERVED_KEY_SET_RMQ.add(MessageConst.PROPERTY_TRANSACTION_PREPARED); + RESERVED_KEY_SET_RMQ.add(MessageConst.PROPERTY_PRODUCER_GROUP); + RESERVED_KEY_SET_RMQ.add(MessageConst.PROPERTY_MIN_OFFSET); + RESERVED_KEY_SET_RMQ.add(MessageConst.PROPERTY_MAX_OFFSET); + + /** + * ONS + */ + RESERVED_KEY_SET_ONS.add(Message.SystemPropKey.TAG); + RESERVED_KEY_SET_ONS.add(Message.SystemPropKey.KEY); + RESERVED_KEY_SET_ONS.add(Message.SystemPropKey.MSGID); + RESERVED_KEY_SET_ONS.add(Message.SystemPropKey.RECONSUMETIMES); + RESERVED_KEY_SET_ONS.add(Message.SystemPropKey.STARTDELIVERTIME); + RESERVED_KEY_SET_ONS.add(Message.SystemPropKey.BORNHOST); + RESERVED_KEY_SET_ONS.add(Message.SystemPropKey.BORNTIMESTAMP); + RESERVED_KEY_SET_ONS.add(Message.SystemPropKey.SHARDINGKEY); + } + + + public static org.apache.rocketmq.common.message.Message msgConvert(Message message) { + org.apache.rocketmq.common.message.Message msgRMQ = new org.apache.rocketmq.common.message.Message(); + if (message == null) { + throw new ONSClientException("\'message\' is null"); + } + + if (message.getTopic() != null) { + msgRMQ.setTopic(message.getTopic()); + } + if (message.getKey() != null) { + msgRMQ.setKeys(message.getKey()); + } + if (message.getTag() != null) { + msgRMQ.setTags(message.getTag()); + } + if (message.getStartDeliverTime() > 0) { + msgRMQ.putUserProperty(Message.SystemPropKey.STARTDELIVERTIME, String.valueOf(message.getStartDeliverTime())); + } + if (message.getBody() != null) { + msgRMQ.setBody(message.getBody()); + } + + if (message.getShardingKey() != null && !message.getShardingKey().isEmpty()) { + msgRMQ.putUserProperty(Message.SystemPropKey.SHARDINGKEY, message.getShardingKey()); + } + + Properties systemProperties = MessageAccessor.getSystemProperties(message); + if (systemProperties != null) { + Iterator> it = systemProperties.entrySet().iterator(); + while (it.hasNext()) { + Entry next = it.next(); + if (!RESERVED_KEY_SET_ONS.contains(next.getKey().toString())) { + org.apache.rocketmq.common.message.MessageAccessor.putProperty(msgRMQ, next.getKey().toString(), + next.getValue().toString()); + } + } + } + + Properties userProperties = message.getUserProperties(); + if (userProperties != null) { + Iterator> it = userProperties.entrySet().iterator(); + while (it.hasNext()) { + Entry next = it.next(); + if (!RESERVED_KEY_SET_RMQ.contains(next.getKey().toString())) { + org.apache.rocketmq.common.message.MessageAccessor.putProperty(msgRMQ, next.getKey().toString(), + next.getValue().toString()); + } + } + } + + return msgRMQ; + } + + public static Message msgConvert(org.apache.rocketmq.common.message.Message msgRMQ) { + Message message = new Message(); + if (msgRMQ.getTopic() != null) { + message.setTopic(msgRMQ.getTopic()); + } + if (msgRMQ.getKeys() != null) { + message.setKey(msgRMQ.getKeys()); + } + if (msgRMQ.getTags() != null) { + message.setTag(msgRMQ.getTags()); + } + if (msgRMQ.getBody() != null) { + message.setBody(msgRMQ.getBody()); + } + + message.setReconsumeTimes(((MessageExt) msgRMQ).getReconsumeTimes()); + message.setBornTimestamp(((MessageExt) msgRMQ).getBornTimestamp()); + message.setBornHost(String.valueOf(((MessageExt) msgRMQ).getBornHost())); + + Map properties = msgRMQ.getProperties(); + if (properties != null) { + Iterator> it = properties.entrySet().iterator(); + while (it.hasNext()) { + Entry next = it.next(); + // System + if (RESERVED_KEY_SET_RMQ.contains(next.getKey()) || RESERVED_KEY_SET_ONS.contains(next.getKey())) { + MessageAccessor.putSystemProperties(message, next.getKey(), next.getValue()); + } + // User + else { + message.putUserProperties(next.getKey(), next.getValue()); + } + } + } + + return message; + } + + public static Properties extractProperties(final Properties properties) { + Properties newPro = new Properties(); + Properties inner = null; + try { + Field field = Properties.class.getDeclaredField("defaults"); + field.setAccessible(true); + inner = (Properties) field.get(properties); + } catch (Exception ignore) { + } + + if (inner != null) { + for (final Entry entry : inner.entrySet()) { + newPro.setProperty(String.valueOf(entry.getKey()), String.valueOf(entry.getValue())); + } + } + + for (final Entry entry : properties.entrySet()) { + newPro.setProperty(String.valueOf(entry.getKey()), String.valueOf(entry.getValue())); + } + + return newPro; + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OnsClientRPCHook.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OnsClientRPCHook.java new file mode 100644 index 0000000..a145c25 --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OnsClientRPCHook.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.rocketmq; + +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.ons.api.impl.MQClientInfo; +import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials; + +public class OnsClientRPCHook extends ClientRPCHook { + + public OnsClientRPCHook(SessionCredentials sessionCredentials) { + super(sessionCredentials); + } + + @Override + public void doBeforeRequest(String remoteAddr, RemotingCommand request) { + super.doBeforeRequest(remoteAddr, request); + request.setVersion(MQClientInfo.versionCode); + } + + + @Override + public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { + super.doAfterResponse(remoteAddr, request, response); + } + +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderConsumerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderConsumerImpl.java new file mode 100644 index 0000000..4004f3e --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderConsumerImpl.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.rocketmq; + +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageExt; + +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.MessageSelector; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; +import org.apache.rocketmq.ons.api.order.ConsumeOrderContext; +import org.apache.rocketmq.ons.api.order.MessageOrderListener; +import org.apache.rocketmq.ons.api.order.OrderAction; +import org.apache.rocketmq.ons.api.order.OrderConsumer; + +public class OrderConsumerImpl extends ONSConsumerAbstract implements OrderConsumer { + private final ConcurrentHashMap subscribeTable = new ConcurrentHashMap(); + + public OrderConsumerImpl(final Properties properties) { + super(properties); + String suspendTimeMillis = properties.getProperty(PropertyKeyConst.SuspendTimeMillis); + if (!UtilAll.isBlank(suspendTimeMillis)) { + try { + this.defaultMQPushConsumer.setSuspendCurrentQueueTimeMillis(Long.parseLong(suspendTimeMillis)); + } catch (NumberFormatException ignored) { + } + } + } + + @Override + public void start() { + this.defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderlyImpl()); + super.start(); + } + + @Override + public void subscribe(String topic, String subExpression, MessageOrderListener listener) { + if (null == topic) { + throw new ONSClientException("topic is null"); + } + + if (null == listener) { + throw new ONSClientException("listener is null"); + } + this.subscribeTable.put(topic, listener); + super.subscribe(topic, subExpression); + } + + @Override + public void subscribe(final String topic, final MessageSelector selector, final MessageOrderListener listener) { + if (null == topic) { + throw new ONSClientException("topic is null"); + } + + if (null == listener) { + throw new ONSClientException("listener is null"); + } + this.subscribeTable.put(topic, listener); + super.subscribe(topic, selector); + } + + class MessageListenerOrderlyImpl implements MessageListenerOrderly { + + @Override + public ConsumeOrderlyStatus consumeMessage(List arg0, ConsumeOrderlyContext arg1) { + MessageExt msgRMQ = arg0.get(0); + Message msg = ONSUtil.msgConvert(msgRMQ); + msg.setMsgID(msgRMQ.getMsgId()); + + MessageOrderListener listener = OrderConsumerImpl.this.subscribeTable.get(msg.getTopic()); + if (null == listener) { + throw new ONSClientException("MessageOrderListener is null"); + } + + final ConsumeOrderContext context = new ConsumeOrderContext(); + OrderAction action = listener.consume(msg, context); + if (action != null) { + switch (action) { + case Success: + return ConsumeOrderlyStatus.SUCCESS; + case Suspend: + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + default: + break; + } + } + + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + } + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java new file mode 100644 index 0000000..218a8c2 --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.rocketmq; + +import java.util.List; +import java.util.Properties; + +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType; +import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.logging.InternalLogger; + +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.SendResult; +import org.apache.rocketmq.ons.api.exception.ONSClientException; +import org.apache.rocketmq.ons.api.impl.tracehook.OnsClientSendMessageHookImpl; +import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil; +import org.apache.rocketmq.ons.api.order.OrderProducer; +import org.apache.commons.lang3.StringUtils; + +public class OrderProducerImpl extends ONSClientAbstract implements OrderProducer { + private final static InternalLogger LOGGER = ClientLoggerUtil.getClientLogger(); + private final DefaultMQProducer defaultMQProducer; + + public OrderProducerImpl(final Properties properties) { + super(properties); + String producerGroup = properties.getProperty(PropertyKeyConst.GROUP_ID, properties.getProperty(PropertyKeyConst.ProducerId)); + if (StringUtils.isEmpty(producerGroup)) { + producerGroup = "__ONS_PRODUCER_DEFAULT_GROUP"; + } + + this.defaultMQProducer = + new DefaultMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials)); + + + this.defaultMQProducer.setProducerGroup(producerGroup); + + boolean isVipChannelEnabled = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.isVipChannelEnabled, "false")); + this.defaultMQProducer.setVipChannelEnabled(isVipChannelEnabled); + + String sendMsgTimeoutMillis = properties.getProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); + this.defaultMQProducer.setSendMsgTimeout(Integer.parseInt(sendMsgTimeoutMillis)); + +// boolean addExtendUniqInfo = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.EXACTLYONCE_DELIVERY, "false")); +// this.defaultMQProducer.setAddExtendUniqInfo(addExtendUniqInfo); + + String instanceName = properties.getProperty(PropertyKeyConst.InstanceName, this.buildIntanceName()); + this.defaultMQProducer.setInstanceName(instanceName); + this.defaultMQProducer.setNamesrvAddr(this.getNameServerAddr()); + String msgTraceSwitch = properties.getProperty(PropertyKeyConst.MsgTraceSwitch); + if (!UtilAll.isBlank(msgTraceSwitch) && (!Boolean.parseBoolean(msgTraceSwitch))) { + LOGGER.info("MQ Client Disable the Trace Hook!"); + } else { + try { + Properties tempProperties = new Properties(); + tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey()); + tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey()); + tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000"); + tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048"); + tempProperties.put(OnsTraceConstants.MaxBatchNum, "100"); + tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr()); + tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER"); + tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.PRODUCER.name()); + AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, sessionCredentials); + dispatcher.setHostProducer(defaultMQProducer.getDefaultMQProducerImpl()); + traceDispatcher = dispatcher; + this.defaultMQProducer.getDefaultMQProducerImpl().registerSendMessageHook( + new OnsClientSendMessageHookImpl(traceDispatcher)); + } catch (Throwable e) { + LOGGER.error("system mqtrace hook init failed ,maybe can't send msg trace data", e); + } + } + } + + @Override + protected void updateNameServerAddr(String newAddrs) { + this.defaultMQProducer.getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().updateNameServerAddressList(newAddrs); + } + + @Override + public void start() { + try { + if (started.compareAndSet(false, true)) { + this.defaultMQProducer.start(); + super.start(); + } + } catch (Exception e) { + throw new ONSClientException(e.getMessage()); + } + } + + @Override + public void shutdown() { + if (started.compareAndSet(true, false)) { + this.defaultMQProducer.shutdown(); + } + super.shutdown(); + } + + @Override + public SendResult send(final Message message, final String shardingKey) { + if (UtilAll.isBlank(shardingKey)) { + throw new ONSClientException("\'shardingKey\' is blank."); + } + message.setShardingKey(shardingKey); + this.checkONSProducerServiceState(this.defaultMQProducer.getDefaultMQProducerImpl()); + final org.apache.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message); + try { + org.apache.rocketmq.client.producer.SendResult sendResultRMQ = + this.defaultMQProducer.send(msgRMQ, new org.apache.rocketmq.client.producer.MessageQueueSelector() { + @Override + public MessageQueue select(List mqs, org.apache.rocketmq.common.message.Message msg, + Object shardingKey) { + int select = Math.abs(shardingKey.hashCode()); + if (select < 0) { + select = 0; + } + return mqs.get(select % mqs.size()); + } + }, shardingKey); + message.setMsgID(sendResultRMQ.getMsgId()); + SendResult sendResult = new SendResult(); + sendResult.setTopic(message.getTopic()); + sendResult.setMessageId(sendResultRMQ.getMsgId()); + return sendResult; + } catch (Exception e) { + throw new ONSClientException("defaultMQProducer send order exception", e); + } + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java new file mode 100644 index 0000000..bd1dede --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.rocketmq; + +import java.util.Properties; +import java.util.concurrent.ExecutorService; + +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType; +import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageClientIDSetter; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; + +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.OnExceptionContext; +import org.apache.rocketmq.ons.api.Producer; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.SendCallback; +import org.apache.rocketmq.ons.api.SendResult; +import org.apache.rocketmq.ons.api.exception.ONSClientException; +import org.apache.rocketmq.ons.api.impl.tracehook.OnsClientSendMessageHookImpl; +import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil; +import org.apache.commons.lang3.StringUtils; + +public class ProducerImpl extends ONSClientAbstract implements Producer { + private final static InternalLogger LOGGER = ClientLoggerUtil.getClientLogger(); + private final DefaultMQProducer defaultMQProducer; + + public ProducerImpl(final Properties properties) { + super(properties); + + String producerGroup = properties.getProperty(PropertyKeyConst.GROUP_ID, properties.getProperty(PropertyKeyConst.ProducerId)); + if (StringUtils.isEmpty(producerGroup)) { + producerGroup = "__ONS_PRODUCER_DEFAULT_GROUP"; + } + + this.defaultMQProducer = + new DefaultMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials)); + + this.defaultMQProducer.setProducerGroup(producerGroup); + + boolean isVipChannelEnabled = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.isVipChannelEnabled, "false")); + this.defaultMQProducer.setVipChannelEnabled(isVipChannelEnabled); + + if (properties.containsKey(PropertyKeyConst.SendMsgTimeoutMillis)) { + this.defaultMQProducer.setSendMsgTimeout(Integer.valueOf(properties.get(PropertyKeyConst.SendMsgTimeoutMillis).toString())); + } else { + this.defaultMQProducer.setSendMsgTimeout(5000); + } + +// if (properties.containsKey(PropertyKeyConst.EXACTLYONCE_DELIVERY)) { +// this.defaultMQProducer.setAddExtendUniqInfo(Boolean.valueOf(properties.get(PropertyKeyConst.EXACTLYONCE_DELIVERY).toString())); +// } + + String instanceName = properties.getProperty(PropertyKeyConst.InstanceName, this.buildIntanceName()); + this.defaultMQProducer.setInstanceName(instanceName); + this.defaultMQProducer.setNamesrvAddr(this.getNameServerAddr()); + this.defaultMQProducer.setMaxMessageSize(1024 * 1024 * 4); + String msgTraceSwitch = properties.getProperty(PropertyKeyConst.MsgTraceSwitch); + if (!UtilAll.isBlank(msgTraceSwitch) && (!Boolean.parseBoolean(msgTraceSwitch))) { + LOGGER.info("MQ Client Disable the Trace Hook!"); + } else { + try { + Properties tempProperties = new Properties(); + tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey()); + tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey()); + tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000"); + tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048"); + tempProperties.put(OnsTraceConstants.MaxBatchNum, "100"); + tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr()); + tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER"); + tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.PRODUCER.name()); + AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, sessionCredentials); + dispatcher.setHostProducer(defaultMQProducer.getDefaultMQProducerImpl()); + traceDispatcher = dispatcher; + this.defaultMQProducer.getDefaultMQProducerImpl().registerSendMessageHook( + new OnsClientSendMessageHookImpl(traceDispatcher)); + } catch (Throwable e) { + LOGGER.error("system mqtrace hook init failed ,maybe can't send msg trace data.", e); + } + } + } + + @Override + protected void updateNameServerAddr(String newAddrs) { + this.defaultMQProducer.getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().updateNameServerAddressList(newAddrs); + } + + @Override + public void start() { + try { + if (this.started.compareAndSet(false, true)) { + this.defaultMQProducer.start(); + super.start(); + } + } catch (Exception e) { + throw new ONSClientException(e.getMessage()); + } + } + + @Override + public void shutdown() { + if (this.started.compareAndSet(true, false)) { + this.defaultMQProducer.shutdown(); + } + super.shutdown(); + } + + @Override + public SendResult send(Message message) { + this.checkONSProducerServiceState(this.defaultMQProducer.getDefaultMQProducerImpl()); + org.apache.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message); + + try { + org.apache.rocketmq.client.producer.SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ); + + message.setMsgID(sendResultRMQ.getMsgId()); + SendResult sendResult = new SendResult(); + sendResult.setTopic(sendResultRMQ.getMessageQueue().getTopic()); + sendResult.setMessageId(sendResultRMQ.getMsgId()); + return sendResult; + } catch (Exception e) { + LOGGER.error(String.format("Send message Exception, %s", message), e); + throw checkProducerException(message.getTopic(), message.getMsgID(), e); + } + } + + @Override + public void sendOneway(Message message) { + this.checkONSProducerServiceState(this.defaultMQProducer.getDefaultMQProducerImpl()); + org.apache.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message); + try { + this.defaultMQProducer.sendOneway(msgRMQ); + message.setMsgID(MessageClientIDSetter.getUniqID(msgRMQ)); + } catch (Exception e) { + LOGGER.error(String.format("Send message oneway Exception, %s", message), e); + throw checkProducerException(message.getTopic(), message.getMsgID(), e); + } + } + + @Override + public void sendAsync(Message message, SendCallback sendCallback) { + this.checkONSProducerServiceState(this.defaultMQProducer.getDefaultMQProducerImpl()); + org.apache.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message); + try { + this.defaultMQProducer.send(msgRMQ, sendCallbackConvert(message, sendCallback)); + message.setMsgID(MessageClientIDSetter.getUniqID(msgRMQ)); + } catch (Exception e) { + LOGGER.error(String.format("Send message async Exception, %s", message), e); + throw checkProducerException(message.getTopic(), message.getMsgID(), e); + } + } + + @Override + public void setCallbackExecutor(final ExecutorService callbackExecutor) { + this.defaultMQProducer.setCallbackExecutor(callbackExecutor); + } + + public DefaultMQProducer getDefaultMQProducer() { + return defaultMQProducer; + } + + private org.apache.rocketmq.client.producer.SendCallback sendCallbackConvert(final Message message, + final SendCallback sendCallback) { + org.apache.rocketmq.client.producer.SendCallback rmqSendCallback = new org.apache.rocketmq.client.producer.SendCallback() { + @Override + public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) { + sendCallback.onSuccess(sendResultConvert(sendResult)); + } + + @Override + public void onException(Throwable e) { + String topic = new String(message.getTopic()); + String msgId = new String(message.getMsgID()); + ONSClientException onsEx = checkProducerException(topic, msgId, e); + OnExceptionContext context = new OnExceptionContext(); + context.setTopic(topic); + context.setMessageId(msgId); + context.setException(onsEx); + sendCallback.onException(context); + } + }; + return rmqSendCallback; + } + + private SendResult sendResultConvert( + final org.apache.rocketmq.client.producer.SendResult rmqSendResult) { + SendResult sendResult = new SendResult(); + sendResult.setTopic(rmqSendResult.getMessageQueue().getTopic()); + sendResult.setMessageId(rmqSendResult.getMsgId()); + return sendResult; + } + + private ONSClientException checkProducerException(String topic, String msgId, Throwable e) { + if (e instanceof MQClientException) { + if (e.getCause() != null) { + if (e.getCause() instanceof RemotingConnectException) { + return new ONSClientException( + FAQ.errorMessage(String.format("Connect broker failed, Topic=%s, msgId=%s", topic, msgId), FAQ.CONNECT_BROKER_FAILED)); + } else if (e.getCause() instanceof RemotingTimeoutException) { + return new ONSClientException(FAQ.errorMessage(String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s", + this.defaultMQProducer.getSendMsgTimeout(), topic, msgId), FAQ.SEND_MSG_TO_BROKER_TIMEOUT)); + } else if (e.getCause() instanceof MQBrokerException) { + MQBrokerException excep = (MQBrokerException) e.getCause(); + return new ONSClientException(FAQ.errorMessage( + String.format("Receive a broker exception, Topi=%s, msgId=%s, %s", topic, msgId, excep.getErrorMessage()), + FAQ.BROKER_RESPONSE_EXCEPTION)); + } + } else { + MQClientException excep = (MQClientException) e; + if (-1 == excep.getResponseCode()) { + return new ONSClientException( + FAQ.errorMessage(String.format("Topic does not exist, Topic=%s, msgId=%s", topic, msgId), FAQ.TOPIC_ROUTE_NOT_EXIST)); + } else if (ResponseCode.MESSAGE_ILLEGAL == excep.getResponseCode()) { + return new ONSClientException( + FAQ.errorMessage(String.format("ONS Client check message exception, Topic=%s, msgId=%s", topic, msgId), + FAQ.CLIENT_CHECK_MSG_EXCEPTION)); + } + } + } + + return new ONSClientException("defaultMQProducer send exception", e); + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java new file mode 100644 index 0000000..bc03867 --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.rocketmq; + +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.ons.api.Constants; +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.SendResult; +import org.apache.rocketmq.ons.api.impl.tracehook.OnsClientSendMessageHookImpl; +import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil; +import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecuter; +import org.apache.rocketmq.ons.api.transaction.TransactionProducer; +import org.apache.rocketmq.ons.api.transaction.TransactionStatus; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType; +import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher; + +public class TransactionProducerImpl extends ONSClientAbstract implements TransactionProducer { + private final static InternalLogger LOGGER = ClientLoggerUtil.getClientLogger(); + TransactionMQProducer transactionMQProducer = null; + private Properties properties; + + public TransactionProducerImpl(Properties properties, TransactionCheckListener transactionCheckListener) { + super(properties); + this.properties = properties; + String producerGroup = properties.getProperty(PropertyKeyConst.GROUP_ID, properties.getProperty(PropertyKeyConst.ProducerId)); + if (StringUtils.isEmpty(producerGroup)) { + producerGroup = "__ONS_PRODUCER_DEFAULT_GROUP"; + } + transactionMQProducer = + new TransactionMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials)); + + boolean isVipChannelEnabled = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.isVipChannelEnabled, "false")); + transactionMQProducer.setVipChannelEnabled(isVipChannelEnabled); + + String instanceName = properties.getProperty(PropertyKeyConst.InstanceName, this.buildIntanceName()); + this.transactionMQProducer.setInstanceName(instanceName); + +// boolean addExtendUniqInfo = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.EXACTLYONCE_DELIVERY, "false")); +// transactionMQProducer.setAddExtendUniqInfo(addExtendUniqInfo); + + transactionMQProducer.setTransactionCheckListener(transactionCheckListener); + String msgTraceSwitch = properties.getProperty(PropertyKeyConst.MsgTraceSwitch); + if (!UtilAll.isBlank(msgTraceSwitch) && (!Boolean.parseBoolean(msgTraceSwitch))) { + LOGGER.info("MQ Client Disable the Trace Hook!"); + } else { + try { + Properties tempProperties = new Properties(); + tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey()); + tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey()); + tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000"); + tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048"); + tempProperties.put(OnsTraceConstants.MaxBatchNum, "100"); + tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr()); + tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER"); + tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.PRODUCER.name()); + AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, sessionCredentials); + dispatcher.setHostProducer(transactionMQProducer.getDefaultMQProducerImpl()); + traceDispatcher = dispatcher; + this.transactionMQProducer.getDefaultMQProducerImpl().registerSendMessageHook( + new OnsClientSendMessageHookImpl(traceDispatcher)); + } catch (Throwable e) { + LOGGER.error("system mqtrace hook init failed ,maybe can't send msg trace data", e); + } + } + } + + @Override + public void start() { + if (started.compareAndSet(false, true)) { + if (transactionMQProducer.getTransactionCheckListener() == null) { + throw new IllegalArgumentException("TransactionCheckListener is null"); + } + transactionMQProducer.setNamesrvAddr(this.nameServerAddr); + try { + transactionMQProducer.start(); + super.start(); + } catch (MQClientException e) { + throw new RuntimeException(e); + } + } + } + + @Override + protected void updateNameServerAddr(String newAddrs) { + this.transactionMQProducer.getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().updateNameServerAddressList(newAddrs); + } + + @Override + public void shutdown() { + if (started.compareAndSet(true, false)) { + transactionMQProducer.shutdown(); + } + super.shutdown(); + } + + @Override + public SendResult send(final Message message, final LocalTransactionExecuter executer, Object arg) { + this.checkONSProducerServiceState(this.transactionMQProducer.getDefaultMQProducerImpl()); + org.apache.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message); + org.apache.rocketmq.client.producer.TransactionSendResult sendResultRMQ = null; + try { + sendResultRMQ = transactionMQProducer.sendMessageInTransaction(msgRMQ, + new org.apache.rocketmq.client.producer.LocalTransactionExecuter() { + @Override + public LocalTransactionState executeLocalTransactionBranch( + org.apache.rocketmq.common.message.Message msg, + Object arg) { + String msgId = msg.getProperty(Constants.TRANSACTION_ID); + message.setMsgID(msgId); + TransactionStatus transactionStatus = executer.execute(message, arg); + if (TransactionStatus.CommitTransaction == transactionStatus) { + return LocalTransactionState.COMMIT_MESSAGE; + } else if (TransactionStatus.RollbackTransaction == transactionStatus) { + return LocalTransactionState.ROLLBACK_MESSAGE; + } + return LocalTransactionState.UNKNOW; + } + }, arg); + } catch (Exception e) { + throw new RuntimeException(e); + } + if (sendResultRMQ.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE) { + throw new RuntimeException("local transaction branch failed ,so transaction rollback"); + } + SendResult sendResult = new SendResult(); + sendResult.setMessageId(sendResultRMQ.getMsgId()); + sendResult.setTopic(sendResultRMQ.getMessageQueue().getTopic()); + return sendResult; + } + +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsClientSendMessageHookImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsClientSendMessageHookImpl.java new file mode 100644 index 0000000..c6fbe10 --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsClientSendMessageHookImpl.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api.impl.tracehook; + +import java.util.ArrayList; + +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceBean; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceContext; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceType; +import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher; +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.hook.SendMessageHook; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.protocol.NamespaceUtil; + +public class OnsClientSendMessageHookImpl implements SendMessageHook { + + private AsyncDispatcher localDispatcher; + + public OnsClientSendMessageHookImpl(AsyncDispatcher localDispatcher) { + this.localDispatcher = localDispatcher; + } + + @Override + public String hookName() { + return "OnsClientSendMessageHook"; + } + + @Override + public void sendMessageBefore(SendMessageContext context) { + + if (context == null || context.getMessage().getTopic().startsWith(MixAll.SYSTEM_TOPIC_PREFIX)) { + return; + } + OnsTraceContext onsContext = new OnsTraceContext(); + onsContext.setTraceBeans(new ArrayList(1)); + context.setMqTraceContext(onsContext); + onsContext.setTraceType(OnsTraceType.Pub); + String userGroup = NamespaceUtil.withoutNamespace(context.getProducerGroup(), context.getNamespace()); + onsContext.setGroupName(userGroup); + OnsTraceBean traceBean = new OnsTraceBean(); + String userTopic = NamespaceUtil.withoutNamespace(context.getMessage().getTopic(), context.getNamespace()); + traceBean.setTopic(userTopic); + traceBean.setTags(context.getMessage().getTags()); + traceBean.setKeys(context.getMessage().getKeys()); + traceBean.setStoreHost(context.getBrokerAddr()); + traceBean.setBodyLength(context.getMessage().getBody().length); + traceBean.setMsgType(context.getMsgType()); + onsContext.getTraceBeans().add(traceBean); + } + + @Override + public void sendMessageAfter(SendMessageContext context) { + + if (context == null || context.getMessage().getTopic().startsWith(OnsTraceConstants.traceTopic) || context.getMqTraceContext() == null) { + return; + } + if (context.getSendResult() == null) { + return; + } + if (context.getSendResult().getRegionId() == null + || context.getSendResult().getRegionId().equals(OnsTraceConstants.default_region) + || !context.getSendResult().isTraceOn()) { + // if regionId is default or switch is false,skip it + return; + } + OnsTraceContext onsContext = (OnsTraceContext) context.getMqTraceContext(); + OnsTraceBean traceBean = onsContext.getTraceBeans().get(0); + int costTime = (int) ((System.currentTimeMillis() - onsContext.getTimeStamp()) / onsContext.getTraceBeans().size()); + onsContext.setCostTime(costTime); + if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) { + onsContext.setSuccess(true); + } else { + onsContext.setSuccess(false); + } + onsContext.setRegionId(context.getSendResult().getRegionId()); + traceBean.setMsgId(context.getSendResult().getMsgId()); + traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId()); + traceBean.setStoreTime(onsContext.getTimeStamp() + costTime / 2); + localDispatcher.append(onsContext); + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsConsumeMessageHookImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsConsumeMessageHookImpl.java new file mode 100644 index 0000000..d5a0782 --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsConsumeMessageHookImpl.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.tracehook; + +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType; +import org.apache.rocketmq.client.hook.ConsumeMessageContext; +import org.apache.rocketmq.client.hook.ConsumeMessageHook; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.NamespaceUtil; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceBean; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceContext; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceType; +import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher; + +public class OnsConsumeMessageHookImpl implements ConsumeMessageHook { + + private AsyncDispatcher localDispatcher; + + public OnsConsumeMessageHookImpl(AsyncDispatcher localDispatcher) { + this.localDispatcher = localDispatcher; + } + + @Override + public String hookName() { + return "OnsConsumeMessageHook"; + } + + @Override + public void consumeMessageBefore(ConsumeMessageContext context) { + if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { + return; + } + OnsTraceContext onsTraceContext = new OnsTraceContext(); + context.setMqTraceContext(onsTraceContext); + onsTraceContext.setTraceType(OnsTraceType.SubBefore); + String userGroup = NamespaceUtil.withoutNamespace(context.getConsumerGroup(), context.getNamespace()); + onsTraceContext.setGroupName(userGroup); + List beans = new ArrayList(); + for (MessageExt msg : context.getMsgList()) { + if (msg == null) { + continue; + } + String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION); + String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH); + if (regionId == null || regionId.equals(OnsTraceConstants.default_region)) { + // if regionId is default ,skip it + continue; + } + if (traceOn != null && "false".equals(traceOn)) { + // if trace switch is false ,skip it + continue; + } + OnsTraceBean traceBean = new OnsTraceBean(); + + String userTopic = NamespaceUtil.withoutNamespace(msg.getTopic(), context.getNamespace()); + traceBean.setTopic(userTopic); + traceBean.setMsgId(msg.getMsgId()); + traceBean.setTags(msg.getTags()); + traceBean.setKeys(msg.getKeys()); + traceBean.setStoreTime(msg.getStoreTimestamp()); + traceBean.setBodyLength(msg.getStoreSize()); + traceBean.setRetryTimes(msg.getReconsumeTimes()); + onsTraceContext.setRegionId(regionId); + beans.add(traceBean); + } + if (beans.size() > 0) { + onsTraceContext.setTraceBeans(beans); + onsTraceContext.setTimeStamp(System.currentTimeMillis()); + localDispatcher.append(onsTraceContext); + } + } + + @Override + public void consumeMessageAfter(ConsumeMessageContext context) { + if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { + return; + } + OnsTraceContext subBeforeContext = (OnsTraceContext) context.getMqTraceContext(); + if (subBeforeContext.getRegionId().equals(OnsTraceConstants.default_region)) { + // if regionId is default ,skip it + return; + } + if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) { + // if subbefore bean is null ,skip it + return; + } + OnsTraceContext subAfterContext = new OnsTraceContext(); + subAfterContext.setTraceType(OnsTraceType.SubAfter); + subAfterContext.setRegionId(subBeforeContext.getRegionId()); + subAfterContext.setGroupName(subBeforeContext.getGroupName()); + subAfterContext.setRequestId(subBeforeContext.getRequestId()); + subAfterContext.setSuccess(context.isSuccess()); + + int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size()); + subAfterContext.setCostTime(costTime);// + subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans()); + String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE); + if (contextType != null) { + subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal()); + } + localDispatcher.append(subAfterContext); + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/util/ClientLoggerUtil.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/util/ClientLoggerUtil.java new file mode 100644 index 0000000..a7e8391 --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/util/ClientLoggerUtil.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api.impl.util; + +import java.util.Arrays; + +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.logging.InternalLogger; + +public class ClientLoggerUtil { + private static final String CLIENT_LOG_ROOT = "ons.client.logRoot"; + private static final String CLIENT_LOG_FILEMAXINDEX = "ons.client.logFileMaxIndex"; + private static final int CLIENT_LOG_FILE_MAX_INDEX = 100; + private static final String CLIENT_LOG_LEVEL = "ons.client.logLevel"; + private static final String[] LEVEL_ARRAY = {"ERROR", "WARN", "INFO", "DEBUG"}; + private static final long CLIENT_LOG_FILESIZE = 64 * 1024 * 1024L; + + public static InternalLogger getClientLogger() { + //Make sure + String onsClientLogRoot = System.getProperty(CLIENT_LOG_ROOT, System.getProperty("user.home") + "/logs"); + System.setProperty(ClientLogger.CLIENT_LOG_ROOT, onsClientLogRoot); + String onsClientLogLevel = System.getProperty(CLIENT_LOG_LEVEL, "INFO").trim().toUpperCase(); + if (!Arrays.asList(LEVEL_ARRAY).contains(onsClientLogLevel)) { + onsClientLogLevel = "INFO"; + } + System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, onsClientLogLevel); + String onsClientLogMaxIndex = System.getProperty(CLIENT_LOG_FILEMAXINDEX, "10").trim(); + try { + int maxIndex = Integer.parseInt(onsClientLogMaxIndex); + if (maxIndex <= 0 || maxIndex > CLIENT_LOG_FILE_MAX_INDEX) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + onsClientLogMaxIndex = "10"; + } + System.setProperty(ClientLogger.CLIENT_LOG_MAXINDEX, onsClientLogMaxIndex); + System.setProperty(ClientLogger.CLIENT_LOG_FILENAME, "ons.log"); + System.setProperty(ClientLogger.CLIENT_LOG_FILESIZE, String.valueOf(CLIENT_LOG_FILESIZE)); + return ClientLogger.getLog(); + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/util/MsgConvertUtil.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/util/MsgConvertUtil.java new file mode 100644 index 0000000..19d7792 --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/util/MsgConvertUtil.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +public class MsgConvertUtil { + + public static final byte[] EMPTY_BYTES = new byte[0]; + public static final String EMPTY_STRING = ""; + + public static final String JMS_MSGMODEL = "jmsMsgModel"; + /** + * To adapt this scene: "Notify client try to receive ObjectMessage sent by JMS client" + * Set notify out message model, value can be textMessage OR objectMessage + */ + public static final String COMPATIBLE_FIELD_MSGMODEL = "notifyOutMsgModel"; + + public static final String MSGMODEL_TEXT = "textMessage"; + public static final String MSGMODEL_BYTES = "bytesMessage"; + public static final String MSGMODEL_OBJ = "objectMessage"; + + public static final String MSG_TOPIC = "msgTopic"; + public static final String MSG_TYPE = "msgType"; + + + public static byte[] objectSerialize(Object object) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(object); + oos.close(); + baos.close(); + return baos.toByteArray(); + } + + public static Serializable objectDeserialize(byte[] bytes) throws IOException, ClassNotFoundException { + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + ObjectInputStream ois = new ObjectInputStream(bais); + ois.close(); + bais.close(); + return (Serializable) ois.readObject(); + } + + public static final byte[] string2Bytes(String s, String charset) { + if (null == s) { + return EMPTY_BYTES; + } + byte[] bs = null; + try { + bs = s.getBytes(charset); + } catch (Exception e) { + // ignore + } + return bs; + } + + public static final String bytes2String(byte[] bs, String charset) { + if (null == bs) { + return EMPTY_STRING; + } + String s = null; + try { + s = new String(bs, charset); + } catch (Exception e) { + // ignore + } + return s; + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/util/NameAddrUtils.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/util/NameAddrUtils.java new file mode 100644 index 0000000..a652d85 --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/util/NameAddrUtils.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.util; + +import java.util.regex.Pattern; + +import org.apache.rocketmq.common.MixAll; + +import org.apache.commons.lang3.StringUtils; + +public class NameAddrUtils { + public static final String INSTANCE_PREFIX = "MQ_INST_"; + public static final String INSTANCE_REGEX = INSTANCE_PREFIX + "\\w+_\\w+"; + public static final String ENDPOINT_PREFIX = "http://"; + public static final Pattern NAMESRV_ENDPOINT_PATTERN = Pattern.compile("^" + ENDPOINT_PREFIX + ".*"); + public static final Pattern INST_ENDPOINT_PATTERN = Pattern.compile("^" + ENDPOINT_PREFIX + INSTANCE_REGEX + "\\..*"); + + public static String getNameAdd() { + return System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)); + } + + public static boolean validateInstanceEndpoint(String endpoint) { + return INST_ENDPOINT_PATTERN.matcher(endpoint).matches(); + } + + public static String parseInstanceIdFromEndpoint(String endpoint) { + if (StringUtils.isEmpty(endpoint)) { + return null; + } + return endpoint.substring(ENDPOINT_PREFIX.length(), endpoint.indexOf('.')); + } +} diff --git a/ons-core/ons-client/src/main/resources/ons_client_info.properties b/ons-core/ons-client/src/main/resources/ons_client_info.properties new file mode 100644 index 0000000..0ba07b4 --- /dev/null +++ b/ons-core/ons-client/src/main/resources/ons_client_info.properties @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version=${project.version} diff --git a/ons-core/ons-client/src/test/java/org/apache/rocketmq/ons/api/impl/rocketmq/NameServerAutoUpdateTest.java b/ons-core/ons-client/src/test/java/org/apache/rocketmq/ons/api/impl/rocketmq/NameServerAutoUpdateTest.java new file mode 100644 index 0000000..e04d890 --- /dev/null +++ b/ons-core/ons-client/src/test/java/org/apache/rocketmq/ons/api/impl/rocketmq/NameServerAutoUpdateTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.rocketmq; + +import org.apache.rocketmq.ons.api.Consumer; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.Producer; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; +import org.apache.rocketmq.ons.api.order.OrderProducer; +import org.junit.Rule; +import org.junit.rules.ExpectedException; + +import java.util.Properties; + +public class NameServerAutoUpdateTest { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @org.junit.Test + public void testNamesrv_setNsAddr() { + Properties prop = buildProps(); + prop.setProperty(PropertyKeyConst.NAMESRV_ADDR, "xxx-whatever"); + Consumer consumer = ONSFactory.createConsumer(prop); + consumer.start(); + } + + @org.junit.Test + public void testNamesrv_setOnsAddr_invalid() { + expectedException.expect(ONSClientException.class); + expectedException.expectMessage("onsAddr " + "xxx"); + + Properties prop = buildProps(); + prop.setProperty(PropertyKeyConst.ONSAddr, "xxx"); + Consumer consumer = ONSFactory.createConsumer(prop); + consumer.start(); + } + + @org.junit.Test + public void testNamesrv_setOnsAddr_valid() throws InterruptedException { + Properties prop = buildProps(); + prop.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet"); + Consumer consumer = ONSFactory.createConsumer(prop); + consumer.start(); + } + + @org.junit.Test + public void testNamesrv_notSetOnsAddr_useInternet_default() throws InterruptedException { + Properties prop = buildProps(); + Consumer consumer = ONSFactory.createConsumer(prop); + consumer.start(); + } + + @org.junit.Test + public void testNamesrv_notSetOnsAddr_useInternet_default_Producer() throws InterruptedException { + Properties prop = buildProps(); + Producer producer = ONSFactory.createProducer(prop); + producer.start(); + } + + @org.junit.Test + public void testNamesrv_notSetOnsAddr_useInternet_default_OrderProcucer() throws InterruptedException { + Properties prop = buildProps(); + OrderProducer producer = ONSFactory.createOrderProducer(prop); + producer.start(); + } + + private static Properties buildProps() { + Properties properties = new Properties(); + + properties.put(PropertyKeyConst.ConsumerId, "metaq-consumer-01_SELF"); + // 鉴权用 AccessKey,在阿里云服务器管理控制台创建 + properties.put(PropertyKeyConst.AccessKey, "XXX"); + // 鉴权用 SecretKey,在阿里云服务器管理控制台创建 + properties.put(PropertyKeyConst.SecretKey, "XXX"); + return properties; + } + +} diff --git a/ons-core/ons-client/src/test/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientTokenUpdateTest.java b/ons-core/ons-client/src/test/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientTokenUpdateTest.java new file mode 100644 index 0000000..245fddd --- /dev/null +++ b/ons-core/ons-client/src/test/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientTokenUpdateTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.rocketmq; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import org.apache.rocketmq.ons.api.Consumer; +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.Producer; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.SendResult; +import org.apache.rocketmq.ons.api.exception.ONSClientException; +import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials; +import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +public class ONSClientTokenUpdateTest { + + private static final String TOPIC = "STS_TOPIC_TEST_MOLING"; + + @Ignore + public void testSend() throws InterruptedException { + + List> credentials = new ArrayList>() { + { + add(new ArrayList() { + { + add("ak"); + add("sk"); + add("token"); + } + }); + + add(new ArrayList() { + { + add("ak"); + add("sk"); + add("token"); + } + }); + + } + }; + + Producer producer = ONSFactory.createProducer(buildProps( + "ak", + "sk", + "token", + ONSChannel.ALIYUN.name() + )); + producer.start(); + + for (int i = 0; i < 100; i++) { + List credential = credentials.get(i % credentials.size()); + producer.updateCredential(buildProps(credential.get(0), credential.get(1), credential.get(2), ONSChannel.ALIYUN.name())); + try { + Message msg = new Message(TOPIC, "tag", "key" + i, ("content." + i).getBytes()); + SendResult result = producer.send(msg); + System.out.println(i + " use ak " + credential.get(0) + " send " + result.getMessageId()); + } catch (Exception e) { + System.out.println(i + " use ak " + credential.get(0) + " send failed."); + } + } + + Thread.sleep(10 * 1000L); + producer.shutdown(); + } + + @Test + public void test_ConsumerImpl() throws NoSuchFieldException, IllegalAccessException { + Consumer consumer = ONSFactory.createConsumer(buildProps("ak", "sk", "token", ONSChannel.ALIYUN.name())); + ONSConsumerAbstract subImpl = (ONSConsumerAbstract) consumer; + consumer.start(); + + Assert.assertTrue(subImpl.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getMQClientAPIImpl().getRemotingClient() instanceof NettyRemotingClient); + NettyRemotingClient remotingClient = + (NettyRemotingClient) subImpl.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getMQClientAPIImpl().getRemotingClient(); + Assert.assertTrue(remotingClient.getRPCHooks() instanceof ClientRPCHook); + ClientRPCHook clientRPCHook = (ClientRPCHook) remotingClient.getRPCHooks(); + Field field = ClientRPCHook.class.getDeclaredField("sessionCredentials"); + field.setAccessible(true); + SessionCredentials credentials = (SessionCredentials) field.get(clientRPCHook); + + Assert.assertEquals("ak", credentials.getAccessKey()); + Assert.assertEquals("sk", credentials.getSecretKey()); + Assert.assertEquals("token", credentials.getSecurityToken()); + Assert.assertEquals(ONSChannel.ALIYUN, credentials.getOnsChannel()); + + consumer.updateCredential(buildProps("nak", "nsk", "ntoken", ONSChannel.CLOUD.name())); + + Assert.assertEquals("nak", credentials.getAccessKey()); + Assert.assertEquals("nsk", credentials.getSecretKey()); + Assert.assertEquals("ntoken", credentials.getSecurityToken()); + Assert.assertEquals(ONSChannel.CLOUD, credentials.getOnsChannel()); + } + + @Test + public void test_ProducerImpl() throws NoSuchFieldException, IllegalAccessException { + Producer producer = ONSFactory.createProducer(buildProps("ak", "sk", "token", ONSChannel.ALIYUN.name())); + ProducerImpl subImpl = (ProducerImpl) producer; + producer.start(); + + Assert.assertTrue(subImpl.getDefaultMQProducer().getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getRemotingClient() instanceof NettyRemotingClient); + NettyRemotingClient remotingClient = + (NettyRemotingClient) subImpl.getDefaultMQProducer().getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getRemotingClient(); + Assert.assertTrue(remotingClient.getRPCHooks() instanceof ClientRPCHook); + ClientRPCHook clientRPCHook = (ClientRPCHook) remotingClient.getRPCHooks(); + Field field = ClientRPCHook.class.getDeclaredField("sessionCredentials"); + field.setAccessible(true); + SessionCredentials credentials = (SessionCredentials) field.get(clientRPCHook); + + Assert.assertEquals("ak", credentials.getAccessKey()); + Assert.assertEquals("sk", credentials.getSecretKey()); + Assert.assertEquals("token", credentials.getSecurityToken()); + Assert.assertEquals(ONSChannel.ALIYUN, credentials.getOnsChannel()); + + producer.updateCredential(buildProps("nak", "nsk", "ntoken", ONSChannel.CLOUD.name())); + + Assert.assertEquals("nak", credentials.getAccessKey()); + Assert.assertEquals("nsk", credentials.getSecretKey()); + Assert.assertEquals("ntoken", credentials.getSecurityToken()); + Assert.assertEquals(ONSChannel.CLOUD, credentials.getOnsChannel()); + } + + @Test + public void test_ConsumerImpl_updateNull() throws NoSuchFieldException, IllegalAccessException { + Consumer consumer = ONSFactory.createConsumer(buildProps("ak", "sk", "token", ONSChannel.ALIYUN.name())); + ONSConsumerAbstract subImpl = (ONSConsumerAbstract) consumer; + consumer.start(); + + Assert.assertTrue(subImpl.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getMQClientAPIImpl().getRemotingClient() instanceof NettyRemotingClient); + NettyRemotingClient remotingClient = + (NettyRemotingClient) subImpl.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getMQClientAPIImpl().getRemotingClient(); + Assert.assertTrue(remotingClient.getRPCHooks() instanceof ClientRPCHook); + ClientRPCHook clientRPCHook = (ClientRPCHook) remotingClient.getRPCHooks(); + Field field = ClientRPCHook.class.getDeclaredField("sessionCredentials"); + field.setAccessible(true); + SessionCredentials credentials = (SessionCredentials) field.get(clientRPCHook); + + Assert.assertEquals("ak", credentials.getAccessKey()); + Assert.assertEquals("sk", credentials.getSecretKey()); + Assert.assertEquals("token", credentials.getSecurityToken()); + Assert.assertEquals(ONSChannel.ALIYUN, credentials.getOnsChannel()); + + try { + consumer.updateCredential(buildProps("nak", "", "ntoken", ONSChannel.CLOUD.name())); + } catch (Exception e) { + Assert.assertTrue(e instanceof ONSClientException); + } + + Assert.assertEquals("ak", credentials.getAccessKey()); + Assert.assertEquals("sk", credentials.getSecretKey()); + Assert.assertEquals("token", credentials.getSecurityToken()); + Assert.assertEquals(ONSChannel.ALIYUN, credentials.getOnsChannel()); + } + + private static Properties buildProps(String ak, String sk, String token, String channel) { + Properties properties = new Properties(); + properties.put(PropertyKeyConst.ConsumerId, "CID_STS_TEST_MOLING"); + properties.put(PropertyKeyConst.ProducerId, "PID_STS_TEST_MOLING"); + properties.put(PropertyKeyConst.AccessKey, ak); + properties.put(PropertyKeyConst.SecretKey, sk); + properties.put(PropertyKeyConst.SecurityToken, token); + properties.put(PropertyKeyConst.OnsChannel, channel); + + return properties; + } + +} diff --git a/ons-core/ons-client/src/test/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSUtilTest.java b/ons-core/ons-client/src/test/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSUtilTest.java new file mode 100644 index 0000000..b0b0a1f --- /dev/null +++ b/ons-core/ons-client/src/test/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSUtilTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.api.impl.rocketmq; + +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ONSUtilTest { + @org.junit.Test + public void extractProperties() throws Exception { + Properties properties = new Properties(); + properties.put("Integer", 123); + + assertThat(properties.get("Integer")).isEqualTo(123); + assertThat(properties.getProperty("Integer")).isNull(); + + Properties newPro = ONSUtil.extractProperties(properties); + assertThat(newPro.getProperty("Integer")).isEqualTo("123"); + assertThat(newPro.get("Integer")).isEqualTo("123"); + } + + @org.junit.Test + public void extractProperties_WithInner() throws Exception { + Properties inner = new Properties(); + inner.put("Integer", 123); + + Properties properties = new Properties(inner); + + assertThat(properties.get("Integer")).isNull(); + assertThat(properties.getProperty("Integer")).isNull(); + + inner.put("String", "String"); + assertThat(properties.get("String")).isNull(); + assertThat(properties.getProperty("String")).isEqualTo("String"); + + + Properties newPro = ONSUtil.extractProperties(properties); + assertThat(newPro.getProperty("Integer")).isEqualTo("123"); + assertThat(newPro.get("Integer")).isEqualTo("123"); + assertThat(newPro.getProperty("String")).isEqualTo("String"); + assertThat(newPro.get("String")).isEqualTo("String"); + } +} \ No newline at end of file diff --git a/ons-core/ons-trace-core/pom.xml b/ons-core/ons-trace-core/pom.xml new file mode 100644 index 0000000..4ba3f7d --- /dev/null +++ b/ons-core/ons-trace-core/pom.xml @@ -0,0 +1,41 @@ + + + + + ons-all + org.apache.rocketmq + 1.8.1-SNAPSHOT + + 4.0.0 + ons-trace-core + jar + ons-trace-core ${project.version} + + + ${project.groupId} + ons-auth4client + ${project.version} + + + org.apache.rocketmq + rocketmq-client + + + diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceBean.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceBean.java new file mode 100644 index 0000000..211958e --- /dev/null +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceBean.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.open.trace.core.common; + +import org.apache.rocketmq.ons.open.trace.core.utils.MixUtils; +import org.apache.rocketmq.common.message.MessageType; + +public class OnsTraceBean { + private static final String LOCAL_ADDRESS = MixUtils.getLocalAddress(); + private String topic = ""; + private String msgId = ""; + private String offsetMsgId = ""; + private String tags = ""; + private String keys = ""; + private String storeHost = LOCAL_ADDRESS; + private String clientHost = LOCAL_ADDRESS; + private long storeTime; + private int retryTimes; + private int bodyLength; + private MessageType msgType; + + + public MessageType getMsgType() { + return msgType; + } + + + public void setMsgType(final MessageType msgType) { + this.msgType = msgType; + } + + + public String getOffsetMsgId() { + return offsetMsgId; + } + + + public void setOffsetMsgId(final String offsetMsgId) { + this.offsetMsgId = offsetMsgId; + } + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public String getMsgId() { + return msgId; + } + + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + + public String getTags() { + return tags; + } + + + public void setTags(String tags) { + this.tags = tags; + } + + + public String getKeys() { + return keys; + } + + + public void setKeys(String keys) { + this.keys = keys; + } + + + public String getStoreHost() { + return storeHost; + } + + + public void setStoreHost(String storeHost) { + this.storeHost = storeHost; + } + + + public String getClientHost() { + return clientHost; + } + + + public void setClientHost(String clientHost) { + this.clientHost = clientHost; + } + + + public long getStoreTime() { + return storeTime; + } + + + public void setStoreTime(long storeTime) { + this.storeTime = storeTime; + } + + + public int getRetryTimes() { + return retryTimes; + } + + + public void setRetryTimes(int retryTimes) { + this.retryTimes = retryTimes; + } + + + public int getBodyLength() { + return bodyLength; + } + + + public void setBodyLength(int bodyLength) { + this.bodyLength = bodyLength; + } +} diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceConstants.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceConstants.java new file mode 100644 index 0000000..0877f84 --- /dev/null +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceConstants.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.open.trace.core.common; + +import javax.annotation.Generated; + +import org.apache.rocketmq.common.MixAll; + +@Generated("ons-client") +public class OnsTraceConstants { + + public static final String NAMESRV_ADDR = "NAMESRV_ADDR"; + + public static final String ADDRSRV_URL = "ADDRSRV_URL"; + + public static final String AccessKey = "AccessKey"; + + public static final String SecretKey = "SecretKey"; + + public static final String InstanceName = "InstanceName"; + + public static final String AsyncBufferSize = "AsyncBufferSize"; + + public static final String MaxBatchNum = "MaxBatchNum"; + + public static final String WakeUpNum = "WakeUpNum"; + + public static final String MaxMsgSize = "MaxMsgSize"; + + + public static final String groupName = "_INNER_TRACE_PRODUCER"; + + public static final String traceTopic = MixAll.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_"; + + + public static final String default_region = MixAll.DEFAULT_TRACE_REGION_ID; + + public static final char CONTENT_SPLITOR = (char)1; + public static final char FIELD_SPLITOR = (char)2; + + public static final String TraceDispatcherType = "DispatcherType"; +} diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceContext.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceContext.java new file mode 100644 index 0000000..f6aa191 --- /dev/null +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceContext.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.open.trace.core.common; + +import java.util.List; + +import org.apache.rocketmq.common.message.MessageClientIDSetter; + +public class OnsTraceContext implements Comparable { + + private OnsTraceType traceType; + + private long timeStamp = System.currentTimeMillis(); + + private String regionId = ""; + private String regionName = ""; + + private String groupName = ""; + + private int costTime = 0; + + private boolean isSuccess = true; + + private String requestId = MessageClientIDSetter.createUniqID(); + + private int contextCode = 0; + + private int exactlyOnceStatus = 0; + + private List traceBeans; + + public int getContextCode() { + return contextCode; + } + + public void setContextCode(final int contextCode) { + this.contextCode = contextCode; + } + + public int getExactlyOnceStatus() { + return exactlyOnceStatus; + } + + public void setExactlyOnceStatus(int exactlyOnceStatus) { + this.exactlyOnceStatus = exactlyOnceStatus; + } + + public List getTraceBeans() { + return traceBeans; + } + + + public void setTraceBeans(List traceBeans) { + this.traceBeans = traceBeans; + } + + + public String getRegionId() { + return regionId; + } + + + public void setRegionId(String regionId) { + this.regionId = regionId; + } + + + public OnsTraceType getTraceType() { + return traceType; + } + + + public void setTraceType(OnsTraceType traceType) { + this.traceType = traceType; + } + + + public long getTimeStamp() { + return timeStamp; + } + + + public void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } + + + public String getGroupName() { + return groupName; + } + + + public void setGroupName(String groupName) { + this.groupName = groupName; + } + + + public int getCostTime() { + return costTime; + } + + + public void setCostTime(int costTime) { + this.costTime = costTime; + } + + + public boolean isSuccess() { + return isSuccess; + } + + + public void setSuccess(boolean success) { + isSuccess = success; + } + + + public String getRequestId() { + return requestId; + } + + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public String getRegionName() { + return regionName; + } + + public void setRegionName(String regionName) { + this.regionName = regionName; + } + + @Override + public int compareTo(OnsTraceContext o) { + return (int) (this.timeStamp - o.getTimeStamp()); + } + + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(1024); + sb.append(traceType).append("_").append(groupName) + .append("_").append(regionId).append("_").append(isSuccess).append("_"); + if (traceBeans != null && traceBeans.size() > 0) { + for (OnsTraceBean bean : traceBeans) { + sb.append(bean.getMsgId() + "_" + bean.getTopic() + "_"); + } + } + return "OnsTraceContext{" + sb.toString() + '}'; + } +} diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceDataEncoder.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceDataEncoder.java new file mode 100644 index 0000000..7b6ad6e --- /dev/null +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceDataEncoder.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.open.trace.core.common; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.rocketmq.common.message.MessageType; + +public class OnsTraceDataEncoder { + + + public static List decoderFromTraceDataString(String traceData) { + List resList = new ArrayList(); + if (traceData == null || traceData.length() <= 0) { + return resList; + } + String[] contextList = traceData.split(String.valueOf(OnsTraceConstants.FIELD_SPLITOR)); + for (String context : contextList) { + String[] line = context.split(String.valueOf(OnsTraceConstants.CONTENT_SPLITOR)); + if (line[0].equals(OnsTraceType.Pub.name())) { + OnsTraceContext pubContext = new OnsTraceContext(); + pubContext.setTraceType(OnsTraceType.Pub); + pubContext.setTimeStamp(Long.parseLong(line[1])); + pubContext.setRegionId(line[2]); + pubContext.setGroupName(line[3]); + OnsTraceBean bean = new OnsTraceBean(); + bean.setTopic(line[4]); + bean.setMsgId(line[5]); + bean.setTags(line[6]); + bean.setKeys(line[7]); + bean.setStoreHost(line[8]); + bean.setBodyLength(Integer.parseInt(line[9])); + pubContext.setCostTime(Integer.parseInt(line[10])); + bean.setMsgType(MessageType.values()[Integer.parseInt(line[11])]); + if (line.length == 13) { + pubContext.setSuccess(Boolean.parseBoolean(line[12])); + } else if (line.length == 14) { + bean.setOffsetMsgId(line[12]); + pubContext.setSuccess(Boolean.parseBoolean(line[13])); + } + pubContext.setTraceBeans(new ArrayList(1)); + pubContext.getTraceBeans().add(bean); + resList.add(pubContext); + } else if (line[0].equals(OnsTraceType.SubBefore.name())) { + OnsTraceContext subBeforeContext = new OnsTraceContext(); + subBeforeContext.setTraceType(OnsTraceType.SubBefore); + subBeforeContext.setTimeStamp(Long.parseLong(line[1])); + subBeforeContext.setRegionId(line[2]); + subBeforeContext.setGroupName(line[3]); + subBeforeContext.setRequestId(line[4]); + OnsTraceBean bean = new OnsTraceBean(); + bean.setMsgId(line[5]); + bean.setRetryTimes(Integer.parseInt(line[6])); + bean.setKeys(line[7]); + subBeforeContext.setTraceBeans(new ArrayList(1)); + subBeforeContext.getTraceBeans().add(bean); + resList.add(subBeforeContext); + } else if (line[0].equals(OnsTraceType.SubAfter.name())) { + OnsTraceContext subAfterContext = new OnsTraceContext(); + subAfterContext.setTraceType(OnsTraceType.SubAfter); + subAfterContext.setRequestId(line[1]); + OnsTraceBean bean = new OnsTraceBean(); + bean.setMsgId(line[2]); + bean.setKeys(line[5]); + subAfterContext.setTraceBeans(new ArrayList(1)); + subAfterContext.getTraceBeans().add(bean); + subAfterContext.setCostTime(Integer.parseInt(line[3])); + subAfterContext.setSuccess(Boolean.parseBoolean(line[4])); + if (line.length >= 7) { + // add the context type + subAfterContext.setContextCode(Integer.parseInt(line[6])); + } + if (line.length >= 8) { + subAfterContext.setExactlyOnceStatus(Integer.parseInt(line[7])); + } + resList.add(subAfterContext); + } + } + return resList; + } + + + public static OnsTraceTransferBean encoderFromContextBean(OnsTraceContext ctx) { + if (ctx == null) { + return null; + } + OnsTraceTransferBean transferBean = new OnsTraceTransferBean(); + StringBuilder sb = new StringBuilder(256); + switch (ctx.getTraceType()) { + case Pub: { + OnsTraceBean bean = ctx.getTraceBeans().get(0); + sb.append(ctx.getTraceType()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getTimeStamp()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getRegionId()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getGroupName()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(bean.getTopic()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(bean.getMsgId()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(bean.getTags()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(bean.getKeys()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(bean.getStoreHost()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(bean.getBodyLength()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getCostTime()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(bean.getMsgType().ordinal()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(bean.getOffsetMsgId()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(ctx.isSuccess()).append(OnsTraceConstants.FIELD_SPLITOR); + } + break; + case SubBefore: { + for (OnsTraceBean bean : ctx.getTraceBeans()) { + sb.append(ctx.getTraceType()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getTimeStamp()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getRegionId()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getGroupName()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getRequestId()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(bean.getMsgId()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(bean.getRetryTimes()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(bean.getKeys()).append(OnsTraceConstants.FIELD_SPLITOR);// + } + } + break; + case SubAfter: { + for (OnsTraceBean bean : ctx.getTraceBeans()) { + sb.append(ctx.getTraceType()).append(OnsTraceConstants.CONTENT_SPLITOR)// + // .append(ctx.getTimeStamp()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getRequestId()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(bean.getMsgId()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getCostTime()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(ctx.isSuccess()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(bean.getKeys()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getContextCode()).append(OnsTraceConstants.CONTENT_SPLITOR)// + .append(ctx.getExactlyOnceStatus()).append(OnsTraceConstants.FIELD_SPLITOR); + } + } + break; + default: + } + transferBean.setTransData(sb.toString()); + for (OnsTraceBean bean : ctx.getTraceBeans()) { + transferBean.getTransKey().add(bean.getMsgId()); + if (bean.getKeys() != null && bean.getKeys().length() > 0) { + transferBean.getTransKey().add(bean.getKeys()); + } + } + return transferBean; + } +} diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceDispatcherType.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceDispatcherType.java new file mode 100644 index 0000000..88dc595 --- /dev/null +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceDispatcherType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.open.trace.core.common; + +public enum OnsTraceDispatcherType { + + PRODUCER, + + CONSUMER +} diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceTransferBean.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceTransferBean.java new file mode 100644 index 0000000..71d79e4 --- /dev/null +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceTransferBean.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.open.trace.core.common; + +import java.util.HashSet; +import java.util.Set; + +public class OnsTraceTransferBean { + private String transData; + private Set transKey = new HashSet(); + + + public String getTransData() { + return transData; + } + + + public void setTransData(String transData) { + this.transData = transData; + } + + + public Set getTransKey() { + return transKey; + } + + + public void setTransKey(Set transKey) { + this.transKey = transKey; + } +} diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceType.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceType.java new file mode 100644 index 0000000..5684584 --- /dev/null +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceType.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.open.trace.core.common; + +public enum OnsTraceType { + + Pub, + + SubBefore, + + SubAfter, +} diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/AsyncDispatcher.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/AsyncDispatcher.java new file mode 100644 index 0000000..87ead88 --- /dev/null +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/AsyncDispatcher.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.open.trace.core.dispatch; + +import java.io.IOException; + +import org.apache.rocketmq.client.exception.MQClientException; + +public interface AsyncDispatcher { + + void start() throws MQClientException; + + boolean append(Object ctx); + + void flush() throws IOException; + + void shutdown(); +} diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java new file mode 100644 index 0000000..4f02bbd --- /dev/null +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.open.trace.core.dispatch.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.client.common.ThreadLocalIndex; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.NamespaceUtil; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceContext; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDataEncoder; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceTransferBean; +import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher; + +public class AsyncArrayDispatcher implements AsyncDispatcher { + private final static InternalLogger CLIENT_LOG = ClientLogger.getLog(); + private final int queueSize; + private final int batchSize; + private final DefaultMQProducer traceProducer; + private final ThreadPoolExecutor traceExecuter; + + private AtomicLong discardCount; + private Thread worker; + private ArrayBlockingQueue traceContextQueue; + private ArrayBlockingQueue appenderQueue; + private volatile Thread shutDownHook; + private volatile boolean stopped = false; + private String dispatcherType; + private DefaultMQProducerImpl hostProducer; + private DefaultMQPushConsumerImpl hostConsumer; + private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); + private String dispatcherId = UUID.randomUUID().toString(); + + public AsyncArrayDispatcher(Properties properties) throws MQClientException { + dispatcherType = properties.getProperty(OnsTraceConstants.TraceDispatcherType); + int queueSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.AsyncBufferSize, "2048")); + queueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1)); + this.queueSize = queueSize; + batchSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.MaxBatchNum, "1")); + this.discardCount = new AtomicLong(0L); + traceContextQueue = new ArrayBlockingQueue(1024); + appenderQueue = new ArrayBlockingQueue(queueSize); + + this.traceExecuter = new ThreadPoolExecutor(// + 10, // + 20, // + 1000 * 60, // + TimeUnit.MILLISECONDS, // + this.appenderQueue, // + new ThreadFactoryImpl("MQTraceSendThread_")); + traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties); + } + + public AsyncArrayDispatcher(Properties properties, SessionCredentials sessionCredentials) throws MQClientException { + dispatcherType = properties.getProperty(OnsTraceConstants.TraceDispatcherType); + int queueSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.AsyncBufferSize, "2048")); + queueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1)); + this.queueSize = queueSize; + batchSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.MaxBatchNum, "1")); + this.discardCount = new AtomicLong(0L); + traceContextQueue = new ArrayBlockingQueue(1024); + appenderQueue = new ArrayBlockingQueue(queueSize); + + this.traceExecuter = new ThreadPoolExecutor( + 10, + 20, + 1000 * 60, + TimeUnit.MILLISECONDS, + this.appenderQueue, + new ThreadFactoryImpl("MQTraceSendThread_")); + traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, sessionCredentials); + } + + public DefaultMQProducerImpl getHostProducer() { + return hostProducer; + } + + public void setHostProducer(DefaultMQProducerImpl hostProducer) { + this.hostProducer = hostProducer; + } + + public DefaultMQPushConsumerImpl getHostConsumer() { + return hostConsumer; + } + + public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) { + this.hostConsumer = hostConsumer; + } + + @Override + public void start() throws MQClientException { + TraceProducerFactory.registerTraceDispatcher(dispatcherId); + this.worker = new ThreadFactoryImpl("MQ-AsyncArrayDispatcher-Thread-" + dispatcherId, true) + .newThread(new AsyncRunnable()); + this.worker.start(); + this.registerShutDownHook(); + } + + @Override + public boolean append(final Object ctx) { + boolean result = traceContextQueue.offer((OnsTraceContext) ctx); + if (!result) { + CLIENT_LOG.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx); + } + return result; + } + + @Override + public void flush() throws IOException { + long end = System.currentTimeMillis() + 500; + while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + break; + } + } + CLIENT_LOG.info("------end trace send " + traceContextQueue.size() + " " + appenderQueue.size()); + } + + @Override + public void shutdown() { + this.stopped = true; + this.traceExecuter.shutdown(); + TraceProducerFactory.unregisterTraceDispatcher(dispatcherId); + this.removeShutdownHook(); + } + + public void registerShutDownHook() { + if (shutDownHook == null) { + shutDownHook = new ThreadFactoryImpl("ShutdownHookMQTrace").newThread(new Runnable() { + private volatile boolean hasShutdown = false; + @Override + public void run() { + synchronized (this) { + if (!this.hasShutdown) { + try { + flush(); + } catch (IOException e) { + CLIENT_LOG.error("system mqtrace hook shutdown failed ,maybe loss some trace data"); + } + } + } + } + }); + Runtime.getRuntime().addShutdownHook(shutDownHook); + } + } + + public void removeShutdownHook() { + if (shutDownHook != null) { + Runtime.getRuntime().removeShutdownHook(shutDownHook); + } + } + + class AsyncRunnable implements Runnable { + private boolean stopped; + + @Override + public void run() { + while (!stopped) { + List contexts = new ArrayList(batchSize); + for (int i = 0; i < batchSize; i++) { + OnsTraceContext context = null; + try { + context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + } + if (context != null) { + contexts.add(context); + } else { + break; + } + } + if (contexts.size() > 0) { + AsyncAppenderRequest request = new AsyncAppenderRequest(contexts); + traceExecuter.submit(request); + } else if (AsyncArrayDispatcher.this.stopped) { + this.stopped = true; + } + } + + } + } + + class AsyncAppenderRequest implements Runnable { + List contextList; + + public AsyncAppenderRequest(final List contextList) { + if (contextList != null) { + this.contextList = contextList; + } else { + this.contextList = new ArrayList(1); + } + } + + @Override + public void run() { + sendTraceData(contextList); + } + + public void sendTraceData(List contextList) { + Map> transBeanMap = new HashMap>(16); + String currentRegionId = null; + for (OnsTraceContext context : contextList) { + currentRegionId = context.getRegionId(); + if (currentRegionId == null || context.getTraceBeans().isEmpty()) { + continue; + } + String topic = context.getTraceBeans().get(0).getTopic(); + String key = topic + OnsTraceConstants.CONTENT_SPLITOR + currentRegionId; + List transBeanList = transBeanMap.get(key); + if (transBeanList == null) { + transBeanList = new ArrayList(); + transBeanMap.put(key, transBeanList); + } + OnsTraceTransferBean traceData = OnsTraceDataEncoder.encoderFromContextBean(context); + transBeanList.add(traceData); + } + for (Map.Entry> entry : transBeanMap.entrySet()) { + String[] key = entry.getKey().split(String.valueOf(OnsTraceConstants.CONTENT_SPLITOR)); + flushData(entry.getValue(), key[0], key[1]); + } + } + + private void flushData(List transBeanList, String topic, String currentRegionId) { + if (transBeanList.size() == 0) { + return; + } + StringBuilder buffer = new StringBuilder(1024); + int count = 0; + Set keySet = new HashSet(); + + for (OnsTraceTransferBean bean : transBeanList) { + keySet.addAll(bean.getTransKey()); + buffer.append(bean.getTransData()); + count++; + if (buffer.length() >= traceProducer.getMaxMessageSize()) { + sendTraceDataByMQ(keySet, buffer.toString(), topic, currentRegionId); + buffer.delete(0, buffer.length()); + keySet.clear(); + count = 0; + } + } + if (count > 0) { + sendTraceDataByMQ(keySet, buffer.toString(), topic, currentRegionId); + } + transBeanList.clear(); + } + + private void sendTraceDataByMQ(Set keySet, final String data, String dataTopic, + String currentRegionId) { + String topic = OnsTraceConstants.traceTopic + currentRegionId; + final Message message = new Message(topic, data.getBytes()); + message.setKeys(keySet); + try { + Set dataBrokerSet = getBrokerSetByTopic(dataTopic); + Set traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic); + dataBrokerSet.retainAll(traceBrokerSet); + SendCallback callback = new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + } + + @Override + public void onException(Throwable e) { + CLIENT_LOG.info("send trace data ,the traceData is " + data); + } + }; + if (dataBrokerSet.isEmpty()) { + //no cross set + traceProducer.send(message, callback, 5000); + } else { + traceProducer.send(message, new MessageQueueSelector() { + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + Set brokerSet = (Set) arg; + List filterMqs = new ArrayList(); + for (MessageQueue queue : mqs) { + if (brokerSet.contains(queue.getBrokerName())) { + filterMqs.add(queue); + } + } + int index = sendWhichQueue.getAndIncrement(); + int pos = Math.abs(index) % filterMqs.size(); + if (pos < 0) { + pos = 0; + } + return filterMqs.get(pos); + } + }, dataBrokerSet, callback); + } + + } catch (Exception e) { + CLIENT_LOG.info("send trace data,the traceData is" + data); + } + } + + private Set getBrokerSetByTopic(String topic) { + Set brokerSet = new HashSet(); + if (dispatcherType != null && dispatcherType.equals(OnsTraceDispatcherType.PRODUCER.name()) && hostProducer != null) { + brokerSet = tryGetMessageQueueBrokerSet(hostProducer, topic); + } + if (dispatcherType != null && dispatcherType.equals(OnsTraceDispatcherType.CONSUMER.name()) && hostConsumer != null) { + brokerSet = tryGetMessageQueueBrokerSet(hostConsumer, topic); + } + return brokerSet; + } + + private Set tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) { + Set brokerSet = new HashSet(); + String realTopic = NamespaceUtil.wrapNamespace(producer.getDefaultMQProducer().getNamespace(), topic); + TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(realTopic); + if (null == topicPublishInfo || !topicPublishInfo.ok()) { + producer.getTopicPublishInfoTable().putIfAbsent(realTopic, new TopicPublishInfo()); + producer.getmQClientFactory().updateTopicRouteInfoFromNameServer(realTopic); + topicPublishInfo = producer.getTopicPublishInfoTable().get(realTopic); + } + if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { + for (MessageQueue queue : topicPublishInfo.getMessageQueueList()) { + brokerSet.add(queue.getBrokerName()); + } + } + return brokerSet; + } + + private Set tryGetMessageQueueBrokerSet(DefaultMQPushConsumerImpl consumer, String topic) { + Set brokerSet = new HashSet(); + try { + String realTopic = NamespaceUtil.wrapNamespace(consumer.getDefaultMQPushConsumer().getNamespace(), topic); + Set messageQueues = consumer.fetchSubscribeMessageQueues(realTopic); + for (MessageQueue queue : messageQueues) { + brokerSet.add(queue.getBrokerName()); + } + } catch (MQClientException e) { + CLIENT_LOG.info("fetch message queue failed, the topic is {}", topic); + } + return brokerSet; + } + } +} diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java new file mode 100644 index 0000000..60e65cb --- /dev/null +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.open.trace.core.dispatch.impl; + +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.namesrv.TopAddressing; + +import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials; +import org.apache.rocketmq.ons.api.impl.rocketmq.ClientRPCHook; + +public class TraceProducerFactory { + private static Map dispatcherTable = new ConcurrentHashMap(); + private static AtomicBoolean isStarted = new AtomicBoolean(false); + private static DefaultMQProducer traceProducer; + + public static DefaultMQProducer getTraceDispatcherProducer(Properties properties) { + if (traceProducer == null) { + SessionCredentials sessionCredentials = new SessionCredentials(); + Properties sessionProperties = new Properties(); + String accessKey = properties.getProperty(OnsTraceConstants.AccessKey); + String secretKey = properties.getProperty(OnsTraceConstants.SecretKey); + sessionProperties.put(OnsTraceConstants.AccessKey, accessKey); + sessionProperties.put(OnsTraceConstants.SecretKey, secretKey); + sessionCredentials.updateContent(sessionProperties); + traceProducer = new DefaultMQProducer(new ClientRPCHook(sessionCredentials)); + traceProducer.setProducerGroup(accessKey + OnsTraceConstants.groupName); + traceProducer.setSendMsgTimeout(5000); + traceProducer.setInstanceName(properties.getProperty(OnsTraceConstants.InstanceName, String.valueOf(System.currentTimeMillis()))); + String nameSrv = properties.getProperty(OnsTraceConstants.NAMESRV_ADDR); + if (nameSrv == null) { + TopAddressing topAddressing = new TopAddressing(properties.getProperty(OnsTraceConstants.ADDRSRV_URL)); + nameSrv = topAddressing.fetchNSAddr(); + } + traceProducer.setNamesrvAddr(nameSrv); + traceProducer.setVipChannelEnabled(false); + int maxSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.MaxMsgSize, "128000")); + traceProducer.setMaxMessageSize(maxSize - 10 * 1000); + } + return traceProducer; + } + + public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, SessionCredentials sessionCredentials) { + if (traceProducer == null) { + String accessKey = properties.getProperty(OnsTraceConstants.AccessKey); + traceProducer = new DefaultMQProducer(new ClientRPCHook(sessionCredentials)); + traceProducer.setProducerGroup(accessKey.replace('.', '-') + OnsTraceConstants.groupName); + traceProducer.setSendMsgTimeout(5000); + traceProducer.setInstanceName(properties.getProperty(OnsTraceConstants.InstanceName, String.valueOf(System.currentTimeMillis()))); + String nameSrv = properties.getProperty(OnsTraceConstants.NAMESRV_ADDR); + if (nameSrv == null) { + TopAddressing topAddressing = new TopAddressing(properties.getProperty(OnsTraceConstants.ADDRSRV_URL)); + nameSrv = topAddressing.fetchNSAddr(); + } + traceProducer.setNamesrvAddr(nameSrv); + traceProducer.setVipChannelEnabled(false); + int maxSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.MaxMsgSize, "128000")); + traceProducer.setMaxMessageSize(maxSize - 10 * 1000); + } + return traceProducer; + } + + public static void registerTraceDispatcher(String dispatcherId) throws MQClientException { + dispatcherTable.put(dispatcherId, new Object()); + if (traceProducer != null && isStarted.compareAndSet(false, true)) { + traceProducer.start(); + } + } + + public static void unregisterTraceDispatcher(String dispatcherId) { + dispatcherTable.remove(dispatcherId); + if (dispatcherTable.isEmpty() && traceProducer != null && isStarted.get()) { + traceProducer.shutdown(); + } + } +} diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/AbstractRPCHook.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/AbstractRPCHook.java new file mode 100644 index 0000000..9e78880 --- /dev/null +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/AbstractRPCHook.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.ons.open.trace.core.hook; + +import java.lang.reflect.Field; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +import static org.apache.rocketmq.ons.api.impl.authority.SessionCredentials.AccessKey; +import static org.apache.rocketmq.ons.api.impl.authority.SessionCredentials.ONSChannelKey; + +public abstract class AbstractRPCHook implements RPCHook { + protected ConcurrentHashMap, Field[]> fieldCache = + new ConcurrentHashMap, Field[]>(); + + + protected SortedMap parseRequestContent(RemotingCommand request, String ak, String onsChannel) { + CommandCustomHeader header = request.readCustomHeader(); + // sort property + SortedMap map = new TreeMap(); + map.put(AccessKey, ak); + map.put(ONSChannelKey, onsChannel); + try { + // add header properties + if (null != header) { + Field[] fields = fieldCache.get(header.getClass()); + if (null == fields) { + fields = header.getClass().getDeclaredFields(); + for (Field field : fields) { + field.setAccessible(true); + } + Field[] tmp = fieldCache.putIfAbsent(header.getClass(), fields); + if (null != tmp) { + fields = tmp; + } + } + + for (Field field : fields) { + Object value = field.get(header); + if (null != value && !field.isSynthetic()) { + map.put(field.getName(), value.toString()); + } + } + } + return map; + } + catch (Exception e) { + throw new RuntimeException("incompatible exception.", e); + } + } + +} diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/utils/MixUtils.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/utils/MixUtils.java new file mode 100644 index 0000000..9eb5be1 --- /dev/null +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/utils/MixUtils.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.rocketmq.ons.open.trace.core.utils; + +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Enumeration; + +public class MixUtils { + public static String getLocalAddress() { + try { + + Enumeration enumeration = NetworkInterface.getNetworkInterfaces(); + ArrayList ipv4Result = new ArrayList(); + ArrayList ipv6Result = new ArrayList(); + while (enumeration.hasMoreElements()) { + final NetworkInterface networkInterface = enumeration.nextElement(); + final Enumeration en = networkInterface.getInetAddresses(); + while (en.hasMoreElements()) { + final InetAddress address = en.nextElement(); + if (!address.isLoopbackAddress()) { + if (address instanceof Inet6Address) { + ipv6Result.add(normalizeHostAddress(address)); + } + else { + ipv4Result.add(normalizeHostAddress(address)); + } + } + } + } + + + if (!ipv4Result.isEmpty()) { + for (String ip : ipv4Result) { + if (ip.startsWith("127.0") || ip.startsWith("192.168")) { + continue; + } + + return ip; + } + + + return ipv4Result.get(ipv4Result.size() - 1); + } + + else if (!ipv6Result.isEmpty()) { + return ipv6Result.get(0); + } + + final InetAddress localHost = InetAddress.getLocalHost(); + return normalizeHostAddress(localHost); + } + catch (SocketException e) { + e.printStackTrace(); + } + catch (UnknownHostException e) { + e.printStackTrace(); + } + finally { + + } + + return null; + } + + + public static String normalizeHostAddress(final InetAddress localHost) { + if (localHost instanceof Inet6Address) { + return "[" + localHost.getHostAddress() + "]"; + } + else { + return localHost.getHostAddress(); + } + } + + + public static String toJson(final Object obj, boolean prettyFormat) { + return RemotingSerializable.toJson(obj, prettyFormat); + } + + + public static T fromJson(String json, Class classOfT) { + return RemotingSerializable.fromJson(json, classOfT); + } + + + public static String replaceNull(String ori) { + return ori == null ? "" : ori; + } +} diff --git a/ons-core/pom.xml b/ons-core/pom.xml new file mode 100644 index 0000000..edadb69 --- /dev/null +++ b/ons-core/pom.xml @@ -0,0 +1,271 @@ + + + + + org.apache.rocketmq + ons-parent + 1.8.1-SNAPSHOT + + 4.0.0 + 2012 + ons-all + pom + ons-all ${project.version} + https://github.org.apache.rocketmq + https://github.com/alibaba/RocketMQ/blob/develop/README.md + + ons-client + ons-api + ons-trace-core + ons-auth4client + + + UTF-8 + + true + true + true + + 1.6 + 1.6 + UTF-8 + + 4.5.1 + ${project.version} + 4.1.2.RELEASE + 3.7.4 + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.3.2 + + ${java_source_version} + ${java_target_version} + ${file_encoding} + true + true + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.3 + + ${maven.test.skip} + -Xms512m -Xmx1024m + once + + **/*Test.java + + + org.apache.rocketmq/remoting/ExceptionTest.java + org.apache.rocketmq/remoting/SyncInvokeTest.java + org.apache.rocketmq/remoting/NettyIdleTest.java + org.apache.rocketmq/remoting/NettyConnectionTest.java + org.apache.rocketmq/common/filter/PolishExprTest.java + org.apache.rocketmq/common/protocol/MQProtosHelperTest.java + + org.apache.rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java + + org.apache.rocketmq/store/RecoverTest.java + org.apache.rocketmq/broker/api/SendMessageTest.java + org.apache.rocketmq/test/integration/*/*.java + org.apache.rocketmq/test/integration/BaseTest.java + org.apache.rocketmq/test/*/*.java + org.apache.rocketmq/test/BaseTest.java + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.10.4 + + + attach-javadocs + + jar + + + + + ${maven.jdoc.skip} + ${file_encoding} + ${file_encoding} + org.jboss.apiviz.APIviz + + org.jboss.apiviz + apiviz + 1.3.2.GA + + true + true + true + true + true + + + + org.apache.maven.plugins + maven-source-plugin + 2.1.2 + + + attach-sources + + jar + + + + + + + + src/main/resources + false + + + + + + release-sign-artifacts + + + performRelease + true + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.1 + + + sign-artifacts + verify + + sign + + + + + + + + + rmq + + true + + + + + maven-assembly-plugin + + rmq + + release.xml + + + + + + + + + ons-client + + + + maven-assembly-plugin + + rocketmq-ons-client + + release-client.xml + + + + + + + + + + + org.slf4j + slf4j-api + 1.7.7 + + + ${project.groupId} + ons-api + ${project.version} + + + ${project.groupId} + ons-client + ${project.version} + + + ${project.groupId} + ons-trace-core + ${project.version} + + + org.apache.rocketmq + rocketmq-client + ${rocketmq.version} + + + org.apache.rocketmq + rocketmq-remoting + ${rocketmq.version} + + + org.apache.rocketmq + rocketmq-common + ${rocketmq.version} + + + org.apache.rocketmq + rocketmq-tools + ${rocketmq.version} + + + ${project.groupId} + ons-auth4client + ${project.version} + + + ch.qos.logback + logback-classic + 1.1.11 + + + ch.qos.logback + logback-core + 1.1.11 + + + + diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..9718937 --- /dev/null +++ b/pom.xml @@ -0,0 +1,277 @@ + + + + + + + org.apache + apache + 18 + + 4.0.0 + org.apache.rocketmq + ons-parent + 1.8.1-SNAPSHOT + pom + ons-parent ${project.version} + + Apache RocketMQ lightweight client + + 2019 + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + UTF-8 + UTF-8 + UTF-8 + + 1.7 + 1.7 + 1.4.8 + + + ons-core + + + + + org.codehaus.mojo + versions-maven-plugin + 2.3 + + + org.codehaus.mojo + clirr-maven-plugin + 2.7 + + + maven-compiler-plugin + 3.6.1 + + ${maven.compiler.source} + ${maven.compiler.target} + ${maven.compiler.source} + true + true + ${file_encoding} + + + + maven-surefire-plugin + 2.19.1 + + -Xms512m -Xmx1024m + always + + **/*Test.java + + + + + maven-site-plugin + 3.6 + + en_US + UTF-8 + UTF-8 + + + + maven-source-plugin + 3.0.1 + + + attach-sources + + jar + + + + + + maven-javadoc-plugin + 2.10.4 + + UTF-8 + en_US + + + + aggregate + + aggregate + + site + + + + + maven-resources-plugin + 3.0.2 + + ${project.build.sourceEncoding} + + + + org.codehaus.mojo + findbugs-maven-plugin + 3.0.4 + + + maven-checkstyle-plugin + 2.17 + + + verify + verify + + style/ons_checkstyle.xml + UTF-8 + true + true + false + + + check + + + + + + pl.project13.maven + git-commit-id-plugin + 2.2.3 + + + + revision + + + + + git + yyyy-MM-dd HH:mm:ss z + false + ${project.basedir}/.git + false + false + false + + + + maven-jar-plugin + 3.0.2 + + + + true + true + false + + + ${project.version} + + + + + + + + + + org.slf4j + slf4j-api + 1.7.7 + + + ch.qos.logback + logback-classic + 1.1.11 + + + ch.qos.logback + logback-core + 1.1.11 + + + commons-codec + commons-codec + 1.9 + + + com.google.guava + guava + 18.0 + + + org.apache.commons + commons-lang3 + 3.6 + + + org.apache.commons + commons-csv + 1.5 + + + commons-beanutils + commons-beanutils + 1.9.3 + + + com.alibaba + fastjson + 1.2.50 + + + junit + junit + 4.11 + test + + + org.assertj + assertj-core + 2.6.0 + test + + + org.mockito + mockito-core + 2.6.3 + test + + + org.mockito + mockito-core + 2.6.3 + test + + + + + git@gitlab.alibaba-inc.com:middleware/ons.git + scm:git:git@gitlab.alibaba-inc.com:middleware/ons.git + scm:git:git@gitlab.alibaba-inc.com:middleware/ons.git + + diff --git a/style/copyright/Apache.xml b/style/copyright/Apache.xml new file mode 100644 index 0000000..e3e3dec --- /dev/null +++ b/style/copyright/Apache.xml @@ -0,0 +1,23 @@ + + + + +