rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinrongt...@apache.org
Subject [rocketmq-spring] branch master updated: [ISSUE #306] Support real LitePullMessage in RocketMQ-Spring (#307)
Date Fri, 04 Dec 2020 06:07:00 GMT
This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git


The following commit(s) were added to refs/heads/master by this push:
     new af9f20e  [ISSUE #306] Support real LitePullMessage in RocketMQ-Spring (#307)
af9f20e is described below

commit af9f20eabd2c4ae1245bed2c0b816592128b1388
Author: CharliePu <heihaozi2006@163.com>
AuthorDate: Fri Dec 4 14:06:53 2020 +0800

    [ISSUE #306] Support real LitePullMessage in RocketMQ-Spring (#307)
    
    * Support LitePullMessage in RocketMQ-Spring
    
    * Add a property in litePullConnsumer named pullBatchSize.
    
    * Add necessary comments.
---
 .../samples/springboot/ConsumerACLApplication.java |  17 ++-
 .../src/main/resources/application.properties      |   2 +
 .../samples/springboot/ConsumerApplication.java    |  24 +++-
 ...erApplication.java => ExtRocketMQTemplate.java} |  19 +--
 .../src/main/resources/application.properties      |   2 +
 .../ExtRocketMQConsumerConfiguration.java          |  97 ++++++++++++++
 .../ExtConsumerResetConfiguration.java             | 149 +++++++++++++++++++++
 .../autoconfigure/RocketMQAutoConfiguration.java   |  80 +++++++++--
 .../spring/autoconfigure/RocketMQProperties.java   | 105 +++++++++++++++
 .../rocketmq/spring/core/RocketMQTemplate.java     |  88 ++++++++++--
 .../rocketmq/spring/support/RocketMQUtil.java      |  56 +++++++-
 .../RocketMQAutoConfigurationTest.java             |  66 +++++++++
 .../spring/core/ExtRocketMQTemplateTest.java       |  18 ++-
 .../rocketmq/spring/core/RocketMQTemplateTest.java |  16 ++-
 14 files changed, 699 insertions(+), 40 deletions(-)

diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerACLApplication.java b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerACLApplication.java
index 3bf266b..f3d7578 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerACLApplication.java
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerACLApplication.java
@@ -17,17 +17,32 @@
 
 package org.apache.rocketmq.samples.springboot;
 
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 
+import javax.annotation.Resource;
+import java.util.List;
+
 /**
  * ConsumerApplication
  */
 @SpringBootApplication
-public class ConsumerACLApplication {
+public class ConsumerACLApplication implements CommandLineRunner {
+
+    @Resource
+    private RocketMQTemplate rocketMQTemplate;
 
     public static void main(String[] args) {
         SpringApplication.run(ConsumerACLApplication.class, args);
     }
+
+    @Override
+    public void run(String... args) throws Exception {
+        ////This is an example of pull consumer with access-key and secret-key.
+        List<String> messages = rocketMQTemplate.receive(String.class);
+        System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);
+    }
 }
 
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/resources/application.properties b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/resources/application.properties
index 057edeb..3fe6abb 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/resources/application.properties
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/resources/application.properties
@@ -17,6 +17,8 @@
 spring.application.name=rocketmq-consume-acl-demo
 
 rocketmq.name-server=Endpoint_of_Aliware_MQ
+rocketmq.consumer.group=my-group1
+rocketmq.consumer.topic=test
 rocketmq.topic=normal_topic_define_in_Aliware_MQ
 
 # properties used in application code
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerApplication.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerApplication.java
index e4fbc8c..6435d47 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerApplication.java
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerApplication.java
@@ -17,17 +17,39 @@
 
 package org.apache.rocketmq.samples.springboot;
 
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 
+import javax.annotation.Resource;
+import java.util.List;
+
 /**
  * ConsumerApplication
  */
 @SpringBootApplication
-public class ConsumerApplication {
+public class ConsumerApplication implements CommandLineRunner {
+
+    @Resource
+    private RocketMQTemplate rocketMQTemplate;
+
+    @Resource(name = "extRocketMQTemplate")
+    private RocketMQTemplate extRocketMQTemplate;
 
     public static void main(String[] args) {
         SpringApplication.run(ConsumerApplication.class, args);
     }
+
+    @Override
+    public void run(String... args) throws Exception {
+        //This is an example of pull consumer using rocketMQTemplate.
+        List<String> messages = rocketMQTemplate.receive(String.class);
+        System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);
+
+        //This is an example of pull consumer using extRocketMQTemplate.
+        messages = extRocketMQTemplate.receive(String.class);
+        System.out.printf("receive from extRocketMQTemplate, messages=%s %n", messages);
+    }
 }
 
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerApplication.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
similarity index 72%
copy from rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerApplication.java
copy to rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
index e4fbc8c..5499ac1 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerApplication.java
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
@@ -14,20 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.rocketmq.samples.springboot;
 
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-/**
- * ConsumerApplication
- */
-@SpringBootApplication
-public class ConsumerApplication {
-
-    public static void main(String[] args) {
-        SpringApplication.run(ConsumerApplication.class, args);
-    }
-}
+import org.apache.rocketmq.spring.annotation.ExtRocketMQConsumerConfiguration;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
 
+@ExtRocketMQConsumerConfiguration(topic = "${demo.rocketmq.topic}", group = "string_consumer")
+public class ExtRocketMQTemplate extends RocketMQTemplate {
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
index 096cbb5..5953e48 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
@@ -16,6 +16,8 @@
 spring.application.name=rocketmq-consume-demo
 
 rocketmq.name-server=localhost:9876
+rocketmq.consumer.group=my-group1
+rocketmq.consumer.topic=test
 # properties used in application code
 demo.rocketmq.topic=string-topic
 demo.rocketmq.bytesRequestTopic=bytesRequestTopic
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
new file mode 100644
index 0000000..63e4e37
--- /dev/null
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.annotation;
+
+import org.springframework.stereotype.Component;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Component
+public @interface ExtRocketMQConsumerConfiguration {
+
+    String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
+    String GROUP_PLACEHOLDER = "${rocketmq.consumer.group:}";
+    String TOPIC_PLACEHOLDER = "${rocketmq.consumer.topic:}";
+    String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
+    String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
+    String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
+
+    /**
+     * The component name of the Producer configuration.
+     */
+    String value() default "";
+
+    /**
+     * The property of "name-server".
+     */
+    String nameServer() default NAME_SERVER_PLACEHOLDER;
+
+    /**
+     * The property of "access-channel".
+     */
+    String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
+
+    /**
+     * Group name of consumer.
+     */
+    String group() default GROUP_PLACEHOLDER;
+
+    /**
+     * Topic name of consumer.
+     */
+    String topic() default TOPIC_PLACEHOLDER;
+
+    /**
+     * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
+     */
+    MessageModel messageModel() default MessageModel.CLUSTERING;
+
+    /**
+     * Control how to selector message.
+     *
+     * @see SelectorType
+     */
+    SelectorType selectorType() default SelectorType.TAG;
+
+    /**
+     * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
+     */
+    String selectorExpression() default "*";
+
+    /**
+     * The property of "access-key".
+     */
+    String accessKey() default ACCESS_KEY_PLACEHOLDER;
+
+    /**
+     * The property of "secret-key".
+     */
+    String secretKey() default SECRET_KEY_PLACEHOLDER;
+
+    /**
+     * Maximum number of messages pulled each time.
+     */
+    int pullBatchSize() default 10;
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
new file mode 100644
index 0000000..7e81b5b
--- /dev/null
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.autoconfigure;
+
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.spring.annotation.ExtRocketMQConsumerConfiguration;
+import org.apache.rocketmq.spring.annotation.MessageModel;
+import org.apache.rocketmq.spring.annotation.SelectorType;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
+import org.apache.rocketmq.spring.support.RocketMQUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.aop.framework.AopProxyUtils;
+import org.springframework.aop.scope.ScopedProxyUtils;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.SmartInitializingSingleton;
+import org.springframework.beans.factory.support.BeanDefinitionValidationException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.core.env.StandardEnvironment;
+import org.springframework.util.Assert;
+import org.springframework.util.StringUtils;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Configuration
+public class ExtConsumerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
+
+    private final static Logger log = LoggerFactory.getLogger(ExtConsumerResetConfiguration.class);
+
+    private ConfigurableApplicationContext applicationContext;
+
+    private StandardEnvironment environment;
+
+    private RocketMQProperties rocketMQProperties;
+
+    private RocketMQMessageConverter rocketMQMessageConverter;
+
+    public ExtConsumerResetConfiguration(RocketMQMessageConverter rocketMQMessageConverter,
+            StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
+        this.rocketMQMessageConverter = rocketMQMessageConverter;
+        this.environment = environment;
+        this.rocketMQProperties = rocketMQProperties;
+    }
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
+    }
+
+    @Override
+    public void afterSingletonsInstantiated() {
+        Map<String, Object> beans = this.applicationContext
+                .getBeansWithAnnotation(ExtRocketMQConsumerConfiguration.class)
+                .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+        beans.forEach(this::registerTemplate);
+    }
+
+    private void registerTemplate(String beanName, Object bean) {
+        Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
+
+        if (!RocketMQTemplate.class.isAssignableFrom(bean.getClass())) {
+            throw new IllegalStateException(clazz + " is not instance of " + RocketMQTemplate.class.getName());
+        }
+
+        ExtRocketMQConsumerConfiguration annotation = clazz.getAnnotation(ExtRocketMQConsumerConfiguration.class);
+        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
+        validate(annotation, genericApplicationContext);
+
+        DefaultLitePullConsumer consumer = null;
+        try {
+            consumer = createConsumer(annotation);
+            // Set instanceName same as the beanName
+            consumer.setInstanceName(beanName);
+            consumer.start();
+        } catch (Exception e) {
+            log.error("Failed to startup PullConsumer for RocketMQTemplate {}", beanName, e);
+        }
+        RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
+        rocketMQTemplate.setConsumer(consumer);
+        rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
+        log.info("Set real consumer to :{} {}", beanName, annotation.value());
+    }
+
+    private DefaultLitePullConsumer createConsumer(ExtRocketMQConsumerConfiguration annotation)
+            throws MQClientException {
+
+        RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
+        if (consumerConfig == null) {
+            consumerConfig = new RocketMQProperties.Consumer();
+        }
+        String nameServer = resolvePlaceholders(annotation.nameServer(), rocketMQProperties.getNameServer());
+        String groupName = resolvePlaceholders(annotation.group(), consumerConfig.getGroup());
+        String topicName = resolvePlaceholders(annotation.topic(), consumerConfig.getTopic());
+        Assert.hasText(nameServer, "[nameServer] must not be null");
+        Assert.hasText(groupName, "[group] must not be null");
+        Assert.hasText(topicName, "[topic] must not be null");
+
+        String accessChannel = resolvePlaceholders(annotation.accessChannel(), rocketMQProperties.getAccessChannel());
+        MessageModel messageModel = annotation.messageModel();
+        SelectorType selectorType = annotation.selectorType();
+        String selectorExpression = annotation.selectorExpression();
+        String ak = resolvePlaceholders(annotation.accessKey(), consumerConfig.getAccessKey());
+        String sk = resolvePlaceholders(annotation.secretKey(), consumerConfig.getSecretKey());
+        int pullBatchSize = annotation.pullBatchSize();
+
+        DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
+                groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize);
+        return litePullConsumer;
+    }
+
+    private String resolvePlaceholders(String text, String defaultValue) {
+        String value = environment.resolvePlaceholders(text);
+        return StringUtils.isEmpty(value) ? defaultValue : value;
+    }
+
+    private void validate(ExtRocketMQConsumerConfiguration annotation,
+            GenericApplicationContext genericApplicationContext) {
+        if (genericApplicationContext.isBeanNameInUse(annotation.value())) {
+            throw new BeanDefinitionValidationException(
+                    String.format("Bean {} has been used in Spring Application Context, " +
+                                    "please check the @ExtRocketMQConsumerConfiguration",
+                            annotation.value()));
+        }
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index e6131e7..b9ec643 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -17,47 +17,66 @@
 
 package org.apache.rocketmq.spring.autoconfigure;
 
-import javax.annotation.PostConstruct;
 import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.MQAdmin;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.spring.annotation.MessageModel;
+import org.apache.rocketmq.spring.annotation.SelectorType;
 import org.apache.rocketmq.spring.core.RocketMQTemplate;
 import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
 import org.apache.rocketmq.spring.support.RocketMQUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.AutoConfigureAfter;
 import org.springframework.boot.autoconfigure.AutoConfigureBefore;
+import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
 import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Conditional;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Import;
 import org.springframework.core.env.Environment;
 import org.springframework.util.Assert;
 import org.springframework.util.StringUtils;
 
+import javax.annotation.PostConstruct;
+
 @Configuration
 @EnableConfigurationProperties(RocketMQProperties.class)
 @ConditionalOnClass({MQAdmin.class})
 @ConditionalOnProperty(prefix = "rocketmq", value = "name-server", matchIfMissing = true)
-@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, RocketMQTransactionConfiguration.class})
+@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, ExtConsumerResetConfiguration.class, RocketMQTransactionConfiguration.class})
 @AutoConfigureAfter({MessageConverterConfiguration.class})
 @AutoConfigureBefore({RocketMQTransactionConfiguration.class})
 
