rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [5/6] incubator-rocketmq git commit: Polish the oms config load mechanism.
Date Wed, 19 Apr 2017 09:50:06 GMT
Polish the oms config load mechanism.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/2e3c1b00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/2e3c1b00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/2e3c1b00

Branch: refs/heads/openmessaging-impl
Commit: 2e3c1b00b7641634bd4a8e22f78544abd3d2d3dd
Parents: 625ba07
Author: yukon <yukon@apache.org>
Authored: Wed Apr 19 15:31:17 2017 +0800
Committer: yukon <yukon@apache.org>
Committed: Wed Apr 19 15:31:17 2017 +0800

----------------------------------------------------------------------
 .../openmessaging/SimplePullConsumer.java       |   8 +-
 .../io/openmessaging/rocketmq/ClientConfig.java | 194 +++++++++++++++++++
 .../rocketmq/MessagingAccessPointImpl.java      |   1 +
 .../java/io/openmessaging/rocketmq/OMSUtil.java | 182 -----------------
 .../rocketmq/consumer/LocalMessageCache.java    |  53 ++---
 .../rocketmq/consumer/PullConsumerImpl.java     |  26 +--
 .../rocketmq/consumer/PushConsumerImpl.java     |  39 ++--
 .../rocketmq/producer/AbstractOMSProducer.java  |  19 +-
 .../rocketmq/producer/ProducerImpl.java         |   4 +-
 .../rocketmq/producer/SequenceProducerImpl.java |   2 +-
 .../openmessaging/rocketmq/utils/BeanUtils.java | 185 ++++++++++++++++++
 .../openmessaging/rocketmq/utils/OMSUtil.java   | 182 +++++++++++++++++
 12 files changed, 623 insertions(+), 272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
