qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rajith Attapattu <rajit...@gmail.com>
Subject Re: svn commit: r1172657 [14/21] - in /qpid/branches/qpid-3346/qpid: ./ cpp/ cpp/bindings/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qpid/dotnet/ cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/Properties/ cpp/bindings/qpid/dotnet/example
Date Mon, 19 Sep 2011 15:48:45 GMT
Ken,

I assume this wasn't intentional and was merely a side effect of the merge ?
I didn't look at the diff very closely, but I wonder what the
implication of these changes would be.

Looking further it seems a lot of classes on the java client and
broker have changed.
I think in order to be on the safe side, these changes should be backed out.
If I am not mistaken that would not impact anything as I believe your
work didn't involve the java side.

Rajith

On Mon, Sep 19, 2011 at 11:13 AM,  <kgiusti@apache.org> wrote:
> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Mon Sep 19 15:13:18 2011
> @@ -23,7 +23,8 @@ package org.apache.qpid.client.message;
>  import javax.jms.JMSException;
>  import javax.jms.StreamMessage;
>
> -import org.apache.mina.common.ByteBuffer;
> +import java.nio.ByteBuffer;
> +
>  import org.apache.qpid.AMQException;
>  import org.apache.qpid.framing.AMQShortString;
>  import org.apache.qpid.framing.BasicContentHeaderProperties;
> @@ -36,65 +37,76 @@ public class JMSStreamMessage extends Ab
>     public static final String MIME_TYPE="jms/stream-message";
>
>
> -
> -    /**
> -     * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
> -     * a byte array in multiple chunks, hence this is used to track how much is left to be read
> -     */
> -    private int _byteArrayRemaining = -1;
> +    private TypedBytesContentReader _typedBytesContentReader;
> +    private TypedBytesContentWriter _typedBytesContentWriter;
>
>     public JMSStreamMessage(AMQMessageDelegateFactory delegateFactory)
>     {
> -        this(delegateFactory,null);
> +        super(delegateFactory,false);
> +        _typedBytesContentWriter = new TypedBytesContentWriter();
>
>     }
>
> -    /**
> -     * Construct a stream message with existing data.
> -     *
> -     * @param delegateFactory
> -     * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
> -     */
> -    JMSStreamMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
> -    {
>
> -        super(delegateFactory, data); // this instanties a content header
> -    }
>
>     JMSStreamMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
>     {
> -
> -        super(delegate, data);
> +        super(delegate, data!=null);
> +        _typedBytesContentReader = new TypedBytesContentReader(data);
>     }
>
> -
>     public void reset()
>     {
> -        super.reset();
>         _readableMessage = true;
> +
> +        if(_typedBytesContentReader != null)
> +        {
> +            _typedBytesContentReader.reset();
> +        }
> +        else if (_typedBytesContentWriter != null)
> +        {
> +            _typedBytesContentReader = new TypedBytesContentReader(_typedBytesContentWriter.getData());
> +        }
> +    }
> +
> +    @Override
> +    public void clearBody() throws JMSException
> +    {
> +        super.clearBody();
> +        _typedBytesContentReader = null;
> +        _typedBytesContentWriter = new TypedBytesContentWriter();
> +
>     }
>
> +
>     protected String getMimeType()
>     {
>         return MIME_TYPE;
>     }
>
> -
> +    @Override
> +    public java.nio.ByteBuffer getData() throws JMSException
> +    {
> +        return _typedBytesContentWriter == null ? _typedBytesContentReader.getData() : _typedBytesContentWriter.getData();
> +    }
>
>     public boolean readBoolean() throws JMSException
>     {
> -        return super.readBoolean();
> +        checkReadable();
> +        return _typedBytesContentReader.readBoolean();
>     }
>
>
>     public byte readByte() throws JMSException
>     {
> -        return super.readByte();
> +        checkReadable();
> +        return _typedBytesContentReader.readByte();
>     }
>
>     public short readShort() throws JMSException
>     {
> -        return super.readShort();
> +        checkReadable();
> +        return _typedBytesContentReader.readShort();
>     }
>
>     /**
> @@ -105,102 +117,127 @@ public class JMSStreamMessage extends Ab
>      */
>     public char readChar() throws JMSException
>     {
> -        return super.readChar();
> +        checkReadable();
> +        return _typedBytesContentReader.readChar();
>     }
>
>     public int readInt() throws JMSException
>     {
> -        return super.readInt();
> +        checkReadable();
> +        return _typedBytesContentReader.readInt();
>     }
>
>     public long readLong() throws JMSException
>     {
> -        return super.readLong();
> +        checkReadable();
> +        return _typedBytesContentReader.readLong();
>     }
>
>     public float readFloat() throws JMSException
>     {
> -        return super.readFloat();
> +        checkReadable();
> +        return _typedBytesContentReader.readFloat();
>     }
>
>     public double readDouble() throws JMSException
>     {
> -        return super.readDouble();
> +        checkReadable();
> +        return _typedBytesContentReader.readDouble();
>     }
>
>     public String readString() throws JMSException
>     {
> -        return super.readString();
> +        checkReadable();
> +        return _typedBytesContentReader.readString();
>     }
>
>     public int readBytes(byte[] bytes) throws JMSException
>     {
> -        return super.readBytes(bytes);
> +        if(bytes == null)
> +        {
> +            throw new IllegalArgumentException("Must provide non-null array to read into");
> +        }
> +
> +        checkReadable();
> +        return _typedBytesContentReader.readBytes(bytes);
>     }
>
>
>     public Object readObject() throws JMSException
>     {
> -        return super.readObject();
> +        checkReadable();
> +        return _typedBytesContentReader.readObject();
>     }
>
>     public void writeBoolean(boolean b) throws JMSException
>     {
> -        super.writeBoolean(b);
> +        checkWritable();
> +        _typedBytesContentWriter.writeBoolean(b);
>     }
>
>     public void writeByte(byte b) throws JMSException
>     {
> -        super.writeByte(b);
> +        checkWritable();
> +        _typedBytesContentWriter.writeByte(b);
>     }
>
>     public void writeShort(short i) throws JMSException
>     {
> -        super.writeShort(i);
> +        checkWritable();
> +        _typedBytesContentWriter.writeShort(i);
>     }
>
>     public void writeChar(char c) throws JMSException
>     {
> -        super.writeChar(c);
> +        checkWritable();
> +        _typedBytesContentWriter.writeChar(c);
>     }
>
>     public void writeInt(int i) throws JMSException
>     {
> -        super.writeInt(i);
> +        checkWritable();
> +        _typedBytesContentWriter.writeInt(i);
>     }
>
>     public void writeLong(long l) throws JMSException
>     {
> -        super.writeLong(l);
> +        checkWritable();
> +        _typedBytesContentWriter.writeLong(l);
>     }
>
>     public void writeFloat(float v) throws JMSException
>     {
> -        super.writeFloat(v);
> +        checkWritable();
> +        _typedBytesContentWriter.writeFloat(v);
>     }
>
>     public void writeDouble(double v) throws JMSException
>     {
> -        super.writeDouble(v);
> +        checkWritable();
> +        _typedBytesContentWriter.writeDouble(v);
>     }
>
>     public void writeString(String string) throws JMSException
>     {
> -        super.writeString(string);
> +        checkWritable();
> +        _typedBytesContentWriter.writeString(string);
>     }
>
>     public void writeBytes(byte[] bytes) throws JMSException
>     {
> -        super.writeBytes(bytes);
> +        checkWritable();
> +        _typedBytesContentWriter.writeBytes(bytes);
>     }
>
>     public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
>     {
> -        super.writeBytes(bytes,offset,length);
> +        checkWritable();
> +        _typedBytesContentWriter.writeBytes(bytes, offset, length);
>     }
>
>     public void writeObject(Object object) throws JMSException
>     {
> -        super.writeObject(object);
> +        checkWritable();
> +        _typedBytesContentWriter.writeObject(object);
>     }
>  }
>
> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java Mon Sep 19 15:13:18 2011
> @@ -22,10 +22,9 @@ package org.apache.qpid.client.message;
>
>  import javax.jms.JMSException;
>
> -import org.apache.mina.common.ByteBuffer;
> +import java.nio.ByteBuffer;
> +
>  import org.apache.qpid.AMQException;
> -import org.apache.qpid.framing.AMQShortString;
> -import org.apache.qpid.framing.BasicContentHeaderProperties;
>
>  public class JMSStreamMessageFactory extends AbstractJMSMessageFactory
>  {
>
> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Mon Sep 19 15:13:18 2011
> @@ -20,15 +20,21 @@
>  */
>  package org.apache.qpid.client.message;
>
> +import java.io.DataInputStream;
>  import java.io.UnsupportedEncodingException;
> +import java.nio.ByteBuffer;
> +import java.nio.CharBuffer;
>  import java.nio.charset.CharacterCodingException;
>  import java.nio.charset.Charset;
> +import java.nio.charset.CharsetDecoder;
> +import java.nio.charset.CharsetEncoder;
>
>  import javax.jms.JMSException;
> +import javax.jms.MessageFormatException;
>
> -import org.apache.mina.common.ByteBuffer;
>  import org.apache.qpid.AMQException;
>  import org.apache.qpid.client.CustomJMSXProperty;
> +import org.apache.qpid.framing.AMQFrameDecodingException;
>  import org.apache.qpid.framing.AMQShortString;
>  import org.apache.qpid.framing.BasicContentHeaderProperties;
>  import org.apache.qpid.util.Strings;
> @@ -37,6 +43,7 @@ public class JMSTextMessage extends Abst
>  {
>     private static final String MIME_TYPE = "text/plain";
>
> +    private Exception _exception;
>     private String _decodedValue;
>
>     /**
> @@ -45,36 +52,41 @@ public class JMSTextMessage extends Abst
>     private static final String PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.toString();
>     private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
>
> -    public JMSTextMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
> -    {
> -        this(delegateFactory, null, null);
> -    }
> +    private CharsetDecoder _decoder = DEFAULT_CHARSET.newDecoder();
> +    private CharsetEncoder _encoder = DEFAULT_CHARSET.newEncoder();
> +
> +    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
>
> -    JMSTextMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data, String encoding) throws JMSException
> +    public JMSTextMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
>     {
> -        super(delegateFactory, data); // this instantiates a content header
> -        setContentType(getMimeType());
> -        setEncoding(encoding);
> +        super(delegateFactory, false); // this instantiates a content header
>     }
>
>     JMSTextMessage(AMQMessageDelegate delegate, ByteBuffer data)
>             throws AMQException
>     {
> -        super(delegate, data);
> -        setContentType(getMimeType());
> -        _data = data;
> -    }
> +        super(delegate, data!=null);
>
> -
> -    public void clearBodyImpl() throws JMSException
> -    {
> -        if (_data != null)
> +        try
>         {
> -            _data.release();
> -            _data = null;
> +            if(propertyExists(PAYLOAD_NULL_PROPERTY))
> +            {
> +                _decodedValue = null;
> +            }
> +            else
> +            {
> +                _decodedValue = _decoder.decode(data).toString();
> +            }
> +        }
> +        catch (CharacterCodingException e)
> +        {
> +            _exception = e;
> +        }
> +        catch (JMSException e)
> +        {
> +            _exception = e;
>         }
>
> -        _decodedValue = null;
>     }
>
>     public String toBodyString() throws JMSException
> @@ -87,95 +99,62 @@ public class JMSTextMessage extends Abst
>         return MIME_TYPE;
>     }
>
> -    public void setText(String text) throws JMSException
> +    @Override
> +    public ByteBuffer getData() throws JMSException
>     {
> -        checkWritable();
> -
> -        clearBody();
> +        _encoder.reset();
>         try
>         {
> -            if (text != null)
> +            if(_exception != null)
> +            {
> +                final MessageFormatException messageFormatException = new MessageFormatException("Cannot decode original message");
> +                messageFormatException.setLinkedException(_exception);
> +                throw messageFormatException;
> +            }
> +            else if(_decodedValue == null)
> +            {
> +                return EMPTY_BYTE_BUFFER;
> +            }
> +            else
>             {
> -                final String encoding = getEncoding();
> -                if (encoding == null || encoding.equalsIgnoreCase("UTF-8"))
> -                {
> -                    _data = ByteBuffer.wrap(Strings.toUTF8(text));
> -                    setEncoding("UTF-8");
> -                }
> -                else
> -                {
> -                    _data = ByteBuffer.wrap(text.getBytes(encoding));
> -                }
> -                _data.position(_data.limit());
> -                _changedData=true;
> +                return _encoder.encode(CharBuffer.wrap(_decodedValue));
>             }
> -            _decodedValue = text;
>         }
> -        catch (UnsupportedEncodingException e)
> +        catch (CharacterCodingException e)
>         {
> -            // should never occur
> -            JMSException jmse = new JMSException("Unable to decode text data");
> -            jmse.setLinkedException(e);
> -            jmse.initCause(e);
> -            throw jmse;
> +            final JMSException jmsException = new JMSException("Cannot encode string in UFT-8: " + _decodedValue);
> +            jmsException.setLinkedException(e);
> +            throw jmsException;
>         }
>     }
>
> -    public String getText() throws JMSException
> +    @Override
> +    public void clearBody() throws JMSException
>     {
> -        if (_data == null && _decodedValue == null)
> -        {
> -            return null;
> -        }
> -        else if (_decodedValue != null)
> -        {
> -            return _decodedValue;
> -        }
> -        else
> -        {
> -            _data.rewind();
> +        super.clearBody();
> +        _decodedValue = null;
> +        _exception = null;
> +    }
>
> -            if (propertyExists(PAYLOAD_NULL_PROPERTY) && getBooleanProperty(PAYLOAD_NULL_PROPERTY))
> -            {
> -                return null;
> -            }
> -            if (getEncoding() != null)
> -            {
> -                try
> -                {
> -                    _decodedValue = _data.getString(Charset.forName(getEncoding()).newDecoder());
> -                }
> -                catch (CharacterCodingException e)
> -                {
> -                    JMSException jmse = new JMSException("Could not decode string data: " + e);
> -                    jmse.setLinkedException(e);
> -                    jmse.initCause(e);
> -                    throw jmse;
> -                }
> -            }
> -            else
> -            {
> -                try
> -                {
> -                    _decodedValue = _data.getString(DEFAULT_CHARSET.newDecoder());
> -                }
> -                catch (CharacterCodingException e)
> -                {
> -                    JMSException jmse = new JMSException("Could not decode string data: " + e);
> -                    jmse.setLinkedException(e);
> -                    jmse.initCause(e);
> -                    throw jmse;
> -                }
> -            }
> -            return _decodedValue;
> -        }
> +    public void setText(String text) throws JMSException
> +    {
> +        checkWritable();
> +
> +        clearBody();
> +        _decodedValue = text;
> +
> +    }
> +
> +    public String getText() throws JMSException
> +    {
> +        return _decodedValue;
>     }
>
>     @Override
>     public void prepareForSending() throws JMSException
>     {
>         super.prepareForSending();
> -        if (_data == null)
> +        if (_decodedValue == null)
>         {
>             setBooleanProperty(PAYLOAD_NULL_PROPERTY, true);
>         }
>
> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java Mon Sep 19 15:13:18 2011
> @@ -22,7 +22,7 @@ package org.apache.qpid.client.message;
>
>  import javax.jms.JMSException;
>
> -import org.apache.mina.common.ByteBuffer;
> +import java.nio.ByteBuffer;
>  import org.apache.qpid.AMQException;
>  import org.apache.qpid.framing.AMQShortString;
>  import org.apache.qpid.framing.BasicContentHeaderProperties;
>
> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Mon Sep 19 15:13:18 2011
> @@ -87,9 +87,9 @@ public class UnprocessedMessage_0_8 exte
>     public void receiveBody(ContentBody body)
>     {
>
> -        if (body.payload != null)
> +        if (body._payload != null)
>         {
> -            final long payloadSize = body.payload.remaining();
> +            final long payloadSize = body._payload.length;
>
>             if (_bodies == null)
>             {
>
> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Sep 19 15:13:18 2011
> @@ -20,7 +20,9 @@
>  */
>  package org.apache.qpid.client.protocol;
>
> +import java.io.DataOutputStream;
>  import java.io.IOException;
> +import java.io.OutputStream;
>  import java.net.SocketAddress;
>  import java.nio.ByteBuffer;
>  import java.util.ArrayList;
> @@ -31,7 +33,6 @@ import java.util.concurrent.CountDownLat
>  import java.util.concurrent.ThreadFactory;
>  import java.util.concurrent.TimeUnit;
>
> -import org.apache.mina.filter.codec.ProtocolCodecException;
>  import org.apache.qpid.AMQConnectionClosedException;
>  import org.apache.qpid.AMQDisconnectedException;
>  import org.apache.qpid.AMQException;
> @@ -46,6 +47,7 @@ import org.apache.qpid.client.state.AMQS
>  import org.apache.qpid.client.state.StateWaiter;
>  import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
>  import org.apache.qpid.codec.AMQCodecFactory;
> +import org.apache.qpid.configuration.ClientProperties;
>  import org.apache.qpid.framing.AMQBody;
>  import org.apache.qpid.framing.AMQDataBlock;
>  import org.apache.qpid.framing.AMQFrame;
> @@ -57,8 +59,6 @@ import org.apache.qpid.framing.Heartbeat
>  import org.apache.qpid.framing.MethodRegistry;
>  import org.apache.qpid.framing.ProtocolInitiation;
>  import org.apache.qpid.framing.ProtocolVersion;
> -import org.apache.qpid.pool.Job;
> -import org.apache.qpid.pool.ReferenceCountingExecutorService;
>  import org.apache.qpid.protocol.AMQConstant;
>  import org.apache.qpid.protocol.AMQMethodEvent;
>  import org.apache.qpid.protocol.AMQMethodListener;
> @@ -164,19 +164,19 @@ public class AMQProtocolHandler implemen
>     private FailoverException _lastFailoverException;
>
>     /** Defines the default timeout to use for synchronous protocol commands. */
> -    private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30);
> +    private final long DEFAULT_SYNC_TIMEOUT = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
> +                                                           Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
> +                                                                        ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
>
>     /** Object to lock on when changing the latch */
>     private Object _failoverLatchChange = new Object();
>     private AMQCodecFactory _codecFactory;
> -    private Job _readJob;
> -    private Job _writeJob;
> -    private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
> +
>     private ProtocolVersion _suggestedProtocolVersion;
>
>     private long _writtenBytes;
>     private long _readBytes;
> -    private NetworkTransport _transport;
> +
>     private NetworkConnection _network;
>     private Sender<ByteBuffer> _sender;
>
> @@ -191,24 +191,6 @@ public class AMQProtocolHandler implemen
>         _protocolSession = new AMQProtocolSession(this, _connection);
>         _stateManager = new AMQStateManager(_protocolSession);
>         _codecFactory = new AMQCodecFactory(false, _protocolSession);
> -        _poolReference.setThreadFactory(new ThreadFactory()
> -        {
> -
> -            public Thread newThread(final Runnable runnable)
> -            {
> -                try
> -                {
> -                    return Threading.getThreadFactory().createThread(runnable);
> -                }
> -                catch (Exception e)
> -                {
> -                    throw new RuntimeException("Failed to create thread", e);
> -                }
> -            }
> -        });
> -        _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
> -        _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
> -        _poolReference.acquireExecutorService();
>         _failoverHandler = new FailoverHandler(this);
>     }
>
> @@ -329,17 +311,7 @@ public class AMQProtocolHandler implemen
>             }
>             else
>             {
> -
> -                if (cause instanceof ProtocolCodecException)
> -                {
> -                    _logger.info("Protocol Exception caught NOT going to attempt failover as " +
> -                                 "cause isn't AMQConnectionClosedException: " + cause, cause);
> -
> -                    AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
> -                    propagateExceptionToAllWaiters(amqe);
> -                }
>                 _connection.exceptionReceived(cause);
> -
>             }
>
>             // FIXME Need to correctly handle other exceptions. Things like ...
> @@ -433,76 +405,63 @@ public class AMQProtocolHandler implemen
>
>     public void received(ByteBuffer msg)
>     {
> +        _readBytes += msg.remaining();
>         try
>         {
> -            _readBytes += msg.remaining();
>             final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
>
> -            Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
> +            // Decode buffer
> +
> +            for (AMQDataBlock message : dataBlocks)
>             {
>
> -                public void run()
> -                {
> -                    // Decode buffer
> +                    if (PROTOCOL_DEBUG)
> +                    {
> +                        _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
> +                    }
>
> -                    for (AMQDataBlock message : dataBlocks)
> +                    if(message instanceof AMQFrame)
>                     {
> +                        final boolean debug = _logger.isDebugEnabled();
> +                        final long msgNumber = ++_messageReceivedCount;
>
> -                        try
> -                        {
> -                            if (PROTOCOL_DEBUG)
> -                            {
> -                                _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
> -                            }
> -
> -                            if(message instanceof AMQFrame)
> -                            {
> -                                final boolean debug = _logger.isDebugEnabled();
> -                                final long msgNumber = ++_messageReceivedCount;
> -
> -                                if (debug && ((msgNumber % 1000) == 0))
> -                                {
> -                                    _logger.debug("Received " + _messageReceivedCount + " protocol messages");
> -                                }
> -
> -                                AMQFrame frame = (AMQFrame) message;
> -
> -                                final AMQBody bodyFrame = frame.getBodyFrame();
> -
> -                                HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
> -
> -                                bodyFrame.handle(frame.getChannel(), _protocolSession);
> -
> -                                _connection.bytesReceived(_readBytes);
> -                            }
> -                            else if (message instanceof ProtocolInitiation)
> -                            {
> -                                // We get here if the server sends a response to our initial protocol header
> -                                // suggesting an alternate ProtocolVersion; the server will then close the
> -                                // connection.
> -                                ProtocolInitiation protocolInit = (ProtocolInitiation) message;
> -                                _suggestedProtocolVersion = protocolInit.checkVersion();
> -                                _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion);
> -
> -                                // get round a bug in old versions of qpid whereby the connection is not closed
> -                                _stateManager.changeState(AMQState.CONNECTION_CLOSED);
> -                            }
> -                        }
> -                        catch (Exception e)
> +                        if (debug && ((msgNumber % 1000) == 0))
>                         {
> -                            _logger.error("Exception processing frame", e);
> -                            propagateExceptionToFrameListeners(e);
> -                            exception(e);
> +                            _logger.debug("Received " + _messageReceivedCount + " protocol messages");
>                         }
> +
> +                        AMQFrame frame = (AMQFrame) message;
> +
> +                        final AMQBody bodyFrame = frame.getBodyFrame();
> +
> +                        HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
> +
> +                        bodyFrame.handle(frame.getChannel(), _protocolSession);
> +
> +                        _connection.bytesReceived(_readBytes);
> +                    }
> +                    else if (message instanceof ProtocolInitiation)
> +                    {
> +                        // We get here if the server sends a response to our initial protocol header
> +                        // suggesting an alternate ProtocolVersion; the server will then close the
> +                        // connection.
> +                        ProtocolInitiation protocolInit = (ProtocolInitiation) message;
> +                        _suggestedProtocolVersion = protocolInit.checkVersion();
> +                        _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion);
> +
> +                        // get round a bug in old versions of qpid whereby the connection is not closed
> +                        _stateManager.changeState(AMQState.CONNECTION_CLOSED);
>                     }
>                 }
> -            });
>         }
>         catch (Exception e)
>         {
> +            _logger.error("Exception processing frame", e);
>             propagateExceptionToFrameListeners(e);
>             exception(e);
>         }
> +
> +
>     }
>
>     public void methodBodyReceived(final int channelId, final AMQBody bodyFrame)
> @@ -568,17 +527,13 @@ public class AMQProtocolHandler implemen
>         writeFrame(frame, false);
>     }
>
> -    public void writeFrame(AMQDataBlock frame, boolean wait)
> +    public  synchronized void writeFrame(AMQDataBlock frame, boolean wait)
>     {
> -        final ByteBuffer buf = frame.toNioByteBuffer();
> +        final ByteBuffer buf = asByteBuffer(frame);
>         _writtenBytes += buf.remaining();
> -        Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
> -        {
> -            public void run()
> -            {
> -                _sender.send(buf);
> -            }
> -        });
> +        _sender.send(buf);
> +        _sender.flush();
> +
>         if (PROTOCOL_DEBUG)
>         {
>             _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame));
> @@ -595,12 +550,41 @@ public class AMQProtocolHandler implemen
>
>         _connection.bytesSent(_writtenBytes);
>
> -        if (wait)
> +    }
> +
> +    private ByteBuffer asByteBuffer(AMQDataBlock block)
> +    {
> +        final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
> +
> +        try
> +        {
> +            block.writePayload(new DataOutputStream(new OutputStream()
> +            {
> +
> +
> +                @Override
> +                public void write(int b) throws IOException
> +                {
> +                    buf.put((byte) b);
> +                }
> +
> +                @Override
> +                public void write(byte[] b, int off, int len) throws IOException
> +                {
> +                    buf.put(b, off, len);
> +                }
> +            }));
> +        }
> +        catch (IOException e)
>         {
> -            _sender.flush();
> +            throw new RuntimeException(e);
>         }
> +
> +        buf.flip();
> +        return buf;
>     }
>
> +
>     /**
>      * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
>      * calling getProtocolSession().write() then waiting for the response.
> @@ -723,7 +707,7 @@ public class AMQProtocolHandler implemen
>                 _logger.debug("FailoverException interrupted connection close, ignoring as connection   close anyway.");
>             }
>         }
> -        _poolReference.releaseExecutorService();
> +
>     }
>
>     /** @return the number of bytes read from this protocol session */
> @@ -841,8 +825,13 @@ public class AMQProtocolHandler implemen
>
>     public void setNetworkConnection(NetworkConnection network)
>     {
> +        setNetworkConnection(network, network.getSender());
> +    }
> +
> +    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
> +    {
>         _network = network;
> -        _sender = network.getSender();
> +        _sender = sender;
>     }
>
>     /** @param delay delay in seconds (not ms) */
>
> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java Mon Sep 19 15:13:18 2011
> @@ -20,17 +20,22 @@
>  */
>  package org.apache.qpid.client.security;
>
> -import org.apache.qpid.util.FileUtils;
> -
> -import org.slf4j.Logger;
> -import org.slf4j.LoggerFactory;
> -
>  import java.io.IOException;
>  import java.io.InputStream;
> +import java.util.Collection;
> +import java.util.Collections;
>  import java.util.Enumeration;
>  import java.util.HashMap;
> +import java.util.HashSet;
>  import java.util.Map;
>  import java.util.Properties;
> +import java.util.Set;
> +import java.util.StringTokenizer;
> +import java.util.TreeMap;
> +
> +import org.apache.qpid.util.FileUtils;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
>
>  /**
>  * CallbackHandlerRegistry is a registry for call back handlers for user authentication and interaction during user
> @@ -42,7 +47,7 @@ import java.util.Properties;
>  * "amp.callbackhandler.properties". The format of the properties file is:
>  *
>  * <p/><pre>
> - * CallbackHanlder.mechanism=fully.qualified.class.name
> + * CallbackHanlder.n.mechanism=fully.qualified.class.name where n is an ordinal
>  * </pre>
>  *
>  * <p/>Where mechanism is an IANA-registered mechanism name and the fully qualified class name refers to a
> @@ -66,51 +71,15 @@ public class CallbackHandlerRegistry
>     public static final String DEFAULT_RESOURCE_NAME = "org/apache/qpid/client/security/CallbackHandlerRegistry.properties";
>
>     /** A static reference to the singleton instance of this registry. */
> -    private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry();
> +    private static final CallbackHandlerRegistry _instance;
>
>     /** Holds a map from SASL mechanism names to call back handlers. */
> -    private Map<String, Class> _mechanismToHandlerClassMap = new HashMap<String, Class>();
> -
> -    /** Holds a space delimited list of mechanisms that callback handlers exist for. */
> -    private String _mechanisms;
> -
> -    /**
> -     * Gets the singleton instance of this registry.
> -     *
> -     * @return The singleton instance of this registry.
> -     */
> -    public static CallbackHandlerRegistry getInstance()
> -    {
> -        return _instance;
> -    }
> +    private Map<String, Class<AMQCallbackHandler>> _mechanismToHandlerClassMap = new HashMap<String, Class<AMQCallbackHandler>>();
>
> -    /**
> -     * Gets the callback handler class for a given SASL mechanism name.
> -     *
> -     * @param mechanism The SASL mechanism name.
> -     *
> -     * @return The callback handler class for the mechanism, or null if none is configured for that mechanism.
> -     */
> -    public Class getCallbackHandlerClass(String mechanism)
> -    {
> -        return (Class) _mechanismToHandlerClassMap.get(mechanism);
> -    }
> +    /** Ordered collection of mechanisms for which callback handlers exist. */
> +    private Collection<String> _mechanisms;
>
> -    /**
> -     * Gets a space delimited list of supported SASL mechanisms.
> -     *
> -     * @return A space delimited list of supported SASL mechanisms.
> -     */
> -    public String getMechanisms()
> -    {
> -        return _mechanisms;
> -    }
> -
> -    /**
> -     * Creates the call back handler registry from its configuration resource or file. This also has the side effect
> -     * of configuring and registering the SASL client factory implementations using {@link DynamicSaslRegistrar}.
> -     */
> -    private CallbackHandlerRegistry()
> +    static
>     {
>         // Register any configured SASL client factories.
>         DynamicSaslRegistrar.registerSaslProviders();
> @@ -120,12 +89,12 @@ public class CallbackHandlerRegistry
>             FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME,
>                 CallbackHandlerRegistry.class.getClassLoader());
>
> +        final Properties props = new Properties();
> +
>         try
>         {
> -            Properties props = new Properties();
> +
>             props.load(is);
> -            parseProperties(props);
> -            _logger.info("Callback handlers available for SASL mechanisms: " + _mechanisms);
>         }
>         catch (IOException e)
>         {
> @@ -146,32 +115,68 @@ public class CallbackHandlerRegistry
>                 }
>             }
>         }
> +
> +        _instance = new CallbackHandlerRegistry(props);
> +        _logger.info("Callback handlers available for SASL mechanisms: " + _instance._mechanisms);
> +
>     }
>
> -    /*private InputStream openPropertiesInputStream(String filename)
> +    /**
> +     * Gets the singleton instance of this registry.
> +     *
> +     * @return The singleton instance of this registry.
> +     */
> +    public static CallbackHandlerRegistry getInstance()
> +    {
> +        return _instance;
> +    }
> +
> +    public AMQCallbackHandler createCallbackHandler(final String mechanism)
>     {
> -        boolean useDefault = true;
> -        InputStream is = null;
> -        if (filename != null)
> +        final Class<AMQCallbackHandler> mechanismClass = _mechanismToHandlerClassMap.get(mechanism);
> +
> +        if (mechanismClass == null)
>         {
> -            try
> -            {
> -                is = new BufferedInputStream(new FileInputStream(new File(filename)));
> -                useDefault = false;
> -            }
> -            catch (FileNotFoundException e)
> -            {
> -                _logger.error("Unable to read from file " + filename + ": " + e, e);
> -            }
> +            throw new IllegalArgumentException("Mechanism " + mechanism + " not known");
>         }
>
> -        if (useDefault)
> +        try
> +        {
> +            return mechanismClass.newInstance();
> +        }
> +        catch (InstantiationException e)
> +        {
> +            throw new IllegalArgumentException("Unable to create an instance of mechanism " + mechanism, e);
> +        }
> +        catch (IllegalAccessException e)
>         {
> -            is = CallbackHandlerRegistry.class.getResourceAsStream(DEFAULT_RESOURCE_NAME);
> +            throw new IllegalArgumentException("Unable to create an instance of mechanism " + mechanism, e);
>         }
> +    }
>
> -        return is;
> -    }*/
> +    /**
> +     * Gets collections of supported SASL mechanism names, ordered by preference
> +     *
> +     * @return collection of SASL mechanism names.
> +     */
> +    public Collection<String> getMechanisms()
> +    {
> +        return Collections.unmodifiableCollection(_mechanisms);
> +    }
> +
> +    /**
> +     * Creates the call back handler registry from its configuration resource or file.
> +     *
> +     * This also has the side effect of configuring and registering the SASL client factory
> +     * implementations using {@link DynamicSaslRegistrar}.
> +     *
> +     * This constructor is default protection to allow for effective unit testing.  Clients must use
> +     * {@link #getInstance()} to obtain the singleton instance.
> +     */
> +    CallbackHandlerRegistry(final Properties props)
> +    {
> +        parseProperties(props);
> +    }
>
>     /**
>      * Scans the specified properties as a mapping from IANA registered SASL mechanism to call back handler
> @@ -183,20 +188,20 @@ public class CallbackHandlerRegistry
>      */
>     private void parseProperties(Properties props)
>     {
> +
> +        final Map<Integer, String> mechanisms = new TreeMap<Integer, String>();
> +
>         Enumeration e = props.propertyNames();
>         while (e.hasMoreElements())
>         {
> -            String propertyName = (String) e.nextElement();
> -            int period = propertyName.indexOf(".");
> -            if (period < 0)
> -            {
> -                _logger.warn("Unable to parse property " + propertyName + " when configuring SASL providers");
> +            final String propertyName = (String) e.nextElement();
> +            final String[] parts = propertyName.split("\\.", 2);
>
> -                continue;
> -            }
> +            checkPropertyNameFormat(propertyName, parts);
>
> -            String mechanism = propertyName.substring(period + 1);
> -            String className = props.getProperty(propertyName);
> +            final String mechanism = parts[0];
> +            final int ordinal = getPropertyOrdinal(propertyName, parts);
> +            final String className = props.getProperty(propertyName);
>             Class clazz = null;
>             try
>             {
> @@ -205,20 +210,11 @@ public class CallbackHandlerRegistry
>                 {
>                     _logger.warn("SASL provider " + clazz + " does not implement " + AMQCallbackHandler.class
>                         + ". Skipping");
> -
>                     continue;
>                 }
> -
>                 _mechanismToHandlerClassMap.put(mechanism, clazz);
> -                if (_mechanisms == null)
> -                {
> -                    _mechanisms = mechanism;
> -                }
> -                else
> -                {
> -                    // one time cost
> -                    _mechanisms = _mechanisms + " " + mechanism;
> -                }
> +
> +                mechanisms.put(ordinal, mechanism);
>             }
>             catch (ClassNotFoundException ex)
>             {
> @@ -227,5 +223,91 @@ public class CallbackHandlerRegistry
>                 continue;
>             }
>         }
> +
> +        _mechanisms = mechanisms.values();  // order guaranteed by keys of treemap (i.e. our ordinals)
> +
> +
> +    }
> +
> +    private void checkPropertyNameFormat(final String propertyName, final String[] parts)
> +    {
> +        if (parts.length != 2)
> +        {
> +            throw new IllegalArgumentException("Unable to parse property " + propertyName + " when configuring SASL providers");
> +        }
> +    }
> +
> +    private int getPropertyOrdinal(final String propertyName, final String[] parts)
> +    {
> +        try
> +        {
> +            return Integer.parseInt(parts[1]);
> +        }
> +        catch(NumberFormatException nfe)
> +        {
> +            throw new IllegalArgumentException("Unable to parse property " + propertyName + " when configuring SASL providers", nfe);
> +        }
> +    }
> +
> +    /**
> +     * Selects a SASL mechanism that is mutually available to both parties.  If more than one
> +     * mechanism is mutually available the one appearing first (by ordinal) will be returned.
> +     *
> +     * @param peerMechanismList space separated list of mechanisms
> +     * @return selected mechanism, or null if none available
> +     */
> +    public String selectMechanism(final String peerMechanismList)
> +    {
> +        final Set<String> peerList = mechListToSet(peerMechanismList);
> +
> +        return selectMechInternal(peerList, Collections.<String>emptySet());
> +    }
> +
> +    /**
> +     * Selects a SASL mechanism that is mutually available to both parties.
> +     *
> +     * @param peerMechanismList space separated list of mechanisms
> +     * @param restrictionList space separated list of mechanisms
> +     * @return selected mechanism, or null if none available
> +     */
> +    public String selectMechanism(final String peerMechanismList, final String restrictionList)
> +    {
> +        final Set<String> peerList = mechListToSet(peerMechanismList);
> +        final Set<String> restrictionSet = mechListToSet(restrictionList);
> +
> +        return selectMechInternal(peerList, restrictionSet);
> +    }
> +
> +    private String selectMechInternal(final Set<String> peerSet, final Set<String> restrictionSet)
> +    {
> +        for (final String mech : _mechanisms)
> +        {
> +            if (peerSet.contains(mech))
> +            {
> +                if (restrictionSet.isEmpty() || restrictionSet.contains(mech))
> +                {
> +                    return mech;
> +                }
> +            }
> +        }
> +
> +        return null;
> +    }
> +
> +    private Set<String> mechListToSet(final String mechanismList)
> +    {
> +        if (mechanismList == null)
> +        {
> +            return Collections.emptySet();
> +        }
> +
> +        final StringTokenizer tokenizer = new StringTokenizer(mechanismList, " ");
> +        final Set<String> mechanismSet = new HashSet<String>(tokenizer.countTokens());
> +        while (tokenizer.hasMoreTokens())
> +        {
> +            mechanismSet.add(tokenizer.nextToken());
> +        }
> +        return Collections.unmodifiableSet(mechanismSet);
>     }
> +
>  }
>
> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties (original)
> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties Mon Sep 19 15:13:18 2011
> @@ -16,7 +16,17 @@
>  # specific language governing permissions and limitations
>  # under the License.
>  #
> -CallbackHandler.CRAM-MD5-HASHED=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
> -CallbackHandler.CRAM-MD5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> -CallbackHandler.AMQPLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> -CallbackHandler.PLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> +
> +#
> +# Format:
> +# <mechanism name>.ordinal=<implementation>
> +#
> +# @see CallbackHandlerRegistry
> +#
> +
> +EXTERNAL.1=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> +GSSAPI.2=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> +CRAM-MD5-HASHED.3=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
> +CRAM-MD5.4=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> +AMQPLAIN.5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> +PLAIN.6=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
>
> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java Mon Sep 19 15:13:18 2011
> @@ -22,7 +22,7 @@ package org.apache.qpid.jms;
>
>  import java.util.Map;
>
> -import org.apache.qpid.client.SSLConfiguration;
> +import org.apache.qpid.transport.ConnectionSettings;
>
>  public interface BrokerDetails
>  {
> @@ -104,14 +104,12 @@ public interface BrokerDetails
>     long getTimeout();
>
>     void setTimeout(long timeout);
> -
> -    SSLConfiguration getSSLConfiguration();
> -
> -    void setSSLConfiguration(SSLConfiguration sslConfiguration);
>
>     boolean getBooleanProperty(String propName);
>
>     String toString();
>
>     boolean equals(Object o);
> +
> +    ConnectionSettings buildConnectionSettings();
>  }
>
> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java Mon Sep 19 15:13:18 2011
> @@ -140,7 +140,6 @@ public class FailoverExchangeMethod impl
>                         broker.setHost(tokens[1]);
>                         broker.setPort(Integer.parseInt(tokens[2]));
>                         broker.setProperties(_originalBrokerDetail.getProperties());
> -                        broker.setSSLConfiguration(_originalBrokerDetail.getSSLConfiguration());
>                         brokerList.add(broker);
>
>                         if (currentBrokerIP.equals(broker.getHost()) &&
>
> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Mon Sep 19 15:13:18 2011
> @@ -36,6 +36,7 @@ import javax.jms.Queue;
>  import javax.jms.Topic;
>  import javax.naming.Context;
>  import javax.naming.NamingException;
> +import javax.naming.ConfigurationException;
>  import javax.naming.spi.InitialContextFactory;
>
>  import org.apache.qpid.client.AMQConnectionFactory;
> @@ -139,7 +140,7 @@ public class PropertiesFileInitialContex
>         return new ReadOnlyContext(environment, data);
>     }
>
> -    protected void createConnectionFactories(Map data, Hashtable environment)
> +    protected void createConnectionFactories(Map data, Hashtable environment) throws ConfigurationException
>     {
>         for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();)
>         {
> @@ -157,7 +158,7 @@ public class PropertiesFileInitialContex
>         }
>     }
>
> -    protected void createDestinations(Map data, Hashtable environment)
> +    protected void createDestinations(Map data, Hashtable environment) throws ConfigurationException
>     {
>         for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();)
>         {
> @@ -225,7 +226,7 @@ public class PropertiesFileInitialContex
>     /**
>      * Factory method to create new Connection Factory instances
>      */
> -    protected ConnectionFactory createFactory(String url)
> +    protected ConnectionFactory createFactory(String url) throws ConfigurationException
>     {
>         try
>         {
> @@ -233,16 +234,18 @@ public class PropertiesFileInitialContex
>         }
>         catch (URLSyntaxException urlse)
>         {
> -            _logger.warn("Unable to createFactories:" + urlse);
> -        }
> +            _logger.warn("Unable to create factory:" + urlse);
>
> -        return null;
> +            ConfigurationException ex = new ConfigurationException("Failed to parse entry: " + urlse + " due to : " +  urlse.getMessage());
> +            ex.initCause(urlse);
> +            throw ex;
> +        }
>     }
>
>     /**
>      * Factory method to create new Destination instances from an AMQP BindingURL
>      */
> -    protected Destination createDestination(String str)
> +    protected Destination createDestination(String str) throws ConfigurationException
>     {
>         try
>         {
> @@ -252,7 +255,9 @@ public class PropertiesFileInitialContex
>         {
>             _logger.warn("Unable to create destination:" + e, e);
>
> -            return null;
> +            ConfigurationException ex = new ConfigurationException("Failed to parse entry: " + str + " due to : " +  e.getMessage());
> +            ex.initCause(e);
> +            throw ex;
>         }
>     }
>
>
> Modified: qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java Mon Sep 19 15:13:18 2011
> @@ -23,7 +23,6 @@ package org.apache.qpid.client;
>  import org.apache.qpid.AMQException;
>  import org.apache.qpid.client.state.AMQState;
>  import org.apache.qpid.framing.ProtocolVersion;
> -import org.apache.qpid.jms.ConnectionURL;
>  import org.apache.qpid.jms.BrokerDetails;
>  import org.apache.qpid.url.URLSyntaxException;
>
> @@ -37,48 +36,18 @@ public class MockAMQConnection extends A
>         super(broker, username, password, clientName, virtualHost);
>     }
>
> -    public MockAMQConnection(String broker, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig)
> -            throws AMQException, URLSyntaxException
> -    {
> -        super(broker, username, password, clientName, virtualHost, sslConfig);
> -    }
> -
>     public MockAMQConnection(String host, int port, String username, String password, String clientName, String virtualHost)
>             throws AMQException, URLSyntaxException
>     {
>         super(host, port, username, password, clientName, virtualHost);
>     }
>
> -    public MockAMQConnection(String host, int port, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig)
> -            throws AMQException, URLSyntaxException
> -    {
> -        super(host, port, username, password, clientName, virtualHost, sslConfig);
> -    }
> -
> -    public MockAMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig)
> -            throws AMQException, URLSyntaxException
> -    {
> -        super(host, port, useSSL, username, password, clientName, virtualHost, sslConfig);
> -    }
> -
>     public MockAMQConnection(String connection)
>             throws AMQException, URLSyntaxException
>     {
>         super(connection);
>     }
>
> -    public MockAMQConnection(String connection, SSLConfiguration sslConfig)
> -            throws AMQException, URLSyntaxException
> -    {
> -        super(connection, sslConfig);
> -    }
> -
> -    public MockAMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig)
> -            throws AMQException
> -    {
> -        super(connectionURL, sslConfig);
> -    }
> -
>     @Override
>     public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException
>     {
>
> Modified: qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java Mon Sep 19 15:13:18 2011
> @@ -43,4 +43,9 @@ public class TestMessageHelper
>     {
>         return new JMSStreamMessage(AMQMessageDelegateFactory.FACTORY_0_8);
>     }
> +
> +    public static JMSObjectMessage newJMSObjectMessage()
> +    {
> +        return new JMSObjectMessage(AMQMessageDelegateFactory.FACTORY_0_8);
> +    }
>  }
>
> Modified: qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java Mon Sep 19 15:13:18 2011
> @@ -21,10 +21,10 @@
>  package org.apache.qpid.test.unit.jndi;
>
>  import junit.framework.TestCase;
> +
>  import org.apache.qpid.client.AMQConnectionFactory;
>  import org.apache.qpid.jms.BrokerDetails;
>  import org.apache.qpid.jms.ConnectionURL;
> -import org.apache.qpid.url.URLSyntaxException;
>
>  public class ConnectionFactoryTest extends TestCase
>  {
> @@ -34,21 +34,9 @@ public class ConnectionFactoryTest exten
>     public static final String URL = "amqp://guest:guest@clientID/test?brokerlist='tcp://localhost:5672'";
>     public static final String URL_STAR_PWD = "amqp://guest:********@clientID/test?brokerlist='tcp://localhost:5672'";
>
> -    public void testConnectionURLString()
> +    public void testConnectionURLStringMasksPassword() throws Exception
>     {
> -        AMQConnectionFactory factory = new AMQConnectionFactory();
> -
> -        assertNull("ConnectionURL should have no value at start",
> -                   factory.getConnectionURL());
> -
> -        try
> -        {
> -            factory.setConnectionURLString(URL);
> -        }
> -        catch (URLSyntaxException e)
> -        {
> -            fail(e.getMessage());
> -        }
> +        AMQConnectionFactory factory = new AMQConnectionFactory(URL);
>
>         //URL will be returned with the password field swapped for '********'
>         assertEquals("Connection URL not correctly set", URL_STAR_PWD, factory.getConnectionURLString());
>
> Modified: qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java Mon Sep 19 15:13:18 2011
> @@ -24,6 +24,7 @@ import java.util.Properties;
>
>  import javax.jms.Queue;
>  import javax.jms.Topic;
> +import javax.naming.ConfigurationException;
>  import javax.naming.Context;
>  import javax.naming.InitialContext;
>
> @@ -67,4 +68,22 @@ public class JNDIPropertyFileTest extend
>             assertEquals("Topic" + i + "WithSpace",bindingKey.asString());
>         }
>     }
> +
> +    public void testConfigurationErrors() throws Exception
> +    {
> +        Properties properties = new Properties();
> +        properties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
> +        properties.put("destination.my-queue","amq.topic/test;create:always}");
> +
> +        try
> +        {
> +            ctx = new InitialContext(properties);
> +            fail("A configuration exception should be thrown with details about the address syntax error");
> +        }
> +        catch(ConfigurationException e)
> +        {
> +            assertTrue("Incorrect exception", e.getMessage().contains("Failed to parse entry: amq.topic/test;create:always}"));
> +        }
> +
> +    }
>  }
>
> Modified: qpid/branches/qpid-3346/qpid/java/common.xml
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common.xml?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/common.xml (original)
> +++ qpid/branches/qpid-3346/qpid/java/common.xml Mon Sep 19 15:13:18 2011
> @@ -132,8 +132,6 @@
>        </sequential>
>   </macrodef>
>
> -
> -
>   <macrodef name="jython">
>     <attribute name="path"/>
>     <element name="args"/>
>
> Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java Mon Sep 19 15:13:18 2011
> @@ -20,9 +20,6 @@
>  */
>  package org.apache.qpid.codec;
>
> -import org.apache.mina.filter.codec.ProtocolCodecFactory;
> -import org.apache.mina.filter.codec.ProtocolDecoder;
> -import org.apache.mina.filter.codec.ProtocolEncoder;
>  import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
>
>  /**
> @@ -31,14 +28,11 @@ import org.apache.qpid.protocol.AMQVersi
>  *
>  * <p/><table id="crc"><caption>CRC Card</caption>
>  * <tr><th> Responsibilities <th> Collaborations.
> - * <tr><td> Supply the protocol encoder. <td> {@link AMQEncoder}
>  * <tr><td> Supply the protocol decoder. <td> {@link AMQDecoder}
>  * </table>
>  */
> -public class AMQCodecFactory implements ProtocolCodecFactory
> +public class AMQCodecFactory
>  {
> -    /** Holds the protocol encoder. */
> -    private final AMQEncoder _encoder = new AMQEncoder();
>
>     /** Holds the protocol decoder. */
>     private final AMQDecoder _frameDecoder;
> @@ -56,15 +50,6 @@ public class AMQCodecFactory implements
>         _frameDecoder = new AMQDecoder(expectProtocolInitiation, session);
>     }
>
> -    /**
> -     * Gets the AMQP encoder.
> -     *
> -     * @return The AMQP encoder.
> -     */
> -    public ProtocolEncoder getEncoder()
> -    {
> -        return _encoder;
> -    }
>
>     /**
>      * Gets the AMQP decoder.
>
> Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Mon Sep 19 15:13:18 2011
> @@ -20,13 +20,9 @@
>  */
>  package org.apache.qpid.codec;
>
> -import java.util.ArrayList;
> -
> -import org.apache.mina.common.ByteBuffer;
> -import org.apache.mina.common.IoSession;
> -import org.apache.mina.common.SimpleByteBufferAllocator;
> -import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
> -import org.apache.mina.filter.codec.ProtocolDecoderOutput;
> +import java.io.*;
> +import java.nio.ByteBuffer;
> +import java.util.*;
>
>  import org.apache.qpid.framing.AMQDataBlock;
>  import org.apache.qpid.framing.AMQDataBlockDecoder;
> @@ -54,11 +50,8 @@ import org.apache.qpid.protocol.AMQVersi
>  * @todo If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the
>  *       per-session overhead.
>  */
> -public class AMQDecoder extends CumulativeProtocolDecoder
> +public class AMQDecoder
>  {
> -
> -    private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer";
> -
>     /** Holds the 'normal' AMQP data decoder. */
>     private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
>
> @@ -67,12 +60,11 @@ public class AMQDecoder extends Cumulati
>
>     /** Flag to indicate whether this decoder needs to handle protocol initiation. */
>     private boolean _expectProtocolInitiation;
> -    private boolean firstDecode = true;
>
>     private AMQMethodBodyFactory _bodyFactory;
>
> -    private ByteBuffer _remainingBuf;
> -
> +    private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>();
> +
>     /**
>      * Creates a new AMQP decoder.
>      *
> @@ -84,98 +76,7 @@ public class AMQDecoder extends Cumulati
>         _bodyFactory = new AMQMethodBodyFactory(session);
>     }
>
> -    /**
> -     * Delegates decoding AMQP from the data buffer that Mina has retrieved from the wire, to the data or protocol
> -     * intiation decoders.
> -     *
> -     * @param session The Mina session.
> -     * @param in      The raw byte buffer.
> -     * @param out     The Mina object output gatherer to write decoded objects to.
> -     *
> -     * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
> -     *
> -     * @throws Exception If the data cannot be decoded for any reason.
> -     */
> -    protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
> -    {
> -
> -        boolean decoded;
> -        if (_expectProtocolInitiation
> -            || (firstDecode
> -                && (in.remaining() > 0)
> -                && (in.get(in.position()) == (byte)'A')))
> -        {
> -            decoded = doDecodePI(session, in, out);
> -        }
> -        else
> -        {
> -            decoded = doDecodeDataBlock(session, in, out);
> -        }
> -        if(firstDecode && decoded)
> -        {
> -            firstDecode = false;
> -        }
> -        return decoded;
> -    }
> -
> -    /**
> -     * Decodes AMQP data, delegating the decoding to an {@link AMQDataBlockDecoder}.
> -     *
> -     * @param session The Mina session.
> -     * @param in      The raw byte buffer.
> -     * @param out     The Mina object output gatherer to write decoded objects to.
> -     *
> -     * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
> -     *
> -     * @throws Exception If the data cannot be decoded for any reason.
> -     */
> -    protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
> -    {
> -        int pos = in.position();
> -        boolean enoughData = _dataBlockDecoder.decodable(in.buf());
> -        in.position(pos);
> -        if (!enoughData)
> -        {
> -            // returning false means it will leave the contents in the buffer and
> -            // call us again when more data has been read
> -            return false;
> -        }
> -        else
> -        {
> -            _dataBlockDecoder.decode(session, in, out);
> -
> -            return true;
> -        }
> -    }
> -
> -    /**
> -     * Decodes an AMQP initiation, delegating the decoding to a {@link ProtocolInitiation.Decoder}.
> -     *
> -     * @param session The Mina session.
> -     * @param in      The raw byte buffer.
> -     * @param out     The Mina object output gatherer to write decoded objects to.
> -     *
> -     * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
> -     *
> -     * @throws Exception If the data cannot be decoded for any reason.
> -     */
> -    private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
> -    {
> -        boolean enoughData = _piDecoder.decodable(in.buf());
> -        if (!enoughData)
> -        {
> -            // returning false means it will leave the contents in the buffer and
> -            // call us again when more data has been read
> -            return false;
> -        }
> -        else
> -        {
> -            ProtocolInitiation pi = new ProtocolInitiation(in.buf());
> -            out.write(pi);
>
> -            return true;
> -        }
> -    }
>
>     /**
>      * Sets the protocol initation flag, that determines whether decoding is handled by the data decoder of the protocol
> @@ -189,151 +90,168 @@ public class AMQDecoder extends Cumulati
>         _expectProtocolInitiation = expectProtocolInitiation;
>     }
>
> -
> -    /**
> -     * Cumulates content of <tt>in</tt> into internal buffer and forwards
> -     * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
> -     * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
> -     * and the cumulative buffer is compacted after decoding ends.
> -     *
> -     * @throws IllegalStateException if your <tt>doDecode()</tt> returned
> -     *                               <tt>true</tt> not consuming the cumulative buffer.
> -     */
> -    public void decode( IoSession session, ByteBuffer in,
> -                        ProtocolDecoderOutput out ) throws Exception
> +    private class RemainingByteArrayInputStream extends InputStream
>     {
> -        ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
> -        // if we have a session buffer, append data to that otherwise
> -        // use the buffer read from the network directly
> -        if( buf != null )
> -        {
> -            buf.put( in );
> -            buf.flip();
> -        }
> -        else
> +        private int _currentListPos;
> +        private int _markPos;
> +
> +
> +        @Override
> +        public int read() throws IOException
>         {
> -            buf = in;
> +            ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
> +            if(currentStream.available() > 0)
> +            {
> +                return currentStream.read();
> +            }
> +            else if((_currentListPos == _remainingBufs.size())
> +                    || (++_currentListPos == _remainingBufs.size()))
> +            {
> +                return -1;
> +            }
> +            else
> +            {
> +
> +                ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
> +                stream.mark(0);
> +                return stream.read();
> +            }
>         }
>
> -        for( ;; )
> +        @Override
> +        public int read(final byte[] b, final int off, final int len) throws IOException
>         {
> -            int oldPos = buf.position();
> -            boolean decoded = doDecode( session, buf, out );
> -            if( decoded )
> +
> +            if(_currentListPos == _remainingBufs.size())
>             {
> -                if( buf.position() == oldPos )
> +                return -1;
> +            }
> +            else
> +            {
> +                ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
> +                final int available = currentStream.available();
> +                int read = currentStream.read(b, off, len > available ? available : len);
> +                if(read < len)
>                 {
> -                    throw new IllegalStateException(
> -                            "doDecode() can't return true when buffer is not consumed." );
> +                    if(_currentListPos++ != _remainingBufs.size())
> +                    {
> +                        _remainingBufs.get(_currentListPos).mark(0);
> +                    }
> +                    int correctRead = read == -1 ? 0 : read;
> +                    int subRead = read(b, off+correctRead, len-correctRead);
> +                    if(subRead == -1)
> +                    {
> +                        return read;
> +                    }
> +                    else
> +                    {
> +                        return correctRead+subRead;
> +                    }
>                 }
> -
> -                if( !buf.hasRemaining() )
> +                else
>                 {
> -                    break;
> +                    return len;
>                 }
>             }
> -            else
> -            {
> -                break;
> -            }
>         }
>
> -        // if there is any data left that cannot be decoded, we store
> -        // it in a buffer in the session and next time this decoder is
> -        // invoked the session buffer gets appended to
> -        if ( buf.hasRemaining() )
> +        @Override
> +        public int available() throws IOException
>         {
> -            storeRemainingInSession( buf, session );
> +            int total = 0;
> +            for(int i = _currentListPos; i < _remainingBufs.size(); i++)
> +            {
> +                total += _remainingBufs.get(i).available();
> +            }
> +            return total;
>         }
> -        else
> +
> +        @Override
> +        public void mark(final int readlimit)
>         {
> -            removeSessionBuffer( session );
> +            _markPos = _currentListPos;
> +            final ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
> +            if(stream != null)
> +            {
> +                stream.mark(readlimit);
> +            }
>         }
> -    }
> -
> -    /**
> -     * Releases the cumulative buffer used by the specified <tt>session</tt>.
> -     * Please don't forget to call <tt>super.dispose( session )</tt> when
> -     * you override this method.
> -     */
> -    public void dispose( IoSession session ) throws Exception
> -    {
> -        removeSessionBuffer( session );
> -    }
>
> -    private void removeSessionBuffer(IoSession session)
> -    {
> -        ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
> -        if( buf != null )
> +        @Override
> +        public void reset() throws IOException
>         {
> -            buf.release();
> -            session.removeAttribute( BUFFER );
> +            _currentListPos = _markPos;
> +            final int size = _remainingBufs.size();
> +            if(_currentListPos < size)
> +            {
> +                _remainingBufs.get(_currentListPos).reset();
> +            }
> +            for(int i = _currentListPos+1; i<size; i++)
> +            {
> +                _remainingBufs.get(i).reset();
> +            }
>         }
>     }
>
> -    private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
> -
> -    private void storeRemainingInSession(ByteBuffer buf, IoSession session)
> -    {
> -        ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false );
> -        remainingBuf.setAutoExpand( true );
> -        remainingBuf.put( buf );
> -        session.setAttribute( BUFFER, remainingBuf );
> -    }
>
> -    public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException
> +    public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
>     {
>
>         // get prior remaining data from accumulator
>         ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
> -        ByteBuffer msg;
> -        // if we have a session buffer, append data to that otherwise
> -        // use the buffer read from the network directly
> -        if( _remainingBuf != null )
> +        DataInputStream msg;
> +
> +
> +        ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
> +        if(!_remainingBufs.isEmpty())
>         {
> -            _remainingBuf.put(buf);
> -            _remainingBuf.flip();
> -            msg = _remainingBuf;
> +            _remainingBufs.add(bais);
> +            msg = new DataInputStream(new RemainingByteArrayInputStream());
>         }
>         else
>         {
> -            msg = ByteBuffer.wrap(buf);
> +            msg = new DataInputStream(bais);
>         }
> -
> -        if (_expectProtocolInitiation
> -            || (firstDecode
> -                && (msg.remaining() > 0)
> -                && (msg.get(msg.position()) == (byte)'A')))
> -        {
> -            if (_piDecoder.decodable(msg.buf()))
> -            {
> -                dataBlocks.add(new ProtocolInitiation(msg.buf()));
> -            }
> -        }
> -        else
> +
> +        boolean enoughData = true;
> +        while (enoughData)
>         {
> -            boolean enoughData = true;
> -            while (enoughData)
> +            if(!_expectProtocolInitiation)
>             {
> -                int pos = msg.position();
> -
>                 enoughData = _dataBlockDecoder.decodable(msg);
> -                msg.position(pos);
>                 if (enoughData)
>                 {
>                     dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg));
>                 }
> -                else
> +            }
> +            else
> +            {
> +                enoughData = _piDecoder.decodable(msg);
> +                if (enoughData)
>                 {
> -                    _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false);
> -                    _remainingBuf.setAutoExpand(true);
> -                    _remainingBuf.put(msg);
> +                    dataBlocks.add(new ProtocolInitiation(msg));
> +                }
> +
> +            }
> +
> +            if(!enoughData)
> +            {
> +                if(!_remainingBufs.isEmpty())
> +                {
> +                    _remainingBufs.remove(_remainingBufs.size()-1);
> +                    ListIterator<ByteArrayInputStream> iterator = _remainingBufs.listIterator();
> +                    while(iterator.hasNext() && iterator.next().available() == 0)
> +                    {
> +                        iterator.remove();
> +                    }
> +                }
> +                if(bais.available()!=0)
> +                {
> +                    byte[] remaining = new byte[bais.available()];
> +                    bais.read(remaining);
> +                    _remainingBufs.add(new ByteArrayInputStream(remaining));
>                 }
>             }
> -        }
> -        if(firstDecode && dataBlocks.size() > 0)
> -        {
> -            firstDecode = false;
>         }
>         return dataBlocks;
>     }
>
> Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java Mon Sep 19 15:13:18 2011
> @@ -23,7 +23,7 @@ package org.apache.qpid.configuration;
>  */
>  public class ClientProperties
>  {
> -
> +
>     /**
>      * Currently with Qpid it is not possible to change the client ID.
>      * If one is not specified upon connection construction, an id is generated automatically.
> @@ -68,38 +68,50 @@ public class ClientProperties
>      * by the broker in TuneOK it will be used as the heartbeat interval.
>      * If not a warning will be printed and the max value specified for
>      * heartbeat in TuneOK will be used
> -     *
> +     *
>      * The default idle timeout is set to 120 secs
>      */
>     public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout";
>     public static final long DEFAULT_IDLE_TIMEOUT = 120000;
> -
> +
>     public static final String HEARTBEAT = "qpid.heartbeat";
>     public static final int HEARTBEAT_DEFAULT = 120;
> -
> +
>     /**
>      * This value will be used to determine the default destination syntax type.
>      * Currently the two types are Binding URL (java only) and the Addressing format (used by
> -     * all clients).
> +     * all clients).
>      */
>     public static final String DEST_SYNTAX = "qpid.dest_syntax";
> -
> +
>     public static final String USE_LEGACY_MAP_MESSAGE_FORMAT = "qpid.use_legacy_map_message";
>
>     public static final String AMQP_VERSION = "qpid.amqp.version";
> -
> -    private static ClientProperties _instance = new ClientProperties();
> -
> +
> +    public static final String QPID_VERIFY_CLIENT_ID = "qpid.verify_client_id";
> +
> +    /**
> +     * System properties to change the default timeout used during
> +     * synchronous operations.
> +     */
> +    public static final String QPID_SYNC_OP_TIMEOUT = "qpid.sync_op_timeout";
> +    public static final String AMQJ_DEFAULT_SYNCWRITE_TIMEOUT = "amqj.default_syncwrite_timeout";
> +
> +    /**
> +     * A default timeout value for synchronous operations
> +     */
> +    public static final int DEFAULT_SYNC_OPERATION_TIMEOUT = 60000;
> +
>     /*
> -    public static final QpidProperty<Boolean>  IGNORE_SET_CLIENTID_PROP_NAME =
> +    public static final QpidProperty<Boolean>  IGNORE_SET_CLIENTID_PROP_NAME =
>         QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID");
> -
> +
>     public static final QpidProperty<Boolean> SYNC_PERSISTENT_PROP_NAME =
>         QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence");
> -
> -
> +
> +
>     public static final QpidProperty<Integer> MAX_PREFETCH_PROP_NAME =
>         QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch"); */
> -
> -
> +
> +
>  }
>
> Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java Mon Sep 19 15:13:18 2011
> @@ -20,7 +20,9 @@
>  */
>  package org.apache.qpid.framing;
>
> -import org.apache.mina.common.ByteBuffer;
> +import java.io.DataOutputStream;
> +import java.io.IOException;
> +
>  import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
>  import org.apache.qpid.AMQException;
>
> @@ -34,7 +36,7 @@ public interface AMQBody
>      */
>     public abstract int getSize();
>
> -    public void writePayload(ByteBuffer buffer);
> +    public void writePayload(DataOutputStream buffer) throws IOException;
>
> -    void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException;
> +    void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException;
>  }
>
> Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java Mon Sep 19 15:13:18 2011
> @@ -20,7 +20,10 @@
>  */
>  package org.apache.qpid.framing;
>
> -import org.apache.mina.common.ByteBuffer;
> +import java.io.DataInputStream;
> +import java.io.DataOutputStream;
> +import java.io.IOException;
> +
>
>  /**
>  * A data block represents something that has a size in bytes and the ability to write itself to a byte
> @@ -39,25 +42,6 @@ public abstract class AMQDataBlock imple
>      * Writes the datablock to the specified buffer.
>      * @param buffer
>      */
> -    public abstract void writePayload(ByteBuffer buffer);
> -
> -    public ByteBuffer toByteBuffer()
> -    {
> -        final ByteBuffer buffer = ByteBuffer.allocate((int)getSize());
> -
> -        writePayload(buffer);
> -        buffer.flip();
> -        return buffer;
> -    }
> -
> -    public java.nio.ByteBuffer toNioByteBuffer()
> -    {
> -        final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize());
> -
> -        ByteBuffer buf = ByteBuffer.wrap(buffer);
> -        writePayload(buf);
> -        buffer.flip();
> -        return buffer;
> -    }
> +    public abstract void writePayload(DataOutputStream buffer) throws IOException;
>
>  }
>
> Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> ==============================================================================
> --- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original)
> +++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Mon Sep 19 15:13:18 2011
> @@ -20,18 +20,14 @@
>  */
>  package org.apache.qpid.framing;
>
> -import org.apache.mina.common.ByteBuffer;
> -import org.apache.mina.common.IoSession;
> -import org.apache.mina.filter.codec.ProtocolDecoderOutput;
> -
> -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
> -
>  import org.slf4j.Logger;
>  import org.slf4j.LoggerFactory;
>
> +import java.io.DataInputStream;
> +import java.io.IOException;
> +
>  public class AMQDataBlockDecoder
>  {
> -    private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY";
>
>     private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
>
> @@ -47,27 +43,32 @@ public class AMQDataBlockDecoder
>     public AMQDataBlockDecoder()
>     { }
>
> -    public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException
> +    public boolean decodable(DataInputStream in) throws AMQFrameDecodingException, IOException
>     {
> -        final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
> +        final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1);
>         // type, channel, body length and end byte
>         if (remainingAfterAttributes < 0)
>         {
>             return false;
>         }
>
> -        in.position(in.position() + 1 + 2);
> +        in.mark(8);
> +        in.skip(1 + 2);
> +
> +
>         // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt()
> -        final long bodySize = in.getInt() & 0xffffffffL;
> +        final long bodySize = in.readInt() & 0xffffffffL;
> +
> +        in.reset();
>
>         return (remainingAfterAttributes >= bodySize);
>
>     }
>
> -    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in)
> -        throws AMQFrameDecodingException, AMQProtocolVersionException
> +    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, DataInputStream in)
> +            throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
>     {
> -        final byte type = in.get();
> +        final byte type = in.readByte();
>
>         BodyFactory bodyFactory;
>         if (type == AMQMethodBody.TYPE)
> @@ -84,8 +85,8 @@ public class AMQDataBlockDecoder
>             throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
>         }
>
> -        final int channel = in.getUnsignedShort();
> -        final long bodySize = in.getUnsignedInt();
> +        final int channel = in.readUnsignedShort();
> +        final long bodySize = EncodingUtils.readUnsignedInteger(in);
>
>         // bodySize can be zero
>         if ((channel < 0) || (bodySize < 0))
> @@ -96,7 +97,7 @@ public class AMQDataBlockDecoder
>
>         AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
>
> -        byte marker = in.get();
> +        byte marker = in.readByte();
>         if ((marker & 0xFF) != 0xCE)
>         {
>             throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
> @@ -106,26 +107,4 @@ public class AMQDataBlockDecoder
>         return frame;
>     }
>
> -    public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
> -    {
> -        AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
> -        if (bodyFactory == null)
> -        {
> -            AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
> -            bodyFactory = new AMQMethodBodyFactory(protocolSession);
> -            session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
> -        }
> -
> -        out.write(createAndPopulateFrame(bodyFactory, in));
> -    }
> -
> -    public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException
> -    {
> -        return decodable(msg.buf());
> -    }
> -
> -    public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException
> -    {
> -        return createAndPopulateFrame(factory, ByteBuffer.wrap(msg));
> -    }
>  }
>
>
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:commits-subscribe@qpid.apache.org
>
>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Mime
View raw message