activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5186 - remove amqp producers
Date Thu, 15 May 2014 13:13:12 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 0c0fadcdc -> ff64b14bc


https://issues.apache.org/jira/browse/AMQ-5186 - remove amqp producers


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ff64b14b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ff64b14b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ff64b14b

Branch: refs/heads/trunk
Commit: ff64b14bc78466df96d16b1d04e862a7ddef3204
Parents: 0c0fadc
Author: Dejan Bosanac <dejan@nighttale.net>
Authored: Thu May 15 15:13:01 2014 +0200
Committer: Dejan Bosanac <dejan@nighttale.net>
Committed: Thu May 15 15:13:01 2014 +0200

----------------------------------------------------------------------
 .../transport/amqp/AmqpProtocolConverter.java   | 114 ++++++++++---------
 1 file changed, 62 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ff64b14b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 02621fc..b8a27c4 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -553,6 +553,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         private final ProducerId producerId;
         private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
         private final ActiveMQDestination destination;
+        private boolean closed;
 
         public ProducerContext(ProducerId producerId, ActiveMQDestination destination) {
             this.producerId = producerId;
@@ -561,70 +562,79 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
         @Override
         protected void onMessage(final Receiver receiver, final Delivery delivery, Buffer
buffer) throws Exception {
-            EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data,
buffer.offset, buffer.length);
-            final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
-            current = null;
+            if (!closed) {
+                EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data,
buffer.offset, buffer.length);
+                final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
+                current = null;
 
-            if (destination != null) {
-                message.setJMSDestination(destination);
-            }
-            message.setProducerId(producerId);
+                if (destination != null) {
+                    message.setJMSDestination(destination);
+                }
+                message.setProducerId(producerId);
 
-            // Always override the AMQP client's MessageId with our own.  Preserve the
-            // original in the TextView property for later Ack.
-            MessageId messageId = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
+                // Always override the AMQP client's MessageId with our own.  Preserve the
+                // original in the TextView property for later Ack.
+                MessageId messageId = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
 
-            MessageId amqpMessageId = message.getMessageId();
-            if (amqpMessageId != null) {
-                if (amqpMessageId.getTextView() != null) {
-                    messageId.setTextView(amqpMessageId.getTextView());
-                } else {
-                    messageId.setTextView(amqpMessageId.toString());
+                MessageId amqpMessageId = message.getMessageId();
+                if (amqpMessageId != null) {
+                    if (amqpMessageId.getTextView() != null) {
+                        messageId.setTextView(amqpMessageId.getTextView());
+                    } else {
+                        messageId.setTextView(amqpMessageId.toString());
+                    }
                 }
-            }
 
-            message.setMessageId(messageId);
+                message.setMessageId(messageId);
 
-            LOG.trace("Inbound Message:{} from Producer:{}", message.getMessageId(), producerId
+ ":" + messageId.getProducerSequenceId());
+                LOG.trace("Inbound Message:{} from Producer:{}", message.getMessageId(),
producerId + ":" + messageId.getProducerSequenceId());
 
-            DeliveryState remoteState = delivery.getRemoteState();
-            if (remoteState != null && remoteState instanceof TransactionalState)
{
-                TransactionalState s = (TransactionalState) remoteState;
-                long txid = toLong(s.getTxnId());
-                message.setTransactionId(new LocalTransactionId(connectionId, txid));
-            }
+                DeliveryState remoteState = delivery.getRemoteState();
+                if (remoteState != null && remoteState instanceof TransactionalState)
{
+                    TransactionalState s = (TransactionalState) remoteState;
+                    long txid = toLong(s.getTxnId());
+                    message.setTransactionId(new LocalTransactionId(connectionId, txid));
+                }
 
-            // Lets handle the case where the expiration was set, but the timestamp
-            // was not set by the client. Lets assign the timestamp now, and adjust the
-            // expiration.
-            if (message.getExpiration() != 0) {
-                if (message.getTimestamp() == 0) {
-                    message.setTimestamp(System.currentTimeMillis());
-                    message.setExpiration(message.getTimestamp() + message.getExpiration());
+                // Lets handle the case where the expiration was set, but the timestamp
+                // was not set by the client. Lets assign the timestamp now, and adjust the
+                // expiration.
+                if (message.getExpiration() != 0) {
+                    if (message.getTimestamp() == 0) {
+                        message.setTimestamp(System.currentTimeMillis());
+                        message.setExpiration(message.getTimestamp() + message.getExpiration());
+                    }
                 }
-            }
 
-            message.onSend();
-            sendToActiveMQ(message, new ResponseHandler() {
-                @Override
-                public void onResponse(IAmqpProtocolConverter converter, Response response)
throws IOException {
-                    if (!delivery.remotelySettled()) {
-                        if (response.isException()) {
-                            ExceptionResponse er = (ExceptionResponse) response;
-                            Rejected rejected = new Rejected();
-                            ErrorCondition condition = new ErrorCondition();
-                            condition.setCondition(Symbol.valueOf("failed"));
-                            condition.setDescription(er.getException().getMessage());
-                            rejected.setError(condition);
-                            delivery.disposition(rejected);
+                message.onSend();
+                sendToActiveMQ(message, new ResponseHandler() {
+                    @Override
+                    public void onResponse(IAmqpProtocolConverter converter, Response response)
throws IOException {
+                        if (!delivery.remotelySettled()) {
+                            if (response.isException()) {
+                                ExceptionResponse er = (ExceptionResponse) response;
+                                Rejected rejected = new Rejected();
+                                ErrorCondition condition = new ErrorCondition();
+                                condition.setCondition(Symbol.valueOf("failed"));
+                                condition.setDescription(er.getException().getMessage());
+                                rejected.setError(condition);
+                                delivery.disposition(rejected);
+                            }
                         }
+                        receiver.flow(1);
+                        delivery.disposition(Accepted.getInstance());
+                        delivery.settle();
+                        pumpProtonToSocket();
                     }
-                    receiver.flow(1);
-                    delivery.disposition(Accepted.getInstance());
-                    delivery.settle();
-                    pumpProtonToSocket();
-                }
-            });
+                });
+            }
+        }
+
+        @Override
+        public void onClose() throws Exception {
+            if (!closed) {
+                sendToActiveMQ(new RemoveInfo(producerId), null);
+            }
         }
     }
 


Mime
View raw message