This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new 9dcbd93 [ISSUE-46] Support mutliple RocketMQTemplate & name-server overrided
Consumer Listener (#56)
9dcbd93 is described below
commit 9dcbd93424e75407b3674ae9f675e372daffec9c
Author: Kevin Wang <wiseking.wq@gmail.com>
AuthorDate: Wed Apr 17 09:56:31 2019 +0800
[ISSUE-46] Support mutliple RocketMQTemplate & name-server overrided Consumer Listener
(#56)
* [ISSUE-46] Support mutliple RocketMQTemplate & name-server overrided Consumer Listener
---
README.md | 42 +++++-
README_zh_CN.md | 36 +++++
rocketmq-spring-boot-samples/pom.xml | 2 +-
.../springboot/consumer/StringConsumerNewNS.java | 25 ++--
.../src/main/resources/application.properties | 5 +-
.../samples/springboot/ExtRocketMQTemplate.java | 17 +--
.../samples/springboot/ProducerApplication.java | 6 +
.../src/main/resources/application.properties | 5 +-
....java => ExtRocketMQTemplateConfiguration.java} | 71 +++++----
.../spring/annotation/RocketMQMessageListener.java | 10 ++
.../ExtProducerResetConfiguration.java | 158 +++++++++++++++++++++
.../ListenerContainerConfiguration.java | 10 +-
.../autoconfigure/RocketMQAutoConfiguration.java | 10 +-
.../spring/config/RocketMQConfigUtils.java | 3 +
.../rocketmq/spring/core/RocketMQTemplate.java | 5 +-
.../support/DefaultRocketMQListenerContainer.java | 20 ++-
.../RocketMQAutoConfigurationTest.java | 94 ++++++++++--
17 files changed, 430 insertions(+), 89 deletions(-)
diff --git a/README.md b/README.md
index b1f4178..780c5ce 100644
--- a/README.md
+++ b/README.md
@@ -361,5 +361,45 @@ public class MyConsumer implements RocketMQListener<String> {
1. How do I send transactional messages?
- It needs two steps on client side: a) Define a class which is annotated with @RocketMQTransactionListener
and implements RocketMQLocalTransactionListener interface, in which, the executeLocalTransaction()
and checkLocalTransaction() methods are implemented;
+ It needs two steps on client side:
+
+ a) Define a class which is annotated with @RocketMQTransactionListener and implements
RocketMQLocalTransactionListener interface, in which, the executeLocalTransaction() and checkLocalTransaction()
methods are implemented;
+
b) Invoke the sendMessageInTransaction() method with the RocketMQTemplate API. Note: The
first parameter of this method is correlated with the txProducerGroup attribute of @RocketMQTransactionListener.
It can be null if using the default transaction producer group.
+
+1. How do I create more than one RocketMQTemplate with a different name-server or other specific
properties?
+ ```java
+ // Step1. Define an extra RocketMQTemplate with required properties, note, the 'nameServer'
property must be different from the value of global
+ // Spring configuration 'rocketmq.name-server', other properties are optionally defined,
they will use the global configuration
+ // definition by default.
+
+ // The RocketMQTemplate's Spring Bean name is 'extRocketMQTemplate', same with the simplified
class name (Initials lowercase)
+ @ExtRocketMQTemplateConfiguration(nameServer="127.0.0.1:9876"
+ , ... // override other specific properties if needed
+ )
+ public class ExtRocketMQTemplate extends RocketMQTemplate {
+ // keep the body empty
+ }
+
+
+ // Step2. Use the extra RocketMQTemplate. e.g.
+ @Resource(name = "extRocketMQTemplate") // Must define the name to qualify to extra-defined
RocketMQTemplate bean.
+ private RocketMQTemplate extRocketMQTemplate;
+ // you can use the template as normal.
+
+ ```
+
+1. How do I create a consumer Listener with different name-server other than the global Spring
configuration 'rocketmq.name-server' ?
+ ```java
+ @Service
+ @RocketMQMessageListener(
+ nameServer = "NEW-NAMESERVER-LIST", // define new nameServer list
+ topic = "test-topic-1",
+ consumerGroup = "my-consumer_test-topic-1",
+ enableMsgTrace = true,
+ customizedTraceTopic = "my-trace-topic"
+ )
+ public class MyNameServerConsumer implements RocketMQListener<String> {
+ ...
+ }
+ ```
\ No newline at end of file
diff --git a/README_zh_CN.md b/README_zh_CN.md
index c285c52..b75a617 100644
--- a/README_zh_CN.md
+++ b/README_zh_CN.md
@@ -352,3 +352,39 @@ public class MyConsumer implements RocketMQListener<String> {
在客户端,首先用户需要实现RocketMQLocalTransactionListener接口,并在接口类上注解声明@RocketMQTransactionListener,实现确认和回查方法;然后再使用资源模板RocketMQTemplate,
调用方法sendMessageInTransaction()来进行消息的发布。 注意:这个方法通过指定发送者组名与具体的声明了txProducerGroup的TransactionListener进行关联,您也可以不指定这个值,从而使用默认的事务发送者组。
+1. 如何声明不同name-server或者其他特定的属性来定义非标的RocketMQTemplate?
+ ```java
+ // 第一步: 定义非标的RocketMQTemplate使用你需要的属性,注意,这里的'nameServer'属性必须要定义,并且其取值不能与全局配置属性'rocketmq.name-server'相同
+ // 也可以定义其他属性,如果不定义,它们取全局的配置属性值或默认值。
+
+ // 这个RocketMQTemplate的Spring Bean名是'extRocketMQTemplate', 与所定义的类名相同(但首字母小写)
+ @ExtRocketMQTemplateConfiguration(nameServer="127.0.0.1:9876"
+ , ... // 定义其他属性,如果有必要。
+ )
+ public class ExtRocketMQTemplate extends RocketMQTemplate {
+ //类里面不需要做任何修改
+ }
+
+
+ // 第二步: 使用这个非标RocketMQTemplate
+ @Resource(name = "extRocketMQTemplate") // 这里必须定义name属性来指向上具体的Spring
Bean.
+ private RocketMQTemplate extRocketMQTemplate;
+ // 接下来就可以正常使用这个extRocketMQTemplate了.
+
+ ```
+
+1. MessageListener消费端,是否可以指定不同的name-server而不是使用全局定义的'rocketmq.name-server'属性值
?
+
+ ```java
+ @Service
+ @RocketMQMessageListener(
+ nameServer = "NEW-NAMESERVER-LIST", // 可以使用这个optional属性来指定不同的name-server
+ topic = "test-topic-1",
+ consumerGroup = "my-consumer_test-topic-1",
+ enableMsgTrace = true,
+ customizedTraceTopic = "my-trace-topic"
+ )
+ public class MyNameServerConsumer implements RocketMQListener<String> {
+ ...
+ }
+ ```
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/pom.xml b/rocketmq-spring-boot-samples/pom.xml
index c1699b6..63a080f 100644
--- a/rocketmq-spring-boot-samples/pom.xml
+++ b/rocketmq-spring-boot-samples/pom.xml
@@ -36,7 +36,7 @@
</modules>
<properties>
- <rocketmq-spring-boot-starter-version>2.0.1</rocketmq-spring-boot-starter-version>
+ <rocketmq-spring-boot-starter-version>2.0.3-SNAPSHOT</rocketmq-spring-boot-starter-version>
</properties>
<dependencies>
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQConfigUtils.java
b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerNewNS.java
similarity index 55%
copy from rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQConfigUtils.java
copy to rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerNewNS.java
index a9c9f80..5d2c872 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQConfigUtils.java
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerNewNS.java
@@ -15,15 +15,20 @@
* limitations under the License.
*/
-package org.apache.rocketmq.spring.config;
+package org.apache.rocketmq.samples.springboot.consumer;
-public class RocketMQConfigUtils {
- /**
- * The bean name of the internally managed RocketMQ transaction annotation processor.
- */
- public static final String ROCKETMQ_TRANSACTION_ANNOTATION_PROCESSOR_BEAN_NAME =
- "org.springframework.rocketmq.spring.starter.internalRocketMQTransAnnotationProcessor";
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.stereotype.Service;
- public static final String ROCKETMQ_TRANSACTION_DEFAULT_GLOBAL_NAME =
- "rocketmq_transaction_default_global_name";
-}
+/**
+ * RocketMQMessageListener
+ */
+@Service
+@RocketMQMessageListener(nameServer = "${demo.rocketmq.myNameServer}", topic = "${demo.rocketmq.topic}",
consumerGroup = "string_consumer")
+public class StringConsumerNewNS implements RocketMQListener<String> {
+ @Override
+ public void onMessage(String message) {
+ System.out.printf("------- StringConsumerNewNS received: %s \n", message);
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
index 5f74d5d..404cb10 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
@@ -1,9 +1,12 @@
spring.application.name=rocketmq-consume-demo
rocketmq.name-server=localhost:9876
-rocketmq.topic=string-topic
# properties used in application code
+demo.rocketmq.topic=string-topic
demo.rocketmq.orderTopic=order-paid-topic
demo.rocketmq.msgExtTopic=message-ext-topic
demo.rocketmq.transTopic=spring-transaction-topic
+
+# another nameserver different global
+demo.rocketmq.myNameServer=127.0.0.1:9876
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQConfigUtils.java
b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
similarity index 61%
copy from rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQConfigUtils.java
copy to rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
index a9c9f80..7a78552 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQConfigUtils.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
@@ -14,16 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.rocketmq.samples.springboot;
-package org.apache.rocketmq.spring.config;
+import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
-public class RocketMQConfigUtils {
- /**
- * The bean name of the internally managed RocketMQ transaction annotation processor.
- */
- public static final String ROCKETMQ_TRANSACTION_ANNOTATION_PROCESSOR_BEAN_NAME =
- "org.springframework.rocketmq.spring.starter.internalRocketMQTransAnnotationProcessor";
-
- public static final String ROCKETMQ_TRANSACTION_DEFAULT_GLOBAL_NAME =
- "rocketmq_transaction_default_global_name";
-}
+@ExtRocketMQTemplateConfiguration(nameServer = "${demo.rocketmq.extNameServer}")
+public class ExtRocketMQTemplate extends RocketMQTemplate {
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
index d7ca208..465658a 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
@@ -54,6 +54,8 @@ public class ProducerApplication implements CommandLineRunner {
private String orderPaidTopic;
@Value("${demo.rocketmq.msgExtTopic}")
private String msgExtTopic;
+ @Resource(name = "extRocketMQTemplate")
+ private RocketMQTemplate extRocketMQTemplate;
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
@@ -65,6 +67,10 @@ public class ProducerApplication implements CommandLineRunner {
SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello, World!");
System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
+ // Use the extRocketMQTemplate
+ sendResult = extRocketMQTemplate.syncSend(springTopic, "Hello, World!");
+ System.out.printf("extRocketMQTemplate.syncSend1 to topic %s sendResult=%s %n", springTopic,
sendResult);
+
// Send string with spring Message
sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello,
World! I'm from spring message").build());
System.out.printf("syncSend2 to topic %s sendResult=%s %n", springTopic, sendResult);
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
index 7965b28..3a68505 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
@@ -1,8 +1,11 @@
rocketmq.name-server=localhost:9876
rocketmq.producer.group=my-group1
+rocketmq.producer.sendMessageTimeout=300000
# properties used in the application
demo.rocketmq.topic=string-topic
demo.rocketmq.orderTopic=order-paid-topic
demo.rocketmq.msgExtTopic=message-ext-topic
-demo.rocketmq.transTopic=spring-transaction-topic
\ No newline at end of file
+demo.rocketmq.transTopic=spring-transaction-topic
+
+demo.rocketmq.extNameServer=127.0.0.1:9876
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
similarity index 50%
copy from rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
copy to rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
index 608be82..c268b57 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.spring.annotation;
+import org.springframework.stereotype.Component;
+
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
@@ -26,71 +28,62 @@ import java.lang.annotation.Target;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
-public @interface RocketMQMessageListener {
-
- String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
- String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
- String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
-
+@Component
+public @interface ExtRocketMQTemplateConfiguration {
/**
- * Consumers of the same role is required to have exactly same subscriptions and consumerGroup
to correctly achieve
- * load balance. It's required and needs to be globally unique.
- *
- *
- * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for
further discussion.
+ * The component name of the Producer configuration.
*/
- String consumerGroup();
+ String value() default "";
/**
- * Topic name.
+ * The property of "name-server".
*/
- String topic();
+ String nameServer();
/**
- * Control how to selector message.
- *
- * @see SelectorType
+ * Name of producer.
*/
- SelectorType selectorType() default SelectorType.TAG;
-
+ String group() default "${rocketmq.producer.group:}";
/**
- * Control which message can be select. Grammar please see {@link SelectorType#TAG} and
{@link SelectorType#SQL92}
+ * Millis of send message timeout.
*/
- String selectorExpression() default "*";
-
+ int sendMessageTimeout() default -1;
/**
- * Control consume mode, you can choice receive message concurrently or orderly.
+ * Compress message body threshold, namely, message body larger than 4k will be compressed
on default.
*/
- ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
-
+ int compressMessageBodyThreshold() default -1;
/**
- * Control message mode, if you want all subscribers receive message all message, broadcasting
is a good choice.
+ * Maximum number of retry to perform internally before claiming sending failure in synchronous
mode.
+ * This may potentially cause message duplication which is up to application developers
to resolve.
*/
- MessageModel messageModel() default MessageModel.CLUSTERING;
-
+ int retryTimesWhenSendFailed() default -1;
/**
- * Max consumer thread number.
+ * <p> Maximum number of retry to perform internally before claiming sending failure
in asynchronous mode. </p>
+ * This may potentially cause message duplication which is up to application developers
to resolve.
*/
- int consumeThreadMax() default 64;
-
+ int retryTimesWhenSendAsyncFailed() default -1;
+ /**
+ * Indicate whether to retry another broker on sending failure internally.
+ */
+ boolean retryNextServer() default false;
+ /**
+ * Maximum allowed message size in bytes.
+ */
+ int maxMessageSize() default -1;
/**
* The property of "access-key".
*/
- String accessKey() default ACCESS_KEY_PLACEHOLDER;
-
+ String accessKey() default "${rocketmq.producer.accessKey:}";
/**
* The property of "secret-key".
*/
- String secretKey() default SECRET_KEY_PLACEHOLDER;
-
+ String secretKey() default "${rocketmq.producer.secretKey:}";
/**
* Switch flag instance for message trace.
*/
boolean enableMsgTrace() default true;
-
/**
* The name value of message trace topic.If you don't config,you can use the default
trace topic name.
*/
- String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
-
-}
+ String customizedTraceTopic() default "${rocketmq.producer.customized-trace-topic:}";
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index 608be82..1d48817 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -28,6 +28,7 @@ import java.lang.annotation.Target;
@Documented
public @interface RocketMQMessageListener {
+ String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
@@ -74,6 +75,11 @@ public @interface RocketMQMessageListener {
int consumeThreadMax() default 64;
/**
+ * Max consumer timeout, default 30s.
+ */
+ long consumeTimeout() default 30000L;
+
+ /**
* The property of "access-key".
*/
String accessKey() default ACCESS_KEY_PLACEHOLDER;
@@ -93,4 +99,8 @@ public @interface RocketMQMessageListener {
*/
String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
+ /**
+ * The property of "name-server".
+ */
+ String nameServer() default NAME_SERVER_PLACEHOLDER;
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
new file mode 100644
index 0000000..192bfc9
--- /dev/null
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.autoconfigure;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.aop.framework.AopProxyUtils;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.SmartInitializingSingleton;
+import org.springframework.beans.factory.support.BeanDefinitionValidationException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.core.env.StandardEnvironment;
+import org.springframework.util.StringUtils;
+
+import java.util.Map;
+import java.util.Objects;
+
+
+@Configuration
+public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton
{
+ private final static Logger log = LoggerFactory.getLogger(ExtProducerResetConfiguration.class);
+
+ private ConfigurableApplicationContext applicationContext;
+
+ private StandardEnvironment environment;
+
+ private RocketMQProperties rocketMQProperties;
+
+ private ObjectMapper objectMapper;
+
+ public ExtProducerResetConfiguration(ObjectMapper rocketMQMessageObjectMapper,
+ StandardEnvironment environment,
+ RocketMQProperties rocketMQProperties) {
+ this.objectMapper = rocketMQMessageObjectMapper;
+ this.environment = environment;
+ this.rocketMQProperties = rocketMQProperties;
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
{
+ this.applicationContext = (ConfigurableApplicationContext) applicationContext;
+ }
+
+ @Override
+ public void afterSingletonsInstantiated() {
+ Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class);
+
+ if (Objects.nonNull(beans)) {
+ beans.forEach(this::registerTemplate);
+ }
+ }
+
+ private void registerTemplate(String beanName, Object bean) {
+ Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
+
+ if (!RocketMQTemplate.class.isAssignableFrom(bean.getClass())) {
+ throw new IllegalStateException(clazz + " is not instance of " + RocketMQTemplate.class.getName());
+ }
+
+ ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);
+ GenericApplicationContext genericApplicationContext = (GenericApplicationContext)
applicationContext;
+ validate(annotation, genericApplicationContext);
+
+ DefaultMQProducer mqProducer = createProducer(annotation);
+ // Set instanceName same as the beanName
+ mqProducer.setInstanceName(beanName);
+ try {
+ mqProducer.start();
+ } catch (MQClientException e) {
+ throw new BeanDefinitionValidationException(String.format("Failed to startup
MQProducer for RocketMQTemplate {}",
+ beanName), e);
+ }
+ RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
+ rocketMQTemplate.setProducer(mqProducer);
+ rocketMQTemplate.setObjectMapper(objectMapper);
+
+
+ log.info("Set real producer to :{} {}", beanName, annotation.value());
+ }
+
+ private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annotation)
{
+ DefaultMQProducer producer = null;
+
+ RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
+ if (producerConfig == null) {
+ producerConfig = new RocketMQProperties.Producer();
+ }
+ String nameServer = environment.resolvePlaceholders(annotation.nameServer());
+ String groupName = environment.resolvePlaceholders(annotation.group());
+ groupName = StringUtils.isEmpty(groupName) ? producerConfig.getGroup() : groupName;
+
+ String ak = environment.resolvePlaceholders(annotation.accessKey());
+ ak = StringUtils.isEmpty(ak) ? producerConfig.getAccessKey() : annotation.accessKey();
+ String sk = environment.resolvePlaceholders(annotation.secretKey());
+ sk = StringUtils.isEmpty(sk) ? producerConfig.getSecretKey() : annotation.secretKey();
+ String customizedTraceTopic = environment.resolvePlaceholders(annotation.customizedTraceTopic());
+ customizedTraceTopic = StringUtils.isEmpty(customizedTraceTopic) ? producerConfig.getCustomizedTraceTopic()
: customizedTraceTopic;
+
+ if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
+ producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak,
sk)),
+ annotation.enableMsgTrace(), customizedTraceTopic);
+ producer.setVipChannelEnabled(false);
+ } else {
+ producer = new DefaultMQProducer(groupName, annotation.enableMsgTrace(), customizedTraceTopic);
+ }
+
+ producer.setNamesrvAddr(nameServer);
+ producer.setSendMsgTimeout(annotation.sendMessageTimeout() == -1 ? producerConfig.getSendMessageTimeout()
: annotation.sendMessageTimeout());
+ producer.setRetryTimesWhenSendFailed(annotation.retryTimesWhenSendAsyncFailed() ==
-1 ? producerConfig.getRetryTimesWhenSendFailed() : annotation.retryTimesWhenSendAsyncFailed());
+ producer.setRetryTimesWhenSendAsyncFailed(annotation.retryTimesWhenSendAsyncFailed()
== -1 ? producerConfig.getRetryTimesWhenSendAsyncFailed() : annotation.retryTimesWhenSendAsyncFailed());
+ producer.setMaxMessageSize(annotation.maxMessageSize() == -1 ? producerConfig.getMaxMessageSize()
: annotation.maxMessageSize());
+ producer.setCompressMsgBodyOverHowmuch(annotation.compressMessageBodyThreshold()
== -1 ? producerConfig.getCompressMessageBodyThreshold() : annotation.compressMessageBodyThreshold());
+ producer.setRetryAnotherBrokerWhenNotStoreOK(annotation.retryNextServer());
+
+ return producer;
+ }
+
+ private void validate(ExtRocketMQTemplateConfiguration annotation, GenericApplicationContext
genericApplicationContext) {
+ if (genericApplicationContext.isBeanNameInUse(annotation.value())) {
+ throw new BeanDefinitionValidationException(String.format("Bean {} has been used
in Spring Application Context, " +
+ "please check the @ExtRocketMQTemplateConfiguration",
+ annotation.value()));
+ }
+
+ if (rocketMQProperties.getNameServer() == null ||
+ rocketMQProperties.getNameServer().equals(environment.resolvePlaceholders(annotation.nameServer())))
{
+ throw new BeanDefinitionValidationException(
+ "Bad annotation definition in @ExtRocketMQTemplateConfiguration, nameServer
property is same with " +
+ "global property, please use the default RocketMQTemplate!");
+ }
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
index 3a9e070..f409198 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@@ -35,6 +35,7 @@ import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.env.StandardEnvironment;
+import org.springframework.util.StringUtils;
import java.util.Map;
import java.util.Objects;
@@ -92,7 +93,7 @@ public class ListenerContainerConfiguration implements ApplicationContextAware,
GenericApplicationContext genericApplicationContext = (GenericApplicationContext)
applicationContext;
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
- () -> createRocketMQListenerContainer(bean, annotation));
+ () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
if (!container.isRunning()) {
@@ -107,15 +108,18 @@ public class ListenerContainerConfiguration implements ApplicationContextAware,
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}",
beanName, containerBeanName);
}
- private DefaultRocketMQListenerContainer createRocketMQListenerContainer(Object bean,
RocketMQMessageListener annotation) {
+ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name,
Object bean, RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
- container.setNameServer(rocketMQProperties.getNameServer());
+ String nameServer = environment.resolvePlaceholders(annotation.nameServer());
+ nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer()
: nameServer;
+ container.setNameServer(nameServer);
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
container.setRocketMQMessageListener(annotation);
container.setRocketMQListener((RocketMQListener) bean);
container.setObjectMapper(objectMapper);
+ container.setName(name); // REVIEW ME, use the same clientId or multiple?
return container;
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index 2fc034c..f1a2aba 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
import org.apache.rocketmq.spring.config.RocketMQTransactionAnnotationProcessor;
import org.apache.rocketmq.spring.config.TransactionHandlerRegistry;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
@@ -45,7 +46,7 @@ import org.springframework.util.StringUtils;
@EnableConfigurationProperties(RocketMQProperties.class)
@ConditionalOnClass({ MQAdmin.class, ObjectMapper.class })
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server")
-@Import({ JacksonFallbackConfiguration.class, ListenerContainerConfiguration.class })
+@Import({ JacksonFallbackConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class
})
@AutoConfigureAfter(JacksonAutoConfiguration.class)
public class RocketMQAutoConfiguration {
@@ -85,7 +86,7 @@ public class RocketMQAutoConfiguration {
@Bean(destroyMethod = "destroy")
@ConditionalOnBean(DefaultMQProducer.class)
- @ConditionalOnMissingBean(RocketMQTemplate.class)
+ @ConditionalOnMissingBean(name = RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, ObjectMapper rocketMQMessageObjectMapper)
{
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setProducer(mqProducer);
@@ -94,9 +95,10 @@ public class RocketMQAutoConfiguration {
}
@Bean
- @ConditionalOnBean(RocketMQTemplate.class)
+ @ConditionalOnBean(name = RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
@ConditionalOnMissingBean(TransactionHandlerRegistry.class)
- public TransactionHandlerRegistry transactionHandlerRegistry(RocketMQTemplate template)
{
+ public TransactionHandlerRegistry transactionHandlerRegistry(@Qualifier(RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
+ RocketMQTemplate
template) {
return new TransactionHandlerRegistry(template);
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQConfigUtils.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQConfigUtils.java
index a9c9f80..3e1c573 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQConfigUtils.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQConfigUtils.java
@@ -26,4 +26,7 @@ public class RocketMQConfigUtils {
public static final String ROCKETMQ_TRANSACTION_DEFAULT_GLOBAL_NAME =
"rocketmq_transaction_default_global_name";
+
+ public static final String ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME =
+ "rocketMQTemplate";
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 348962e..0803a7e 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -464,8 +464,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String>
imp
@Override
public void afterPropertiesSet() throws Exception {
- Assert.notNull(producer, "Property 'producer' is required");
- producer.start();
+ if (producer != null) {
+ producer.start();
+ }
}
@Override
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index 50cd3a0..b56e3c3 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -59,6 +59,11 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
private ApplicationContext applicationContext;
+ /**
+ * The name of the DefaultRocketMQListenerContainer instance
+ */
+ private String name;
+
private long suspendCurrentQueueTimeMillis = 1000;
/**
@@ -94,6 +99,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
private SelectorType selectorType;
private String selectorExpression;
private MessageModel messageModel;
+ private long consumeTimeout;
public long getSuspendCurrentQueueTimeMillis() {
return suspendCurrentQueueTimeMillis;
@@ -176,6 +182,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
this.messageModel = anno.messageModel();
this.selectorExpression = anno.selectorExpression();
this.selectorType = anno.selectorType();
+ this.consumeTimeout = anno.consumeTimeout();
}
public ConsumeMode getConsumeMode() {
@@ -296,6 +303,10 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
'}';
}
+ public void setName(String name) {
+ this.name = name;
+ }
+
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently
{
@SuppressWarnings("unchecked")
@@ -414,11 +425,18 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
}
- consumer.setNamesrvAddr(nameServer);
+ String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
+ if (customizedNameServer != null) {
+ consumer.setNamesrvAddr(customizedNameServer);
+ } else {
+ consumer.setNamesrvAddr(nameServer);
+ }
consumer.setConsumeThreadMax(consumeThreadMax);
if (consumeThreadMax < consumer.getConsumeThreadMin()) {
consumer.setConsumeThreadMin(consumeThreadMax);
}
+ consumer.setConsumeTimeout(consumeTimeout);
+ consumer.setInstanceName(this.name);
switch (messageModel) {
case BROADCASTING:
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
index f78251e..e6113a8 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
@@ -19,13 +19,19 @@ package org.apache.rocketmq.spring.autoconfigure;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
+import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
+import org.springframework.beans.factory.support.BeanDefinitionValidationException;
import org.springframework.boot.autoconfigure.AutoConfigurations;
+import org.springframework.boot.test.context.assertj.AssertableApplicationContext;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
+import org.springframework.boot.test.context.runner.ContextConsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -62,23 +68,28 @@ public class RocketMQAutoConfigurationTest {
@Test
public void testDefaultMQProducer() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
- "rocketmq.producer.group=spring_rocketmq").
- run((context) -> {
- assertThat(context).hasSingleBean(DefaultMQProducer.class);
- });
+ "rocketmq.producer.group=spring_rocketmq").
+ run((context) -> {
+ assertThat(context).hasSingleBean(DefaultMQProducer.class);
+ });
}
@Test
public void testRocketMQListenerContainer() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
- withUserConfiguration(TestConfig.class).
- run((context) -> {
- // No producer on consume side
- assertThat(context).doesNotHaveBean(DefaultMQProducer.class);
- // Auto-create consume container if existing Bean annotated with @RocketMQMessageListener
- assertThat(context).hasSingleBean(DefaultRocketMQListenerContainer.class);
- });
+ withUserConfiguration(TestConfig.class).
+ run((context) -> {
+ // No producer on consume side
+ assertThat(context).doesNotHaveBean(DefaultMQProducer.class);
+ // Auto-create consume container if existing Bean annotated with @RocketMQMessageListener
+ assertThat(context).hasBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1");
+ assertThat(context).hasBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_2");
+ assertThat(context).getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1").
+ hasFieldOrPropertyWithValue("nameServer", "127.0.0.1:9876");
+ assertThat(context).getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_2").
+ hasFieldOrPropertyWithValue("nameServer", "127.0.1.1:9876");
+ });
}
@@ -87,8 +98,8 @@ public class RocketMQAutoConfigurationTest {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
withUserConfiguration(TestConfig.class, CustomObjectMapperConfig.class).
run((context) -> {
- assertThat(context).hasSingleBean(DefaultRocketMQListenerContainer.class);
- assertThat(context.getBean(DefaultRocketMQListenerContainer.class).getObjectMapper())
+ assertThat(context.getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1",
+ DefaultRocketMQListenerContainer.class).getObjectMapper())
.isSameAs(context.getBean(CustomObjectMapperConfig.class).testObjectMapper());
});
}
@@ -98,12 +109,35 @@ public class RocketMQAutoConfigurationTest {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
withUserConfiguration(TestConfig.class, CustomObjectMappersConfig.class).
run((context) -> {
- assertThat(context).hasSingleBean(DefaultRocketMQListenerContainer.class);
- assertThat(context.getBean(DefaultRocketMQListenerContainer.class).getObjectMapper())
+ assertThat(context.getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1",
+ DefaultRocketMQListenerContainer.class).getObjectMapper())
.isSameAs(context.getBean(CustomObjectMappersConfig.class).rocketMQMessageObjectMapper());
});
}
+
+ @Test
+ public void testExtRocketMQTemplate() {
+ runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
+ withUserConfiguration(ExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class).
+ run(new ContextConsumer<AssertableApplicationContext>() {
+ @Override
+ public void accept(AssertableApplicationContext context) throws Throwable
{
+ Throwable th = context.getStartupFailure();
+ System.out.printf("th==" + th + "\n");
+ Assert.assertTrue(th instanceof BeanDefinitionValidationException);
+ }
+ });
+
+ runner.withPropertyValues("rocketmq.name-server=127.0.1.1:9876").
+ withUserConfiguration(ExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class).
+ run((context) -> {
+ // No producer on consume side
+ assertThat(context).getBean("extRocketMQTemplate").hasFieldOrProperty("producer");
+ // Auto-create consume container if existing Bean annotated with @RocketMQMessageListener
+ });
+ }
+
@Configuration
static class TestConfig {
@@ -111,6 +145,12 @@ public class RocketMQAutoConfigurationTest {
public Object consumeListener() {
return new MyMessageListener();
}
+
+ @Bean
+ public Object consumeListener1() {
+ return new MyMessageListener1();
+ }
+
}
@Configuration
@@ -146,5 +186,29 @@ public class RocketMQAutoConfigurationTest {
}
}
+
+ @RocketMQMessageListener(nameServer = "127.0.1.1:9876", consumerGroup = "abc1", topic
= "test")
+ static class MyMessageListener1 implements RocketMQListener {
+
+ @Override
+ public void onMessage(Object message) {
+
+ }
+ }
+
+ @Configuration
+ static class ExtRocketMQTemplateConfig {
+
+ @Bean
+ public RocketMQTemplate extRocketMQTemplate() {
+ return new MyExtRocketMQTemplate();
+ }
+
+ }
+
+ @ExtRocketMQTemplateConfiguration(group = "test", nameServer = "127.0.0.1:9876")
+ static class MyExtRocketMQTemplate extends RocketMQTemplate {
+
+ }
}
|