-public class RocketMQAutoConfiguration {
+public class RocketMQAutoConfiguration implements ApplicationContextAware {
     private static final Logger log = LoggerFactory.getLogger(RocketMQAutoConfiguration.class);
 
     public static final String ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME =
         "rocketMQTemplate";
+    public static final String PRODUCER_BEAN_NAME = "defaultMQProducer";
+    public static final String CONSUMER_BEAN_NAME = "defaultLitePullConsumer";
 
     @Autowired
     private Environment environment;
 
+    private ApplicationContext applicationContext;
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        this.applicationContext = applicationContext;
+    }
+
     @PostConstruct
     public void checkProperties() {
         String nameServer = environment.getProperty("rocketmq.name-server", String.class);
@@ -67,7 +86,7 @@ public class RocketMQAutoConfiguration {
         }
     }
 
-    @Bean
+    @Bean(PRODUCER_BEAN_NAME)
     @ConditionalOnMissingBean(DefaultMQProducer.class)
     @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"})
     public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
@@ -100,14 +119,59 @@ public class RocketMQAutoConfiguration {
         return producer;
     }
 
+    @Bean(CONSUMER_BEAN_NAME)
+    @ConditionalOnMissingBean(DefaultLitePullConsumer.class)
+    @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "consumer.group", "consumer.topic"})
+    public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)
+            throws MQClientException {
+        RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
+        String nameServer = rocketMQProperties.getNameServer();
+        String groupName = consumerConfig.getGroup();
+        String topicName = consumerConfig.getTopic();
+        Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
+        Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
+        Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");
+
+        String accessChannel = rocketMQProperties.getAccessChannel();
+        MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel());
+        SelectorType selectorType = SelectorType.valueOf(consumerConfig.getSelectorType());
+        String selectorExpression = consumerConfig.getSelectorExpression();
+        String ak = consumerConfig.getAccessKey();
+        String sk = consumerConfig.getSecretKey();
+        int pullBatchSize = consumerConfig.getPullBatchSize();
+
+        DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
+                groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize);
+        return litePullConsumer;
+    }
+
     @Bean(destroyMethod = "destroy")
