Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQDestination.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQDestination.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQDestination.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+
+import org.apache.activemq.jndi.JNDIBaseStorable;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+
+/**
+ * @openwire:marshaller
+ * @version $Revision: 1.10 $
+ */
+public abstract class ActiveMQDestination extends JNDIBaseStorable implements DataStructure, Destination, Externalizable, Comparable {
+
+ public static final String PATH_SEPERATOR = ".";
+ public static final char COMPOSITE_SEPERATOR = ',';
+
+ public static final byte QUEUE_TYPE = 0x01;
+ public static final byte TOPIC_TYPE = 0x02;
+ public static final byte TEMP_MASK = 0x04;
+ public static final byte TEMP_TOPIC_TYPE = TOPIC_TYPE | TEMP_MASK;
+ public static final byte TEMP_QUEUE_TYPE = QUEUE_TYPE | TEMP_MASK;
+
+ public static final String QUEUE_QUALIFIED_PREFIX = "queue://";
+ public static final String TOPIC_QUALIFIED_PREFIX = "topic://";
+ public static final String TEMP_QUEUE_QUALIFED_PREFIX = "temp-queue://";
+ public static final String TEMP_TOPIC_QUALIFED_PREFIX = "temp-topic://";
+
+ public static final String TEMP_DESTINATION_NAME_PREFIX = "ID:";
+
+ private static final long serialVersionUID = -3885260014960795889L;
+
+ protected String physicalName;
+
+ protected transient ActiveMQDestination[] compositeDestinations;
+ protected transient String[] destinationPaths;
+ protected transient boolean isPattern;
+ protected transient int hashValue;
+ protected Map<String, String> options;
+
+ public ActiveMQDestination() {
+ }
+
+ protected ActiveMQDestination(String name) {
+ setPhysicalName(name);
+ }
+
+ public ActiveMQDestination(ActiveMQDestination composites[]) {
+ setCompositeDestinations(composites);
+ }
+
+
+ // static helper methods for working with destinations
+ // -------------------------------------------------------------------------
+ public static ActiveMQDestination createDestination(String name, byte defaultType) {
+
+ if (name.startsWith(QUEUE_QUALIFIED_PREFIX)) {
+ return new ActiveMQQueue(name.substring(QUEUE_QUALIFIED_PREFIX.length()));
+ } else if (name.startsWith(TOPIC_QUALIFIED_PREFIX)) {
+ return new ActiveMQTopic(name.substring(TOPIC_QUALIFIED_PREFIX.length()));
+ } else if (name.startsWith(TEMP_QUEUE_QUALIFED_PREFIX)) {
+ return new ActiveMQTempQueue(name.substring(TEMP_QUEUE_QUALIFED_PREFIX.length()));
+ } else if (name.startsWith(TEMP_TOPIC_QUALIFED_PREFIX)) {
+ return new ActiveMQTempTopic(name.substring(TEMP_TOPIC_QUALIFED_PREFIX.length()));
+ }
+
+ switch (defaultType) {
+ case QUEUE_TYPE:
+ return new ActiveMQQueue(name);
+ case TOPIC_TYPE:
+ return new ActiveMQTopic(name);
+ case TEMP_QUEUE_TYPE:
+ return new ActiveMQTempQueue(name);
+ case TEMP_TOPIC_TYPE:
+ return new ActiveMQTempTopic(name);
+ default:
+ throw new IllegalArgumentException("Invalid default destination type: " + defaultType);
+ }
+ }
+
+ public static ActiveMQDestination transform(Destination dest) throws JMSException {
+ if (dest == null) {
+ return null;
+ }
+ if (dest instanceof ActiveMQDestination) {
+ return (ActiveMQDestination)dest;
+ }
+ if (dest instanceof TemporaryQueue) {
+ return new ActiveMQTempQueue(((TemporaryQueue)dest).getQueueName());
+ }
+ if (dest instanceof TemporaryTopic) {
+ return new ActiveMQTempTopic(((TemporaryTopic)dest).getTopicName());
+ }
+ if (dest instanceof Queue) {
+ return new ActiveMQQueue(((Queue)dest).getQueueName());
+ }
+ if (dest instanceof Topic) {
+ return new ActiveMQTopic(((Topic)dest).getTopicName());
+ }
+ throw new JMSException("Could not transform the destination into a ActiveMQ destination: " + dest);
+ }
+
+ public static int compare(ActiveMQDestination destination, ActiveMQDestination destination2) {
+ if (destination == destination2) {
+ return 0;
+ }
+ if (destination == null) {
+ return -1;
+ } else if (destination2 == null) {
+ return 1;
+ } else {
+ if (destination.isQueue() == destination2.isQueue()) {
+ return destination.getPhysicalName().compareTo(destination2.getPhysicalName());
+ } else {
+ return destination.isQueue() ? -1 : 1;
+ }
+ }
+ }
+
+ public int compareTo(Object that) {
+ if (that instanceof ActiveMQDestination) {
+ return compare(this, (ActiveMQDestination)that);
+ }
+ if (that == null) {
+ return 1;
+ } else {
+ return getClass().getName().compareTo(that.getClass().getName());
+ }
+ }
+
+ public boolean isComposite() {
+ return compositeDestinations != null;
+ }
+
+ public ActiveMQDestination[] getCompositeDestinations() {
+ return compositeDestinations;
+ }
+
+ public void setCompositeDestinations(ActiveMQDestination[] destinations) {
+ this.compositeDestinations = destinations;
+ this.destinationPaths = null;
+ this.hashValue = 0;
+ this.isPattern = false;
+
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < destinations.length; i++) {
+ if (i != 0) {
+ sb.append(COMPOSITE_SEPERATOR);
+ }
+ if (getDestinationType() == destinations[i].getDestinationType()) {
+ sb.append(destinations[i].getPhysicalName());
+ } else {
+ sb.append(destinations[i].getQualifiedName());
+ }
+ }
+ physicalName = sb.toString();
+ }
+
+ public String getQualifiedName() {
+ if (isComposite()) {
+ return physicalName;
+ }
+ return getQualifiedPrefix() + physicalName;
+ }
+
+ protected abstract String getQualifiedPrefix();
+
+ /**
+ * @openwire:property version=1
+ */
+ public String getPhysicalName() {
+ return physicalName;
+ }
+
+ public void setPhysicalName(String physicalName) {
+ final int len = physicalName.length();
+ // options offset
+ int p = -1;
+ boolean composite = false;
+ for (int i = 0; i < len; i++) {
+ char c = physicalName.charAt(i);
+ if (c == '?') {
+ p = i;
+ break;
+ }
+ if (c == COMPOSITE_SEPERATOR) {
+ // won't be wild card
+ isPattern = false;
+ composite = true;
+ } else if (!composite && (c == '*' || c == '>')) {
+ isPattern = true;
+ }
+ }
+ // Strip off any options
+ if (p >= 0) {
+ String optstring = physicalName.substring(p + 1);
+ physicalName = physicalName.substring(0, p);
+ try {
+ options = URISupport.parseQuery(optstring);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Invalid destination name: " + physicalName + ", it's options are not encoded properly: " + e);
+ }
+ }
+ this.physicalName = physicalName;
+ this.destinationPaths = null;
+ this.hashValue = 0;
+ if (composite) {
+ // Check to see if it is a composite.
+ List<String> l = new ArrayList<String>();
+ StringTokenizer iter = new StringTokenizer(physicalName, "" + COMPOSITE_SEPERATOR);
+ while (iter.hasMoreTokens()) {
+ String name = iter.nextToken().trim();
+ if (name.length() == 0) {
+ continue;
+ }
+ l.add(name);
+ }
+ if (l.size() > 1) {
+ compositeDestinations = new ActiveMQDestination[l.size()];
+ int counter = 0;
+ for (String dest : l) {
+ compositeDestinations[counter++] = createDestination(dest);
+ }
+ }
+ }
+ }
+
+ public ActiveMQDestination createDestination(String name) {
+ return createDestination(name, getDestinationType());
+ }
+
+ public String[] getDestinationPaths() {
+
+ if (destinationPaths != null) {
+ return destinationPaths;
+ }
+
+ List<String> l = new ArrayList<String>();
+ StringTokenizer iter = new StringTokenizer(physicalName, PATH_SEPERATOR);
+ while (iter.hasMoreTokens()) {
+ String name = iter.nextToken().trim();
+ if (name.length() == 0) {
+ continue;
+ }
+ l.add(name);
+ }
+
+ destinationPaths = new String[l.size()];
+ l.toArray(destinationPaths);
+ return destinationPaths;
+ }
+
+ public abstract byte getDestinationType();
+
+ public boolean isQueue() {
+ return false;
+ }
+
+ public boolean isTopic() {
+ return false;
+ }
+
+ public boolean isTemporary() {
+ return false;
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ActiveMQDestination d = (ActiveMQDestination)o;
+ return physicalName.equals(d.physicalName);
+ }
+
+ public int hashCode() {
+ if (hashValue == 0) {
+ hashValue = physicalName.hashCode();
+ }
+ return hashValue;
+ }
+
+ public String toString() {
+ return getQualifiedName();
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeUTF(this.getPhysicalName());
+ out.writeObject(options);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ this.setPhysicalName(in.readUTF());
+ this.options = (Map<String, String>)in.readObject();
+ }
+
+ public String getDestinationTypeAsString() {
+ switch (getDestinationType()) {
+ case QUEUE_TYPE:
+ return "Queue";
+ case TOPIC_TYPE:
+ return "Topic";
+ case TEMP_QUEUE_TYPE:
+ return "TempQueue";
+ case TEMP_TOPIC_TYPE:
+ return "TempTopic";
+ default:
+ throw new IllegalArgumentException("Invalid destination type: " + getDestinationType());
+ }
+ }
+
+ public Map<String, String> getOptions() {
+ return options;
+ }
+
+ public boolean isMarshallAware() {
+ return false;
+ }
+
+ public void buildFromProperties(Properties properties) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+
+ IntrospectionSupport.setProperties(this, properties);
+ }
+
+ public void populateProperties(Properties props) {
+ props.setProperty("physicalName", getPhysicalName());
+ }
+
+ public boolean isPattern() {
+ return isPattern;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,739 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotWriteableException;
+
+import org.apache.activemq.IConnection;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * A <CODE>MapMessage</CODE> object is used to send a set of name-value pairs.
+ * The names are <CODE>String</CODE> objects, and the values are primitive
+ * data types in the Java programming language. The names must have a value that
+ * is not null, and not an empty string. The entries can be accessed
+ * sequentially or randomly by name. The order of the entries is undefined.
+ * <CODE>MapMessage</CODE> inherits from the <CODE>Message</CODE> interface
+ * and adds a message body that contains a Map.
+ * <P>
+ * The primitive types can be read or written explicitly using methods for each
+ * type. They may also be read or written generically as objects. For instance,
+ * a call to <CODE>MapMessage.setInt("foo", 6)</CODE> is equivalent to
+ * <CODE> MapMessage.setObject("foo", new Integer(6))</CODE>. Both forms are
+ * provided, because the explicit form is convenient for static programming, and
+ * the object form is needed when types are not known at compile time.
+ * <P>
+ * When a client receives a <CODE>MapMessage</CODE>, it is in read-only mode.
+ * If a client attempts to write to the message at this point, a
+ * <CODE>MessageNotWriteableException</CODE> is thrown. If
+ * <CODE>clearBody</CODE> is called, the message can now be both read from and
+ * written to.
+ * <P>
+ * <CODE>MapMessage</CODE> objects support the following conversion table. The
+ * marked cases must be supported. The unmarked cases must throw a
+ * <CODE>JMSException</CODE>. The <CODE>String</CODE> -to-primitive
+ * conversions may throw a runtime exception if the primitive's
+ * <CODE>valueOf()</CODE> method does not accept it as a valid
+ * <CODE> String</CODE> representation of the primitive.
+ * <P>
+ * A value written as the row type can be read as the column type. <p/>
+ *
+ * <PRE>
+ * | | boolean byte short char int long float double String byte[] |----------------------------------------------------------------------
+ * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X |long | X X |float | X X X |double | X X
+ * |String | X X X X X X X X |byte[] | X |----------------------------------------------------------------------
+ * <p/>
+ * </PRE>
+ *
+ * <p/>
+ * <P>
+ * Attempting to read a null value as a primitive type must be treated as
+ * calling the primitive's corresponding <code>valueOf(String)</code>
+ * conversion method with a null value. Since <code>char</code> does not
+ * support a <code>String</code> conversion, attempting to read a null value
+ * as a <code>char</code> must throw a <code>NullPointerException</code>.
+ *
+ * @openwire:marshaller code="25"
+ * @see javax.jms.Session#createMapMessage()
+ * @see javax.jms.BytesMessage
+ * @see javax.jms.Message
+ * @see javax.jms.ObjectMessage
+ * @see javax.jms.StreamMessage
+ * @see javax.jms.TextMessage
+ */
+public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MAP_MESSAGE;
+
+ protected transient Map<String, Object> map = new HashMap<String, Object>();
+
+ public Message copy() {
+ ActiveMQMapMessage copy = new ActiveMQMapMessage();
+ copy(copy);
+ return copy;
+ }
+
+ private void copy(ActiveMQMapMessage copy) {
+ storeContent();
+ super.copy(copy);
+ }
+
+ // We only need to marshal the content if we are hitting the wire.
+ public void beforeMarshall(WireFormat wireFormat) throws IOException {
+ super.beforeMarshall(wireFormat);
+ storeContent();
+ }
+
+ private void storeContent() {
+ try {
+ if (getContent() == null && !map.isEmpty()) {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ OutputStream os = bytesOut;
+ IConnection connection = getConnection();
+ if (connection != null && connection.isUseCompression()) {
+ compressed = true;
+ os = new DeflaterOutputStream(os);
+ }
+ DataOutputStream dataOut = new DataOutputStream(os);
+ MarshallingSupport.marshalPrimitiveMap(map, dataOut);
+ dataOut.close();
+ setContent(bytesOut.toByteSequence());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Builds the message body from data
+ *
+ * @throws JMSException
+ * @throws IOException
+ */
+ private void loadContent() throws JMSException {
+ try {
+ if (getContent() != null && map.isEmpty()) {
+ ByteSequence content = getContent();
+ InputStream is = new ByteArrayInputStream(content);
+ if (isCompressed()) {
+ is = new InflaterInputStream(is);
+ }
+ DataInputStream dataIn = new DataInputStream(is);
+ map = MarshallingSupport.unmarshalPrimitiveMap(dataIn);
+ dataIn.close();
+ }
+ } catch (IOException e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public String getJMSXMimeType() {
+ return "jms/map-message";
+ }
+
+ /**
+ * Clears out the message body. Clearing a message's body does not clear its
+ * header values or property entries.
+ * <P>
+ * If this message body was read-only, calling this method leaves the
+ * message body in the same state as an empty body in a newly created
+ * message.
+ */
+ public void clearBody() throws JMSException {
+ super.clearBody();
+ map.clear();
+ }
+
+ /**
+ * Returns the <CODE>boolean</CODE> value with the specified name.
+ *
+ * @param name the name of the <CODE>boolean</CODE>
+ * @return the <CODE>boolean</CODE> value with the specified name
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageFormatException if this type conversion is invalid.
+ */
+ public boolean getBoolean(String name) throws JMSException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value == null) {
+ return false;
+ }
+ if (value instanceof Boolean) {
+ return ((Boolean)value).booleanValue();
+ }
+ if (value instanceof String) {
+ return Boolean.valueOf(value.toString()).booleanValue();
+ } else {
+ throw new MessageFormatException(" cannot read a boolean from " + value.getClass().getName());
+ }
+ }
+
+ /**
+ * Returns the <CODE>byte</CODE> value with the specified name.
+ *
+ * @param name the name of the <CODE>byte</CODE>
+ * @return the <CODE>byte</CODE> value with the specified name
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageFormatException if this type conversion is invalid.
+ */
+ public byte getByte(String name) throws JMSException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Byte) {
+ return ((Byte)value).byteValue();
+ }
+ if (value instanceof String) {
+ return Byte.valueOf(value.toString()).byteValue();
+ } else {
+ throw new MessageFormatException(" cannot read a byte from " + value.getClass().getName());
+ }
+ }
+
+ /**
+ * Returns the <CODE>short</CODE> value with the specified name.
+ *
+ * @param name the name of the <CODE>short</CODE>
+ * @return the <CODE>short</CODE> value with the specified name
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageFormatException if this type conversion is invalid.
+ */
+ public short getShort(String name) throws JMSException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Short) {
+ return ((Short)value).shortValue();
+ }
+ if (value instanceof Byte) {
+ return ((Byte)value).shortValue();
+ }
+ if (value instanceof String) {
+ return Short.valueOf(value.toString()).shortValue();
+ } else {
+ throw new MessageFormatException(" cannot read a short from " + value.getClass().getName());
+ }
+ }
+
+ /**
+ * Returns the Unicode character value with the specified name.
+ *
+ * @param name the name of the Unicode character
+ * @return the Unicode character value with the specified name
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageFormatException if this type conversion is invalid.
+ */
+ public char getChar(String name) throws JMSException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ if (value instanceof Character) {
+ return ((Character)value).charValue();
+ } else {
+ throw new MessageFormatException(" cannot read a short from " + value.getClass().getName());
+ }
+ }
+
+ /**
+ * Returns the <CODE>int</CODE> value with the specified name.
+ *
+ * @param name the name of the <CODE>int</CODE>
+ * @return the <CODE>int</CODE> value with the specified name
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageFormatException if this type conversion is invalid.
+ */
+ public int getInt(String name) throws JMSException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Integer) {
+ return ((Integer)value).intValue();
+ }
+ if (value instanceof Short) {
+ return ((Short)value).intValue();
+ }
+ if (value instanceof Byte) {
+ return ((Byte)value).intValue();
+ }
+ if (value instanceof String) {
+ return Integer.valueOf(value.toString()).intValue();
+ } else {
+ throw new MessageFormatException(" cannot read an int from " + value.getClass().getName());
+ }
+ }
+
+ /**
+ * Returns the <CODE>long</CODE> value with the specified name.
+ *
+ * @param name the name of the <CODE>long</CODE>
+ * @return the <CODE>long</CODE> value with the specified name
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageFormatException if this type conversion is invalid.
+ */
+ public long getLong(String name) throws JMSException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Long) {
+ return ((Long)value).longValue();
+ }
+ if (value instanceof Integer) {
+ return ((Integer)value).longValue();
+ }
+ if (value instanceof Short) {
+ return ((Short)value).longValue();
+ }
+ if (value instanceof Byte) {
+ return ((Byte)value).longValue();
+ }
+ if (value instanceof String) {
+ return Long.valueOf(value.toString()).longValue();
+ } else {
+ throw new MessageFormatException(" cannot read a long from " + value.getClass().getName());
+ }
+ }
+
+ /**
+ * Returns the <CODE>float</CODE> value with the specified name.
+ *
+ * @param name the name of the <CODE>float</CODE>
+ * @return the <CODE>float</CODE> value with the specified name
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageFormatException if this type conversion is invalid.
+ */
+ public float getFloat(String name) throws JMSException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Float) {
+ return ((Float)value).floatValue();
+ }
+ if (value instanceof String) {
+ return Float.valueOf(value.toString()).floatValue();
+ } else {
+ throw new MessageFormatException(" cannot read a float from " + value.getClass().getName());
+ }
+ }
+
+ /**
+ * Returns the <CODE>double</CODE> value with the specified name.
+ *
+ * @param name the name of the <CODE>double</CODE>
+ * @return the <CODE>double</CODE> value with the specified name
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageFormatException if this type conversion is invalid.
+ */
+ public double getDouble(String name) throws JMSException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Double) {
+ return ((Double)value).doubleValue();
+ }
+ if (value instanceof Float) {
+ return ((Float)value).floatValue();
+ }
+ if (value instanceof String) {
+ return Float.valueOf(value.toString()).floatValue();
+ } else {
+ throw new MessageFormatException(" cannot read a double from " + value.getClass().getName());
+ }
+ }
+
+ /**
+ * Returns the <CODE>String</CODE> value with the specified name.
+ *
+ * @param name the name of the <CODE>String</CODE>
+ * @return the <CODE>String</CODE> value with the specified name; if there
+ * is no item by this name, a null value is returned
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageFormatException if this type conversion is invalid.
+ */
+ public String getString(String name) throws JMSException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value == null) {
+ return null;
+ }
+ if (value instanceof byte[]) {
+ throw new MessageFormatException("Use getBytes to read a byte array");
+ } else {
+ return value.toString();
+ }
+ }
+
+ /**
+ * Returns the byte array value with the specified name.
+ *
+ * @param name the name of the byte array
+ * @return a copy of the byte array value with the specified name; if there
+ * is no item by this name, a null value is returned.
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageFormatException if this type conversion is invalid.
+ */
+ public byte[] getBytes(String name) throws JMSException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value instanceof byte[]) {
+ return (byte[])value;
+ } else {
+ throw new MessageFormatException(" cannot read a byte[] from " + value.getClass().getName());
+ }
+ }
+
+ /**
+ * Returns the value of the object with the specified name.
+ * <P>
+ * This method can be used to return, in objectified format, an object in
+ * the Java programming language ("Java object") that had been stored in the
+ * Map with the equivalent <CODE>setObject</CODE> method call, or its
+ * equivalent primitive <CODE>set <I>type </I></CODE> method.
+ * <P>
+ * Note that byte values are returned as <CODE>byte[]</CODE>, not
+ * <CODE>Byte[]</CODE>.
+ *
+ * @param name the name of the Java object
+ * @return a copy of the Java object value with the specified name, in
+ * objectified format (for example, if the object was set as an
+ * <CODE>int</CODE>, an <CODE>Integer</CODE> is returned); if
+ * there is no item by this name, a null value is returned
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ */
+ public Object getObject(String name) throws JMSException {
+ initializeReading();
+ return map.get(name);
+ }
+
+ /**
+ * Returns an <CODE>Enumeration</CODE> of all the names in the
+ * <CODE>MapMessage</CODE> object.
+ *
+ * @return an enumeration of all the names in this <CODE>MapMessage</CODE>
+ * @throws JMSException
+ */
+ public Enumeration<String> getMapNames() throws JMSException {
+ initializeReading();
+ return Collections.enumeration(map.keySet());
+ }
+
+ protected void put(String name, Object value) throws JMSException {
+ if (name == null) {
+ throw new IllegalArgumentException("The name of the property cannot be null.");
+ }
+ if (name.length() == 0) {
+ throw new IllegalArgumentException("The name of the property cannot be an emprty string.");
+ }
+ map.put(name, value);
+ }
+
+ /**
+ * Sets a <CODE>boolean</CODE> value with the specified name into the Map.
+ *
+ * @param name the name of the <CODE>boolean</CODE>
+ * @param value the <CODE>boolean</CODE> value to set in the Map
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws IllegalArgumentException if the name is null or if the name is an
+ * empty string.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void setBoolean(String name, boolean value) throws JMSException {
+ initializeWriting();
+ put(name, value ? Boolean.TRUE : Boolean.FALSE);
+ }
+
+ /**
+ * Sets a <CODE>byte</CODE> value with the specified name into the Map.
+ *
+ * @param name the name of the <CODE>byte</CODE>
+ * @param value the <CODE>byte</CODE> value to set in the Map
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws IllegalArgumentException if the name is null or if the name is an
+ * empty string.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void setByte(String name, byte value) throws JMSException {
+ initializeWriting();
+ put(name, Byte.valueOf(value));
+ }
+
+ /**
+ * Sets a <CODE>short</CODE> value with the specified name into the Map.
+ *
+ * @param name the name of the <CODE>short</CODE>
+ * @param value the <CODE>short</CODE> value to set in the Map
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws IllegalArgumentException if the name is null or if the name is an
+ * empty string.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void setShort(String name, short value) throws JMSException {
+ initializeWriting();
+ put(name, Short.valueOf(value));
+ }
+
+ /**
+ * Sets a Unicode character value with the specified name into the Map.
+ *
+ * @param name the name of the Unicode character
+ * @param value the Unicode character value to set in the Map
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws IllegalArgumentException if the name is null or if the name is an
+ * empty string.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void setChar(String name, char value) throws JMSException {
+ initializeWriting();
+ put(name, Character.valueOf(value));
+ }
+
+ /**
+ * Sets an <CODE>int</CODE> value with the specified name into the Map.
+ *
+ * @param name the name of the <CODE>int</CODE>
+ * @param value the <CODE>int</CODE> value to set in the Map
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws IllegalArgumentException if the name is null or if the name is an
+ * empty string.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void setInt(String name, int value) throws JMSException {
+ initializeWriting();
+ put(name, Integer.valueOf(value));
+ }
+
+ /**
+ * Sets a <CODE>long</CODE> value with the specified name into the Map.
+ *
+ * @param name the name of the <CODE>long</CODE>
+ * @param value the <CODE>long</CODE> value to set in the Map
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws IllegalArgumentException if the name is null or if the name is an
+ * empty string.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void setLong(String name, long value) throws JMSException {
+ initializeWriting();
+ put(name, Long.valueOf(value));
+ }
+
+ /**
+ * Sets a <CODE>float</CODE> value with the specified name into the Map.
+ *
+ * @param name the name of the <CODE>float</CODE>
+ * @param value the <CODE>float</CODE> value to set in the Map
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws IllegalArgumentException if the name is null or if the name is an
+ * empty string.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void setFloat(String name, float value) throws JMSException {
+ initializeWriting();
+ put(name, new Float(value));
+ }
+
+ /**
+ * Sets a <CODE>double</CODE> value with the specified name into the Map.
+ *
+ * @param name the name of the <CODE>double</CODE>
+ * @param value the <CODE>double</CODE> value to set in the Map
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws IllegalArgumentException if the name is null or if the name is an
+ * empty string.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void setDouble(String name, double value) throws JMSException {
+ initializeWriting();
+ put(name, new Double(value));
+ }
+
+ /**
+ * Sets a <CODE>String</CODE> value with the specified name into the Map.
+ *
+ * @param name the name of the <CODE>String</CODE>
+ * @param value the <CODE>String</CODE> value to set in the Map
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws IllegalArgumentException if the name is null or if the name is an
+ * empty string.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void setString(String name, String value) throws JMSException {
+ initializeWriting();
+ put(name, value);
+ }
+
+ /**
+ * Sets a byte array value with the specified name into the Map.
+ *
+ * @param name the name of the byte array
+ * @param value the byte array value to set in the Map; the array is copied
+ * so that the value for <CODE>name </CODE> will not be
+ * altered by future modifications
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws NullPointerException if the name is null, or if the name is an
+ * empty string.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void setBytes(String name, byte[] value) throws JMSException {
+ initializeWriting();
+ if (value != null) {
+ put(name, value);
+ } else {
+ map.remove(name);
+ }
+ }
+
+ /**
+ * Sets a portion of the byte array value with the specified name into the
+ * Map.
+ *
+ * @param name the name of the byte array
+ * @param value the byte array value to set in the Map
+ * @param offset the initial offset within the byte array
+ * @param length the number of bytes to use
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws IllegalArgumentException if the name is null or if the name is an
+ * empty string.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void setBytes(String name, byte[] value, int offset, int length) throws JMSException {
+ initializeWriting();
+ byte[] data = new byte[length];
+ System.arraycopy(value, offset, data, 0, length);
+ put(name, data);
+ }
+
+ /**
+ * Sets an object value with the specified name into the Map.
+ * <P>
+ * This method works only for the objectified primitive object types (<code>Integer</code>,<code>Double</code>,
+ * <code>Long</code> ...), <code>String</code> objects, and byte
+ * arrays.
+ *
+ * @param name the name of the Java object
+ * @param value the Java object value to set in the Map
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws IllegalArgumentException if the name is null or if the name is an
+ * empty string.
+ * @throws MessageFormatException if the object is invalid.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+ public void setObject(String name, Object value) throws JMSException {
+ initializeWriting();
+ if (value != null) {
+ // byte[] not allowed on properties
+ if (!(value instanceof byte[])) {
+ checkValidObject(value);
+ }
+ put(name, value);
+ } else {
+ put(name, null);
+ }
+ }
+
+ /**
+ * Indicates whether an item exists in this <CODE>MapMessage</CODE>
+ * object.
+ *
+ * @param name the name of the item to test
+ * @return true if the item exists
+ * @throws JMSException if the JMS provider fails to determine if the item
+ * exists due to some internal error.
+ */
+ public boolean itemExists(String name) throws JMSException {
+ initializeReading();
+ return map.containsKey(name);
+ }
+
+ private void initializeReading() throws JMSException {
+ loadContent();
+ }
+
+ private void initializeWriting() throws MessageNotWriteableException {
+ checkReadOnlyBody();
+ setContent(null);
+ }
+
+ public String toString() {
+ return super.toString() + " ActiveMQMapMessage{ " + "theTable = " + map + " }";
+ }
+
+ public Map<String, Object> getContentMap() throws JMSException {
+ initializeReading();
+ return map;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQMessage.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQMessage.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQMessage.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,648 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotWriteableException;
+
+import org.apache.activemq.IConnection;
+import org.apache.activemq.broker.openwire.OpenwireMessageEvaluationContext;
+import org.apache.activemq.filter.FilterException;
+import org.apache.activemq.filter.PropertyExpression;
+import org.apache.activemq.state.CommandVisitor;
+import org.apache.activemq.util.Callback;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.TypeConversionSupport;
+
+/**
+ * @version $Revision:$
+ * @openwire:marshaller code="23"
+ */
+public class ActiveMQMessage extends Message implements org.apache.activemq.Message {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MESSAGE;
+ private static final Map<String, PropertySetter> JMS_PROPERTY_SETERS = new HashMap<String, PropertySetter>();
+
+ protected transient Callback acknowledgeCallback;
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+
+ public Message copy() {
+ ActiveMQMessage copy = new ActiveMQMessage();
+ copy(copy);
+ return copy;
+ }
+
+ protected void copy(ActiveMQMessage copy) {
+ super.copy(copy);
+ copy.acknowledgeCallback = acknowledgeCallback;
+ }
+
+ public int hashCode() {
+ MessageId id = getMessageId();
+ if (id != null) {
+ return id.hashCode();
+ } else {
+ return super.hashCode();
+ }
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || o.getClass() != getClass()) {
+ return false;
+ }
+
+ ActiveMQMessage msg = (ActiveMQMessage) o;
+ MessageId oMsg = msg.getMessageId();
+ MessageId thisMsg = this.getMessageId();
+ return thisMsg != null && oMsg != null && oMsg.equals(thisMsg);
+ }
+
+ public void acknowledge() throws JMSException {
+ if (acknowledgeCallback != null) {
+ try {
+ acknowledgeCallback.execute();
+ } catch (JMSException e) {
+ throw e;
+ } catch (Throwable e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+ }
+
+ public void clearBody() throws JMSException {
+ setContent(null);
+ readOnlyBody = false;
+ }
+
+ public String getJMSMessageID() {
+ MessageId messageId = this.getMessageId();
+ if (messageId == null) {
+ return null;
+ }
+ return messageId.toString();
+ }
+
+ /**
+ * Seems to be invalid because the parameter doesn't initialize MessageId
+ * instance variables ProducerId and ProducerSequenceId
+ *
+ * @param value
+ * @throws JMSException
+ */
+ public void setJMSMessageID(String value) throws JMSException {
+ if (value != null) {
+ try {
+ MessageId id = new MessageId(value);
+ this.setMessageId(id);
+ } catch (NumberFormatException e) {
+ // we must be some foreign JMS provider or strange user-supplied
+ // String
+ // so lets set the IDs to be 1
+ MessageId id = new MessageId();
+ id.setTextView(value);
+ this.setMessageId(messageId);
+ }
+ } else {
+ this.setMessageId(null);
+ }
+ }
+
+ /**
+ * This will create an object of MessageId. For it to be valid, the instance
+ * variable ProducerId and producerSequenceId must be initialized.
+ *
+ * @param producerId
+ * @param producerSequenceId
+ * @throws JMSException
+ */
+ public void setJMSMessageID(ProducerId producerId, long producerSequenceId) throws JMSException {
+ MessageId id = null;
+ try {
+ id = new MessageId(producerId, producerSequenceId);
+ this.setMessageId(id);
+ } catch (Throwable e) {
+ throw JMSExceptionSupport.create("Invalid message id '" + id + "', reason: " + e.getMessage(), e);
+ }
+ }
+
+ public long getJMSTimestamp() {
+ return this.getTimestamp();
+ }
+
+ public void setJMSTimestamp(long timestamp) {
+ this.setTimestamp(timestamp);
+ }
+
+ public String getJMSCorrelationID() {
+ return this.getCorrelationId();
+ }
+
+ public void setJMSCorrelationID(String correlationId) {
+ this.setCorrelationId(correlationId);
+ }
+
+ public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
+ return encodeString(this.getCorrelationId());
+ }
+
+ public void setJMSCorrelationIDAsBytes(byte[] correlationId) throws JMSException {
+ this.setCorrelationId(decodeString(correlationId));
+ }
+
+ public String getJMSXMimeType() {
+ return "jms/message";
+ }
+
+ protected static String decodeString(byte[] data) throws JMSException {
+ try {
+ if (data == null) {
+ return null;
+ }
+ return new String(data, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new JMSException("Invalid UTF-8 encoding: " + e.getMessage());
+ }
+ }
+
+ protected static byte[] encodeString(String data) throws JMSException {
+ try {
+ if (data == null) {
+ return null;
+ }
+ return data.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new JMSException("Invalid UTF-8 encoding: " + e.getMessage());
+ }
+ }
+
+ public Destination getJMSReplyTo() {
+ return this.getReplyTo();
+ }
+
+ public void setJMSReplyTo(Destination destination) throws JMSException {
+ this.setReplyTo(ActiveMQDestination.transform(destination));
+ }
+
+ public Destination getJMSDestination() {
+ return this.getDestination();
+ }
+
+ public void setJMSDestination(Destination destination) throws JMSException {
+ this.setDestination(ActiveMQDestination.transform(destination));
+ }
+
+ public int getJMSDeliveryMode() {
+ return this.isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+ }
+
+ public void setJMSDeliveryMode(int mode) {
+ this.setPersistent(mode == DeliveryMode.PERSISTENT);
+ }
+
+ public boolean getJMSRedelivered() {
+ return this.isRedelivered();
+ }
+
+ public void setJMSRedelivered(boolean redelivered) {
+ this.setRedelivered(redelivered);
+ }
+
+ public String getJMSType() {
+ return this.getType();
+ }
+
+ public void setJMSType(String type) {
+ this.setType(type);
+ }
+
+ public long getJMSExpiration() {
+ return this.getExpiration();
+ }
+
+ public void setJMSExpiration(long expiration) {
+ this.setExpiration(expiration);
+ }
+
+ public int getJMSPriority() {
+ return this.getPriority();
+ }
+
+ public void setJMSPriority(int priority) {
+ this.setPriority((byte) priority);
+ }
+
+ public void clearProperties() {
+ super.clearProperties();
+ readOnlyProperties = false;
+ }
+
+ public boolean propertyExists(String name) throws JMSException {
+ try {
+ return this.getProperties().containsKey(name);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+
+ public Enumeration getPropertyNames() throws JMSException {
+ try {
+ return new Vector<String>(this.getProperties().keySet()).elements();
+ } catch (IOException e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+
+ interface PropertySetter {
+
+ void set(Message message, Object value) throws MessageFormatException;
+ }
+
+ static {
+ JMS_PROPERTY_SETERS.put("JMSXDeliveryCount", new PropertySetter() {
+ public void set(Message message, Object value) throws MessageFormatException {
+ Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
+ if (rc == null) {
+ throw new MessageFormatException("Property JMSXDeliveryCount cannot be set from a " + value.getClass().getName() + ".");
+ }
+ message.setRedeliveryCounter(rc.intValue() - 1);
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSXGroupID", new PropertySetter() {
+ public void set(Message message, Object value) throws MessageFormatException {
+ String rc = (String) TypeConversionSupport.convert(value, String.class);
+ if (rc == null) {
+ throw new MessageFormatException("Property JMSXGroupID cannot be set from a " + value.getClass().getName() + ".");
+ }
+ message.setGroupID(rc);
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSXGroupSeq", new PropertySetter() {
+ public void set(Message message, Object value) throws MessageFormatException {
+ Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
+ if (rc == null) {
+ throw new MessageFormatException("Property JMSXGroupSeq cannot be set from a " + value.getClass().getName() + ".");
+ }
+ message.setGroupSequence(rc.intValue());
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSCorrelationID", new PropertySetter() {
+ public void set(Message message, Object value) throws MessageFormatException {
+ String rc = (String) TypeConversionSupport.convert(value, String.class);
+ if (rc == null) {
+ throw new MessageFormatException("Property JMSCorrelationID cannot be set from a " + value.getClass().getName() + ".");
+ }
+ ((ActiveMQMessage) message).setJMSCorrelationID(rc);
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSDeliveryMode", new PropertySetter() {
+ public void set(Message message, Object value) throws MessageFormatException {
+ Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
+ if (rc == null) {
+ Boolean bool = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
+ if (bool == null) {
+ throw new MessageFormatException("Property JMSDeliveryMode cannot be set from a " + value.getClass().getName() + ".");
+ }
+ else {
+ rc = bool.booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+ }
+ }
+ ((ActiveMQMessage) message).setJMSDeliveryMode(rc);
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSExpiration", new PropertySetter() {
+ public void set(Message message, Object value) throws MessageFormatException {
+ Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
+ if (rc == null) {
+ throw new MessageFormatException("Property JMSExpiration cannot be set from a " + value.getClass().getName() + ".");
+ }
+ ((ActiveMQMessage) message).setJMSExpiration(rc.longValue());
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSPriority", new PropertySetter() {
+ public void set(Message message, Object value) throws MessageFormatException {
+ Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
+ if (rc == null) {
+ throw new MessageFormatException("Property JMSPriority cannot be set from a " + value.getClass().getName() + ".");
+ }
+ ((ActiveMQMessage) message).setJMSPriority(rc.intValue());
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSRedelivered", new PropertySetter() {
+ public void set(Message message, Object value) throws MessageFormatException {
+ Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
+ if (rc == null) {
+ throw new MessageFormatException("Property JMSRedelivered cannot be set from a " + value.getClass().getName() + ".");
+ }
+ ((ActiveMQMessage) message).setJMSRedelivered(rc.booleanValue());
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSReplyTo", new PropertySetter() {
+ public void set(Message message, Object value) throws MessageFormatException {
+ ActiveMQDestination rc = (ActiveMQDestination) TypeConversionSupport.convert(value, ActiveMQDestination.class);
+ if (rc == null) {
+ throw new MessageFormatException("Property JMSReplyTo cannot be set from a " + value.getClass().getName() + ".");
+ }
+ ((ActiveMQMessage) message).setReplyTo(rc);
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSTimestamp", new PropertySetter() {
+ public void set(Message message, Object value) throws MessageFormatException {
+ Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
+ if (rc == null) {
+ throw new MessageFormatException("Property JMSTimestamp cannot be set from a " + value.getClass().getName() + ".");
+ }
+ ((ActiveMQMessage) message).setJMSTimestamp(rc.longValue());
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSType", new PropertySetter() {
+ public void set(Message message, Object value) throws MessageFormatException {
+ String rc = (String) TypeConversionSupport.convert(value, String.class);
+ if (rc == null) {
+ throw new MessageFormatException("Property JMSType cannot be set from a " + value.getClass().getName() + ".");
+ }
+ ((ActiveMQMessage) message).setJMSType(rc);
+ }
+ });
+ }
+
+ public void setObjectProperty(String name, Object value) throws JMSException {
+ setObjectProperty(name, value, true);
+ }
+
+ public void setObjectProperty(String name, Object value, boolean checkReadOnly) throws JMSException {
+
+ if (checkReadOnly) {
+ checkReadOnlyProperties();
+ }
+ if (name == null || name.equals("")) {
+ throw new IllegalArgumentException("Property name cannot be empty or null");
+ }
+
+ checkValidObject(value);
+ PropertySetter setter = JMS_PROPERTY_SETERS.get(name);
+
+ if (setter != null && value != null) {
+ setter.set(this, value);
+ } else {
+ try {
+ this.setProperty(name, value);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+ }
+
+ public void setProperties(Map properties) throws JMSException {
+ for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) {
+ Map.Entry entry = (Map.Entry) iter.next();
+
+ // Lets use the object property method as we may contain standard
+ // extension headers like JMSXGroupID
+ setObjectProperty((String) entry.getKey(), entry.getValue());
+ }
+ }
+
+ protected void checkValidObject(Object value) throws MessageFormatException {
+
+ boolean valid = value instanceof Boolean || value instanceof Byte || value instanceof Short || value instanceof Integer || value instanceof Long;
+ valid = valid || value instanceof Float || value instanceof Double || value instanceof Character || value instanceof String || value == null;
+
+ if (!valid) {
+
+ IConnection conn = getConnection();
+ // conn is null if we are in the broker rather than a JMS client
+ if (conn == null || conn.isNestedMapAndListEnabled()) {
+ if (!(value instanceof Map || value instanceof List)) {
+ throw new MessageFormatException("Only objectified primitive objects, String, Map and List types are allowed but was: " + value + " type: " + value.getClass());
+ }
+ } else {
+ throw new MessageFormatException("Only objectified primitive objects and String types are allowed but was: " + value + " type: " + value.getClass());
+ }
+ }
+ }
+
+ public Object getObjectProperty(String name) throws JMSException {
+ if (name == null) {
+ throw new NullPointerException("Property name cannot be null");
+ }
+
+ // PropertyExpression handles converting message headers to properties.
+ PropertyExpression expression = new PropertyExpression(name);
+ OpenwireMessageEvaluationContext ctx = new OpenwireMessageEvaluationContext(this);
+ try {
+ return expression.evaluate(ctx);
+ } catch (FilterException e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+
+ public boolean getBooleanProperty(String name) throws JMSException {
+ Object value = getObjectProperty(name);
+ if (value == null) {
+ return false;
+ }
+ Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
+ if (rc == null) {
+ throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a boolean");
+ }
+ return rc.booleanValue();
+ }
+
+ public byte getByteProperty(String name) throws JMSException {
+ Object value = getObjectProperty(name);
+ if (value == null) {
+ throw new NumberFormatException("property " + name + " was null");
+ }
+ Byte rc = (Byte) TypeConversionSupport.convert(value, Byte.class);
+ if (rc == null) {
+ throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a byte");
+ }
+ return rc.byteValue();
+ }
+
+ public short getShortProperty(String name) throws JMSException {
+ Object value = getObjectProperty(name);
+ if (value == null) {
+ throw new NumberFormatException("property " + name + " was null");
+ }
+ Short rc = (Short) TypeConversionSupport.convert(value, Short.class);
+ if (rc == null) {
+ throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a short");
+ }
+ return rc.shortValue();
+ }
+
+ public int getIntProperty(String name) throws JMSException {
+ Object value = getObjectProperty(name);
+ if (value == null) {
+ throw new NumberFormatException("property " + name + " was null");
+ }
+ Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
+ if (rc == null) {
+ throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as an integer");
+ }
+ return rc.intValue();
+ }
+
+ public long getLongProperty(String name) throws JMSException {
+ Object value = getObjectProperty(name);
+ if (value == null) {
+ throw new NumberFormatException("property " + name + " was null");
+ }
+ Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
+ if (rc == null) {
+ throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a long");
+ }
+ return rc.longValue();
+ }
+
+ public float getFloatProperty(String name) throws JMSException {
+ Object value = getObjectProperty(name);
+ if (value == null) {
+ throw new NullPointerException("property " + name + " was null");
+ }
+ Float rc = (Float) TypeConversionSupport.convert(value, Float.class);
+ if (rc == null) {
+ throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a float");
+ }
+ return rc.floatValue();
+ }
+
+ public double getDoubleProperty(String name) throws JMSException {
+ Object value = getObjectProperty(name);
+ if (value == null) {
+ throw new NullPointerException("property " + name + " was null");
+ }
+ Double rc = (Double) TypeConversionSupport.convert(value, Double.class);
+ if (rc == null) {
+ throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a double");
+ }
+ return rc.doubleValue();
+ }
+
+ public String getStringProperty(String name) throws JMSException {
+ Object value = getObjectProperty(name);
+ if (value == null) {
+ if (name.equals("JMSXUserID")) {
+ value = getUserID();
+ }
+ }
+ if (value == null) {
+ return null;
+ }
+ String rc = (String) TypeConversionSupport.convert(value, String.class);
+ if (rc == null) {
+ throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a String");
+ }
+ return rc;
+ }
+
+ public void setBooleanProperty(String name, boolean value) throws JMSException {
+ setBooleanProperty(name, value, true);
+ }
+
+ public void setBooleanProperty(String name, boolean value, boolean checkReadOnly) throws JMSException {
+ setObjectProperty(name, Boolean.valueOf(value), checkReadOnly);
+ }
+
+ public void setByteProperty(String name, byte value) throws JMSException {
+ setObjectProperty(name, Byte.valueOf(value));
+ }
+
+ public void setShortProperty(String name, short value) throws JMSException {
+ setObjectProperty(name, Short.valueOf(value));
+ }
+
+ public void setIntProperty(String name, int value) throws JMSException {
+ setObjectProperty(name, Integer.valueOf(value));
+ }
+
+ public void setLongProperty(String name, long value) throws JMSException {
+ setObjectProperty(name, Long.valueOf(value));
+ }
+
+ public void setFloatProperty(String name, float value) throws JMSException {
+ setObjectProperty(name, new Float(value));
+ }
+
+ public void setDoubleProperty(String name, double value) throws JMSException {
+ setObjectProperty(name, new Double(value));
+ }
+
+ public void setStringProperty(String name, String value) throws JMSException {
+ setObjectProperty(name, value);
+ }
+
+ private void checkReadOnlyProperties() throws MessageNotWriteableException {
+ if (readOnlyProperties) {
+ throw new MessageNotWriteableException("Message properties are read-only");
+ }
+ }
+
+ protected void checkReadOnlyBody() throws MessageNotWriteableException {
+ if (readOnlyBody) {
+ throw new MessageNotWriteableException("Message body is read-only");
+ }
+ }
+
+ public boolean isExpired() {
+ long expireTime = this.getExpiration();
+ if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
+ return true;
+ }
+ return false;
+ }
+
+ public Callback getAcknowledgeCallback() {
+ return acknowledgeCallback;
+ }
+
+ public void setAcknowledgeCallback(Callback acknowledgeCallback) {
+ this.acknowledgeCallback = acknowledgeCallback;
+ }
+
+ /**
+ * Send operation event listener. Used to get the message ready to be sent.
+ */
+ public void onSend() throws JMSException {
+ setReadOnlyBody(true);
+ setReadOnlyProperties(true);
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return visitor.processMessage(this);
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.command;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+
+import org.apache.activemq.IConnection;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
+import org.apache.activemq.util.JMSExceptionSupport;
+
+/**
+ * An <CODE>ObjectMessage</CODE> object is used to send a message that
+ * contains a serializable object in the Java programming language ("Java
+ * object"). It inherits from the <CODE>Message</CODE> interface and adds a
+ * body containing a single reference to an object. Only
+ * <CODE>Serializable</CODE> Java objects can be used. <p/>
+ * <P>
+ * If a collection of Java objects must be sent, one of the
+ * <CODE>Collection</CODE> classes provided since JDK 1.2 can be used. <p/>
+ * <P>
+ * When a client receives an <CODE>ObjectMessage</CODE>, it is in read-only
+ * mode. If a client attempts to write to the message at this point, a
+ * <CODE>MessageNotWriteableException</CODE> is thrown. If
+ * <CODE>clearBody</CODE> is called, the message can now be both read from and
+ * written to.
+ *
+ * @openwire:marshaller code="26"
+ * @see javax.jms.Session#createObjectMessage()
+ * @see javax.jms.Session#createObjectMessage(Serializable)
+ * @see javax.jms.BytesMessage
+ * @see javax.jms.MapMessage
+ * @see javax.jms.Message
+ * @see javax.jms.StreamMessage
+ * @see javax.jms.TextMessage
+ */
+public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage {
+
+ // TODO: verify classloader
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_OBJECT_MESSAGE;
+ static final ClassLoader ACTIVEMQ_CLASSLOADER = ActiveMQObjectMessage.class.getClassLoader();
+
+ protected transient Serializable object;
+
+ public Message copy() {
+ ActiveMQObjectMessage copy = new ActiveMQObjectMessage();
+ copy(copy);
+ return copy;
+ }
+
+ private void copy(ActiveMQObjectMessage copy) {
+ storeContent();
+ super.copy(copy);
+ copy.object = null;
+ }
+
+ public void storeContent() {
+ ByteSequence bodyAsBytes = getContent();
+ if (bodyAsBytes == null && object != null) {
+ try {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ OutputStream os = bytesOut;
+ IConnection connection = getConnection();
+ if (connection != null && connection.isUseCompression()) {
+ compressed = true;
+ os = new DeflaterOutputStream(os);
+ }
+ DataOutputStream dataOut = new DataOutputStream(os);
+ ObjectOutputStream objOut = new ObjectOutputStream(dataOut);
+ objOut.writeObject(object);
+ objOut.flush();
+ objOut.reset();
+ objOut.close();
+ setContent(bytesOut.toByteSequence());
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe.getMessage(), ioe);
+ }
+ }
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public String getJMSXMimeType() {
+ return "jms/object-message";
+ }
+
+ /**
+ * Clears out the message body. Clearing a message's body does not clear its
+ * header values or property entries. <p/>
+ * <P>
+ * If this message body was read-only, calling this method leaves the
+ * message body in the same state as an empty body in a newly created
+ * message.
+ *
+ * @throws JMSException if the JMS provider fails to clear the message body
+ * due to some internal error.
+ */
+
+ public void clearBody() throws JMSException {
+ super.clearBody();
+ this.object = null;
+ }
+
+ /**
+ * Sets the serializable object containing this message's data. It is
+ * important to note that an <CODE>ObjectMessage</CODE> contains a
+ * snapshot of the object at the time <CODE>setObject()</CODE> is called;
+ * subsequent modifications of the object will have no effect on the
+ * <CODE>ObjectMessage</CODE> body.
+ *
+ * @param newObject the message's data
+ * @throws JMSException if the JMS provider fails to set the object due to
+ * some internal error.
+ * @throws javax.jms.MessageFormatException if object serialization fails.
+ * @throws javax.jms.MessageNotWriteableException if the message is in
+ * read-only mode.
+ */
+
+ public void setObject(Serializable newObject) throws JMSException {
+ checkReadOnlyBody();
+ this.object = newObject;
+ setContent(null);
+ IConnection connection = getConnection();
+ if (connection == null || !connection.isObjectMessageSerializationDefered()) {
+ storeContent();
+ }
+ }
+
+ /**
+ * Gets the serializable object containing this message's data. The default
+ * value is null.
+ *
+ * @return the serializable object containing this message's data
+ * @throws JMSException
+ */
+ public Serializable getObject() throws JMSException {
+ if (object == null && getContent() != null) {
+ try {
+ ByteSequence content = getContent();
+ InputStream is = new ByteArrayInputStream(content);
+ if (isCompressed()) {
+ is = new InflaterInputStream(is);
+ }
+ DataInputStream dataIn = new DataInputStream(is);
+ ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
+ try {
+ object = (Serializable)objIn.readObject();
+ } catch (ClassNotFoundException ce) {
+ throw JMSExceptionSupport.create("Failed to build body from content. Serializable class not available to broker. Reason: " + ce, ce);
+ } finally {
+ dataIn.close();
+ }
+ } catch (IOException e) {
+ throw JMSExceptionSupport.create("Failed to build body from bytes. Reason: " + e, e);
+ }
+ }
+ return this.object;
+ }
+
+ public void onMessageRolledBack() {
+ super.onMessageRolledBack();
+
+ // lets force the object to be deserialized again - as we could have
+ // changed the object
+ object = null;
+ }
+
+ public String toString() {
+ try {
+ getObject();
+ } catch (JMSException e) {
+ }
+ return super.toString();
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQQueue.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQQueue.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQQueue.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+
+/**
+ *
+ * @org.apache.xbean.XBean element="queue" description="An ActiveMQ Queue
+ * Destination"
+ *
+ * @openwire:marshaller code="100"
+ * @version $Revision: 1.5 $
+ */
+public class ActiveMQQueue extends ActiveMQDestination implements Queue {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_QUEUE;
+ private static final long serialVersionUID = -3885260014960795889L;
+
+ public ActiveMQQueue() {
+ }
+
+ public ActiveMQQueue(String name) {
+ super(name);
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public boolean isQueue() {
+ return true;
+ }
+
+ public String getQueueName() throws JMSException {
+ return getPhysicalName();
+ }
+
+ public byte getDestinationType() {
+ return QUEUE_TYPE;
+ }
+
+ protected String getQualifiedPrefix() {
+ return QUEUE_QUALIFIED_PREFIX;
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQQueue.java
------------------------------------------------------------------------------
svn:executable = *
|