rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [2/6] incubator-rocketmq git commit: Add Producer related implementation for OpenMessaging.
Date Wed, 19 Apr 2017 09:50:03 GMT
Add Producer related implementation for OpenMessaging.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/ce146934
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/ce146934
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/ce146934

Branch: refs/heads/openmessaging-impl
Commit: ce14693419a21279f2f71c89be7d543215e66728
Parents: c60ac52
Author: yukon <yukon@apache.org>
Authored: Tue Apr 11 11:11:29 2017 +0800
Committer: yukon <yukon@apache.org>
Committed: Thu Apr 13 16:58:17 2017 +0800

----------------------------------------------------------------------
 example/pom.xml                                 |   9 +
 .../example/openmessaging/SimpleProducer.java   |  51 +++++
 openmessaging/pom.xml                           |  13 +-
 .../rocketmq/MessagingAccessPointImpl.java      |  23 +-
 .../java/io/openmessaging/rocketmq/OMSUtil.java |  80 +++++++
 .../rocketmq/domain/BytesMessageImpl.java       | 102 +++++++++
 .../rocketmq/domain/NonStandardKeys.java        |  20 ++
 .../rocketmq/domain/SendResultImpl.java         |  40 ++++
 .../rocketmq/producer/AbstractOMSProducer.java  | 138 +++++++++++
 .../rocketmq/producer/ProducerImpl.java         | 124 ++++++++++
 .../rocketmq/producer/SequenceProducerImpl.java |  91 ++++++++
 .../rocketmq/promise/DefaultPromise.java        | 227 +++++++++++++++++++
 .../rocketmq/promise/FutureState.java           |  45 ++++
 pom.xml                                         |   4 +-
 14 files changed, 959 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ce146934/example/pom.xml
----------------------------------------------------------------------
diff --git a/example/pom.xml b/example/pom.xml
index 785a4ca..840fa36 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -48,5 +48,14 @@
             <groupId>org.javassist</groupId>
             <artifactId>javassist</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-openmessaging</artifactId>
+            <version>4.1.0-incubating-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ce146934/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
index 3b71849..5a27f5a 100644
--- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
@@ -16,8 +16,59 @@
  */
 package org.apache.rocketmq.example.openmessaging;
 
+import io.openmessaging.Message;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.Producer;
+import io.openmessaging.Promise;
+import io.openmessaging.PromiseListener;
+import io.openmessaging.SendResult;
+import java.nio.charset.Charset;
+
 public class SimpleProducer {
     public static void main(String[] args) {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace");
+
+        final Producer producer = messagingAccessPoint.createProducer();
+
+        messagingAccessPoint.startup();
+        System.out.println("messagingAccessPoint startup OK");
+
+        producer.startup();
+        System.out.println("producer startup OK");
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                producer.shutdown();
+                messagingAccessPoint.shutdown();
+            }
+        }));
+
+        {
+            Message message = producer.createBytesMessageToTopic("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
+            SendResult sendResult = producer.send(message);
+            //final Void aVoid = result.get(3000L);
+            System.out.println("send async message OK, msgId: " + sendResult.messageId());
+        }
+
+        {
+            final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("HELLO_TOPIC",
"HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+            result.addListener(new PromiseListener<SendResult>() {
+                @Override public void operationCompleted(Promise<SendResult> promise)
{
+                    System.out.println("Send async message OK, msgId: " + promise.get().messageId());
+                }
+
+                @Override public void operationFailed(Promise<SendResult> promise)
{
+                    System.out.println("send async message Failed, error: " + promise.getThrowable().getMessage());
+                }
+            });
+        }
 