-    @ConditionalOnBean(DefaultMQProducer.class)
+    @Conditional(ProducerOrConsumerPropertyCondition.class)
     @ConditionalOnMissingBean(name = ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
-    public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer,
-        RocketMQMessageConverter rocketMQMessageConverter) {
+    public RocketMQTemplate rocketMQTemplate(RocketMQMessageConverter rocketMQMessageConverter) {
         RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
-        rocketMQTemplate.setProducer(mqProducer);
+        if (applicationContext.containsBean(PRODUCER_BEAN_NAME)) {
+            rocketMQTemplate.setProducer((DefaultMQProducer) applicationContext.getBean(PRODUCER_BEAN_NAME));
+        }
+        if (applicationContext.containsBean(CONSUMER_BEAN_NAME)) {
+            rocketMQTemplate.setConsumer((DefaultLitePullConsumer) applicationContext.getBean(CONSUMER_BEAN_NAME));
+        }
         rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
         return rocketMQTemplate;
     }
+
+    static class ProducerOrConsumerPropertyCondition extends AnyNestedCondition {
+
+        public ProducerOrConsumerPropertyCondition() {
+            super(ConfigurationPhase.REGISTER_BEAN);
+        }
+
+        @ConditionalOnBean(DefaultMQProducer.class)
+        static class DefaultMQProducerExistsCondition {
+        }
+
+        @ConditionalOnBean(DefaultLitePullConsumer.class)
+        static class DefaultLitePullConsumerExistsCondition {
+        }
+    }
 }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