index 8dd7b23..86cb696 100644
--- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
@@ -48,9 +48,11 @@ public class SimplePullConsumer {
 
         while (true) {
             Message message = consumer.poll();
-            String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
-            System.out.println("Received one message: " + msgId);
-            consumer.ack(msgId);
+            if (message != null) {
+                String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
+                System.out.println("Received one message: " + msgId);
+                consumer.ack(msgId);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/ClientConfig.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/ClientConfig.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/ClientConfig.java
new file mode 100644
index 0000000..fbca21a
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/ClientConfig.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq;
+
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+
+public class ClientConfig implements PropertyKeys, NonStandardKeys {
+    private String omsDriverImpl;
+    private String omsAccessPoints;
+    private String omsNamespace;
+    private String omsProducerId;
+    private String omsConsumerId;
+    private int omsOperationTimeout = 5000;
+    private String omsRoutingName;
+    private String omsOperatorName;
+    private String omsDstQueue;
+    private String omsSrcTopic;
+    private String rmqConsumerGroup;
+    private String rmqProducerGroup = "__OMS_PRODUCER_DEFAULT_GROUP";
+    private int rmqMaxRedeliveryTimes = 16;
+    private int rmqMessageConsumeTimeout = 15; //In minutes
+    private int rmqMaxConsumeThreadNums = 64;
+    private int rmqMinConsumeThreadNums = 20;
+    private String rmqMessageDestination;
+    private int rmqPullMessageBatchNums = 32;
+    private int rmqPullMessageCacheCapacity = 1000;
+
+    public String getOmsDriverImpl() {
+        return omsDriverImpl;
+    }
+
+    public void setOmsDriverImpl(final String omsDriverImpl) {
+        this.omsDriverImpl = omsDriverImpl;
+    }
+
+    public String getOmsAccessPoints() {
+        return omsAccessPoints;
+    }
+
+    public void setOmsAccessPoints(final String omsAccessPoints) {
+        this.omsAccessPoints = omsAccessPoints;
+    }
+
+    public String getOmsNamespace() {
+        return omsNamespace;
+    }
+
+    public void setOmsNamespace(final String omsNamespace) {
+        this.omsNamespace = omsNamespace;
+    }
+
+    public String getOmsProducerId() {
+        return omsProducerId;
+    }
+
+    public void setOmsProducerId(final String omsProducerId) {
+        this.omsProducerId = omsProducerId;
+    }
+
+    public String getOmsConsumerId() {
+        return omsConsumerId;
+    }
+
+    public void setOmsConsumerId(final String omsConsumerId) {
+        this.omsConsumerId = omsConsumerId;
+    }
+
+    public int getOmsOperationTimeout() {
+        return omsOperationTimeout;
+    }
+
+    public void setOmsOperationTimeout(final int omsOperationTimeout) {
+        this.omsOperationTimeout = omsOperationTimeout;
+    }
+
+    public String getOmsRoutingName() {
+        return omsRoutingName;
+    }
+
+    public void setOmsRoutingName(final String omsRoutingName) {
+        this.omsRoutingName = omsRoutingName;
+    }
+
+    public String getOmsOperatorName() {
+        return omsOperatorName;
+    }
+
+    public void setOmsOperatorName(final String omsOperatorName) {
+        this.omsOperatorName = omsOperatorName;
+    }
+
+    public String getOmsDstQueue() {
+        return omsDstQueue;
+    }
+
+    public void setOmsDstQueue(final String omsDstQueue) {
+        this.omsDstQueue = omsDstQueue;
+    }
+
+    public String getOmsSrcTopic() {
+        return omsSrcTopic;
+    }
+
+    public void setOmsSrcTopic(final String omsSrcTopic) {
+        this.omsSrcTopic = omsSrcTopic;
+    }
+
+    public String getRmqConsumerGroup() {
+        return rmqConsumerGroup;
+    }
+
+    public void setRmqConsumerGroup(final String rmqConsumerGroup) {
+        this.rmqConsumerGroup = rmqConsumerGroup;
+    }
+
+    public String getRmqProducerGroup() {
+        return rmqProducerGroup;
+    }
+
+    public void setRmqProducerGroup(final String rmqProducerGroup) {
+        this.rmqProducerGroup = rmqProducerGroup;
+    }
+
+    public int getRmqMaxRedeliveryTimes() {
+        return rmqMaxRedeliveryTimes;
+    }
+
+    public void setRmqMaxRedeliveryTimes(final int rmqMaxRedeliveryTimes) {
+        this.rmqMaxRedeliveryTimes = rmqMaxRedeliveryTimes;
+    }
+
+    public int getRmqMessageConsumeTimeout() {
+        return rmqMessageConsumeTimeout;
+    }
+
+    public void setRmqMessageConsumeTimeout(final int rmqMessageConsumeTimeout) {
+        this.rmqMessageConsumeTimeout = rmqMessageConsumeTimeout;
+    }
+
+    public int getRmqMaxConsumeThreadNums() {
+        return rmqMaxConsumeThreadNums;
+    }
+
+    public void setRmqMaxConsumeThreadNums(final int rmqMaxConsumeThreadNums) {
+        this.rmqMaxConsumeThreadNums = rmqMaxConsumeThreadNums;
+    }
+
+    public int getRmqMinConsumeThreadNums() {
+        return rmqMinConsumeThreadNums;
+    }
+
+    public void setRmqMinConsumeThreadNums(final int rmqMinConsumeThreadNums) {
+        this.rmqMinConsumeThreadNums = rmqMinConsumeThreadNums;
+    }
+
+    public String getRmqMessageDestination() {
+        return rmqMessageDestination;
+    }
+
+    public void setRmqMessageDestination(final String rmqMessageDestination) {
+        this.rmqMessageDestination = rmqMessageDestination;
+    }
+
+    public int getRmqPullMessageBatchNums() {
+        return rmqPullMessageBatchNums;
+    }
+
+    public void setRmqPullMessageBatchNums(final int rmqPullMessageBatchNums) {
+        this.rmqPullMessageBatchNums = rmqPullMessageBatchNums;
+    }
+
+    public int getRmqPullMessageCacheCapacity() {
+        return rmqPullMessageCacheCapacity;
+    }
+
+    public void setRmqPullMessageCacheCapacity(final int rmqPullMessageCacheCapacity) {
+        this.rmqPullMessageCacheCapacity = rmqPullMessageCacheCapacity;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
index af1695b..a897da5 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
@@ -31,6 +31,7 @@ import io.openmessaging.rocketmq.consumer.PullConsumerImpl;
 import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
 import io.openmessaging.rocketmq.producer.ProducerImpl;
 import io.openmessaging.rocketmq.producer.SequenceProducerImpl;
+import io.openmessaging.rocketmq.utils.OMSUtil;
 
 public class MessagingAccessPointImpl implements MessagingAccessPoint {
     private final KeyValue accessPointProperties;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
deleted file mode 100644
index 87037ee..0000000
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.openmessaging.rocketmq;
-
-import io.openmessaging.BytesMessage;
-import io.openmessaging.KeyValue;
-import io.openmessaging.MessageHeader;
-import io.openmessaging.OMS;
-import io.openmessaging.SendResult;
-import io.openmessaging.rocketmq.domain.BytesMessageImpl;
-import io.openmessaging.rocketmq.domain.NonStandardKeys;
-import io.openmessaging.rocketmq.domain.SendResultImpl;
-import java.lang.reflect.Field;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.message.MessageAccessor;
-
-public class OMSUtil {
-
-    /**
-     * Builds a OMS client instance name.
-     *
-     * @return a unique instance name
-     */
-    public static String buildInstanceName() {
-        return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime();
-    }
-
-    public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) {
-        org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
-        rmqMessage.setBody(omsMessage.getBody());
-
-        KeyValue headers = omsMessage.headers();
-        KeyValue properties = omsMessage.properties();
-
-        //All destinations in RocketMQ use Topic
-        if (headers.containsKey(MessageHeader.TOPIC)) {
-            rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC));
-            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
-        } else {
-            rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE));
-            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE");
-        }
-
-        for (String key : properties.keySet()) {
-            MessageAccessor.putProperty(rmqMessage, key, properties.getString(key));
-        }
-
-        //Headers has a high priority
-        for (String key : headers.keySet()) {
-            MessageAccessor.putProperty(rmqMessage, key, headers.getString(key));
-        }
-
-        return rmqMessage;
-    }
-
-    public static BytesMessage msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) {
-        BytesMessage omsMsg = new BytesMessageImpl();
-        omsMsg.setBody(rmqMsg.getBody());
-
-        KeyValue headers = omsMsg.headers();
-        KeyValue properties = omsMsg.properties();
-
-        final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet();
-
-        for (final Map.Entry<String, String> entry : entries) {
-            if (isOMSHeader(entry.getKey())) {
-                headers.put(entry.getKey(), entry.getValue());
-            } else {
-                properties.put(entry.getKey(), entry.getValue());
-            }
-        }
-
-        omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId());
-        if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) ||
-            rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) {
-            omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic());
-        } else {
-            omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic());
-        }
-        omsMsg.putHeaders(MessageHeader.SEARCH_KEY, rmqMsg.getKeys());
-        omsMsg.putHeaders(MessageHeader.BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
-        omsMsg.putHeaders(MessageHeader.BORN_TIMESTAMP, rmqMsg.getBornTimestamp());
-        omsMsg.putHeaders(MessageHeader.STORE_HOST, String.valueOf(rmqMsg.getStoreHost()));
-        omsMsg.putHeaders(MessageHeader.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp());
-        return omsMsg;
-    }
-
-    public static boolean isOMSHeader(String value) {
-        for (Field field : MessageHeader.class.getDeclaredFields()) {
-            try {
-                if (field.get(MessageHeader.class).equals(value)) {
-                    return true;
-                }
-            } catch (IllegalAccessException e) {
-                return false;
-            }
-        }
-        return false;
-    }
-
-    /**
-     * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult.
-     */
-    public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) {
-        assert rmqResult.getSendStatus().equals(SendStatus.SEND_OK);
-        return new SendResultImpl(rmqResult.getMsgId(), OMS.newKeyValue());
-    }
-
-    public static KeyValue buildKeyValue(KeyValue... keyValues) {
-        KeyValue keyValue = OMS.newKeyValue();
-        for (KeyValue properties : keyValues) {
-            for (String key : properties.keySet()) {
-                keyValue.put(key, properties.getString(key));
-            }
-        }
-        return keyValue;
-    }
-
-    /**
-     * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}.
-     */
-    public static <T> Iterator<T> cycle(final Iterable<T> iterable) {
-        return new Iterator<T>() {
-            Iterator<T> iterator = new Iterator<T>() {
-                @Override
-                public synchronized boolean hasNext() {
-                    return false;
-                }
-
-                @Override
-                public synchronized T next() {
-                    throw new NoSuchElementException();
-                }
-
-                @Override
-                public synchronized void remove() {
-                    //Ignore
-                }
-            };
-
-            @Override
-            public synchronized boolean hasNext() {
-                return iterator.hasNext() || iterable.iterator().hasNext();
-            }
-
-            @Override
-            public synchronized T next() {
-                if (!iterator.hasNext()) {
-                    iterator = iterable.iterator();
-                    if (!iterator.hasNext()) {
-                        throw new NoSuchElementException();
-                    }
-                }
-                return iterator.next();
-            }
-
-            @Override
-            public synchronized void remove() {
-                iterator.remove();
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
index 968229a..0ffd36c 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
@@ -18,8 +18,8 @@ package io.openmessaging.rocketmq.consumer;
 
 import io.openmessaging.KeyValue;
 import io.openmessaging.PropertyKeys;
+import io.openmessaging.rocketmq.ClientConfig;
 import io.openmessaging.rocketmq.domain.ConsumeRequest;
-import io.openmessaging.rocketmq.domain.NonStandardKeys;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -38,32 +38,19 @@ class LocalMessageCache {
     private final Map<String, ConsumeRequest> consumedRequest;
     private final ConcurrentHashMap<MessageQueue, Long> pullOffsetTable;
     private final DefaultMQPullConsumer rocketmqPullConsumer;
-    private int pullBatchNums = 32;
-    private int pollTimeout = -1;
+    private final ClientConfig clientConfig;
     private final static Logger log = ClientLogger.getLog();
 
-    LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final KeyValue properties) {
-        int cacheCapacity = 1000;
-        if (properties.containsKey(NonStandardKeys.PULL_MESSAGE_CACHE_CAPACITY)) {
-            cacheCapacity = properties.getInt(NonStandardKeys.PULL_MESSAGE_CACHE_CAPACITY);
-        }
-        consumeRequestCache = new LinkedBlockingQueue<>(cacheCapacity);
-
-        if (properties.containsKey(NonStandardKeys.PULL_MESSAGE_BATCH_NUMS)) {
-            pullBatchNums = properties.getInt(NonStandardKeys.PULL_MESSAGE_BATCH_NUMS);
-        }
-
-        if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) {
-            pollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
-        }
-
+    LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final ClientConfig clientConfig) {
+        consumeRequestCache = new LinkedBlockingQueue<>(clientConfig.getRmqPullMessageCacheCapacity());
         this.consumedRequest = new ConcurrentHashMap<>();
         this.pullOffsetTable = new ConcurrentHashMap<>();
         this.rocketmqPullConsumer = rocketmqPullConsumer;
+        this.clientConfig = clientConfig;
     }
 
     int nextPullBatchNums() {
-        return Math.min(pullBatchNums, consumeRequestCache.remainingCapacity());
+        return Math.min(clientConfig.getRmqPullMessageBatchNums(), consumeRequestCache.remainingCapacity());
     }
 
     long nextPullOffset(MessageQueue remoteQueue) {
@@ -90,31 +77,25 @@ class LocalMessageCache {
     }
 
     MessageExt poll() {
-        try {
-            ConsumeRequest consumeRequest = consumeRequestCache.take();
-            consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
-            consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest);
-            return consumeRequest.getMessageExt();
-        } catch (InterruptedException ignore) {
-        }
-        return null;
+        return poll(clientConfig.getOmsOperationTimeout());
     }
 
     MessageExt poll(final KeyValue properties) {
-        int currentPollTimeout = pollTimeout;
+        int currentPollTimeout = clientConfig.getOmsOperationTimeout();
         if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) {
             currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
         }
+        return poll(currentPollTimeout);
+    }
 
-        if (currentPollTimeout == -1) {
-            return poll();
-        }
-
+    private MessageExt poll(long timeout) {
         try {
-            ConsumeRequest consumeRequest = consumeRequestCache.poll(currentPollTimeout, TimeUnit.MILLISECONDS);
-            consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
-            consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest);
-            return consumeRequest.getMessageExt();
+            ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS);
+            if (consumeRequest != null) {
+                consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
+                consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest);
+                return consumeRequest.getMessageExt();
+            }
         } catch (InterruptedException ignore) {
         }
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
index bd33d78..56a49a4 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
@@ -21,9 +21,10 @@ import io.openmessaging.Message;
 import io.openmessaging.PropertyKeys;
 import io.openmessaging.PullConsumer;
 import io.openmessaging.exception.OMSRuntimeException;