+        {
+            producer.sendOneway(producer.createBytesMessageToTopic("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+            System.out.println("Send oneway message OK");
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ce146934/openmessaging/pom.xml
----------------------------------------------------------------------
diff --git a/openmessaging/pom.xml b/openmessaging/pom.xml
index d568650..d649812 100644
--- a/openmessaging/pom.xml
+++ b/openmessaging/pom.xml
@@ -27,11 +27,22 @@
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>rocketmq-openmessaging</artifactId>
+    <name>rocketmq-openmessaging ${project.version}</name>
 
     <dependencies>
         <dependency>
             <groupId>io.openmessaging</groupId>
-            <artifactId>messaging-user-level-api</artifactId>
+            <artifactId>openmessaging-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>javax.jms</groupId>
+            <artifactId>javax.jms-api</artifactId>
+            <version>2.0.1</version>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ce146934/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
index 2f75686..c30b7d4 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
@@ -26,26 +26,39 @@ import io.openmessaging.ResourceManager;
 import io.openmessaging.SequenceProducer;
 import io.openmessaging.ServiceEndPoint;
 import io.openmessaging.observer.Observer;
+import io.openmessaging.rocketmq.producer.ProducerImpl;
+import io.openmessaging.rocketmq.producer.SequenceProducerImpl;
 
 public class MessagingAccessPointImpl implements MessagingAccessPoint {
+    private final KeyValue accessPointProperties;
+
+    public MessagingAccessPointImpl(final KeyValue accessPointProperties) {
+        this.accessPointProperties = accessPointProperties;
+    }
+
+    @Override
+    public KeyValue properties() {
+        return accessPointProperties;
+    }
+
     @Override
     public Producer createProducer() {
-        return null;
+        return new ProducerImpl(this.accessPointProperties);
     }
 
     @Override
     public Producer createProducer(KeyValue properties) {
-        return null;
+        return new ProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
     }
 
     @Override
     public SequenceProducer createSequenceProducer() {
-        return null;
+        return new SequenceProducerImpl(this.accessPointProperties);
     }
 
     @Override
     public SequenceProducer createSequenceProducer(KeyValue properties) {
-        return null;
+        return new SequenceProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties,
properties));
     }
 
     @Override
@@ -79,7 +92,7 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
     }
 
     @Override
-    public ResourceManager createResourceManager() {
+    public ResourceManager getResourceManager() {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ce146934/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
new file mode 100644
index 0000000..061ee6b
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.OMS;
+import io.openmessaging.SendResult;
+import io.openmessaging.rocketmq.domain.SendResultImpl;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageAccessor;
+
+public class OMSUtil {
+
+    /**
+     * Builds a OMS client instance name.
+     *
+     * @return a unique instance name
+     */
+    public static String buildInstanceName() {
+        return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime();
+    }
+
+    public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage)
{
+        org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
+        rmqMessage.setBody(omsMessage.getBody());
+
+        KeyValue headers = omsMessage.headers();
+        KeyValue properties = omsMessage.properties();
+
+        //All destinations in RocketMQ use Topic
+        rmqMessage.setTopic(headers.containsKey(MessageHeader.TOPIC)
+            ? headers.getString(MessageHeader.TOPIC) : headers.getString(MessageHeader.QUEUE));
+
+        for (String key : properties.keySet()) {
+            MessageAccessor.putProperty(rmqMessage, key, properties.getString(key));
+        }
+
+        //Headers has a high priority
+        for (String key : headers.keySet()) {
+            MessageAccessor.putProperty(rmqMessage, key, headers.getString(key));
+        }
+
+        return rmqMessage;
+    }
+
+    /**
+     * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult.
+     */
+    public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult
rmqResult) {
+        assert rmqResult.getSendStatus().equals(SendStatus.SEND_OK);
+        return new SendResultImpl(rmqResult.getMsgId(), OMS.newKeyValue());
+    }
+
+    public static KeyValue buildKeyValue(KeyValue ... keyValues) {
+        KeyValue keyValue = OMS.newKeyValue();
+        for (KeyValue properties : keyValues) {
+            for (String key : properties.keySet()) {
+                keyValue.put(key, properties.getString(key));
+            }
+        }
+        return keyValue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ce146934/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
new file mode 100644
index 0000000..8140fe2
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.Message;
+import io.openmessaging.OMS;
+
+public class BytesMessageImpl implements BytesMessage {
+    private KeyValue headers;
+    private KeyValue properties;
+    private byte[] body;
+
+    public BytesMessageImpl() {
+        this.headers = OMS.newKeyValue();
+        this.properties = OMS.newKeyValue();
+    }
+
+    @Override
+    public byte[] getBody() {
+        return body;
+    }
+
+    @Override
+    public BytesMessage setBody(final byte[] body) {
+        this.body = body;
+        return this;
+    }
+
+    @Override
+    public KeyValue headers() {
+        return headers;
+    }
+
+    @Override
+    public KeyValue properties() {
+        return properties;
+    }
+
+    @Override
+    public Message putHeaders(final String key, final int value) {
+        headers.put(key, value);
+        return this;
+    }
+
+    @Override
+    public Message putHeaders(final String key, final long value) {
+        headers.put(key, value);
+        return this;
+    }
+
+    @Override
+    public Message putHeaders(final String key, final double value) {
+        headers.put(key, value);
+        return this;
+    }
+
+    @Override
+    public Message putHeaders(final String key, final String value) {
+        headers.put(key, value);
+        return this;
+    }
+
+    @Override
+    public Message putProperties(final String key, final int value) {
+        properties.put(key, value);
+        return this;
+    }
+
+    @Override
+    public Message putProperties(final String key, final long value) {
+        properties.put(key, value);
+        return this;
+    }
+
+    @Override
+    public Message putProperties(final String key, final double value) {
+        properties.put(key, value);
+        return this;
+    }
+
+    @Override
+    public Message putProperties(final String key, final String value) {
+        properties.put(key, value);
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ce146934/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
new file mode 100644
index 0000000..cf83cfd
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+public class NonStandardKeys {
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ce146934/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
new file mode 100644
index 0000000..228a9f0
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.SendResult;
+
+public class SendResultImpl implements SendResult {
+    private String messageId;
+    private KeyValue properties;
+
+    public SendResultImpl(final String messageId, final KeyValue properties) {
+        this.messageId = messageId;
+        this.properties = properties;
+    }
+
+    @Override
+    public String messageId() {
+        return messageId;
+    }
+
+    @Override
+    public KeyValue properties() {
+        return properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ce146934/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
new file mode 100644
index 0000000..9eb735d
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.Message;
+import io.openmessaging.MessageFactory;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.ServiceLifecycle;
+import io.openmessaging.exception.OMSMessageFormatException;
+import io.openmessaging.exception.OMSNotSupportedException;
+import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.exception.OMSTimeOutException;
+import io.openmessaging.rocketmq.domain.BytesMessageImpl;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.slf4j.Logger;
+
+import static io.openmessaging.rocketmq.OMSUtil.buildInstanceName;
+
+abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory{
+    final static Logger log = ClientLogger.getLog();
+    final KeyValue properties;
+    final DefaultMQProducer rocketmqProducer;
+    private boolean started = false;
+
+    AbstractOMSProducer(final KeyValue properties) {
+        this.properties = properties;
+        this.rocketmqProducer = new DefaultMQProducer();
+
+        String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS);
+
+        if (accessPoints == null || accessPoints.isEmpty()) {
+            throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
+        }
+        String producerId = buildInstanceName();
+
+        int operationTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
+
+        this.rocketmqProducer.setProducerGroup(producerId);
+        this.rocketmqProducer.setSendMsgTimeout(operationTimeout == 0 ? 5000 : operationTimeout);
+        this.rocketmqProducer.setInstanceName(producerId);
+        this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';'));
+        this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
+
+        properties.put(PropertyKeys.PRODUCER_ID, producerId);
+    }
+
+    @Override
+    public synchronized void startup() {
+        if (!started) {
+            try {
+                this.rocketmqProducer.start();
+            } catch (MQClientException e) {
+                throw new OMSRuntimeException("-1", e);
+            }
+        }
+        this.started = true;
+    }
+
+    @Override
+    public synchronized void shutdown() {
+        if (this.started) {
+            this.rocketmqProducer.shutdown();
+        }
+        this.started = false;
+    }
+
+    OMSRuntimeException checkProducerException(String topic, String msgId, Throwable e) {
+        if (e instanceof MQClientException) {
+            if (e.getCause() != null) {
+                if (e.getCause() instanceof RemotingTimeoutException) {
+                    return new OMSTimeOutException("-1", String.format("Send message to broker
timeout, %dms, Topic=%s, msgId=%s",
+                        this.rocketmqProducer.getSendMsgTimeout(), topic, msgId), e);
+                } else if (e.getCause() instanceof MQBrokerException || e.getCause() instanceof
RemotingConnectException) {
+                    MQBrokerException brokerException = (MQBrokerException) e.getCause();
+                    return new OMSRuntimeException("-1", String.format("Received a broker
exception, Topic=%s, msgId=%s, %s",
+                        topic, msgId, brokerException.getErrorMessage()), e);
+                }
+            }
+            // Exception thrown by local.
+            else {
+                MQClientException clientException = (MQClientException) e;
+                if (-1 == clientException.getResponseCode()) {
+                    return new OMSRuntimeException("-1", String.format("Topic does not exist,
Topic=%s, msgId=%s",
+                        topic, msgId), e);
+                } else if (ResponseCode.MESSAGE_ILLEGAL == clientException.getResponseCode())
{
+                    return new OMSMessageFormatException("-1", String.format("A illegal message
for RocketMQ, Topic=%s, msgId=%s",
+                        topic, msgId), e);
+                }
+            }
+        }
+        return new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.", e);
+    }
+
+    protected void checkMessageType(Message message) {
+        if (!(message instanceof BytesMessage)) {
+            throw new OMSNotSupportedException("-1", "Only BytesMessage is supported.");
+        }
+    }
+
+    @Override
+    public BytesMessage createBytesMessageToTopic(final String topic, final byte[] body)
{
+        BytesMessage bytesMessage = new BytesMessageImpl();
+        bytesMessage.setBody(body);
+        bytesMessage.headers().put(MessageHeader.TOPIC, topic);
+        return bytesMessage;
+    }
+
+    @Override
+    public BytesMessage createBytesMessageToQueue(final String queue, final byte[] body)
{
+        BytesMessage bytesMessage = new BytesMessageImpl();
+        bytesMessage.setBody(body);
+        bytesMessage.headers().put(MessageHeader.QUEUE, queue);
+        return bytesMessage;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ce146934/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
new file mode 100644
index 0000000..f5d2f25
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.Producer;
+import io.openmessaging.Promise;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.SendResult;
+import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.rocketmq.OMSUtil;
+import io.openmessaging.rocketmq.promise.DefaultPromise;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendStatus;
+
+import static io.openmessaging.rocketmq.OMSUtil.msgConvert;
+
+public class ProducerImpl extends AbstractOMSProducer implements Producer {
+
+    public ProducerImpl(final KeyValue properties) {
+        super(properties);
+    }
+
+    @Override
+    public KeyValue properties() {
+        return properties;
+    }
+
+    @Override
+    public SendResult send(final Message message) {
+        return send(message, this.rocketmqProducer.getSendMsgTimeout());
+    }
+
+    @Override
+    public SendResult send(final Message message, final KeyValue properties) {
+        long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
+            ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
+        return send(message, timeout);
+    }
+
+    private SendResult send(final Message message, long timeout) {
+        checkMessageType(message);
+        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage)
message);
+        try {
+            org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage,
timeout);
+            if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) {
+                log.error(String.format("Send message to RocketMQ failed, %s", message));
+                throw new OMSRuntimeException("-1", "Send message to RocketMQ failed.");
+            }
+            message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
+            return OMSUtil.sendResultConvert(rmqResult);
+        } catch (Exception e) {
+            log.error(String.format("Send message to RocketMQ failed, %s", message), e);
+            throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID),
e);
+        }
+    }
+
+    @Override
+    public Promise<SendResult> sendAsync(final Message message) {
+        return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout());
+    }
+
+    @Override
+    public Promise<SendResult> sendAsync(final Message message, final KeyValue properties)
{
+        long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
+            ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
+        return sendAsync(message, timeout);
+    }
+
+    private Promise<SendResult> sendAsync(final Message message, long timeout) {
+        checkMessageType(message);
+        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage)
message);
+        final Promise<SendResult> promise = new DefaultPromise<>();
+        try {
+            this.rocketmqProducer.send(rmqMessage, new SendCallback() {
+                @Override
+                public void onSuccess(final org.apache.rocketmq.client.producer.SendResult
rmqResult) {
+                    message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
+                    promise.set(OMSUtil.sendResultConvert(rmqResult));
+                }
+
+                @Override
+                public void onException(final Throwable e) {
+                    promise.setFailure(e);
+                }
+            }, timeout);
+        } catch (Exception e) {
+            promise.setFailure(e);
+        }
+        return promise;
+    }
+
+    @Override
+    public void sendOneway(final Message message) {
+        checkMessageType(message);
+        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage)
message);
+        try {
+            this.rocketmqProducer.sendOneway(rmqMessage);
+        } catch (Exception ignore) { //Ignore the oneway exception.
+        }
+    }
+
+    @Override
+    public void sendOneway(final Message message, final KeyValue properties) {
+        sendOneway(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ce146934/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
new file mode 100644
index 0000000..89ece2b
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.SequenceProducer;
+import io.openmessaging.rocketmq.OMSUtil;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.rocketmq.client.Validators;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.SendResult;
+
+public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer
{
+
+    private BlockingQueue<Message> msgCacheQueue;
+
+    public SequenceProducerImpl(final KeyValue properties) {
+        super(properties);
+        this.msgCacheQueue = new LinkedBlockingQueue<>();
+    }
+
+    @Override
+    public KeyValue properties() {
+        return properties;
+    }
+
+    @Override
+    public void send(final Message message) {
+        checkMessageType(message);
+        org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage)
message);
+        try {
+            Validators.checkMessage(rmqMessage, this.rocketmqProducer);
+        } catch (MQClientException e) {
+            throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID),
e);
+        }
+        msgCacheQueue.add(message);
+    }
+
+    @Override
+    public void send(final Message message, final KeyValue properties) {
+        send(message);
+    }
+
+    @Override
+    public synchronized void commit() {
+        List<Message> messages = new ArrayList<>();
+        msgCacheQueue.drainTo(messages);
+
+        List<org.apache.rocketmq.common.message.Message> rmqMessages = new ArrayList<>();
+
+        for (Message message : messages) {
+            rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message));
+        }
+
+        try {
+            SendResult sendResult = this.rocketmqProducer.send(rmqMessages);
+            String [] msgIdArray = sendResult.getMsgId().split(",");
+            for (int i = 0; i < messages.size(); i++) {
+                Message message = messages.get(i);
+                message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]);
+            }
+        } catch (Exception e) {
+            throw checkProducerException("", "", e);
+        }
+    }
+
+    @Override
+    public synchronized void rollback() {
+        msgCacheQueue.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ce146934/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
new file mode 100644
index 0000000..43f96ce
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.promise;
+
+import io.openmessaging.Promise;
+import io.openmessaging.PromiseListener;
+import io.openmessaging.exception.OMSRuntimeException;
+import java.util.ArrayList;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultPromise<V> implements Promise<V> {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class);
+    private final Object lock = new Object();
+    private volatile FutureState state = FutureState.DOING;
+    private V result = null;
+    private long timeout;
+    private long createTime;
+    private Throwable exception = null;
+    private List<PromiseListener> promiseListenerList;
+
+    public DefaultPromise() {
+        createTime = System.currentTimeMillis();
+        promiseListenerList = new ArrayList<>();
+        timeout = 5000;
+    }
+
+    @Override
+    public boolean cancel(final boolean mayInterruptIfRunning) {
+        return false;
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return state.isCancelledState();
+    }
+
+    @Override
+    public boolean isDone() {
+        return state.isDoneState();
+    }
+
+    @Override
+    public V get() {
+        return result;
+    }
+
+    @Override
+    public V get(final long timeout) {
+        synchronized (lock) {
+            if (!isDoing()) {
+                return getValueOrThrowable();
+            }
+
+            if (timeout <= 0) {
+                try {
+                    lock.wait();
+                } catch (Exception e) {
+                    cancel(e);
+                }
+                return getValueOrThrowable();
+            } else {
+                long waitTime = timeout - (System.currentTimeMillis() - createTime);
+                if (waitTime > 0) {
+                    for (; ; ) {
+                        try {
+                            lock.wait(waitTime);
+                        } catch (InterruptedException e) {
+                            LOG.error("promise get value interrupted,excepiton:{}", e.getMessage());
+                        }
+
+                        if (!isDoing()) {
+                            break;
+                        } else {
+                            waitTime = timeout - (System.currentTimeMillis() - createTime);
+                            if (waitTime <= 0) {
+                                break;
+                            }
+                        }
+                    }
+                }
+
+                if (isDoing()) {
+                    timeoutSoCancel();
+                }
+            }
+            return getValueOrThrowable();
+        }
+    }
+
+    @Override
+    public boolean set(final V value) {
+        if (value == null)
+            return false;
+        this.result = value;
+        return done();
+    }
+
+    @Override
+    public boolean setFailure(final Throwable cause) {
+        if (cause == null)
+            return false;
+        this.exception = cause;
+        return done();
+    }
+
+    @Override
+    public void addListener(final PromiseListener listener) {
+        if (listener == null) {
+            throw new NullPointerException("FutureListener is null");
+        }
+
+        boolean notifyNow = false;
+        synchronized (lock) {
+            if (!isDoing()) {
+                notifyNow = true;
+            } else {
+                if (promiseListenerList == null) {
+                    promiseListenerList = new ArrayList<>();
+                }
+                promiseListenerList.add(listener);
+            }
+        }
+
+        if (notifyNow) {
+            notifyListener(listener);
+        }
+    }
+
+    @Override
+    public Throwable getThrowable() {
+        return exception;
+    }
+
+    private void notifyListeners() {
+        if (promiseListenerList != null) {
+            for (PromiseListener listener : promiseListenerList) {
+                notifyListener(listener);
+            }
+        }
+    }
+
+    private boolean isSuccess() {
+        return isDone() && (exception == null);
+    }
+
+    private void timeoutSoCancel() {
+        synchronized (lock) {
+            if (!isDoing()) {
+                return;
+            }
+            state = FutureState.CANCELLED;
+            exception = new RuntimeException("get request result is timeout or interrupted");
+            lock.notifyAll();
+        }
+        notifyListeners();
+    }
+
+    private V getValueOrThrowable() {
+        if (exception != null) {
+            Throwable e = exception.getCause() != null ? exception.getCause() : exception;
+            throw new OMSRuntimeException("-1", e);
+        }
+        notifyListeners();
+        return result;
+    }
+
+    private boolean isDoing() {
+        return state.isDoingState();
+    }
+
+    private boolean done() {
+        synchronized (lock) {
+            if (!isDoing()) {
+                return false;
+            }
+
+            state = FutureState.DONE;
+            lock.notifyAll();
+        }
+
+        notifyListeners();
+        return true;
+    }
+
+    private void notifyListener(final PromiseListener<V> listener) {
+        try {
+            if (exception != null)
+                listener.operationFailed(this);
+            else
+                listener.operationCompleted(this);
+        } catch (Throwable t) {
+            LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(),
t);
+        }
+    }
+
+    private boolean cancel(Exception e) {
+        synchronized (lock) {
+            if (!isDoing()) {
+                return false;
+            }
+
+            state = FutureState.CANCELLED;
+            exception = e;
+            lock.notifyAll();
+        }
+
+        notifyListeners();
+        return true;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ce146934/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java
new file mode 100644
index 0000000..9e2f69c
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.rocketmq.promise;
+
+public enum FutureState {
+    /** the task is doing **/
+    DOING(0),
+    /** the task is done **/
+    DONE(1),
+    /** ths task is cancelled **/
+    CANCELLED(2);
+
+    public final int value;
+
+    private FutureState(int value) {
+        this.value = value;
+    }
+
+    public boolean isCancelledState() {
+        return this == CANCELLED;
+    }
+
+    public boolean isDoneState() {
+        return this == DONE;
+    }
+
+    public boolean isDoingState() {
+        return this == DOING;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ce146934/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cf6ec9b..865e9f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -606,8 +606,8 @@
             </dependency>
             <dependency>
                 <groupId>io.openmessaging</groupId>
-                <artifactId>messaging-user-level-api</artifactId>
-                <version>1.0.0-SNAPSHOT</version>
+                <artifactId>openmessaging-api</artifactId>
+                <version>0.1.0-beta</version>
             </dependency>
         </dependencies>
     </dependencyManagement>



Mime
View raw message