index c08b377..98a88f7 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
@@ -231,6 +231,47 @@ public class RocketMQProperties {
 
     public static final class Consumer {
         /**
+         * Group name of consumer.
+         */
+        private String group;
+
+        /**
+         * Topic name of consumer.
+         */
+        private String topic;
+
+        /**
+         * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
+         */
+        private String  messageModel = "CLUSTERING";
+
+        /**
+         * Control how to selector message.
+         *
+         */
+        private String selectorType = "TAG";
+
+        /**
+         * Control which message can be select.
+         */
+        private String selectorExpression = "*";
+
+        /**
+         * The property of "access-key".
+         */
+        private String accessKey;
+
+        /**
+         * The property of "secret-key".
+         */
+        private String secretKey;
+
+        /**
+         * Maximum number of messages pulled each time.
+         */
+        private int pullBatchSize = 10;
+
+        /**
          * listener configuration container
          * the pattern is like this:
          * group1.topic1 = false
@@ -239,6 +280,70 @@ public class RocketMQProperties {
          */
         private Map<String, Map<String, Boolean>> listeners = new HashMap<>();
 
+        public String getGroup() {
+            return group;
+        }
+
+        public void setGroup(String group) {
+            this.group = group;
+        }
+
+        public String getTopic() {
+            return topic;
+        }
+
+        public void setTopic(String topic) {
+            this.topic = topic;
+        }
+
+        public String getMessageModel() {
+            return messageModel;
+        }
+
+        public void setMessageModel(String messageModel) {
+            this.messageModel = messageModel;
+        }
+
+        public String getSelectorType() {
+            return selectorType;
+        }
+
+        public void setSelectorType(String selectorType) {
+            this.selectorType = selectorType;
+        }
+
+        public String getSelectorExpression() {
+            return selectorExpression;
+        }
+
+        public void setSelectorExpression(String selectorExpression) {
+            this.selectorExpression = selectorExpression;
+        }
+
+        public String getAccessKey() {
+            return accessKey;
+        }
+
+        public void setAccessKey(String accessKey) {
+            this.accessKey = accessKey;
+        }
+
+        public String getSecretKey() {
+            return secretKey;
+        }
+
+        public void setSecretKey(String secretKey) {
+            this.secretKey = secretKey;
+        }
+
+        public int getPullBatchSize() {
+            return pullBatchSize;
+        }
+
+        public void setPullBatchSize(int pullBatchSize) {
+            this.pullBatchSize = pullBatchSize;
+        }
+
         public Map<String, Map<String, Boolean>> getListeners() {
             return listeners;
         }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 78f0b2d..a18e781 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -17,15 +17,7 @@
 
 package org.apache.rocketmq.spring.core;
 
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.MessageQueueSelector;
@@ -52,12 +44,24 @@ import org.springframework.messaging.core.MessagePostProcessor;
 import org.springframework.messaging.support.MessageBuilder;
 import org.springframework.util.MimeTypeUtils;
 
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
 @SuppressWarnings({"WeakerAccess", "unused"})
 public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
     private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);
 
     private DefaultMQProducer producer;
 
+    private DefaultLitePullConsumer consumer;
+
     private String charset = "UTF-8";
 
     private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
@@ -72,6 +76,14 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
         this.producer = producer;
     }
 