-import io.openmessaging.rocketmq.OMSUtil;
+import io.openmessaging.rocketmq.ClientConfig;
 import io.openmessaging.rocketmq.domain.ConsumeRequest;
-import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import io.openmessaging.rocketmq.utils.BeanUtils;
+import io.openmessaging.rocketmq.utils.OMSUtil;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.MQPullConsumer;
 import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
@@ -44,6 +45,7 @@ public class PullConsumerImpl implements PullConsumer {
     private String targetQueueName;
     private final MQPullConsumerScheduleService pullConsumerScheduleService;
     private final LocalMessageCache localMessageCache;
+    private final ClientConfig clientConfig;
 
     final static Logger log = ClientLogger.getLog();
 
@@ -51,7 +53,9 @@ public class PullConsumerImpl implements PullConsumer {
         this.properties = properties;
         this.targetQueueName = queueName;
 
-        String consumerGroup = properties.getString(NonStandardKeys.CONSUMER_GROUP);
+        this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
+
+        String consumerGroup = clientConfig.getRmqConsumerGroup();
         if (null == consumerGroup || consumerGroup.isEmpty()) {
             throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it.");
         }
@@ -59,7 +63,7 @@ public class PullConsumerImpl implements PullConsumer {
 
         this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer();
 
-        String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS);
+        String accessPoints = clientConfig.getOmsAccessPoints();
         if (accessPoints == null || accessPoints.isEmpty()) {
             throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
         }
@@ -67,16 +71,14 @@ public class PullConsumerImpl implements PullConsumer {
 
         this.rocketmqPullConsumer.setConsumerGroup(consumerGroup);
 
-        int maxReDeliveryTimes = properties.getInt(NonStandardKeys.MAX_REDELIVERY_TIMES);
-        if (maxReDeliveryTimes != 0) {
-            this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes);
-        }
+        int maxReDeliveryTimes = clientConfig.getRmqMaxRedeliveryTimes();
+        this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes);
 
         String consumerId = OMSUtil.buildInstanceName();
         this.rocketmqPullConsumer.setInstanceName(consumerId);
         properties.put(PropertyKeys.CONSUMER_ID, consumerId);
 
-        this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, properties);
+        this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig);
     }
 
     @Override
