From commits-return-5020-apmail-rocketmq-commits-archive=rocketmq.apache.org@rocketmq.apache.org Mon Feb 10 06:49:43 2020 Return-Path: X-Original-To: apmail-rocketmq-commits-archive@minotaur.apache.org Delivered-To: apmail-rocketmq-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by minotaur.apache.org (Postfix) with SMTP id DF88B1028E for ; Mon, 10 Feb 2020 06:49:42 +0000 (UTC) Received: (qmail 39166 invoked by uid 500); 10 Feb 2020 06:49:42 -0000 Delivered-To: apmail-rocketmq-commits-archive@rocketmq.apache.org Received: (qmail 39144 invoked by uid 500); 10 Feb 2020 06:49:42 -0000 Mailing-List: contact commits-help@rocketmq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@rocketmq.apache.org Delivered-To: mailing list commits@rocketmq.apache.org Received: (qmail 39135 invoked by uid 99); 10 Feb 2020 06:49:42 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Feb 2020 06:49:42 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id E37218B690; Mon, 10 Feb 2020 06:49:41 +0000 (UTC) Date: Mon, 10 Feb 2020 06:49:41 +0000 To: "commits@rocketmq.apache.org" Subject: [rocketmq-spring] branch master updated: [ISSUE #218] Fix spring scopeTarget will repeat consumer instance (#210) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <158131738187.8910.756086609155408347@gitbox.apache.org> From: jinrongtong@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: rocketmq-spring X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: c7f230f98a5285efed9456c64ad2d02974a0e2dd X-Git-Newrev: 2853384030397fe9453316c05b68b6d570d67019 X-Git-Rev: 2853384030397fe9453316c05b68b6d570d67019 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git The following commit(s) were added to refs/heads/master by this push: new 2853384 [ISSUE #218] Fix spring scopeTarget will repeat consumer instance (#210) 2853384 is described below commit 2853384030397fe9453316c05b68b6d570d67019 Author: 爱因斯唐 AuthorDate: Mon Feb 10 14:49:33 2020 +0800 [ISSUE #218] Fix spring scopeTarget will repeat consumer instance (#210) * change clientId algorithm * code format * develop * optimize on 2.0.5.EINSITANG * revert pom version * change note * change note * revert demo.rocketmq.myNameServer * remove clientInstaceName * remove unuse method * pass ci-check * remove pass annotation * correct variable word * optimize annotation * merge Co-authored-by: von gosling --- .../ExtProducerResetConfiguration.java | 13 +++--- .../ListenerContainerConfiguration.java | 21 +++++---- .../RocketMQTransactionConfiguration.java | 10 +++-- .../rocketmq/spring/support/SpringBeanUtil.java | 52 ++++++++++++++++++++++ 4 files changed, 79 insertions(+), 17 deletions(-) diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java index e5e7433..9ea7699 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java @@ -25,6 +25,7 @@ import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQMessageConverter; import org.apache.rocketmq.spring.support.RocketMQUtil; +import org.apache.rocketmq.spring.support.SpringBeanUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.framework.AopProxyUtils; @@ -52,7 +53,7 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S private RocketMQMessageConverter rocketMQMessageConverter; public ExtProducerResetConfiguration(RocketMQMessageConverter rocketMQMessageConverter, - StandardEnvironment environment, RocketMQProperties rocketMQProperties) { + StandardEnvironment environment, RocketMQProperties rocketMQProperties) { this.rocketMQMessageConverter = rocketMQMessageConverter; this.environment = environment; this.rocketMQProperties = rocketMQProperties; @@ -60,12 +61,12 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - this.applicationContext = (ConfigurableApplicationContext)applicationContext; + this.applicationContext = (ConfigurableApplicationContext) applicationContext; } @Override public void afterSingletonsInstantiated() { - Map beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class); + Map beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, ExtRocketMQTemplateConfiguration.class); if (Objects.nonNull(beans)) { beans.forEach(this::registerTemplate); @@ -80,7 +81,7 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S } ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class); - GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext; + GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext; validate(annotation, genericApplicationContext); DefaultMQProducer mqProducer = createProducer(annotation); @@ -92,7 +93,7 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}", beanName), e); } - RocketMQTemplate rocketMQTemplate = (RocketMQTemplate)bean; + RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean; rocketMQTemplate.setProducer(mqProducer); rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); log.info("Set real producer to :{} {}", beanName, annotation.value()); @@ -130,7 +131,7 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S } private void validate(ExtRocketMQTemplateConfiguration annotation, - GenericApplicationContext genericApplicationContext) { + GenericApplicationContext genericApplicationContext) { if (genericApplicationContext.isBeanNameInUse(annotation.value())) { throw new BeanDefinitionValidationException(String.format("Bean {} has been used in Spring Application Context, " + "please check the @ExtRocketMQTemplateConfiguration", diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java index 699474d..008a4db 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java @@ -17,10 +17,6 @@ package org.apache.rocketmq.spring.autoconfigure; -import java.util.Collections; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; @@ -29,6 +25,7 @@ import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQReplyListener; import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer; import org.apache.rocketmq.spring.support.RocketMQMessageConverter; +import org.apache.rocketmq.spring.support.SpringBeanUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.framework.AopProxyUtils; @@ -43,6 +40,11 @@ import org.springframework.context.support.GenericApplicationContext; import org.springframework.core.env.StandardEnvironment; import org.springframework.util.StringUtils; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; + @Configuration public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton { private final static Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class); @@ -58,7 +60,7 @@ public class ListenerContainerConfiguration implements ApplicationContextAware, private RocketMQMessageConverter rocketMQMessageConverter; public ListenerContainerConfiguration(RocketMQMessageConverter rocketMQMessageConverter, - StandardEnvironment environment, RocketMQProperties rocketMQProperties) { + StandardEnvironment environment, RocketMQProperties rocketMQProperties) { this.rocketMQMessageConverter = rocketMQMessageConverter; this.environment = environment; this.rocketMQProperties = rocketMQProperties; @@ -71,7 +73,8 @@ public class ListenerContainerConfiguration implements ApplicationContextAware, @Override public void afterSingletonsInstantiated() { - Map beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); + Map beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, RocketMQMessageListener. + class); if (Objects.nonNull(beans)) { beans.forEach(this::registerContainer); @@ -127,7 +130,7 @@ public class ListenerContainerConfiguration implements ApplicationContextAware, } private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, - RocketMQMessageListener annotation) { + RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); container.setRocketMQMessageListener(annotation); @@ -145,13 +148,15 @@ public class ListenerContainerConfiguration implements ApplicationContextAware, container.setSelectorExpression(tags); } container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); + if (RocketMQListener.class.isAssignableFrom(bean.getClass())) { container.setRocketMQListener((RocketMQListener) bean); } else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { container.setRocketMQReplyListener((RocketMQReplyListener) bean); } + container.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); - container.setName(name); // REVIEW ME, use the same clientId or multiple? + container.setName(name); return container; } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQTransactionConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQTransactionConfiguration.java index 1a897e5..2daefcf 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQTransactionConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQTransactionConfiguration.java @@ -22,11 +22,13 @@ import java.util.Objects; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQUtil; +import org.apache.rocketmq.spring.support.SpringBeanUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.framework.AopProxyUtils; @@ -44,12 +46,14 @@ public class RocketMQTransactionConfiguration implements ApplicationContextAware private ConfigurableApplicationContext applicationContext; - @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = (ConfigurableApplicationContext) applicationContext; } - @Override public void afterSingletonsInstantiated() { - Map beans = this.applicationContext.getBeansWithAnnotation(RocketMQTransactionListener.class); + @Override + public void afterSingletonsInstantiated() { + Map beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, RocketMQTransactionListener.class); if (Objects.nonNull(beans)) { beans.forEach(this::registerTransactionListener); diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/SpringBeanUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/SpringBeanUtil.java new file mode 100644 index 0000000..b5d1161 --- /dev/null +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/SpringBeanUtil.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.support; + +import org.springframework.aop.scope.ScopedProxyUtils; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.lang.NonNull; + +import java.lang.annotation.Annotation; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class SpringBeanUtil { + + /** + * Override applicationContext.getBeansWithAnnotation method to make sure without same ProxyTarget beans + * + * @param applicationContext spring Application Context + * @param clazz annotation class + * @return beans map without proxyTarget bean + */ + public static Map getBeansWithAnnotation(@NonNull ConfigurableApplicationContext applicationContext, Class clazz) { + Map beans = applicationContext.getBeansWithAnnotation(clazz); + Map filterBeans = new HashMap<>(beans.size()); + // remove proxy target + Set> entrySet = beans.entrySet(); + entrySet.forEach((entry) -> { + final String beanName = entry.getKey(); + if (!ScopedProxyUtils.isScopedTarget(beanName)) { + filterBeans.put(beanName, entry.getValue()); + } + }); + return filterBeans; + } + +}