rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lizhan...@apache.org
Subject [rocketmq] branch develop updated: Make code compatible to OMS 0.3.0
Date Tue, 24 Apr 2018 03:45:15 GMT
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2169e3c  Make code compatible to OMS 0.3.0
2169e3c is described below

commit 2169e3c075e48f8f016337c9b81eafc258b9b9b1
Author: shutian.lzh <shutian.lzh@alibaba-inc.com>
AuthorDate: Sun Apr 15 16:13:35 2018 +0800

    Make code compatible to OMS 0.3.0
---
 .../org/apache/rocketmq/broker/BrokerStartup.java  |   1 +
 .../rocketmq/client/log/ClientLoggerTest.java      |   5 +-
 .../example/openmessaging/SimpleProducer.java      |  54 ++++-----
 .../example/openmessaging/SimplePullConsumer.java  |  54 ++++++---
 .../example/openmessaging/SimplePushConsumer.java  |  19 ++-
 .../apache/rocketmq/example/simple/Producer.java   |   2 +-
 .../example/simple/PullScheduleService.java        |   2 +-
 .../rocketmq/MessagingAccessPointImpl.java         |  68 ++++-------
 .../rocketmq/config/ClientConfig.java              | 128 ++++++++++-----------
 .../rocketmq/consumer/LocalMessageCache.java       |  12 +-
 .../rocketmq/consumer/PullConsumerImpl.java        |  44 ++++---
 .../rocketmq/consumer/PushConsumerImpl.java        |  60 +++++++---
 .../rocketmq/domain/BytesMessageImpl.java          |  48 ++++----
 .../rocketmq/domain/RocketMQConstants.java         |   7 ++
 .../rocketmq/domain/SendResultImpl.java            |   3 +-
 .../rocketmq/producer/AbstractOMSProducer.java     |  27 ++---
 .../rocketmq/producer/ProducerImpl.java            |  45 ++++++--
 .../rocketmq/producer/SequenceProducerImpl.java    |  95 ---------------
 .../rocketmq/promise/DefaultPromise.java           |  15 +--
 .../io/openmessaging/rocketmq/utils/BeanUtils.java |   2 +-
 .../io/openmessaging/rocketmq/utils/OMSUtil.java   |  62 +++++-----
 .../rocketmq/consumer/PullConsumerImplTest.java    |  24 ++--
 .../rocketmq/consumer/PushConsumerImplTest.java    |  18 ++-
 .../rocketmq/producer/ProducerImplTest.java        |  16 +--
 .../producer/SequenceProducerImplTest.java         |  86 --------------
 .../rocketmq/promise/DefaultPromiseTest.java       |  38 ++----
 .../rocketmq/utils/BeanUtilsTest.java              |   4 +-
 pom.xml                                            |   2 +-
 28 files changed, 392 insertions(+), 549 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index f0a1150..1fc1b3b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -70,6 +70,7 @@ public class BrokerStartup {
             }
 
             log.info(tip);
+            System.out.printf("%s%n", tip);
             return controller;
         } catch (Throwable e) {
             e.printStackTrace();
diff --git a/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java b/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java
index 9fe0d8b..4888186 100644
--- a/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java
@@ -49,7 +49,10 @@ public class ClientLoggerTest {
             rocketmqCommon.info("common message {}", i, new RuntimeException());
             rocketmqRemoting.info("remoting message {}", i, new RuntimeException());
         }
-
+        try {
+            Thread.sleep(10);
+        } catch (InterruptedException ignore) {
+        }
         String content = MixAll.file2String(LOG_DIR + "/rocketmq_client.log");
         Assert.assertTrue(content.contains("testClientlog"));
         Assert.assertTrue(content.contains("RocketmqClient"));
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
index 9d162ac..dbe1d10 100644
--- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
@@ -16,19 +16,20 @@
  */
 package org.apache.rocketmq.example.openmessaging;
 
+import io.openmessaging.Future;
+import io.openmessaging.FutureListener;
 import io.openmessaging.Message;
 import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
-import io.openmessaging.Producer;
-import io.openmessaging.Promise;
-import io.openmessaging.PromiseListener;
-import io.openmessaging.SendResult;
+import io.openmessaging.OMS;
+import io.openmessaging.producer.Producer;
+import io.openmessaging.producer.SendResult;
 import java.nio.charset.Charset;
+import java.util.concurrent.CountDownLatch;
 
 public class SimpleProducer {
     public static void main(String[] args) {
-        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
-            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+        final MessagingAccessPoint messagingAccessPoint =
+            OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
 
         final Producer producer = messagingAccessPoint.createProducer();
 
@@ -38,39 +39,40 @@ public class SimpleProducer {
         producer.startup();
         System.out.printf("Producer startup OK%n");
 
-        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-            @Override
-            public void run() {
-                producer.shutdown();
-                messagingAccessPoint.shutdown();
-            }
-        }));
-
         {
-            Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
+            Message message = producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
             SendResult sendResult = producer.send(message);
             //final Void aVoid = result.get(3000L);
             System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId());
         }
 
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
         {
-            final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
-            result.addListener(new PromiseListener<SendResult>() {
-                @Override
-                public void operationCompleted(Promise<SendResult> promise) {
-                    System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
-                }
-
+            final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+            result.addListener(new FutureListener<SendResult>() {
                 @Override
-                public void operationFailed(Promise<SendResult> promise) {
-                    System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
+                public void operationComplete(Future<SendResult> future) {
+                    if (future.getThrowable() != null) {
+                        System.out.printf("Send async message Failed, error: %s%n", future.getThrowable().getMessage());
+                    } else {
+                        System.out.printf("Send async message OK, msgId: %s%n", future.get().messageId());
+                    }
+                    countDownLatch.countDown();
                 }
             });
         }
 
         {
-            producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+            producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
             System.out.printf("Send oneway message OK%n");
         }
+
+        try {
+            countDownLatch.await();
+            Thread.sleep(500); // Wait some time for one-way delivery.
+        } catch (InterruptedException ignore) {
+        }
+
+        producer.shutdown();
     }
 }
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
index 8e06772..86aba41 100644
--- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
@@ -17,42 +17,60 @@
 package org.apache.rocketmq.example.openmessaging;
 
 import io.openmessaging.Message;
-import io.openmessaging.MessageHeader;
 import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
 import io.openmessaging.OMS;
-import io.openmessaging.PullConsumer;
-import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import io.openmessaging.OMSBuiltinKeys;
+import io.openmessaging.consumer.PullConsumer;
+import io.openmessaging.producer.Producer;
+import io.openmessaging.producer.SendResult;
 
 public class SimplePullConsumer {
     public static void main(String[] args) {
-        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
-            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+        final MessagingAccessPoint messagingAccessPoint =
+            OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
 
-        final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
-            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
+        messagingAccessPoint.startup();
+
+        final Producer producer = messagingAccessPoint.createProducer();
+
+        final PullConsumer consumer = messagingAccessPoint.createPullConsumer(
+            OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
 
         messagingAccessPoint.startup();
         System.out.printf("MessagingAccessPoint startup OK%n");
 
-        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-            @Override
-            public void run() {
-                consumer.shutdown();
-                messagingAccessPoint.shutdown();
-            }
-        }));
+        final String queueName = "TopicTest";
+
+        producer.startup();
+        Message msg = producer.createBytesMessage(queueName, "Hello Open Messaging".getBytes());
+        SendResult sendResult = producer.send(msg);
+        System.out.printf("Send Message OK. MsgId: %s%n", sendResult.messageId());
+        producer.shutdown();
+
+        consumer.attachQueue(queueName);
 
         consumer.startup();
         System.out.printf("Consumer startup OK%n");
 
-        while (true) {
-            Message message = consumer.poll();
+        // Keep running until we find the one that has just been sent
+        boolean stop = false;
+        while (!stop) {
+            Message message = consumer.receive();
             if (message != null) {
-                String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
+                String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID);
                 System.out.printf("Received one message: %s%n", msgId);
                 consumer.ack(msgId);
+
+                if (!stop) {
+                    stop = msgId.equalsIgnoreCase(sendResult.messageId());
+                }
+
+            } else {
+                System.out.printf("Return without any message%n");
             }
         }
+
+        consumer.shutdown();
+        messagingAccessPoint.shutdown();
     }
 }
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
index b0935d4..220c132 100644
--- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
@@ -17,22 +17,19 @@
 package org.apache.rocketmq.example.openmessaging;
 
 import io.openmessaging.Message;