+    public DefaultLitePullConsumer getConsumer() {
+        return consumer;
+    }
+
+    public void setConsumer(DefaultLitePullConsumer consumer) {
+        this.consumer = consumer;
+    }
+
     public String getCharset() {
         return charset;
     }
@@ -860,6 +872,13 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
         if (producer != null) {
             producer.start();
         }
+        if (Objects.nonNull(consumer)) {
+            try {
+                consumer.start();
+            } catch (Exception e) {
+                log.error("Failed to startup PullConsumer for RocketMQTemplate", e);
+            }
+        }
     }
 
     @Override
@@ -883,6 +902,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
         if (Objects.nonNull(producer)) {
             producer.shutdown();
         }
+        if (Objects.nonNull(consumer)) {
+            consumer.shutdown();
+        }
     }
 
     /**
@@ -967,4 +989,52 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
         }
         return Object.class;
     }
+
+    /**
+     * receive message  in pull mode.
+     *
+     * @param clazz message object type
+     * @param <T>
+     * @return message list
+     */
+    public <T> List<T> receive(Class<T> clazz) {
+        return receive(clazz, this.consumer.getPollTimeoutMillis());
+    }
+
+    /**
+     * Same to {@link #receive(Class<T>)} with receive timeout specified in addition.
+     *
+     * @param clazz   message object type
+     * @param timeout receive timeout with millis
+     * @param <T>
+     * @return message list
+     */
+    public <T> List<T> receive(Class<T> clazz, long timeout) {
+        List<MessageExt> messageExts = this.consumer.poll(timeout);
+        List<T> list = new ArrayList<>(messageExts.size());
+        for (MessageExt messageExt : messageExts) {
+            list.add(doConvertMessage(messageExt, clazz));
+        }
+        return list;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> T doConvertMessage(MessageExt messageExt, Class<T> messageType) {
+        if (Objects.equals(messageType, MessageExt.class)) {
+            return (T) messageExt;
+        } else {
+            String str = new String(messageExt.getBody(), Charset.forName(charset));
+            if (Objects.equals(messageType, String.class)) {
+                return (T) str;
+            } else {
+                // If msgType not string, use objectMapper change it.
+                try {
+                    return (T) this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), messageType);
+                } catch (Exception e) {
+                    log.info("convert failed. str:{}, msgType:{}", str, messageType);
+                    throw new RuntimeException("cannot convert message to " + messageType, e);
+                }
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
index 082b0f6..a891fa7 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
@@ -17,12 +17,11 @@
 package org.apache.rocketmq.spring.support;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import java.lang.reflect.Field;
-import java.nio.charset.Charset;
-import java.util.Map;
-import java.util.Objects;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.AccessChannel;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.LocalTransactionState;
@@ -36,6 +35,8 @@ import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.spring.annotation.MessageModel;
+import org.apache.rocketmq.spring.annotation.SelectorType;
 import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
 import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
 import org.slf4j.Logger;
@@ -48,6 +49,11 @@ import org.springframework.messaging.support.MessageBuilder;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
 
+import java.lang.reflect.Field;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.Objects;
+
 public class RocketMQUtil {
     private final static Logger log = LoggerFactory.getLogger(RocketMQUtil.class);
 
@@ -285,4 +291,46 @@ public class RocketMQUtil {
         return instanceName.toString();
     }
 
+    public static DefaultLitePullConsumer createDefaultLitePullConsumer(String nameServer, String accessChannel,
+            String groupName, String topicName, MessageModel messageModel, SelectorType selectorType,
+            String selectorExpression, String ak, String sk, int pullBatchSize)
+            throws MQClientException {
+        DefaultLitePullConsumer litePullConsumer = null;
+        if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
+            litePullConsumer = new DefaultLitePullConsumer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)));
+            litePullConsumer.setVipChannelEnabled(false);
+        } else {
+            litePullConsumer = new DefaultLitePullConsumer(groupName);
+        }
+        litePullConsumer.setNamesrvAddr(nameServer);
+        litePullConsumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
+        litePullConsumer.setPullBatchSize(pullBatchSize);
+        if (accessChannel != null) {
+            litePullConsumer.setAccessChannel(AccessChannel.valueOf(accessChannel));
+        }
+
+        switch (messageModel) {
+            case BROADCASTING:
+                litePullConsumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
+                break;
+            case CLUSTERING:
+                litePullConsumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
+                break;
+            default:
+                throw new IllegalArgumentException("Property 'messageModel' was wrong.");
+        }
+
+        switch (selectorType) {
+            case SQL92:
+                litePullConsumer.subscribe(topicName, MessageSelector.bySql(selectorExpression));
+                break;
+            case TAG:
+                litePullConsumer.subscribe(topicName, selectorExpression);
+                break;
+            default:
+                throw new IllegalArgumentException("Property 'selectorType' was wrong.");
+        }
+
+        return litePullConsumer;
+    }
 }
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
index 514c138..9f31b09 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
@@ -19,8 +19,11 @@ package org.apache.rocketmq.spring.autoconfigure;
 
 import java.util.ArrayList;
 import java.util.List;