@@ -86,12 +88,14 @@ public class PullConsumerImpl implements PullConsumer {
 
     @Override
     public Message poll() {
-        return OMSUtil.msgConvert(localMessageCache.poll());
+        MessageExt rmqMsg = localMessageCache.poll();
+        return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
     }
 
     @Override
     public Message poll(final KeyValue properties) {
-        return OMSUtil.msgConvert(localMessageCache.poll(properties));
+        MessageExt rmqMsg = localMessageCache.poll(properties);
+        return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
index 9c3b6a9..65c8ee0 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
@@ -24,7 +24,9 @@ import io.openmessaging.PropertyKeys;
 import io.openmessaging.PushConsumer;
 import io.openmessaging.ReceivedMessageContext;
 import io.openmessaging.exception.OMSRuntimeException;
-import io.openmessaging.rocketmq.OMSUtil;
+import io.openmessaging.rocketmq.ClientConfig;
+import io.openmessaging.rocketmq.utils.BeanUtils;
+import io.openmessaging.rocketmq.utils.OMSUtil;
 import io.openmessaging.rocketmq.domain.NonStandardKeys;
 import java.util.List;
 import java.util.Map;
@@ -43,43 +45,29 @@ public class PushConsumerImpl implements PushConsumer {
     private final KeyValue properties;
     private boolean started = false;
     private final Map<String, MessageListener> subscribeTable = new ConcurrentHashMap<>();
+    private final ClientConfig clientConfig;
 
 
     public PushConsumerImpl(final KeyValue properties) {
         this.rocketmqPushConsumer = new DefaultMQPushConsumer();
         this.properties = properties;
+        this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
 
-        String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS);
+        String accessPoints = clientConfig.getOmsAccessPoints();
         if (accessPoints == null || accessPoints.isEmpty()) {
             throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
         }
         this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));
 
-        String consumerGroup = properties.getString(NonStandardKeys.CONSUMER_GROUP);
+        String consumerGroup = clientConfig.getRmqConsumerGroup();
         if (null == consumerGroup || consumerGroup.isEmpty()) {
             throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it.");
         }
         this.rocketmqPushConsumer.setConsumerGroup(consumerGroup);
-
-        int maxReDeliveryTimes = properties.getInt(NonStandardKeys.MAX_REDELIVERY_TIMES);
-        if (maxReDeliveryTimes != 0) {
-            this.rocketmqPushConsumer.setMaxReconsumeTimes(maxReDeliveryTimes);
-        }
-
-        int messageConsumeTimeout = properties.getInt(NonStandardKeys.MESSAGE_CONSUME_TIMEOUT);
-        if (messageConsumeTimeout != 0) {
-            this.rocketmqPushConsumer.setConsumeTimeout(messageConsumeTimeout);
-        }
-
-        int maxConsumeThreadNums = properties.getInt(NonStandardKeys.MAX_CONSUME_THREAD_NUMS);
-        if (maxConsumeThreadNums != 0) {
-            this.rocketmqPushConsumer.setConsumeThreadMax(maxConsumeThreadNums);
-        }
-
-        int minConsumeThreadNums = properties.getInt(NonStandardKeys.MIN_CONSUME_THREAD_NUMS);
-        if (minConsumeThreadNums != 0) {
-            this.rocketmqPushConsumer.setConsumeThreadMin(minConsumeThreadNums);
-        }
+        this.rocketmqPushConsumer.setMaxReconsumeTimes(clientConfig.getRmqMaxRedeliveryTimes());
+        this.rocketmqPushConsumer.setConsumeTimeout(clientConfig.getRmqMessageConsumeTimeout());
+        this.rocketmqPushConsumer.setConsumeThreadMax(clientConfig.getRmqMaxConsumeThreadNums());
+        this.rocketmqPushConsumer.setConsumeThreadMin(clientConfig.getRmqMinConsumeThreadNums());
 
         String consumerId = OMSUtil.buildInstanceName();
         this.rocketmqPushConsumer.setInstanceName(consumerId);
@@ -181,10 +169,9 @@ public class PushConsumerImpl implements PushConsumer {
             long begin = System.currentTimeMillis();
             listener.onMessage(omsMsg, context);
             long costs = System.currentTimeMillis() - begin;
-
+            long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000;
             try {
-                sync.await(Math.max(0, PushConsumerImpl.this.rocketmqPushConsumer.getConsumeTimeout() - costs)
-                    , TimeUnit.MILLISECONDS);
+                sync.await(Math.max(0, timeoutMills - costs), TimeUnit.MILLISECONDS);
             } catch (InterruptedException ignore) {
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
index 32d65cd..7de7888 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
@@ -27,8 +27,9 @@ import io.openmessaging.exception.OMSMessageFormatException;
 import io.openmessaging.exception.OMSNotSupportedException;
 import io.openmessaging.exception.OMSRuntimeException;
 import io.openmessaging.exception.OMSTimeOutException;
+import io.openmessaging.rocketmq.ClientConfig;
 import io.openmessaging.rocketmq.domain.BytesMessageImpl;
-import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import io.openmessaging.rocketmq.utils.BeanUtils;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.log.ClientLogger;
@@ -38,33 +39,29 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.slf4j.Logger;
 
-import static io.openmessaging.rocketmq.OMSUtil.buildInstanceName;
+import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName;
 
 abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory{
     final static Logger log = ClientLogger.getLog();
     final KeyValue properties;
     final DefaultMQProducer rocketmqProducer;
     private boolean started = false;
+    final ClientConfig clientConfig;
 
     AbstractOMSProducer(final KeyValue properties) {
         this.properties = properties;
         this.rocketmqProducer = new DefaultMQProducer();
+        this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
 
-        String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS);
+        String accessPoints = clientConfig.getOmsAccessPoints();
         if (accessPoints == null || accessPoints.isEmpty()) {
             throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
         }
         this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';'));
-
-        String producerGroup = properties.getString(NonStandardKeys.PRODUCER_GROUP);
-        if (producerGroup == null || producerGroup.isEmpty()) {
-            producerGroup = "__OMS_PRODUCER_DEFAULT_GROUP";
-        }
-        this.rocketmqProducer.setProducerGroup(producerGroup);
+        this.rocketmqProducer.setProducerGroup(clientConfig.getRmqProducerGroup());
 
         String producerId = buildInstanceName();
-        int operationTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
-        this.rocketmqProducer.setSendMsgTimeout(operationTimeout == 0 ? 5000 : operationTimeout);
+        this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout());
         this.rocketmqProducer.setInstanceName(producerId);
         this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
         properties.put(PropertyKeys.PRODUCER_ID, producerId);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
index f5d2f25..8b2ddd2 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
@@ -25,12 +25,12 @@ import io.openmessaging.Promise;
 import io.openmessaging.PropertyKeys;
 import io.openmessaging.SendResult;
 import io.openmessaging.exception.OMSRuntimeException;
-import io.openmessaging.rocketmq.OMSUtil;
+import io.openmessaging.rocketmq.utils.OMSUtil;
 import io.openmessaging.rocketmq.promise.DefaultPromise;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendStatus;
 
-import static io.openmessaging.rocketmq.OMSUtil.msgConvert;
+import static io.openmessaging.rocketmq.utils.OMSUtil.msgConvert;
 
 public class ProducerImpl extends AbstractOMSProducer implements Producer {
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
index 89ece2b..58b1a12 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
@@ -21,7 +21,7 @@ import io.openmessaging.KeyValue;
 import io.openmessaging.Message;
 import io.openmessaging.MessageHeader;
 import io.openmessaging.SequenceProducer;
-import io.openmessaging.rocketmq.OMSUtil;
+import io.openmessaging.rocketmq.utils.OMSUtil;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
new file mode 100644
index 0000000..d8eed84
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.utils;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.slf4j.Logger;
+
+public final class BeanUtils {
+    final static Logger log = ClientLogger.getLog();
+    
+    /**
+     * Maps primitive {@code Class}es to their corresponding wrapper {@code Class}.
+     */
+    private static Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<Class<?>, Class<?>>();
+
+    static {
+        primitiveWrapperMap.put(Boolean.TYPE, Boolean.class);
+        primitiveWrapperMap.put(Byte.TYPE, Byte.class);
+        primitiveWrapperMap.put(Character.TYPE, Character.class);
+        primitiveWrapperMap.put(Short.TYPE, Short.class);
+        primitiveWrapperMap.put(Integer.TYPE, Integer.class);
+        primitiveWrapperMap.put(Long.TYPE, Long.class);
+        primitiveWrapperMap.put(Double.TYPE, Double.class);
+        primitiveWrapperMap.put(Float.TYPE, Float.class);
+        primitiveWrapperMap.put(Void.TYPE, Void.TYPE);
+    }
+
+    private static Map<Class<?>, Class<?>> wrapperMap = new HashMap<Class<?>, Class<?>>();
+
+    static {
+        for (final Class<?> primitiveClass : primitiveWrapperMap.keySet()) {
+            final Class<?> wrapperClass = primitiveWrapperMap.get(primitiveClass);
+            if (!primitiveClass.equals(wrapperClass)) {
+                wrapperMap.put(wrapperClass, primitiveClass);
+            }
+        }
+        wrapperMap.put(String.class, String.class);
+    }
+
+    /**
+     * <p>Populate the JavaBeans properties of the specified bean, based on
+     * the specified name/value pairs.  This method uses Java reflection APIs
+     * to identify corresponding "property setter" method names, and deals
+     * with setter arguments of type <Code>String</Code>, <Code>boolean</Code>,
+     * <Code>int</Code>, <Code>long</Code>, <Code>float</Code>, and
+     * <Code>double</Code>.</p>
+     *
+     * <p>The particular setter method to be called for each property is
+     * determined using the usual JavaBeans introspection mechanisms.  Thus,
+     * you may identify custom setter methods using a BeanInfo class that is
+     * associated with the class of the bean itself.  If no such BeanInfo
+     * class is available, the standard method name conversion ("set" plus
+     * the capitalized name of the property in question) is used.</p>
+     *
+     * <p><strong>NOTE</strong>:  It is contrary to the JavaBeans Specification
+     * to have more than one setter method (with different argument
+     * signatures) for the same property.</p>
+     *
+     * @param clazz JavaBean class whose properties are being populated
+     * @param properties Map keyed by property name, with the corresponding (String or String[]) value(s) to be set
+     * @param <T> Class type
+     * @return Class instance
+     */
+    public static <T> T populate(final Properties properties, final Class<T> clazz) {
+        T obj = null;
+        try {
+            obj = clazz.newInstance();
+            return populate(properties, obj);
+        } catch (Throwable e) {
+            log.warn("Error occurs !", e);
+        }
+        return obj;
+    }
+
+    public static <T> T populate(final KeyValue properties, final Class<T> clazz) {
+        T obj = null;
+        try {
+            obj = clazz.newInstance();
+            return populate(properties, obj);
+        } catch (Throwable e) {
+            log.warn("Error occurs !", e);
+        }
+        return obj;
+    }
+
+    public static Class<?> getMethodClass(Class<?> clazz, String methodName) {
+        Method[] methods = clazz.getMethods();
+        for (Method method : methods) {
+            if (method.getName().equalsIgnoreCase(methodName)) {
+                return method.getParameterTypes()[0];
+            }
+        }
+        return null;
+    }
+
+    public static void setProperties(Class<?> clazz, Object obj, String methodName,
+        Object value) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+        Class<?> parameterClass = getMethodClass(clazz, methodName);
+        Method setterMethod = clazz.getMethod(methodName, parameterClass);
+        if (parameterClass == Boolean.TYPE) {
+            setterMethod.invoke(obj, Boolean.valueOf(value.toString()));
+        } else if (parameterClass == Integer.TYPE) {
+            setterMethod.invoke(obj, Integer.valueOf(value.toString()));
+        } else if (parameterClass == Double.TYPE) {
+            setterMethod.invoke(obj, Double.valueOf(value.toString()));
+        } else if (parameterClass == Float.TYPE) {
+            setterMethod.invoke(obj, Float.valueOf(value.toString()));
+        } else if (parameterClass == Long.TYPE) {
+            setterMethod.invoke(obj, Long.valueOf(value.toString()));
+        } else
+            setterMethod.invoke(obj, value);
+    }
+
+    public static <T> T populate(final Properties properties, final T obj) {
+        Class<?> clazz = obj.getClass();
+        try {
+            
+            Set<Map.Entry<Object, Object>> entries = properties.entrySet();
+            for (Map.Entry<Object, Object> entry : entries) {
+                String entryKey = entry.getKey().toString();
+                String[] keyGroup = entryKey.split("\\.");
+                for (int i = 0; i < keyGroup.length; i++) {
+                    keyGroup[i] = keyGroup[i].toLowerCase();
+                    keyGroup[i] = StringUtils.capitalize(keyGroup[i]);
+                }
+                String beanFieldNameWithCapitalization = StringUtils.join(keyGroup);
+                try {
+                        setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, entry.getValue());
+                } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) {
+                    //ignored...
+                }
+            }
+        } catch (RuntimeException e) {
+            log.warn("Error occurs !", e);
+        }
+        return obj;
+    }
+
+    public static <T> T populate(final KeyValue properties, final T obj) {
+        Class<?> clazz = obj.getClass();
+        try {
+
+            final Set<String> keySet = properties.keySet();
+            for (String key : keySet) {
+                String[] keyGroup = key.split("\\.");
+                for (int i = 0; i < keyGroup.length; i++) {
+                    keyGroup[i] = keyGroup[i].toLowerCase();
+                    keyGroup[i] = StringUtils.capitalize(keyGroup[i]);
+                }
+                String beanFieldNameWithCapitalization = StringUtils.join(keyGroup);
+                try {
+                    setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, properties.getString(key));
+                } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) {
+                    //ignored...
+                }
+            }
+        } catch (RuntimeException e) {
+            log.warn("Error occurs !", e);
+        }
+        return obj;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
new file mode 100644
index 0000000..60c8408
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.utils;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.OMS;
+import io.openmessaging.SendResult;
+import io.openmessaging.rocketmq.domain.BytesMessageImpl;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import io.openmessaging.rocketmq.domain.SendResultImpl;
+import java.lang.reflect.Field;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageAccessor;
+
+public class OMSUtil {
+
+    /**
+     * Builds a OMS client instance name.
+     *
+     * @return a unique instance name
+     */
+    public static String buildInstanceName() {
+        return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime();
+    }
+
+    public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) {
+        org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
+        rmqMessage.setBody(omsMessage.getBody());
+
+        KeyValue headers = omsMessage.headers();
+        KeyValue properties = omsMessage.properties();
+
+        //All destinations in RocketMQ use Topic
+        if (headers.containsKey(MessageHeader.TOPIC)) {
+            rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC));
+            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+        } else {
+            rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE));
+            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE");
+        }
+
+        for (String key : properties.keySet()) {
+            MessageAccessor.putProperty(rmqMessage, key, properties.getString(key));
+        }
+
+        //Headers has a high priority
+        for (String key : headers.keySet()) {
+            MessageAccessor.putProperty(rmqMessage, key, headers.getString(key));
+        }
+
+        return rmqMessage;
+    }
+
+    public static BytesMessage msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) {
+        BytesMessage omsMsg = new BytesMessageImpl();
+        omsMsg.setBody(rmqMsg.getBody());
+
+        KeyValue headers = omsMsg.headers();
+        KeyValue properties = omsMsg.properties();
+
+        final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet();
+
+        for (final Map.Entry<String, String> entry : entries) {
+            if (isOMSHeader(entry.getKey())) {
+                headers.put(entry.getKey(), entry.getValue());
+            } else {
+                properties.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId());
+        if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) ||
+            rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) {
+            omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic());
+        } else {
+            omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic());
+        }
+        omsMsg.putHeaders(MessageHeader.SEARCH_KEY, rmqMsg.getKeys());
+        omsMsg.putHeaders(MessageHeader.BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
+        omsMsg.putHeaders(MessageHeader.BORN_TIMESTAMP, rmqMsg.getBornTimestamp());
+        omsMsg.putHeaders(MessageHeader.STORE_HOST, String.valueOf(rmqMsg.getStoreHost()));
+        omsMsg.putHeaders(MessageHeader.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp());
+        return omsMsg;
+    }
+
+    public static boolean isOMSHeader(String value) {
+        for (Field field : MessageHeader.class.getDeclaredFields()) {
+            try {
+                if (field.get(MessageHeader.class).equals(value)) {
+                    return true;
+                }
+            } catch (IllegalAccessException e) {
+                return false;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult.
+     */
+    public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) {
+        assert rmqResult.getSendStatus().equals(SendStatus.SEND_OK);
+        return new SendResultImpl(rmqResult.getMsgId(), OMS.newKeyValue());
+    }
+
+    public static KeyValue buildKeyValue(KeyValue... keyValues) {
+        KeyValue keyValue = OMS.newKeyValue();
+        for (KeyValue properties : keyValues) {
+            for (String key : properties.keySet()) {
+                keyValue.put(key, properties.getString(key));
+            }
+        }
+        return keyValue;
+    }
+
+    /**
+     * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}.
+     */
+    public static <T> Iterator<T> cycle(final Iterable<T> iterable) {
+        return new Iterator<T>() {
+            Iterator<T> iterator = new Iterator<T>() {
+                @Override
+                public synchronized boolean hasNext() {
+                    return false;
+                }
+
+                @Override
+                public synchronized T next() {
+                    throw new NoSuchElementException();
+                }
+
+                @Override
+                public synchronized void remove() {
+                    //Ignore
+                }
+            };
+
+            @Override
+            public synchronized boolean hasNext() {
+                return iterator.hasNext() || iterable.iterator().hasNext();
+            }
+
+            @Override
+            public synchronized T next() {
+                if (!iterator.hasNext()) {
+                    iterator = iterable.iterator();
+                    if (!iterator.hasNext()) {
+                        throw new NoSuchElementException();
+                    }
+                }
+                return iterator.next();
+            }
+
+            @Override
+            public synchronized void remove() {
+                iterator.remove();
+            }
+        };
+    }
+}


Mime
View raw message