-import io.openmessaging.MessageHeader;
-import io.openmessaging.MessageListener;
 import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
 import io.openmessaging.OMS;
-import io.openmessaging.PushConsumer;
-import io.openmessaging.ReceivedMessageContext;
-import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import io.openmessaging.OMSBuiltinKeys;
+import io.openmessaging.consumer.MessageListener;
+import io.openmessaging.consumer.PushConsumer;
 
 public class SimplePushConsumer {
     public static void main(String[] args) {
-        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
-            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+        final MessagingAccessPoint messagingAccessPoint = OMS
+            .getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
 
         final PushConsumer consumer = messagingAccessPoint.
-            createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
+            createPushConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
 
         messagingAccessPoint.startup();
         System.out.printf("MessagingAccessPoint startup OK%n");
@@ -47,8 +44,8 @@ public class SimplePushConsumer {
 
         consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
             @Override
-            public void onMessage(final Message message, final ReceivedMessageContext context) {
-                System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
+            public void onReceived(Message message, Context context) {
+                System.out.printf("Received one message: %s%n", message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID));
                 context.ack();
             }
         });
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
index 5751d22..7b504dd 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
@@ -29,7 +29,7 @@ public class Producer {
 
         producer.start();
 
-        for (int i = 0; i < 10000000; i++)
+        for (int i = 0; i < 128; i++)
             try {
                 {
                     Message msg = new Message("TopicTest",
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
index 151628f..8cfdd9b 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
@@ -32,7 +32,7 @@ public class PullScheduleService {
         final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");
 
         scheduleService.setMessageModel(MessageModel.CLUSTERING);
-        scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() {
+        scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {
 
             @Override
             public void doPullTask(MessageQueue mq, PullTaskContext context) {
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
index 65caf84..51388f9 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
@@ -16,24 +16,21 @@
  */
 package io.openmessaging.rocketmq;
 
-import io.openmessaging.IterableConsumer;
 import io.openmessaging.KeyValue;
 import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.Producer;
-import io.openmessaging.PullConsumer;
-import io.openmessaging.PushConsumer;
 import io.openmessaging.ResourceManager;
-import io.openmessaging.SequenceProducer;
-import io.openmessaging.ServiceEndPoint;
+import io.openmessaging.consumer.PullConsumer;
+import io.openmessaging.consumer.PushConsumer;
+import io.openmessaging.consumer.StreamingConsumer;
 import io.openmessaging.exception.OMSNotSupportedException;
-import io.openmessaging.observer.Observer;
+import io.openmessaging.producer.Producer;
 import io.openmessaging.rocketmq.consumer.PullConsumerImpl;
 import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
 import io.openmessaging.rocketmq.producer.ProducerImpl;
-import io.openmessaging.rocketmq.producer.SequenceProducerImpl;
 import io.openmessaging.rocketmq.utils.OMSUtil;
 
 public class MessagingAccessPointImpl implements MessagingAccessPoint {
+
     private final KeyValue accessPointProperties;
 
     public MessagingAccessPointImpl(final KeyValue accessPointProperties) {
@@ -41,11 +38,16 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
     }
 
     @Override
-    public KeyValue properties() {
+    public KeyValue attributes() {
         return accessPointProperties;
     }
 
     @Override
+    public String implVersion() {
+        return "0.3.0";
+    }
+
+    @Override
     public Producer createProducer() {
         return new ProducerImpl(this.accessPointProperties);
     }
@@ -56,16 +58,6 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
     }
 
     @Override
-    public SequenceProducer createSequenceProducer() {
-        return new SequenceProducerImpl(this.accessPointProperties);
-    }
-
-    @Override
-    public SequenceProducer createSequenceProducer(KeyValue properties) {
-        return new SequenceProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
-    }
-
-    @Override
     public PushConsumer createPushConsumer() {
         return new PushConsumerImpl(accessPointProperties);
     }
@@ -76,51 +68,31 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
     }
 
     @Override
-    public PullConsumer createPullConsumer(String queueName) {
-        return new PullConsumerImpl(queueName, accessPointProperties);
+    public PullConsumer createPullConsumer() {
+        return new PullConsumerImpl(accessPointProperties);
     }
 
     @Override
-    public PullConsumer createPullConsumer(String queueName, KeyValue properties) {
-        return new PullConsumerImpl(queueName, OMSUtil.buildKeyValue(this.accessPointProperties, properties));
+    public PullConsumer createPullConsumer(KeyValue attributes) {
+        return new PullConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, attributes));
     }
 
     @Override
-    public IterableConsumer createIterableConsumer(String queueName) {
-        throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version");
+    public StreamingConsumer createStreamingConsumer() {
+        return null;
     }
 
     @Override
-    public IterableConsumer createIterableConsumer(String queueName, KeyValue properties) {
-        throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version");
+    public StreamingConsumer createStreamingConsumer(KeyValue attributes) {
+        return null;
     }
 
     @Override
-    public ResourceManager getResourceManager() {
+    public ResourceManager resourceManager() {
         throw new OMSNotSupportedException("-1", "ResourceManager is not supported in current version.");
     }
 
     @Override
-    public ServiceEndPoint createServiceEndPoint() {
-        throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version.");
-    }
-
-    @Override
-    public ServiceEndPoint createServiceEndPoint(KeyValue properties) {
-        throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version.");
-    }
-
-    @Override
-    public void addObserver(Observer observer) {
-        //Ignore
-    }
-
-    @Override
-    public void deleteObserver(Observer observer) {
-        //Ignore
-    }
-
-    @Override
     public void startup() {
         //Ignore
     }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java
index 7077c6d..a5dfe49 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java
@@ -16,20 +16,20 @@
  */
 package io.openmessaging.rocketmq.config;
 
-import io.openmessaging.PropertyKeys;
+import io.openmessaging.OMSBuiltinKeys;
 import io.openmessaging.rocketmq.domain.NonStandardKeys;
 
-public class ClientConfig implements PropertyKeys, NonStandardKeys {
-    private String omsDriverImpl;
-    private String omsAccessPoints;
-    private String omsNamespace;
-    private String omsProducerId;
-    private String omsConsumerId;
-    private int omsOperationTimeout = 5000;
-    private String omsRoutingName;
-    private String omsOperatorName;
-    private String omsDstQueue;
-    private String omsSrcTopic;
+public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys {
+    private String driverImpl;
+    private String accessPoints;
+    private String namespace;
+    private String producerId;
+    private String consumerId;
+    private int operationTimeout = 5000;
+    private String region;
+    private String routingSource;
+    private String routingDestination;
+    private String routingExpression;
     private String rmqConsumerGroup;
     private String rmqProducerGroup = "__OMS_PRODUCER_DEFAULT_GROUP";
     private int rmqMaxRedeliveryTimes = 16;
@@ -40,84 +40,60 @@ public class ClientConfig implements PropertyKeys, NonStandardKeys {
     private int rmqPullMessageBatchNums = 32;
     private int rmqPullMessageCacheCapacity = 1000;
 
-    public String getOmsDriverImpl() {
-        return omsDriverImpl;
+    public String getDriverImpl() {
+        return driverImpl;
     }
 
-    public void setOmsDriverImpl(final String omsDriverImpl) {
-        this.omsDriverImpl = omsDriverImpl;
+    public void setDriverImpl(final String driverImpl) {
+        this.driverImpl = driverImpl;
     }
 
-    public String getOmsAccessPoints() {
-        return omsAccessPoints;
+    public String getAccessPoints() {
+        return accessPoints;
     }
 
-    public void setOmsAccessPoints(final String omsAccessPoints) {
-        this.omsAccessPoints = omsAccessPoints;
+    public void setAccessPoints(final String accessPoints) {
+        this.accessPoints = accessPoints;
     }
 
-    public String getOmsNamespace() {
-        return omsNamespace;
+    public String getNamespace() {
+        return namespace;
     }
 
-    public void setOmsNamespace(final String omsNamespace) {
-        this.omsNamespace = omsNamespace;
+    public void setNamespace(final String namespace) {
+        this.namespace = namespace;
     }
 
-    public String getOmsProducerId() {
-        return omsProducerId;
+    public String getProducerId() {
+        return producerId;
     }
 
-    public void setOmsProducerId(final String omsProducerId) {
-        this.omsProducerId = omsProducerId;
+    public void setProducerId(final String producerId) {
+        this.producerId = producerId;
     }
 
-    public String getOmsConsumerId() {
-        return omsConsumerId;
+    public String getConsumerId() {
+        return consumerId;
     }
 
-    public void setOmsConsumerId(final String omsConsumerId) {
-        this.omsConsumerId = omsConsumerId;
+    public void setConsumerId(final String consumerId) {
+        this.consumerId = consumerId;
     }
 
-    public int getOmsOperationTimeout() {
-        return omsOperationTimeout;
+    public int getOperationTimeout() {
+        return operationTimeout;
     }
 
-    public void setOmsOperationTimeout(final int omsOperationTimeout) {
-        this.omsOperationTimeout = omsOperationTimeout;
+    public void setOperationTimeout(final int operationTimeout) {
+        this.operationTimeout = operationTimeout;
     }
 
-    public String getOmsRoutingName() {
-        return omsRoutingName;
+    public String getRoutingSource() {
+        return routingSource;
     }
 
-    public void setOmsRoutingName(final String omsRoutingName) {
-        this.omsRoutingName = omsRoutingName;
-    }
-
-    public String getOmsOperatorName() {
-        return omsOperatorName;
-    }
-
-    public void setOmsOperatorName(final String omsOperatorName) {
-        this.omsOperatorName = omsOperatorName;
-    }
-
-    public String getOmsDstQueue() {
-        return omsDstQueue;
-    }
-
-    public void setOmsDstQueue(final String omsDstQueue) {
-        this.omsDstQueue = omsDstQueue;
-    }
-
-    public String getOmsSrcTopic() {
-        return omsSrcTopic;
-    }
-
-    public void setOmsSrcTopic(final String omsSrcTopic) {
-        this.omsSrcTopic = omsSrcTopic;
+    public void setRoutingSource(final String routingSource) {
+        this.routingSource = routingSource;
     }
 
     public String getRmqConsumerGroup() {
@@ -191,4 +167,28 @@ public class ClientConfig implements PropertyKeys, NonStandardKeys {
     public void setRmqPullMessageCacheCapacity(final int rmqPullMessageCacheCapacity) {
         this.rmqPullMessageCacheCapacity = rmqPullMessageCacheCapacity;
     }
+
+    public String getRegion() {
+        return region;
+    }
+
+    public void setRegion(String region) {
+        this.region = region;
+    }
+
+    public String getRoutingDestination() {
+        return routingDestination;
+    }
+
+    public void setRoutingDestination(String routingDestination) {
+        this.routingDestination = routingDestination;
+    }
+
+    public String getRoutingExpression() {
+        return routingExpression;
+    }
+
+    public void setRoutingExpression(String routingExpression) {
+        this.routingExpression = routingExpression;
+    }
 }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
index cc1a515..93e60a7 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
@@ -17,7 +17,7 @@
 package io.openmessaging.rocketmq.consumer;
 
 import io.openmessaging.KeyValue;
-import io.openmessaging.PropertyKeys;
+import io.openmessaging.Message;
 import io.openmessaging.ServiceLifecycle;
 import io.openmessaging.rocketmq.config.ClientConfig;
 import io.openmessaging.rocketmq.domain.ConsumeRequest;
@@ -37,11 +37,11 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.logging.InternalLogger;
 
 class LocalMessageCache implements ServiceLifecycle {
     private final BlockingQueue<ConsumeRequest> consumeRequestCache;
@@ -91,13 +91,13 @@ class LocalMessageCache implements ServiceLifecycle {
     }
 
     MessageExt poll() {
-        return poll(clientConfig.getOmsOperationTimeout());
+        return poll(clientConfig.getOperationTimeout());
     }
 
     MessageExt poll(final KeyValue properties) {
-        int currentPollTimeout = clientConfig.getOmsOperationTimeout();
-        if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) {
-            currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
+        int currentPollTimeout = clientConfig.getOperationTimeout();
+        if (properties.containsKey(Message.BuiltinKeys.TIMEOUT)) {
+            currentPollTimeout = properties.getInt(Message.BuiltinKeys.TIMEOUT);
         }
         return poll(currentPollTimeout);
     }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
index da4afdb..2e22509 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
@@ -18,8 +18,8 @@ package io.openmessaging.rocketmq.consumer;
 
 import io.openmessaging.KeyValue;
 import io.openmessaging.Message;
-import io.openmessaging.PropertyKeys;
-import io.openmessaging.PullConsumer;
+import io.openmessaging.OMSBuiltinKeys;
+import io.openmessaging.consumer.PullConsumer;
 import io.openmessaging.exception.OMSRuntimeException;
 import io.openmessaging.rocketmq.config.ClientConfig;
 import io.openmessaging.rocketmq.domain.ConsumeRequest;
@@ -34,28 +34,25 @@ import org.apache.rocketmq.client.consumer.PullTaskContext;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
 import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
 
 public class PullConsumerImpl implements PullConsumer {
     private final DefaultMQPullConsumer rocketmqPullConsumer;
     private final KeyValue properties;
     private boolean started = false;
-    private String targetQueueName;
     private final MQPullConsumerScheduleService pullConsumerScheduleService;
     private final LocalMessageCache localMessageCache;
     private final ClientConfig clientConfig;
 
     final static InternalLogger log = ClientLogger.getLog();
 
-    public PullConsumerImpl(final String queueName, final KeyValue properties) {
+    public PullConsumerImpl(final KeyValue properties) {
         this.properties = properties;
-        this.targetQueueName = queueName;
-
         this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
 
-        String consumerGroup = clientConfig.getRmqConsumerGroup();
+        String consumerGroup = clientConfig.getConsumerId();
         if (null == consumerGroup || consumerGroup.isEmpty()) {
             throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it.");
         }
@@ -63,7 +60,7 @@ public class PullConsumerImpl implements PullConsumer {
 
         this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer();
 
-        String accessPoints = clientConfig.getOmsAccessPoints();
+        String accessPoints = clientConfig.getAccessPoints();
         if (accessPoints == null || accessPoints.isEmpty()) {
             throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
         }
@@ -76,24 +73,42 @@ public class PullConsumerImpl implements PullConsumer {
 
         String consumerId = OMSUtil.buildInstanceName();
         this.rocketmqPullConsumer.setInstanceName(consumerId);
-        properties.put(PropertyKeys.CONSUMER_ID, consumerId);
+        properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
 
         this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig);
     }
 
     @Override
-    public KeyValue properties() {
+    public KeyValue attributes() {
         return properties;
     }
 
     @Override
-    public Message poll() {
+    public PullConsumer attachQueue(String queueName) {
+        registerPullTaskCallback(queueName);
+        return this;
+    }
+
+    @Override
+    public PullConsumer attachQueue(String queueName, KeyValue attributes) {
+        registerPullTaskCallback(queueName);
+        return this;
+    }
+
+    @Override
+    public PullConsumer detachQueue(String queueName) {
+        this.rocketmqPullConsumer.getRegisterTopics().remove(queueName);
+        return this;
+    }
+
+    @Override
+    public Message receive() {
         MessageExt rmqMsg = localMessageCache.poll();
         return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
     }
 
     @Override
-    public Message poll(final KeyValue properties) {
+    public Message receive(final KeyValue properties) {
         MessageExt rmqMsg = localMessageCache.poll(properties);
         return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
     }
@@ -112,7 +127,6 @@ public class PullConsumerImpl implements PullConsumer {
     public synchronized void startup() {
         if (!started) {
             try {
-                registerPullTaskCallback();
                 this.pullConsumerScheduleService.start();
                 this.localMessageCache.startup();
             } catch (MQClientException e) {
@@ -122,7 +136,7 @@ public class PullConsumerImpl implements PullConsumer {
         this.started = true;
     }
 
-    private void registerPullTaskCallback() {
+    private void registerPullTaskCallback(final String targetQueueName) {
         this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback() {
             @Override
             public void doPullTask(final MessageQueue mq, final PullTaskContext context) {
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
index f9b8058..9bfd7c8 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
@@ -18,12 +18,12 @@ package io.openmessaging.rocketmq.consumer;
 
 import io.openmessaging.BytesMessage;
 import io.openmessaging.KeyValue;
-import io.openmessaging.MessageListener;
 import io.openmessaging.OMS;
-import io.openmessaging.PropertyKeys;
-import io.openmessaging.PushConsumer;
-import io.openmessaging.ReceivedMessageContext;
+import io.openmessaging.OMSBuiltinKeys;
+import io.openmessaging.consumer.MessageListener;
+import io.openmessaging.consumer.PushConsumer;
 import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.interceptor.ConsumerInterceptor;
 import io.openmessaging.rocketmq.config.ClientConfig;
 import io.openmessaging.rocketmq.domain.NonStandardKeys;
 import io.openmessaging.rocketmq.utils.BeanUtils;
@@ -52,13 +52,13 @@ public class PushConsumerImpl implements PushConsumer {
         this.properties = properties;
         this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
 
-        String accessPoints = clientConfig.getOmsAccessPoints();
+        String accessPoints = clientConfig.getAccessPoints();
         if (accessPoints == null || accessPoints.isEmpty()) {
             throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
         }
         this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));
 
-        String consumerGroup = clientConfig.getRmqConsumerGroup();
+        String consumerGroup = clientConfig.getConsumerId();
         if (null == consumerGroup || consumerGroup.isEmpty()) {
             throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it.");
         }
@@ -70,13 +70,13 @@ public class PushConsumerImpl implements PushConsumer {
 
         String consumerId = OMSUtil.buildInstanceName();
         this.rocketmqPushConsumer.setInstanceName(consumerId);
-        properties.put(PropertyKeys.CONSUMER_ID, consumerId);
+        properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
 
         this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl());
     }
 
     @Override
-    public KeyValue properties() {
+    public KeyValue attributes() {
         return properties;
     }
 
@@ -91,6 +91,11 @@ public class PushConsumerImpl implements PushConsumer {
     }
 
     @Override
+    public void suspend(long timeout) {
+
+    }
+
+    @Override
     public boolean isSuspended() {
         return this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause();
     }
@@ -107,6 +112,32 @@ public class PushConsumerImpl implements PushConsumer {
     }
 
     @Override
+    public PushConsumer attachQueue(String queueName, MessageListener listener, KeyValue attributes) {
+        return this.attachQueue(queueName, listener);
+    }
+
+    @Override
+    public PushConsumer detachQueue(String queueName) {
+        this.subscribeTable.remove(queueName);
+        try {
+            this.rocketmqPushConsumer.unsubscribe(queueName);
+        } catch (Exception e) {
+            throw new OMSRuntimeException("-1", String.format("RocketMQ push consumer fails to unsubscribe topic: %s", queueName));
+        }
+        return null;
+    }
+
+    @Override
+    public void addInterceptor(ConsumerInterceptor interceptor) {
+
+    }
+
+    @Override
+    public void removeInterceptor(ConsumerInterceptor interceptor) {
+
+    }
+
+    @Override
     public synchronized void startup() {
         if (!started) {
             try {
@@ -146,9 +177,9 @@ public class PushConsumerImpl implements PushConsumer {
 
             contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.RECONSUME_LATER.name());
 
-            ReceivedMessageContext context = new ReceivedMessageContext() {
+            MessageListener.Context context = new MessageListener.Context() {
                 @Override
-                public KeyValue properties() {
+                public KeyValue attributes() {
                     return contextProperties;
                 }
 
@@ -158,16 +189,9 @@ public class PushConsumerImpl implements PushConsumer {
                     contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
                         ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
                 }
-
-                @Override
-                public void ack(final KeyValue properties) {
-                    sync.countDown();
-                    contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                        properties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS));
-                }
             };
             long begin = System.currentTimeMillis();
-            listener.onMessage(omsMsg, context);
+            listener.onReceived(omsMsg, context);
             long costs = System.currentTimeMillis() - begin;
             long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000;
             try {
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
index 43f80ae..702d561 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
@@ -23,13 +23,13 @@ import io.openmessaging.OMS;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 
 public class BytesMessageImpl implements BytesMessage {
-    private KeyValue headers;
-    private KeyValue properties;
+    private KeyValue sysHeaders;
+    private KeyValue userHeaders;
     private byte[] body;
 
     public BytesMessageImpl() {
-        this.headers = OMS.newKeyValue();
-        this.properties = OMS.newKeyValue();
+        this.sysHeaders = OMS.newKeyValue();
+        this.userHeaders = OMS.newKeyValue();
     }
 
     @Override
@@ -44,60 +44,60 @@ public class BytesMessageImpl implements BytesMessage {
     }
 
     @Override
-    public KeyValue headers() {
-        return headers;
+    public KeyValue sysHeaders() {
+        return sysHeaders;
     }
 
     @Override
-    public KeyValue properties() {
-        return properties;
+    public KeyValue userHeaders() {
+        return userHeaders;
     }
 
     @Override
-    public Message putHeaders(final String key, final int value) {
-        headers.put(key, value);
+    public Message putSysHeaders(String key, int value) {
+        sysHeaders.put(key, value);
         return this;
     }
 
     @Override
-    public Message putHeaders(final String key, final long value) {
-        headers.put(key, value);
+    public Message putSysHeaders(String key, long value) {
+        sysHeaders.put(key, value);
         return this;
     }
 
     @Override
-    public Message putHeaders(final String key, final double value) {
-        headers.put(key, value);
+    public Message putSysHeaders(String key, double value) {
+        sysHeaders.put(key, value);
         return this;
     }
 
     @Override
-    public Message putHeaders(final String key, final String value) {
-        headers.put(key, value);
+    public Message putSysHeaders(String key, String value) {
+        sysHeaders.put(key, value);
         return this;
     }
 
     @Override
-    public Message putProperties(final String key, final int value) {
-        properties.put(key, value);
+    public Message putUserHeaders(String key, int value) {
+        userHeaders.put(key, value);
         return this;
     }
 
     @Override
-    public Message putProperties(final String key, final long value) {
-        properties.put(key, value);
+    public Message putUserHeaders(String key, long value) {
+        userHeaders.put(key, value);
         return this;
     }
 
     @Override
-    public Message putProperties(final String key, final double value) {
-        properties.put(key, value);
+    public Message putUserHeaders(String key, double value) {
+        userHeaders.put(key, value);
         return this;
     }
 
     @Override
-    public Message putProperties(final String key, final String value) {
-        properties.put(key, value);
+    public Message putUserHeaders(String key, String value) {
+        userHeaders.put(key, value);
         return this;
     }
 
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java
new file mode 100644
index 0000000..4c6568a
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java
@@ -0,0 +1,7 @@
+package io.openmessaging.rocketmq.domain;
+
+public interface RocketMQConstants {
+
+    String START_DELIVER_TIME = "__STARTDELIVERTIME";
+
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
index 228a9f0..85bcd68 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
@@ -17,7 +17,7 @@
 package io.openmessaging.rocketmq.domain;
 
 import io.openmessaging.KeyValue;
-import io.openmessaging.SendResult;
+import io.openmessaging.producer.SendResult;
 
 public class SendResultImpl implements SendResult {
     private String messageId;
@@ -33,7 +33,6 @@ public class SendResultImpl implements SendResult {
         return messageId;
     }
 
-    @Override
     public KeyValue properties() {
         return properties;
     }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
index db25fc6..f733756 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
@@ -20,8 +20,7 @@ import io.openmessaging.BytesMessage;
 import io.openmessaging.KeyValue;
 import io.openmessaging.Message;
 import io.openmessaging.MessageFactory;
-import io.openmessaging.MessageHeader;
-import io.openmessaging.PropertyKeys;
+import io.openmessaging.OMSBuiltinKeys;
 import io.openmessaging.ServiceLifecycle;
 import io.openmessaging.exception.OMSMessageFormatException;
 import io.openmessaging.exception.OMSNotSupportedException;
@@ -53,7 +52,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
         this.rocketmqProducer = new DefaultMQProducer();
         this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
 
-        String accessPoints = clientConfig.getOmsAccessPoints();
+        String accessPoints = clientConfig.getAccessPoints();
         if (accessPoints == null || accessPoints.isEmpty()) {
             throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
         }
@@ -61,10 +60,10 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
         this.rocketmqProducer.setProducerGroup(clientConfig.getRmqProducerGroup());
 
         String producerId = buildInstanceName();
-        this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout());
+        this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOperationTimeout());
         this.rocketmqProducer.setInstanceName(producerId);
         this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
-        properties.put(PropertyKeys.PRODUCER_ID, producerId);
+        properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId);
     }
 
     @Override
@@ -121,18 +120,10 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
     }
 
     @Override
-    public BytesMessage createBytesMessageToTopic(final String topic, final byte[] body) {
-        BytesMessage bytesMessage = new BytesMessageImpl();
-        bytesMessage.setBody(body);
-        bytesMessage.headers().put(MessageHeader.TOPIC, topic);
-        return bytesMessage;
-    }
-
-    @Override
-    public BytesMessage createBytesMessageToQueue(final String queue, final byte[] body) {
-        BytesMessage bytesMessage = new BytesMessageImpl();
-        bytesMessage.setBody(body);
-        bytesMessage.headers().put(MessageHeader.QUEUE, queue);
-        return bytesMessage;
+    public BytesMessage createBytesMessage(String queue, byte[] body) {
+        BytesMessage message = new BytesMessageImpl();
+        message.setBody(body);
+        message.sysHeaders().put(Message.BuiltinKeys.DESTINATION, queue);
+        return message;
     }
 }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
index 2c00c60..c2b6d3e 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
@@ -19,12 +19,13 @@ package io.openmessaging.rocketmq.producer;
 import io.openmessaging.BytesMessage;
 import io.openmessaging.KeyValue;
 import io.openmessaging.Message;
-import io.openmessaging.MessageHeader;
-import io.openmessaging.Producer;
 import io.openmessaging.Promise;
-import io.openmessaging.PropertyKeys;
-import io.openmessaging.SendResult;
 import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.interceptor.ProducerInterceptor;
+import io.openmessaging.producer.BatchMessageSender;
+import io.openmessaging.producer.LocalTransactionExecutor;
+import io.openmessaging.producer.Producer;
+import io.openmessaging.producer.SendResult;
 import io.openmessaging.rocketmq.promise.DefaultPromise;
 import io.openmessaging.rocketmq.utils.OMSUtil;
 import org.apache.rocketmq.client.producer.SendCallback;
@@ -39,7 +40,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
     }
 
     @Override
-    public KeyValue properties() {
+    public KeyValue attributes() {
         return properties;
     }
 
@@ -50,11 +51,16 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
 
     @Override
     public SendResult send(final Message message, final KeyValue properties) {
-        long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
-            ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
+        long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT)
+            ? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
         return send(message, timeout);
     }
 
+    @Override
+    public SendResult send(Message message, LocalTransactionExecutor branchExecutor, KeyValue attributes) {
+        return null;
+    }
+
     private SendResult send(final Message message, long timeout) {
         checkMessageType(message);
         org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
@@ -64,11 +70,11 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
                 log.error(String.format("Send message to RocketMQ failed, %s", message));
                 throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.");
             }
-            message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
+            message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId());
             return OMSUtil.sendResultConvert(rmqResult);
         } catch (Exception e) {
             log.error(String.format("Send message to RocketMQ failed, %s", message), e);
-            throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
+            throw checkProducerException(rmqMessage.getTopic(), message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID), e);
         }
     }
 
@@ -79,8 +85,8 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
 
     @Override
     public Promise<SendResult> sendAsync(final Message message, final KeyValue properties) {
-        long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
-            ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
+        long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT)
+            ? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
         return sendAsync(message, timeout);
     }
 
@@ -92,7 +98,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
             this.rocketmqProducer.send(rmqMessage, new SendCallback() {
                 @Override
                 public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {
-                    message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
+                    message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId());
                     promise.set(OMSUtil.sendResultConvert(rmqResult));
                 }
 
@@ -121,4 +127,19 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
     public void sendOneway(final Message message, final KeyValue properties) {
         sendOneway(message);
     }
+
+    @Override
+    public BatchMessageSender createBatchMessageSender() {
+        return null;
+    }
+
+    @Override
+    public void addInterceptor(ProducerInterceptor interceptor) {
+
+    }
+
+    @Override
+    public void removeInterceptor(ProducerInterceptor interceptor) {
+
+    }
 }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
deleted file mode 100644
index 05225cc..0000000
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.openmessaging.rocketmq.producer;
-
-import io.openmessaging.BytesMessage;
-import io.openmessaging.KeyValue;
-import io.openmessaging.Message;
-import io.openmessaging.MessageHeader;
-import io.openmessaging.SequenceProducer;
-import io.openmessaging.rocketmq.utils.OMSUtil;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.rocketmq.client.Validators;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.SendResult;
-
-public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer {
-
-    private BlockingQueue<Message> msgCacheQueue;
-
-    public SequenceProducerImpl(final KeyValue properties) {
-        super(properties);
-        this.msgCacheQueue = new LinkedBlockingQueue<>();
-    }
-
-    @Override
-    public KeyValue properties() {
-        return properties;
-    }
-
-    @Override
-    public void send(final Message message) {
-        checkMessageType(message);
-        org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message);
-        try {
-            Validators.checkMessage(rmqMessage, this.rocketmqProducer);
-        } catch (MQClientException e) {
-            throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
-        }
-        msgCacheQueue.add(message);
-    }
-
-    @Override
-    public void send(final Message message, final KeyValue properties) {
-        send(message);
-    }
-
-    @Override
-    public synchronized void commit() {
-        List<Message> messages = new ArrayList<>();
-        msgCacheQueue.drainTo(messages);
-
-        List<org.apache.rocketmq.common.message.Message> rmqMessages = new ArrayList<>();
-
-        for (Message message : messages) {
-            rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message));
-        }
-
-        if (rmqMessages.size() == 0) {
-            return;
-        }
-
-        try {
-            SendResult sendResult = this.rocketmqProducer.send(rmqMessages);
-            String[] msgIdArray = sendResult.getMsgId().split(",");
-            for (int i = 0; i < messages.size(); i++) {
-                Message message = messages.get(i);
-                message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]);
-            }
-        } catch (Exception e) {
-            throw checkProducerException("", "", e);
-        }
-    }
-
-    @Override
-    public synchronized void rollback() {
-        msgCacheQueue.clear();
-    }
-}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
index 453b665..c1b5999 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
@@ -17,7 +17,7 @@
 package io.openmessaging.rocketmq.promise;
 
 import io.openmessaging.Promise;
-import io.openmessaging.PromiseListener;
+import io.openmessaging.FutureListener;
 import io.openmessaging.exception.OMSRuntimeException;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -33,7 +33,7 @@ public class DefaultPromise<V> implements Promise<V> {
     private long timeout;
     private long createTime;
     private Throwable exception = null;
-    private List<PromiseListener<V>> promiseListenerList;
+    private List<FutureListener<V>> promiseListenerList;
 
     public DefaultPromise() {
         createTime = System.currentTimeMillis();
@@ -121,7 +121,7 @@ public class DefaultPromise<V> implements Promise<V> {
     }
 
     @Override
-    public void addListener(final PromiseListener<V> listener) {
+    public void addListener(final FutureListener<V> listener) {
         if (listener == null) {
             throw new NullPointerException("FutureListener is null");
         }
@@ -150,7 +150,7 @@ public class DefaultPromise<V> implements Promise<V> {
 
     private void notifyListeners() {
         if (promiseListenerList != null) {
-            for (PromiseListener<V> listener : promiseListenerList) {
+            for (FutureListener<V> listener : promiseListenerList) {
                 notifyListener(listener);
             }
         }
@@ -199,12 +199,9 @@ public class DefaultPromise<V> implements Promise<V> {
         return true;
     }
 
-    private void notifyListener(final PromiseListener<V> listener) {
+    private void notifyListener(final FutureListener<V> listener) {
         try {
-            if (exception != null)
-                listener.operationFailed(this);
-            else
-                listener.operationCompleted(this);
+            listener.operationComplete(this);
         } catch (Throwable t) {
             LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t);
         }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
index ba7cd59..ef9236f 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
@@ -164,7 +164,7 @@ public final class BeanUtils {
 
             final Set<String> keySet = properties.keySet();
             for (String key : keySet) {
-                String[] keyGroup = key.split("\\.");
+                String[] keyGroup = key.split("[\\._]");
                 for (int i = 0; i < keyGroup.length; i++) {
                     keyGroup[i] = keyGroup[i].toLowerCase();
                     keyGroup[i] = StringUtils.capitalize(keyGroup[i]);
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
index 60c8408..2302141 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
@@ -18,11 +18,11 @@ package io.openmessaging.rocketmq.utils;
 
 import io.openmessaging.BytesMessage;
 import io.openmessaging.KeyValue;
-import io.openmessaging.MessageHeader;
+import io.openmessaging.Message.BuiltinKeys;
 import io.openmessaging.OMS;
-import io.openmessaging.SendResult;
+import io.openmessaging.producer.SendResult;
 import io.openmessaging.rocketmq.domain.BytesMessageImpl;
-import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import io.openmessaging.rocketmq.domain.RocketMQConstants;
 import io.openmessaging.rocketmq.domain.SendResultImpl;
 import java.lang.reflect.Field;
 import java.util.Iterator;
@@ -48,25 +48,26 @@ public class OMSUtil {
         org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
         rmqMessage.setBody(omsMessage.getBody());
 
-        KeyValue headers = omsMessage.headers();
-        KeyValue properties = omsMessage.properties();
+        KeyValue sysHeaders = omsMessage.sysHeaders();
+        KeyValue userHeaders = omsMessage.userHeaders();
 
         //All destinations in RocketMQ use Topic
-        if (headers.containsKey(MessageHeader.TOPIC)) {
-            rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC));
-            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
-        } else {
-            rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE));
-            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE");
+        rmqMessage.setTopic(sysHeaders.getString(BuiltinKeys.DESTINATION));
+
+        if (sysHeaders.containsKey(BuiltinKeys.START_TIME)) {
+            long deliverTime = sysHeaders.getLong(BuiltinKeys.START_TIME, 0);
+            if (deliverTime > 0) {
+                rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime));
+            }
         }
 
-        for (String key : properties.keySet()) {
-            MessageAccessor.putProperty(rmqMessage, key, properties.getString(key));
+        for (String key : userHeaders.keySet()) {
+            MessageAccessor.putProperty(rmqMessage, key, userHeaders.getString(key));
         }
 
-        //Headers has a high priority
-        for (String key : headers.keySet()) {
-            MessageAccessor.putProperty(rmqMessage, key, headers.getString(key));
+        //System headers has a high priority
+        for (String key : sysHeaders.keySet()) {
+            MessageAccessor.putProperty(rmqMessage, key, sysHeaders.getString(key));
         }
 
         return rmqMessage;
@@ -76,8 +77,8 @@ public class OMSUtil {
         BytesMessage omsMsg = new BytesMessageImpl();
         omsMsg.setBody(rmqMsg.getBody());
 
-        KeyValue headers = omsMsg.headers();
-        KeyValue properties = omsMsg.properties();
+        KeyValue headers = omsMsg.sysHeaders();
+        KeyValue properties = omsMsg.userHeaders();
 
         final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet();
 
@@ -89,25 +90,22 @@ public class OMSUtil {
             }
         }
 
-        omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId());
-        if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) ||
-            rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) {
-            omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic());
-        } else {
-            omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic());
-        }
-        omsMsg.putHeaders(MessageHeader.SEARCH_KEY, rmqMsg.getKeys());
-        omsMsg.putHeaders(MessageHeader.BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
-        omsMsg.putHeaders(MessageHeader.BORN_TIMESTAMP, rmqMsg.getBornTimestamp());
-        omsMsg.putHeaders(MessageHeader.STORE_HOST, String.valueOf(rmqMsg.getStoreHost()));
-        omsMsg.putHeaders(MessageHeader.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp());
+        omsMsg.putSysHeaders(BuiltinKeys.MESSAGE_ID, rmqMsg.getMsgId());
+
+        omsMsg.putSysHeaders(BuiltinKeys.DESTINATION, rmqMsg.getTopic());
+
+        omsMsg.putSysHeaders(BuiltinKeys.SEARCH_KEYS, rmqMsg.getKeys());
+        omsMsg.putSysHeaders(BuiltinKeys.BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
+        omsMsg.putSysHeaders(BuiltinKeys.BORN_TIMESTAMP, rmqMsg.getBornTimestamp());
+        omsMsg.putSysHeaders(BuiltinKeys.STORE_HOST, String.valueOf(rmqMsg.getStoreHost()));
+        omsMsg.putSysHeaders(BuiltinKeys.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp());
         return omsMsg;
     }
 
     public static boolean isOMSHeader(String value) {
-        for (Field field : MessageHeader.class.getDeclaredFields()) {
+        for (Field field : BuiltinKeys.class.getDeclaredFields()) {
             try {
-                if (field.get(MessageHeader.class).equals(value)) {
+                if (field.get(BuiltinKeys.class).equals(value)) {
                     return true;
                 }
             } catch (IllegalAccessException e) {
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
index 843ddb7..da2e8a0 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
@@ -18,12 +18,10 @@ package io.openmessaging.rocketmq.consumer;
 
 import io.openmessaging.BytesMessage;
 import io.openmessaging.Message;
-import io.openmessaging.MessageHeader;
 import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
 import io.openmessaging.OMS;
-import io.openmessaging.PropertyKeys;
-import io.openmessaging.PullConsumer;
+import io.openmessaging.OMSBuiltinKeys;
+import io.openmessaging.consumer.PullConsumer;
 import io.openmessaging.rocketmq.config.ClientConfig;
 import io.openmessaging.rocketmq.domain.NonStandardKeys;
 import java.lang.reflect.Field;
@@ -50,18 +48,18 @@ public class PullConsumerImplTest {
 
     @Before
     public void init() throws NoSuchFieldException, IllegalAccessException {
-        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
-            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+        final MessagingAccessPoint messagingAccessPoint = OMS
+            .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
 
-        consumer = messagingAccessPoint.createPullConsumer(queueName,
-            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
+        consumer = messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup"));
+        consumer.attachQueue(queueName);
 
         Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
         field.setAccessible(true);
         field.set(consumer, rocketmqPullConsumer); //Replace
 
         ClientConfig clientConfig = new ClientConfig();
-        clientConfig.setOmsOperationTimeout(200);
+        clientConfig.setOperationTimeout(200);
         localMessageCache = spy(new LocalMessageCache(rocketmqPullConsumer, clientConfig));
 
         field = PullConsumerImpl.class.getDeclaredField("localMessageCache");
@@ -83,18 +81,18 @@ public class PullConsumerImplTest {
 
         when(localMessageCache.poll()).thenReturn(consumedMsg);
 
-        Message message = consumer.poll();
-        assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
+        Message message = consumer.receive();
+        assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
         assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
     }
 
     @Test
     public void testPoll_WithTimeout() {
         //There is a default timeout value, @see ClientConfig#omsOperationTimeout.
-        Message message = consumer.poll();
+        Message message = consumer.receive();
         assertThat(message).isNull();
 
-        message = consumer.poll(OMS.newKeyValue().put(PropertyKeys.OPERATION_TIMEOUT, 100));
+        message = consumer.receive(OMS.newKeyValue().put(Message.BuiltinKeys.TIMEOUT, 100));
         assertThat(message).isNull();
     }
 }
\ No newline at end of file
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
index 882e57e..b55816b 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
@@ -18,13 +18,11 @@ package io.openmessaging.rocketmq.consumer;
 
 import io.openmessaging.BytesMessage;
 import io.openmessaging.Message;
-import io.openmessaging.MessageHeader;
-import io.openmessaging.MessageListener;
+import io.openmessaging.OMSBuiltinKeys;
+import io.openmessaging.consumer.MessageListener;
 import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
 import io.openmessaging.OMS;
-import io.openmessaging.PushConsumer;
-import io.openmessaging.ReceivedMessageContext;
+import io.openmessaging.consumer.PushConsumer;
 import io.openmessaging.rocketmq.domain.NonStandardKeys;
 import java.lang.reflect.Field;
 import java.util.Collections;
@@ -49,10 +47,10 @@ public class PushConsumerImplTest {
 
     @Before
     public void init() throws NoSuchFieldException, IllegalAccessException {
-        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
-            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+        final MessagingAccessPoint messagingAccessPoint = OMS
+            .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
         consumer = messagingAccessPoint.createPushConsumer(
-            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
+            OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup"));
 
         Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer");
         field.setAccessible(true);
@@ -75,8 +73,8 @@ public class PushConsumerImplTest {
         consumedMsg.setTopic("HELLO_QUEUE");
         consumer.attachQueue("HELLO_QUEUE", new MessageListener() {
             @Override
-            public void onMessage(final Message message, final ReceivedMessageContext context) {
-                assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
+            public void onReceived(Message message, Context context) {
+                assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
                 assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
                 context.ack();
             }
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
index 1db80c3..fc6515e 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
@@ -17,9 +17,9 @@
 package io.openmessaging.rocketmq.producer;
 
 import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
-import io.openmessaging.Producer;
+import io.openmessaging.OMS;
 import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.producer.Producer;
 import java.lang.reflect.Field;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -49,8 +49,8 @@ public class ProducerImplTest {
 
     @Before
     public void init() throws NoSuchFieldException, IllegalAccessException {
-        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
-            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+        final MessagingAccessPoint messagingAccessPoint = OMS
+            .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
         producer = messagingAccessPoint.createProducer();
 
         Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
@@ -67,8 +67,8 @@ public class ProducerImplTest {
         sendResult.setMsgId("TestMsgID");
         sendResult.setSendStatus(SendStatus.SEND_OK);
         when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
-        io.openmessaging.SendResult omsResult =
-            producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
+        io.openmessaging.producer.SendResult omsResult =
+            producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
 
         assertThat(omsResult.messageId()).isEqualTo("TestMsgID");
     }
@@ -80,7 +80,7 @@ public class ProducerImplTest {
 
         when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
         try {
-            producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
+            producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
             failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
         } catch (Exception e) {
             assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
@@ -91,7 +91,7 @@ public class ProducerImplTest {
     public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
         when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class);
         try {
-            producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
+            producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
             failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
         } catch (Exception e) {
             assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
deleted file mode 100644
index 823fe01..0000000
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.openmessaging.rocketmq.producer;
-
-import io.openmessaging.BytesMessage;
-import io.openmessaging.MessageHeader;
-import io.openmessaging.MessagingAccessPoint;
-import io.openmessaging.MessagingAccessPointFactory;
-import io.openmessaging.SequenceProducer;
-import java.lang.reflect.Field;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public class SequenceProducerImplTest {
-
-    private SequenceProducer producer;
-
-    @Mock
-    private DefaultMQProducer rocketmqProducer;
-
-    @Before
-    public void init() throws NoSuchFieldException, IllegalAccessException {
-        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
-            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
-        producer = messagingAccessPoint.createSequenceProducer();
-
-        Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
-        field.setAccessible(true);
-        field.set(producer, rocketmqProducer);
-
-        messagingAccessPoint.startup();
-        producer.startup();
-    }
-
-    @Test
-    public void testSend_WithCommit() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
-        SendResult sendResult = new SendResult();
-        sendResult.setMsgId("TestMsgID");
-        sendResult.setSendStatus(SendStatus.SEND_OK);
-        when(rocketmqProducer.send(ArgumentMatchers.<Message>anyList())).thenReturn(sendResult);
-        when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
-        final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
-        producer.send(message);
-        producer.commit();
-        assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("TestMsgID");
-    }
-
-    @Test
-    public void testRollback() {
-        when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
-        final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
-        producer.send(message);
-        producer.rollback();
-        producer.commit(); //Commit nothing.
-        assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo(null);
-    }
-}
\ No newline at end of file
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
index 2240ff2..f226ede 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
@@ -16,8 +16,9 @@
  */
 package io.openmessaging.rocketmq.promise;
 
+import io.openmessaging.Future;
+import io.openmessaging.FutureListener;
 import io.openmessaging.Promise;
-import io.openmessaging.PromiseListener;
 import io.openmessaging.exception.OMSRuntimeException;
 import org.junit.Before;
 import org.junit.Test;
@@ -63,14 +64,10 @@ public class DefaultPromiseTest {
 
     @Test
     public void testAddListener() throws Exception {
-        promise.addListener(new PromiseListener<String>() {
+        promise.addListener(new FutureListener<String>() {
             @Override
-            public void operationCompleted(final Promise<String> promise) {
+            public void operationComplete(Future<String> future) {
                 assertThat(promise.get()).isEqualTo("Done");
-            }
-
-            @Override
-            public void operationFailed(final Promise<String> promise) {
 
             }
         });
@@ -80,15 +77,10 @@ public class DefaultPromiseTest {
     @Test
     public void testAddListener_ListenerAfterSet() throws Exception {
         promise.set("Done");
-        promise.addListener(new PromiseListener<String>() {
-            @Override
-            public void operationCompleted(final Promise<String> promise) {
-                assertThat(promise.get()).isEqualTo("Done");
-            }
-
+        promise.addListener(new FutureListener<String>() {
             @Override
-            public void operationFailed(final Promise<String> promise) {
-
+            public void operationComplete(Future<String> future) {
+                assertThat(future.get()).isEqualTo("Done");
             }
         });
     }
@@ -97,13 +89,9 @@ public class DefaultPromiseTest {
     public void testAddListener_WithException_ListenerAfterSet() throws Exception {
         final Throwable exception = new OMSRuntimeException("-1", "Test Error");
         promise.setFailure(exception);
-        promise.addListener(new PromiseListener<String>() {
-            @Override
-            public void operationCompleted(final Promise<String> promise) {
-            }
-
+        promise.addListener(new FutureListener<String>() {
             @Override
-            public void operationFailed(final Promise<String> promise) {
+            public void operationComplete(Future<String> future) {
                 assertThat(promise.getThrowable()).isEqualTo(exception);
             }
         });
@@ -112,13 +100,9 @@ public class DefaultPromiseTest {
     @Test
     public void testAddListener_WithException() throws Exception {
         final Throwable exception = new OMSRuntimeException("-1", "Test Error");
-        promise.addListener(new PromiseListener<String>() {
-            @Override
-            public void operationCompleted(final Promise<String> promise) {
-            }
-
+        promise.addListener(new FutureListener<String>() {
             @Override
-            public void operationFailed(final Promise<String> promise) {
+            public void operationComplete(Future<String> future) {
                 assertThat(promise.getThrowable()).isEqualTo(exception);
             }
         });
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
index 71ca11c..1a431d9 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
@@ -92,9 +92,9 @@ public class BeanUtilsTest {
     @Test
     public void testPopulate_ExistObj() {
         CustomizedConfig config = new CustomizedConfig();
-        config.setOmsConsumerId("NewConsumerId");
+        config.setConsumerId("NewConsumerId");
 
-        Assert.assertEquals(config.getOmsConsumerId(), "NewConsumerId");
+        Assert.assertEquals(config.getConsumerId(), "NewConsumerId");
 
         config = BeanUtils.populate(properties, config);
 
diff --git a/pom.xml b/pom.xml
index 6737ae4..f4184a5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -592,7 +592,7 @@
             <dependency>
                 <groupId>io.openmessaging</groupId>
                 <artifactId>openmessaging-api</artifactId>
-                <version>0.1.0-alpha</version>
+                <version>0.3.0-alpha-SNAPSHOT</version>
             </dependency>
             <dependency>
                 <groupId>log4j</groupId>

-- 
To stop receiving notification emails like this one, please contact
lizhanhui@apache.org.

Mime
View raw message