rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq-ons] 32/43: feat(spring) add spring bean support
Date Fri, 06 Dec 2019 04:22:54 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch OpenMessaging
in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git

commit efcc721462d78a6ce3633d1e0a4869c4111b429a
Author: 翊名 <duheng.dh@alibaba-inc.com>
AuthorDate: Tue Nov 19 20:17:30 2019 +0800

    feat(spring) add spring bean support
---
 .../rocketmq/ons/api/bean/BatchConsumerBean.java   | 130 ++++++++++++++++
 .../apache/rocketmq/ons/api/bean/ConsumerBean.java | 173 +++++++++++++++++++++
 .../rocketmq/ons/api/bean/OrderConsumerBean.java   | 131 ++++++++++++++++
 .../rocketmq/ons/api/bean/OrderProducerBean.java   |  93 +++++++++++
 .../apache/rocketmq/ons/api/bean/ProducerBean.java | 108 +++++++++++++
 .../ons/api/bean/TransactionProducerBean.java      | 111 +++++++++++++
 .../org/apache/rocketmq/ons/sample/MQConfig.java   |   2 +-
 .../ons/sample/consumer/SimpleMQConsumer.java      |  14 +-
 .../ons/sample/consumer/SimpleOrderConsumer.java   |  16 +-
 .../ons/sample/producer/MQTimerProducer.java       |  13 +-
 .../ons/sample/producer/SimpleMQProducer.java      |  23 ++-
 .../ons/sample/producer/SimpleOrderProducer.java   |  17 +-
 .../sample/producer/SimpleTransactionProducer.java |  16 +-
 13 files changed, 826 insertions(+), 21 deletions(-)

diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/BatchConsumerBean.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/BatchConsumerBean.java
new file mode 100644
index 0000000..34017fe
--- /dev/null
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/BatchConsumerBean.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.ons.api.bean;
+
+import io.openmessaging.api.batch.BatchConsumer;
+import io.openmessaging.api.batch.BatchMessageListener;
+import io.openmessaging.api.bean.Subscription;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.rocketmq.ons.api.ONSFactory;
+import org.apache.rocketmq.ons.api.PropertyKeyConst;
+import org.apache.rocketmq.ons.api.exception.ONSClientException;
+
+/**
+ * {@code BatchConsumerBean} Used to integrate {@link BatchConsumer} into Spring.
+ */
+public class BatchConsumerBean implements BatchConsumer {
+    /**
+     * Need to inject this field, specify the properties of the construct {@code BatchConsumer}
instance, see the
+     * specific supported properties{@link PropertyKeyConst}
+     *
+     * @see BatchConsumerBean#setProperties(Properties)
+     */
+    private Properties properties;
+
+    /**
+     * By injecting this field, complete the Topic subscription when starting {@code BatchConsumer}
+     *
+     * @see BatchConsumerBean#setSubscriptionTable(Map)
+     */
+    private Map<Subscription, BatchMessageListener> subscriptionTable;
+
+    private BatchConsumer batchConsumer;
+
+    @Override
+    public boolean isStarted() {
+        return this.batchConsumer.isStarted();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return this.batchConsumer.isClosed();
+    }
+
+    /**
+     * Start the {@code BatchConsumer} instance, it is recommended to configure the init-method
of the bean.
+     */
+    @Override
+    public void start() {
+        if (null == this.properties) {
+            throw new ONSClientException("properties not set");
+        }
+
+        if (null == this.subscriptionTable) {
+            throw new ONSClientException("subscriptionTable not set");
+        }
+
+        this.batchConsumer = ONSFactory.createBatchConsumer(this.properties);
+
+        for (final Map.Entry<Subscription, BatchMessageListener> next : this.subscriptionTable.entrySet())
{
+            this.subscribe(next.getKey().getTopic(), next.getKey().getExpression(), next.getValue());
+        }
+
+        this.batchConsumer.start();
+    }
+
+    @Override
+    public void updateCredential(Properties credentialProperties) {
+        if (this.batchConsumer != null) {
+            this.batchConsumer.updateCredential(credentialProperties);
+        }
+    }
+
+    /**
+     * Close the {@code BatchConsumer} instance, it is recommended to configure the destroy-method
of the bean.
+     */
+    @Override
+    public void shutdown() {
+        if (this.batchConsumer != null) {
+            this.batchConsumer.shutdown();
+        }
+    }
+
+    @Override
+    public void subscribe(final String topic, final String subExpression, final BatchMessageListener
listener) {
+        if (null == this.batchConsumer) {
+            throw new ONSClientException("subscribe must be called after BatchConsumerBean
started");
+        }
+        this.batchConsumer.subscribe(topic, subExpression, listener);
+    }
+
+    @Override
+    public void unsubscribe(final String topic) {
+        if (null == this.batchConsumer) {
+            throw new ONSClientException("unsubscribe must be called after BatchConsumerBean
started");
+        }
+        this.batchConsumer.unsubscribe(topic);
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(final Properties properties) {
+        this.properties = properties;
+    }
+
+    public Map<Subscription, BatchMessageListener> getSubscriptionTable() {
+        return subscriptionTable;
+    }
+
+    public void setSubscriptionTable(
+        final Map<Subscription, BatchMessageListener> subscriptionTable) {
+        this.subscriptionTable = subscriptionTable;
+    }
+}
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ConsumerBean.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ConsumerBean.java
new file mode 100644
index 0000000..1555be9
--- /dev/null
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ConsumerBean.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.ons.api.bean;
+
+import io.openmessaging.api.Consumer;
+import io.openmessaging.api.ExpressionType;
+import io.openmessaging.api.MessageListener;
+import io.openmessaging.api.MessageSelector;
+import io.openmessaging.api.bean.Subscription;
+import io.openmessaging.api.bean.SubscriptionExt;
+import java.lang.reflect.Method;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import org.apache.rocketmq.ons.api.ONSFactory;
+import org.apache.rocketmq.ons.api.exception.ONSClientException;
+
+/**
+ * {@code ConsumerBean} is used to integrate {@link Consumer} into Spring Bean
+ */
+public class ConsumerBean implements Consumer {
+    /**
+     * You need to inject this field to specify the properties of the {@code Consumer} instance.
For details, see {@link
+     * PropertyKeyConst}.
+     *
+     * @see ConsumerBean#setProperties(Properties)
+     */
+    private Properties properties;
+
+    /**
+     * By injecting this field, complete the Topic subscription when launching {@code Consumer}
+     *
+     * @see ConsumerBean#setSubscriptionTable(Map)
+     */
+    private Map<Subscription, MessageListener> subscriptionTable;
+
+    private Consumer consumer;
+
+    /**
+     * Start the {@code Consumer} instance, it is recommended to configure the init-method
of the bean.
+     */
+    @Override
+    public void start() {
+        if (null == this.properties) {
+            throw new ONSClientException("properties not set");
+        }
+
+        if (null == this.subscriptionTable) {
+            throw new ONSClientException("subscriptionTable not set");
+        }
+
+        this.consumer = ONSFactory.createConsumer(this.properties);
+
+        Iterator<Entry<Subscription, MessageListener>> it = this.subscriptionTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<Subscription, MessageListener> next = it.next();
+            if ("com.aliyun.openservices.ons.api.impl.notify.ConsumerImpl".equals(this.consumer.getClass().getCanonicalName())
+                && (next.getKey() instanceof SubscriptionExt)) {
+                SubscriptionExt subscription = (SubscriptionExt) next.getKey();
+                for (Method method : this.consumer.getClass().getMethods()) {
+                    if ("subscribeNotify".equals(method.getName())) {
+                        try {
+                            method.invoke(consumer, subscription.getTopic(), subscription.getExpression(),
+                                subscription.isPersistence(), next.getValue());
+                        } catch (Exception e) {
+                            throw new ONSClientException("subscribeNotify invoke exception",
e);
+                        }
+                        break;
+                    }
+                }
+
+            } else {
+                Subscription subscription = next.getKey();
+                if (subscription.getType() == null || ExpressionType.TAG.name().equals(subscription.getType()))
{
+
+                    this.subscribe(subscription.getTopic(), subscription.getExpression(),
next.getValue());
+
+                } else if (ExpressionType.SQL92.name().equals(subscription.getType())) {
+
+                    this.subscribe(subscription.getTopic(), MessageSelector.bySql(subscription.getExpression()),
next.getValue());
+                } else {
+
+                    throw new ONSClientException(String.format("Expression type %s is unknown!",
subscription.getType()));
+                }
+            }
+
+        }
+
+        this.consumer.start();
+    }
+
+    @Override
+    public void updateCredential(Properties credentialProperties) {
+        if (this.consumer != null) {
+            this.consumer.updateCredential(credentialProperties);
+        }
+    }
+
+    /**
+     * Close the {@code Consumer} instance, it is recommended to configure the destroy-method
of the bean.
+     */
+    @Override
+    public void shutdown() {
+        if (this.consumer != null) {
+            this.consumer.shutdown();
+        }
+    }
+
+    @Override
+    public void subscribe(String topic, String subExpression, MessageListener listener) {
+        if (null == this.consumer) {
+            throw new ONSClientException("subscribe must be called after consumerBean started");
+        }
+        this.consumer.subscribe(topic, subExpression, listener);
+    }
+
+    @Override
+    public void subscribe(final String topic, final MessageSelector selector, final MessageListener
listener) {
+        if (null == this.consumer) {
+            throw new ONSClientException("subscribe must be called after consumerBean started");
+        }
+        this.consumer.subscribe(topic, selector, listener);
+    }
+
+    @Override
+    public void unsubscribe(String topic) {
+        if (null == this.consumer) {
+            throw new ONSClientException("unsubscribe must be called after consumerBean started");
+        }
+        this.consumer.unsubscribe(topic);
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
+
+    public Map<Subscription, MessageListener> getSubscriptionTable() {
+        return subscriptionTable;
+    }
+
+    public void setSubscriptionTable(Map<Subscription, MessageListener> subscriptionTable)
{
+        this.subscriptionTable = subscriptionTable;
+    }
+
+    @Override
+    public boolean isStarted() {
+        return this.consumer.isStarted();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return this.consumer.isClosed();
+    }
+}
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/OrderConsumerBean.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/OrderConsumerBean.java
new file mode 100644
index 0000000..dda41e6
--- /dev/null
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/OrderConsumerBean.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.ons.api.bean;
+
+import io.openmessaging.api.MessageSelector;
+import io.openmessaging.api.bean.Subscription;
+import io.openmessaging.api.order.MessageOrderListener;
+import io.openmessaging.api.order.OrderConsumer;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.rocketmq.ons.api.ONSFactory;
+import org.apache.rocketmq.ons.api.PropertyKeyConst;
+import org.apache.rocketmq.ons.api.exception.ONSClientException;
+
+/**
+ * {@code OrderConsumerBean} is used to integrate {@link OrderConsumer} into Spring Bean
+ */
+public class OrderConsumerBean implements OrderConsumer {
+    /**
+     * Need to inject this field, specify the properties of the construct {@code OrderConsumer}
instance, the specific
+     * supported properties are detailed in {@link PropertyKeyConst}
+     *
+     * @see OrderConsumerBean#setProperties(Properties)
+     */
+    private Properties properties;
+
+    /**
+     * By injecting this field, complete the subscription for Topic when launching {@code
OrderConsumer}
+     *
+     * @see OrderConsumerBean#setSubscriptionTable(Map)
+     */
+    private Map<Subscription, MessageOrderListener> subscriptionTable;
+
+    private OrderConsumer orderConsumer;
+
+    @Override
+    public boolean isStarted() {
+        return this.orderConsumer.isStarted();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return this.orderConsumer.isClosed();
+    }
+
+    /**
+     * Start the {@code OrderConsumer} instance, it is recommended to configure the init-method
of the bean.
+     */
+    @Override
+    public void start() {
+        if (null == this.properties) {
+            throw new ONSClientException("properties not set");
+        }
+
+        if (null == this.subscriptionTable) {
+            throw new ONSClientException("subscriptionTable not set");
+        }
+
+        this.orderConsumer = ONSFactory.createOrderedConsumer(this.properties);
+
+        for (final Map.Entry<Subscription, MessageOrderListener> next : this.subscriptionTable.entrySet())
{
+            this.subscribe(next.getKey().getTopic(), next.getKey().getExpression(), next.getValue());
+        }
+
+        this.orderConsumer.start();
+    }
+
+    @Override
+    public void updateCredential(Properties credentialProperties) {
+        if (this.orderConsumer != null) {
+            this.orderConsumer.updateCredential(credentialProperties);
+        }
+    }
+
+    /**
+     * Close the {@code OrderConsumer} instance, it is recommended to configure the destroy-method
of the bean.
+     */
+    @Override
+    public void shutdown() {
+        if (this.orderConsumer != null) {
+            this.orderConsumer.shutdown();
+        }
+    }
+
+    @Override
+    public void subscribe(final String topic, final String subExpression, final MessageOrderListener
listener) {
+        if (null == this.orderConsumer) {
+            throw new ONSClientException("subscribe must be called after OrderConsumerBean
started");
+        }
+        this.orderConsumer.subscribe(topic, subExpression, listener);
+    }
+
+    @Override
+    public void subscribe(String topic, MessageSelector selector, MessageOrderListener listener)
{
+        if (null == this.orderConsumer) {
+            throw new ONSClientException("subscribe must be called after OrderConsumerBean
started");
+        }
+        this.orderConsumer.subscribe(topic, selector, listener);
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(final Properties properties) {
+        this.properties = properties;
+    }
+
+    public Map<Subscription, MessageOrderListener> getSubscriptionTable() {
+        return subscriptionTable;
+    }
+
+    public void setSubscriptionTable(
+        final Map<Subscription, MessageOrderListener> subscriptionTable) {
+        this.subscriptionTable = subscriptionTable;
+    }
+}
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/OrderProducerBean.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/OrderProducerBean.java
new file mode 100644
index 0000000..8f57487
--- /dev/null
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/OrderProducerBean.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.ons.api.bean;
+
+import io.openmessaging.api.Message;
+import io.openmessaging.api.SendResult;
+import io.openmessaging.api.order.OrderProducer;
+import java.util.Properties;
+import org.apache.rocketmq.ons.api.ONSFactory;
+import org.apache.rocketmq.ons.api.PropertyKeyConst;
+import org.apache.rocketmq.ons.api.exception.ONSClientException;
+
+/**
+ * {@code OrderProducerBean} is used to integrate {@link OrderProducer} into Spring Bean.
+ */
+public class OrderProducerBean implements OrderProducer {
+    /**
+     * Need to inject this field, specify the properties of the construct {@code OrderProducer}
instance, the specific
+     * supported properties are detailed in {@link PropertyKeyConst}
+     *
+     * @see OrderProducerBean#setProperties(Properties)
+     */
+    private Properties properties;
+
+    private OrderProducer orderProducer;
+
+    /**
+     * Start the {@code OrderProducer} instance, it is recommended to configure the init-method
of the bean.
+     */
+    @Override
+    public void start() {
+        if (null == this.properties) {
+            throw new ONSClientException("properties not set");
+        }
+
+        this.orderProducer = ONSFactory.createOrderProducer(this.properties);
+        this.orderProducer.start();
+    }
+
+    @Override
+    public void updateCredential(Properties credentialProperties) {
+        if (this.orderProducer != null) {
+            this.orderProducer.updateCredential(credentialProperties);
+        }
+    }
+
+    /**
+     * Close the {@code OrderProducer} instance, it is recommended to configure the destroy-method
of the bean.
+     */
+    @Override
+    public void shutdown() {
+        if (this.orderProducer != null) {
+            this.orderProducer.shutdown();
+        }
+    }
+
+    @Override
+    public boolean isStarted() {
+        return this.orderProducer.isStarted();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return this.orderProducer.isClosed();
+    }
+
+    @Override
+    public SendResult send(final Message message, final String shardingKey) {
+        return this.orderProducer.send(message, shardingKey);
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(final Properties properties) {
+        this.properties = properties;
+    }
+}
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java
new file mode 100644
index 0000000..26d9922
--- /dev/null
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.ons.api.bean;
+
+import io.openmessaging.api.Message;
+import io.openmessaging.api.Producer;
+import io.openmessaging.api.SendCallback;
+import io.openmessaging.api.SendResult;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import org.apache.rocketmq.ons.api.ONSFactory;
+import org.apache.rocketmq.ons.api.exception.ONSClientException;
+
+/**
+ * {@code ProducerBean} for integrating {@link Producer} into Spring Bean
+ */
+public class ProducerBean implements Producer {
+    /**
+     * You need to inject this field to specify the properties of the {@code Producer} instance.
For details, see {@link
+     * PropertyKeyConst}
+     *
+     * @see ProducerBean#setProperties(Properties)
+     */
+    private Properties properties;
+    private Producer producer;
+
+    /**
+     * Start the {@code Producer} instance, it is recommended to configure the init-method
of the bean.
+     */
+    @Override
+    public void start() {
+        if (null == this.properties) {
+            throw new ONSClientException("properties not set");
+        }
+
+        this.producer = ONSFactory.createProducer(this.properties);
+        this.producer.start();
+    }
+
+    @Override
+    public void updateCredential(Properties credentialProperties) {
+        if (this.producer != null) {
+            this.producer.updateCredential(credentialProperties);
+        }
+    }
+
+    /**
+     * Close the {@code Producer} instance, it is recommended to configure the destroy-method
of the bean
+     */
+    @Override
+    public void shutdown() {
+        if (this.producer != null) {
+            this.producer.shutdown();
+        }
+    }
+
+    @Override
+    public SendResult send(Message message) {
+        return this.producer.send(message);
+    }
+
+    @Override
+    public void sendOneway(Message message) {
+        this.producer.sendOneway(message);
+    }
+
+    @Override
+    public void sendAsync(Message message, SendCallback sendCallback) {
+        this.producer.sendAsync(message, sendCallback);
+    }
+
+    @Override
+    public void setCallbackExecutor(final ExecutorService callbackExecutor) {
+        this.producer.setCallbackExecutor(callbackExecutor);
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
+
+    @Override
+    public boolean isStarted() {
+        return this.producer.isStarted();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return this.producer.isClosed();
+    }
+}
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java
new file mode 100644
index 0000000..c39e732
--- /dev/null
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.ons.api.bean;
+
+import io.openmessaging.api.Message;
+import io.openmessaging.api.SendResult;
+import io.openmessaging.api.transaction.LocalTransactionChecker;
+import io.openmessaging.api.transaction.LocalTransactionExecuter;
+import io.openmessaging.api.transaction.TransactionProducer;
+import java.util.Properties;
+import org.apache.rocketmq.ons.api.ONSFactory;
+import org.apache.rocketmq.ons.api.PropertyKeyConst;
+import org.apache.rocketmq.ons.api.exception.ONSClientException;
+
+/**
+ * {@code TransactionProducerBean} is used to integrate {@link TransactionProducer} into
Spring Bean
+ */
+public class TransactionProducerBean implements TransactionProducer {
+    /**
+     * Need to inject this field, specify the properties of the construct {@code TransactionProducer}
instance, the
+     * specific supported properties are detailed in {@link PropertyKeyConst}
+     *
+     * @see TransactionProducerBean#setProperties(Properties)
+     */
+    private Properties properties;
+
+    /**
+     * Need to inject this field, {@code TransactionProducer} will send the transaction message
will rely on the object
+     * for transaction status checkback
+     *
+     * @see TransactionProducerBean#setLocalTransactionChecker(LocalTransactionChecker)
+     */
+    private LocalTransactionChecker localTransactionChecker;
+
+    private TransactionProducer transactionProducer;
+
+    /**
+     *
+     */
+    @Override
+    public void start() {
+        if (null == this.properties) {
+            throw new ONSClientException("properties not set");
+        }
+
+        this.transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
+        this.transactionProducer.start();
+    }
+
+    @Override
+    public void updateCredential(Properties credentialProperties) {
+        if (this.transactionProducer != null) {
+            this.transactionProducer.updateCredential(credentialProperties);
+        }
+    }
+
+    /**
+     * Close the {@code TransactionProducer} instance, it is recommended to configure the
destroy-method of the bean.
+     */
+    @Override
+    public void shutdown() {
+        if (this.transactionProducer != null) {
+            this.transactionProducer.shutdown();
+        }
+    }
+
+    @Override
+    public SendResult send(Message message, LocalTransactionExecuter executer, Object arg)
{
+        return this.transactionProducer.send(message, executer, arg);
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
+
+    public LocalTransactionChecker getLocalTransactionChecker() {
+        return localTransactionChecker;
+    }
+
+    public void setLocalTransactionChecker(LocalTransactionChecker localTransactionChecker)
{
+        this.localTransactionChecker = localTransactionChecker;
+    }
+
+    @Override
+    public boolean isStarted() {
+        return this.transactionProducer.isStarted();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return this.transactionProducer.isClosed();
+    }
+}
diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/MQConfig.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/MQConfig.java
index 8a59dc3..c8d300e 100644
--- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/MQConfig.java
+++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/MQConfig.java
@@ -31,6 +31,6 @@ public class MQConfig {
     /**
      * NAMESRV_ADDR
      */
-    public static final String NAMESRV_ADDR = "47.107.167.190:9876";
+    public static final String NAMESRV_ADDR = "127.0.0.1:9876";
 
 }
diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleMQConsumer.java
b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleMQConsumer.java
index 34fe915..51f558b 100644
--- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleMQConsumer.java
+++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleMQConsumer.java
@@ -27,14 +27,23 @@ import org.apache.rocketmq.ons.sample.MQConfig;
 public class SimpleMQConsumer {
 
     public static void main(String[] args) {
-        MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
+        MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://127.0.0.1:9876");
 
         Properties consumerProperties = new Properties();
         consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.GROUP_ID);
         consumerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY);
         consumerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY);
-        consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR);
+
         Consumer consumer = messagingAccessPoint.createConsumer(consumerProperties);
+        /*
+         * Alternatively, you can use the ONSFactory to create instance directly.
+         * <pre>
+         * {@code
+         * consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR);
+         * OrderConsumer consumer  = ONSFactory.createOrderedConsumer(consumerProperties);
+         * }
+         * </pre>
+         */
         consumer.subscribe(MQConfig.TOPIC, MQConfig.TAG, new MessageListenerImpl());
         consumer.start();
         System.out.printf("Consumer start success. %n");
@@ -44,5 +53,6 @@ public class SimpleMQConsumer {
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
+        consumer.shutdown();
     }
 }
diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleOrderConsumer.java
b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleOrderConsumer.java
index d3aaa07..d991474 100644
--- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleOrderConsumer.java
+++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleOrderConsumer.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.ons.sample.consumer;
 
-
 import io.openmessaging.api.Message;
 import io.openmessaging.api.MessagingAccessPoint;
 import io.openmessaging.api.OMS;
@@ -31,13 +30,23 @@ import org.apache.rocketmq.ons.sample.MQConfig;
 public class SimpleOrderConsumer {
 
     public static void main(String[] args) {
-        MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
+        MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://127.0.0.1:9876");
         Properties consumerProperties = new Properties();
         consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.ORDER_GROUP_ID);
         consumerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY);
         consumerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY);
-        consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR);
         OrderConsumer consumer = messagingAccessPoint.createOrderedConsumer(consumerProperties);
+
+        /*
+         * Alternatively, you can use the ONSFactory to create instance directly.
+         * <pre>
+         * {@code
+         * consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR);
+         * OrderConsumer consumer  = ONSFactory.createOrderedConsumer(consumerProperties);
+         * }
+         * </pre>
+         */
+
         consumer.subscribe(MQConfig.ORDER_TOPIC, MQConfig.TAG, new MessageOrderListener()
{
 
             @Override
@@ -54,5 +63,6 @@ public class SimpleOrderConsumer {
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
+        consumer.shutdown();
     }
 }
diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java
b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java
index 7cc5682..fcf7554 100644
--- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java
+++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java
@@ -29,13 +29,24 @@ import org.apache.rocketmq.ons.sample.MQConfig;
 
 public class MQTimerProducer {
     public static void main(String[] args) {
-        MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://47.107.167.190:9876");
+        MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://127.0.0.1:9876");
 
         Properties producerProperties = new Properties();
         producerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.GROUP_ID);
         producerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY);
         producerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY);
         Producer producer = messagingAccessPoint.createProducer(producerProperties);
+
+        /*
+         * Alternatively, you can use the ONSFactory to create instance directly.
+         * <pre>
+         * {@code
+         * producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR);
+         * OrderProducer producer = ONSFactory.createOrderProducer(producerProperties);
+         * }
+         * </pre>
+         */
+
         producer.start();
         System.out.printf("Producer Started. %n");
 
diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java
b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java
index 2b53968..5bb7f9e 100644
--- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java
+++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java
@@ -17,30 +17,37 @@
 package org.apache.rocketmq.ons.sample.producer;
 
 import io.openmessaging.api.Message;
+import io.openmessaging.api.MessagingAccessPoint;
+import io.openmessaging.api.OMS;
 import io.openmessaging.api.Producer;
 import io.openmessaging.api.SendResult;
 import io.openmessaging.api.exception.OMSRuntimeException;
 import java.util.Properties;
-import org.apache.rocketmq.ons.api.ONSFactory;
 import org.apache.rocketmq.ons.api.PropertyKeyConst;
 import org.apache.rocketmq.ons.sample.MQConfig;
 
-
 //    io.openmessaging.api.xxx => com.aliyun.openservices.ons.api.xxxx
 
-
-
 public class SimpleMQProducer {
 
-
     public static void main(String[] args) {
+        MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://127.0.0.1:9876");
 
         Properties producerProperties = new Properties();
         producerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.GROUP_ID);
         producerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY);
         producerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY);
-        producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR);
-        Producer producer = ONSFactory.createProducer(producerProperties);
+
+        Producer producer = messagingAccessPoint.createProducer(producerProperties);
+        /*
+         * Alternatively, you can use the ONSFactory to create instance directly.
+         * <pre>
+         * {@code
+         * producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR);
+         * Producer producer = ONSFactory.createProducer(producerProperties);
+         * }
+         * </pre>
+         */
 
         producer.start();
         System.out.printf("Producer Started %n");
@@ -56,5 +63,7 @@ public class SimpleMQProducer {
                 e.printStackTrace();
             }
         }
+
+        producer.shutdown();
     }
 }
diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleOrderProducer.java
b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleOrderProducer.java
index 2d49a48..a454297 100644
--- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleOrderProducer.java
+++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleOrderProducer.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.ons.sample.producer;
 
-
 import io.openmessaging.api.Message;
 import io.openmessaging.api.MessagingAccessPoint;
 import io.openmessaging.api.OMS;
@@ -30,14 +29,25 @@ import org.apache.rocketmq.ons.sample.MQConfig;
 public class SimpleOrderProducer {
 
     public static void main(String[] args) {
-        MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://alice@47.107.167.190/us-east");
+        MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://127.0.0.1:9876");
 
         Properties producerProperties = new Properties();
         producerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.ORDER_GROUP_ID);
         producerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY);
         producerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY);
-        producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR);
         OrderProducer producer = messagingAccessPoint.createOrderProducer(producerProperties);
+
+
+        /*
+         * Alternatively, you can use the ONSFactory to create instance directly.
+         * <pre>
+         * {@code
+         * producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR);
+         * OrderProducer producer = ONSFactory.createOrderProducer(producerProperties);
+         * }
+         * </pre>
+         */
+
         producer.start();
         System.out.printf("Producer Started. %n");
 
@@ -55,5 +65,6 @@ public class SimpleOrderProducer {
                 e.printStackTrace();
             }
         }