+
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.spring.annotation.ExtRocketMQConsumerConfiguration;
 import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
 import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
 import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
@@ -52,6 +55,12 @@ public class RocketMQAutoConfigurationTest {
         runner.run(context -> context.getBean(DefaultMQProducer.class));
     }
 
+    @Test(expected = NoSuchBeanDefinitionException.class)
+    public void testDefaultLitePullConsumerNotCreatedByDefault() {
+        // You will see the WARN log message about missing rocketmq.name-server spring property when running this test case.
+        runner.run(context -> context.getBean(DefaultLitePullConsumer.class));
+    }
+
     @Test
     public void testDefaultMQProducerWithRelaxPropertyName() {
         runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
@@ -65,6 +74,19 @@ public class RocketMQAutoConfigurationTest {
     }
 
     @Test
+    public void testDefaultLitePullConsumerWithRelaxPropertyName() {
+        runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
+                "rocketmq.consumer.group=spring_rocketmq",
+                "rocketmq.consumer.topic=test",
+                "rocketmq.accessChannel=LOCAL").
+                run((context) -> {
+                    assertThat(context).hasSingleBean(DefaultLitePullConsumer.class);
+                    assertThat(context).hasSingleBean(RocketMQProperties.class);
+                });
+
+    }
+
+    @Test
     public void testBadAccessChannelProperty() {
         runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
             "rocketmq.producer.group=spring_rocketmq",
@@ -73,6 +95,15 @@ public class RocketMQAutoConfigurationTest {
                 //Should throw exception for bad accessChannel property
                 assertThat(context).getFailure();
             });
