rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [3/6] incubator-rocketmq git commit: Add PushConsumer related implementation for OpenMessaging.
Date Wed, 19 Apr 2017 09:50:04 GMT
Add PushConsumer related implementation for OpenMessaging.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/a5ea4e45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/a5ea4e45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/a5ea4e45

Branch: refs/heads/openmessaging-impl
Commit: a5ea4e45bfa0b877bfc4476c2e8f23186d82a177
Parents: ce14693
Author: yukon <yukon@apache.org>
Authored: Mon Apr 17 18:05:04 2017 +0800
Committer: yukon <yukon@apache.org>
Committed: Mon Apr 17 21:13:39 2017 +0800

----------------------------------------------------------------------
 .../example/openmessaging/SimpleProducer.java   |   6 +-
 .../openmessaging/SimplePushConsumer.java       |  58 ++++++
 .../rocketmq/MessagingAccessPointImpl.java      |  15 +-
 .../java/io/openmessaging/rocketmq/OMSUtil.java |  60 +++++-
 .../rocketmq/consumer/PullConsumerImpl.java     |  62 ++++++
 .../rocketmq/consumer/PushConsumerImpl.java     | 188 +++++++++++++++++++
 .../rocketmq/domain/BytesMessageImpl.java       |   6 +
 .../rocketmq/domain/NonStandardKeys.java        |  10 +-
 .../rocketmq/producer/AbstractOMSProducer.java  |  15 +-
 pom.xml                                         |   2 +-
 10 files changed, 402 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/a5ea4e45/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