+        producer.shutdown();
     }
 }
diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
index e5834f7..44da502 100644
--- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
+++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
@@ -16,12 +16,10 @@
  */
 package org.apache.rocketmq.ons.sample.producer;
 
-
 import io.openmessaging.api.Message;
 import io.openmessaging.api.MessagingAccessPoint;
 import io.openmessaging.api.OMS;
 import io.openmessaging.api.SendResult;
-import io.openmessaging.api.exception.OMSRuntimeException;
 import io.openmessaging.api.transaction.LocalTransactionExecuter;
 import io.openmessaging.api.transaction.TransactionProducer;
 import io.openmessaging.api.transaction.TransactionStatus;
@@ -34,15 +32,25 @@ import org.apache.rocketmq.ons.sample.MQConfig;
 public class SimpleTransactionProducer {
 
     public static void main(String[] args) {
-        MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
+        MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://127.0.0.1:9876");
 
         Properties tranProducerProperties = new Properties();
         tranProducerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.GROUP_ID);
         tranProducerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY);
         tranProducerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY);
-        tranProducerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR);
         LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
         TransactionProducer transactionProducer = messagingAccessPoint.createTransactionProducer(tranProducerProperties,
localTransactionChecker);
+
+        /*
+         * Alternatively, you can use the ONSFactory to create instance directly.
+         * <pre>
+         * {@code
+         * producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR);
+         * TransactionProducer producer = ONSFactory.createTransactionProducer(tranProducerProperties,
localTransactionChecker);
+         * }
+         * </pre>
+         */
+
         transactionProducer.start();
 
         Message message = new Message(MQConfig.TOPIC, MQConfig.TAG, "MQ send transaction
message test".getBytes());


Mime
View raw message