+
+        runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
+                "rocketmq.consumer.group=spring_rocketmq",
+                "rocketmq.consumer.topic=test",
+                "rocketmq.accessChannel=LOCAL123").
+                run((context) -> {
+                    //Should throw exception for bad accessChannel property
+                    assertThat(context).getFailure();
+                });
     }
 
     @Test
@@ -85,6 +116,16 @@ public class RocketMQAutoConfigurationTest {
     }
 
     @Test
+    public void testDefaultLitePullConsumer() {
+        runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
+                "rocketmq.consumer.group=spring_rocketmq",
+                "rocketmq.consumer.topic=test").
+                run((context) -> {
+                    assertThat(context).hasSingleBean(DefaultLitePullConsumer.class);
+                });
+    }
+
+    @Test
     public void testExtRocketMQTemplate() {
         runner.withPropertyValues("rocketmq.name-server=127.0.1.1:9876").
             withUserConfiguration(TestExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class).
@@ -95,6 +136,16 @@ public class RocketMQAutoConfigurationTest {
             });
     }
 
+
+    @Test
+    public void testExtRocketMQConsumer() {
+        runner.withPropertyValues("rocketmq.name-server=127.0.1.1:9876").
+                withUserConfiguration(TestExtRocketMQConsumerConfig.class, CustomObjectMappersConfig.class).
+                run((context) -> {
+                    assertThat(context).getBean("extRocketMQTemplate").hasFieldOrProperty("consumer");
+                });
+    }
+
     @Test
     public void testConsumerListener() {
         runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
@@ -347,5 +398,20 @@ public class RocketMQAutoConfigurationTest {
 
         }
     }
+
+    @Configuration
+    static class TestExtRocketMQConsumerConfig {
+
+        @Bean
+        public RocketMQTemplate extRocketMQTemplate() {
+            return new TestExtRocketMQConsumer();
+        }
+
+    }
+
+    @ExtRocketMQConsumerConfiguration(topic = "test", group = "test", nameServer = "127.0.0.1:9876")
+    static class TestExtRocketMQConsumer extends RocketMQTemplate {
+
+    }
 }
 
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/ExtRocketMQTemplateTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/ExtRocketMQTemplateTest.java
index 7430ff5..dc88253 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/ExtRocketMQTemplateTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/ExtRocketMQTemplateTest.java
@@ -16,8 +16,8 @@
  */
 package org.apache.rocketmq.spring.core;
 
-import javax.annotation.Resource;
 import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.spring.annotation.ExtRocketMQConsumerConfiguration;
 import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
 import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
 import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
@@ -29,17 +29,22 @@ import org.springframework.messaging.MessagingException;
 import org.springframework.messaging.support.MessageBuilder;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