index 5a27f5a..f89ae4c 100644
--- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
@@ -47,14 +47,14 @@ public class SimpleProducer {
         }));
 
         {
-            Message message = producer.createBytesMessageToTopic("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
+            Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
             SendResult sendResult = producer.send(message);
             //final Void aVoid = result.get(3000L);
             System.out.println("send async message OK, msgId: " + sendResult.messageId());
         }
 
         {
-            final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("HELLO_TOPIC",
"HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+            final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC",
"OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
             result.addListener(new PromiseListener<SendResult>() {
                 @Override public void operationCompleted(Promise<SendResult> promise)
{
                     System.out.println("Send async message OK, msgId: " + promise.get().messageId());
@@ -67,7 +67,7 @@ public class SimpleProducer {
         }
 
         {
-            producer.sendOneway(producer.createBytesMessageToTopic("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+            producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
             System.out.println("Send oneway message OK");
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/a5ea4e45/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
new file mode 100644
index 0000000..6fc8e39
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.openmessaging;
+
+import io.openmessaging.Message;
+import io.openmessaging.MessageListener;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.OMS;
+import io.openmessaging.PushConsumer;
+import io.openmessaging.ReceivedMessageContext;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+
+public class SimplePushConsumer {
+    public static void main(String[] args) {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace");
+
+        final PushConsumer consumer = messagingAccessPoint.
+            createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
+
+        messagingAccessPoint.startup();
+        System.out.println("messagingAccessPoint startup OK");
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                consumer.shutdown();
+                messagingAccessPoint.shutdown();
+            }
+        }));
+
+        consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
+            @Override
+            public void onMessage(final Message message, final ReceivedMessageContext context)
{
+                System.out.println("Received one message: " + message);
+                context.ack();
+            }
+        });
+
+        consumer.startup();
+        System.out.println("consumer startup OK");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/a5ea4e45/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
index c30b7d4..fecd69f 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
@@ -25,7 +25,10 @@ import io.openmessaging.PushConsumer;
 import io.openmessaging.ResourceManager;
 import io.openmessaging.SequenceProducer;
 import io.openmessaging.ServiceEndPoint;
+import io.openmessaging.exception.OMSNotSupportedException;
 import io.openmessaging.observer.Observer;
+import io.openmessaging.rocketmq.consumer.PullConsumerImpl;
+import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
 import io.openmessaging.rocketmq.producer.ProducerImpl;
 import io.openmessaging.rocketmq.producer.SequenceProducerImpl;
 
@@ -63,32 +66,32 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint
{
 
     @Override
     public PushConsumer createPushConsumer() {
-        return null;
+        return new PushConsumerImpl(accessPointProperties);
     }
 
     @Override
     public PushConsumer createPushConsumer(KeyValue properties) {
-        return null;
+        return new PushConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
     }
 
     @Override
     public PullConsumer createPullConsumer(String queueName) {
-        return null;
+        return new PullConsumerImpl(accessPointProperties);
     }
 
     @Override
     public PullConsumer createPullConsumer(String queueName, KeyValue properties) {
-        return null;
+        return new PullConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
     }
 
     @Override
     public IterableConsumer createIterableConsumer(String queueName) {
-        return null;
+        throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in RocketMQ");
     }
 
     @Override
     public IterableConsumer createIterableConsumer(String queueName, KeyValue properties)
{
-        return null;
+        throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in RocketMQ");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/a5ea4e45/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
index 061ee6b..dd591a6 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
@@ -21,7 +21,12 @@ import io.openmessaging.KeyValue;
 import io.openmessaging.MessageHeader;
 import io.openmessaging.OMS;
 import io.openmessaging.SendResult;
+import io.openmessaging.rocketmq.domain.BytesMessageImpl;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
 import io.openmessaging.rocketmq.domain.SendResultImpl;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Set;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageAccessor;
@@ -45,8 +50,13 @@ public class OMSUtil {
         KeyValue properties = omsMessage.properties();
 
         //All destinations in RocketMQ use Topic
-        rmqMessage.setTopic(headers.containsKey(MessageHeader.TOPIC)
-            ? headers.getString(MessageHeader.TOPIC) : headers.getString(MessageHeader.QUEUE));
+        if (headers.containsKey(MessageHeader.TOPIC)) {
+            rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC));
+            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+        } else {
+            rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE));
+            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE");
+        }
 
         for (String key : properties.keySet()) {
             MessageAccessor.putProperty(rmqMessage, key, properties.getString(key));
@@ -60,6 +70,50 @@ public class OMSUtil {
         return rmqMessage;
     }
 
+    public static BytesMessage msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg)
{
+        BytesMessage omsMsg = new BytesMessageImpl();
+        omsMsg.setBody(rmqMsg.getBody());
+
+        KeyValue headers = omsMsg.headers();
+        KeyValue properties = omsMsg.properties();
+
+        final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet();
+
+        for (final Map.Entry<String, String> entry : entries) {
+            if (isOMSHeader(entry.getKey())) {
+                headers.put(entry.getKey(), entry.getValue());
+            } else {
+                properties.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId());
+        if (rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC"))
{
+            omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic());
+        } else {
+            omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic());
+        }
+        omsMsg.putHeaders(MessageHeader.SEARCH_KEY, rmqMsg.getKeys());
+        omsMsg.putHeaders(MessageHeader.BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
+        omsMsg.putHeaders(MessageHeader.BORN_TIMESTAMP, rmqMsg.getBornTimestamp());
+        omsMsg.putHeaders(MessageHeader.STORE_HOST, String.valueOf(rmqMsg.getStoreHost()));
+        omsMsg.putHeaders(MessageHeader.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp());
+        return omsMsg;
+    }
+
+    public static boolean isOMSHeader(String value) {
+        for (Field field : MessageHeader.class.getDeclaredFields()) {
+            try {
+                if (field.get(MessageHeader.class).equals(value)) {
+                    return true;
+                }
+            } catch (IllegalAccessException e) {
+                return false;
+            }
+        }
+        return false;
+    }
+
     /**
      * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult.
      */
@@ -68,7 +122,7 @@ public class OMSUtil {
         return new SendResultImpl(rmqResult.getMsgId(), OMS.newKeyValue());
     }
 
-    public static KeyValue buildKeyValue(KeyValue ... keyValues) {
+    public static KeyValue buildKeyValue(KeyValue... keyValues) {
         KeyValue keyValue = OMS.newKeyValue();
         for (KeyValue properties : keyValues) {
             for (String key : properties.keySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/a5ea4e45/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
new file mode 100644
index 0000000..6730b1f
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.Message;
+import io.openmessaging.PullConsumer;
+
+public class PullConsumerImpl implements PullConsumer {
+    public PullConsumerImpl(final KeyValue properties) {
+
+    }
+
+    @Override
+    public KeyValue properties() {
+        return null;
+    }
+
+    @Override
+    public Message poll() {
+        return null;
+    }
+
+    @Override
+    public Message poll(final KeyValue properties) {
+        return null;
+    }
+
+    @Override
+    public void ack(final String messageId) {
+
+    }
+
+    @Override
+    public void ack(final String messageId, final KeyValue properties) {
+
+    }
+
+    @Override
+    public void startup() {
+
+    }
+
+    @Override
+    public void shutdown() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/a5ea4e45/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
new file mode 100644
index 0000000..cd83212
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.MessageListener;
+import io.openmessaging.OMS;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.PushConsumer;
+import io.openmessaging.ReceivedMessageContext;
+import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.rocketmq.OMSUtil;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+
+public class PushConsumerImpl implements PushConsumer {
+    private final DefaultMQPushConsumer rocketmqPushConsumer;
+    private final KeyValue properties;
+    private boolean started = false;
+    private final Map<String, MessageListener> subscribeTable = new ConcurrentHashMap<>();
+
+
+    public PushConsumerImpl(final KeyValue properties) {
+        this.rocketmqPushConsumer = new DefaultMQPushConsumer();
+        this.properties = properties;
+
+        String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS);
+        if (accessPoints == null || accessPoints.isEmpty()) {
+            throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
+        }
+        this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));
+
+        String consumerGroup = properties.getString(NonStandardKeys.CONSUMER_GROUP);
+        if (null == consumerGroup || consumerGroup.isEmpty()) {
+            throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ,
please set it.");
+        }
+        this.rocketmqPushConsumer.setConsumerGroup(consumerGroup);
+
+        int maxReDeliveryTimes = properties.getInt(NonStandardKeys.MAX_REDELIVERY_TIMES);
+        if (maxReDeliveryTimes != 0) {
+            this.rocketmqPushConsumer.setMaxReconsumeTimes(maxReDeliveryTimes);
+        }
+
+        int messageConsumeTimeout = properties.getInt(NonStandardKeys.MESSAGE_CONSUME_TIMEOUT);
+        if (messageConsumeTimeout != 0) {
+            this.rocketmqPushConsumer.setConsumeTimeout(messageConsumeTimeout);
+        }
+
+        int maxConsumeThreadNums = properties.getInt(NonStandardKeys.MAX_CONSUME_THREAD_NUMS);
+        if (maxConsumeThreadNums != 0) {
+            this.rocketmqPushConsumer.setConsumeThreadMax(maxConsumeThreadNums);
+        }
+
+        int minConsumeThreadNums = properties.getInt(NonStandardKeys.MIN_CONSUME_THREAD_NUMS);
+        if (minConsumeThreadNums != 0) {
+            this.rocketmqPushConsumer.setConsumeThreadMin(minConsumeThreadNums);
+        }
+
+        String consumerId = OMSUtil.buildInstanceName();
+        this.rocketmqPushConsumer.setInstanceName(consumerId);
+        properties.put(PropertyKeys.CONSUMER_ID, consumerId);
+
+        this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl());
+    }
+
+    @Override
+    public KeyValue properties() {
+        return properties;
+    }
+
+    @Override
+    public void resume() {
+        this.rocketmqPushConsumer.resume();
+    }
+
+    @Override
+    public void suspend() {
+        this.rocketmqPushConsumer.suspend();
+    }
+
+    @Override
+    public boolean isSuspended() {
+        return this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause();
+    }
+
+    @Override
+    public PushConsumer attachQueue(final String queueName, final MessageListener listener)
{
+        this.subscribeTable.put(queueName, listener);
+        try {
+            this.rocketmqPushConsumer.subscribe(queueName, "*");
+        } catch (MQClientException e) {
+            throw new OMSRuntimeException("-1", String.format("RocketMQ push consumer can't
attach to %s.", queueName));
+        }
+        return this;
+    }
+
+    @Override
+    public synchronized void startup() {
+        if (!started) {
+            try {
+                this.rocketmqPushConsumer.start();
+            } catch (MQClientException e) {
+                throw new OMSRuntimeException("-1", e);
+            }
+        }
+        this.started = true;
+    }
+
+    @Override
+    public synchronized void shutdown() {
+        if (this.started) {
+            this.rocketmqPushConsumer.shutdown();
+        }
+        this.started = false;
+    }
+
+    class MessageListenerImpl implements MessageListenerConcurrently {
+
+        @Override
+        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> rmqMsgList,
ConsumeConcurrentlyContext contextRMQ) {
+            MessageExt rmqMsg = rmqMsgList.get(0);
+            BytesMessage omsMsg = OMSUtil.msgConvert(rmqMsg);
+
+            MessageListener listener = PushConsumerImpl.this.subscribeTable.get(rmqMsg.getTopic());
+
+            if (listener == null) {
+                throw new OMSRuntimeException("-1",
+                    String.format("The topic/queue %s isn't attached to this consumer", rmqMsg.getTopic()));
+            }
+
+            final KeyValue contextProperties = OMS.newKeyValue();
+            final CountDownLatch sync = new CountDownLatch(1);
+
+            ReceivedMessageContext context = new ReceivedMessageContext() {
+                @Override
+                public KeyValue properties() {
+                    return contextProperties;
+                }
+
+                @Override
+                public void ack() {
+                    sync.countDown();
+                    contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+                        ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+                }
+
+                @Override
+                public void ack(final KeyValue properties) {
+                    sync.countDown();
+                    contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+                        properties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS));
+                }
+            };
+            listener.onMessage(omsMsg, context);
+            try {
+                sync.await(PushConsumerImpl.this.rocketmqPushConsumer.getConsumeTimeout(),
TimeUnit.MILLISECONDS);
+            } catch (InterruptedException ignore) {
+            }
+
+            return ConsumeConcurrentlyStatus.valueOf(contextProperties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/a5ea4e45/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
index 8140fe2..43f80ae 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
@@ -20,6 +20,7 @@ import io.openmessaging.BytesMessage;
 import io.openmessaging.KeyValue;
 import io.openmessaging.Message;
 import io.openmessaging.OMS;
+import org.apache.commons.lang3.builder.ToStringBuilder;
 
 public class BytesMessageImpl implements BytesMessage {
     private KeyValue headers;
@@ -99,4 +100,9 @@ public class BytesMessageImpl implements BytesMessage {
         properties.put(key, value);
         return this;
     }
+
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/a5ea4e45/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
index cf83cfd..566a17d 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
@@ -16,5 +16,13 @@
  */
 package io.openmessaging.rocketmq.domain;
 
-public class NonStandardKeys {
+public interface NonStandardKeys {
+    String CONSUMER_GROUP = "rmq.consumer.group";
+    String PRODUCER_GROUP = "rmq.producer.group";
+    String MAX_REDELIVERY_TIMES = "rmq.max.redelivery.times";
+    String MESSAGE_CONSUME_TIMEOUT = "rmq.message.consume.timeout";
+    String MAX_CONSUME_THREAD_NUMS = "rmq.max.consume.thread.nums";
+    String MIN_CONSUME_THREAD_NUMS = "rmq.min.consume.thread.nums";
+    String MESSAGE_CONSUME_STATUS = "rmq.message.consume.status";
+    String MESSAGE_DESTINATION = "rmq.message.destination";
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/a5ea4e45/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
index 9eb735d..32d65cd 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
@@ -28,6 +28,7 @@ import io.openmessaging.exception.OMSNotSupportedException;
 import io.openmessaging.exception.OMSRuntimeException;
 import io.openmessaging.exception.OMSTimeOutException;
 import io.openmessaging.rocketmq.domain.BytesMessageImpl;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.log.ClientLogger;
@@ -50,20 +51,22 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory{
         this.rocketmqProducer = new DefaultMQProducer();
 
         String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS);
-
         if (accessPoints == null || accessPoints.isEmpty()) {
             throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
         }
-        String producerId = buildInstanceName();
+        this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';'));
 
-        int operationTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
+        String producerGroup = properties.getString(NonStandardKeys.PRODUCER_GROUP);
+        if (producerGroup == null || producerGroup.isEmpty()) {
+            producerGroup = "__OMS_PRODUCER_DEFAULT_GROUP";
+        }
+        this.rocketmqProducer.setProducerGroup(producerGroup);
 
-        this.rocketmqProducer.setProducerGroup(producerId);
+        String producerId = buildInstanceName();
+        int operationTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
         this.rocketmqProducer.setSendMsgTimeout(operationTimeout == 0 ? 5000 : operationTimeout);
         this.rocketmqProducer.setInstanceName(producerId);
-        this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';'));
         this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
-
         properties.put(PropertyKeys.PRODUCER_ID, producerId);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/a5ea4e45/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 865e9f9..a69b2c6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -607,7 +607,7 @@
             <dependency>
                 <groupId>io.openmessaging</groupId>
                 <artifactId>openmessaging-api</artifactId>
-                <version>0.1.0-beta</version>
+                <version>0.1.0-alpha</version>
             </dependency>
         </dependencies>
     </dependencyManagement>


Mime
View raw message