+import javax.annotation.Resource;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 @RunWith(SpringJUnit4ClassRunner.class)
 
 @SpringBootTest(properties = {
-    "rocketmq.nameServer=127.0.0.1:9876", "rocketmq.producer.group=extRocketMQTemplate-test-producer_group"}, classes = {RocketMQAutoConfiguration.class, ExtRocketMQTemplate.class, ExtTransactionListenerImpl.class})
+    "rocketmq.nameServer=127.0.0.1:9876", "rocketmq.producer.group=extRocketMQTemplate-test-producer_group"}, classes = {RocketMQAutoConfiguration.class, ExtRocketMQTemplate.class, ExtTransactionListenerImpl.class, ExtRocketMQConsumer.class})
 public class ExtRocketMQTemplateTest {
 
     @Resource(name = "extRocketMQTemplate")
     private RocketMQTemplate extRocketMQTemplate;
 
+    @Resource(name = "extRocketMQConsumer")
+    private ExtRocketMQConsumer extRocketMQConsumer;
+
     @Resource
     private RocketMQTemplate rocketMQTemplate;
 
@@ -49,6 +54,10 @@ public class ExtRocketMQTemplateTest {
         assertThat(extRocketMQTemplate.getProducer().getProducerGroup()).isEqualTo("extRocketMQTemplate-test-group");
         assertThat(extRocketMQTemplate.getProducer().getSendMsgTimeout()).isEqualTo(3000);
         assertThat(extRocketMQTemplate.getProducer().getMaxMessageSize()).isEqualTo(4 * 1024);
+
+        assertThat(extRocketMQConsumer.getConsumer().getNamesrvAddr()).isEqualTo("172.0.0.1:9876");
+        assertThat(extRocketMQConsumer.getConsumer().getConsumerGroup()).isEqualTo("extRocketMQTemplate-test-group");
+        assertThat(extRocketMQConsumer.getConsumer().getPullBatchSize()).isEqualTo(3);
     }
 
     @Test
@@ -93,6 +102,11 @@ class ExtTransactionListenerImpl implements RocketMQLocalTransactionListener {
     }
 }
 
+@ExtRocketMQConsumerConfiguration(nameServer = "172.0.0.1:9876", group = "extRocketMQTemplate-test-group", topic = "test", pullBatchSize = 3)
+class ExtRocketMQConsumer extends RocketMQTemplate {
+
+}
+
 
 
 
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
index 56a7de6..2103da2 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
@@ -50,7 +50,8 @@ import static org.mockito.ArgumentMatchers.any;
     "test.rocketmq.topic=test", "rocketmq.producer.access-key=test-ak",
     "rocketmq.producer.secret-key=test-sk", "rocketmq.accessChannel=LOCAL",
     "rocketmq.producer.sendMessageTimeout= 3500", "rocketmq.producer.retryTimesWhenSendFailed=3",
-    "rocketmq.producer.retryTimesWhenSendAsyncFailed=3"}, classes = {RocketMQAutoConfiguration.class, TransactionListenerImpl.class})
+    "rocketmq.producer.retryTimesWhenSendAsyncFailed=3",
+    "rocketmq.consumer.group=spring_rocketmq", "rocketmq.consumer.topic=test"}, classes = {RocketMQAutoConfiguration.class, TransactionListenerImpl.class})
 
 public class RocketMQTemplateTest {
     @Resource
@@ -95,6 +96,15 @@ public class RocketMQTemplateTest {
     }
 
     @Test
+    public void testReceiveMessage() {
+        try {
+            rocketMQTemplate.receive(String.class);
+        } catch (MessagingException e) {
+            assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
+        }
+    }
+
+    @Test
     public void testSendMessage_withCustomAsyncSenderExecutor() {
         ExecutorService executorService = new ThreadPoolExecutor(
             2,
@@ -234,6 +244,10 @@ public class RocketMQTemplateTest {
         assertThat(rocketMQTemplate.getProducer().getRetryTimesWhenSendAsyncFailed()).isEqualTo(3);
         assertThat(rocketMQTemplate.getProducer().getRetryTimesWhenSendFailed()).isEqualTo(3);
         assertThat(rocketMQTemplate.getProducer().getCompressMsgBodyOverHowmuch()).isEqualTo(1024 * 4);
+
+        assertThat(rocketMQTemplate.getConsumer().getNamesrvAddr()).isEqualTo("127.0.0.1:9876");
+        assertThat(rocketMQTemplate.getConsumer().getConsumerGroup()).isEqualTo("spring_rocketmq");
+        assertThat(rocketMQTemplate.getConsumer().getAccessChannel()).isEqualTo(AccessChannel.LOCAL);
     }
 
     @Test


Mime
View raw message