qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rajith Attapattu <rajit...@gmail.com>
Subject Re: svn commit: r1172657 [14/21] - in /qpid/branches/qpid-3346/qpid: ./ cpp/ cpp/bindings/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qpid/dotnet/ cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/Properties/ cpp/bindings/qpid/dotnet/example
Date Mon, 19 Sep 2011 16:02:31 GMT
Ken,

Apologies, I didn't realize that you were merging from trunk not into it :)
Thanks Gordon for pointing it out.

Regards,

Rajith

On Mon, Sep 19, 2011 at 11:48 AM, Rajith Attapattu <rajith77@gmail.com> wrote:
> Ken,
>
> I assume this wasn't intentional and was merely a side effect of the merge ?
> I didn't look at the diff very closely, but I wonder what the
> implication of these changes would be.
>
> Looking further it seems a lot of classes on the java client and
> broker have changed.
> I think in order to be on the safe side, these changes should be backed out.
> If I am not mistaken that would not impact anything as I believe your
> work didn't involve the java side.
>
> Rajith
>
> On Mon, Sep 19, 2011 at 11:13 AM,  <kgiusti@apache.org> wrote:
>> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Mon Sep 19 15:13:18 2011
>> @@ -23,7 +23,8 @@ package org.apache.qpid.client.message;
>>  import javax.jms.JMSException;
>>  import javax.jms.StreamMessage;
>>
>> -import org.apache.mina.common.ByteBuffer;
>> +import java.nio.ByteBuffer;
>> +
>>  import org.apache.qpid.AMQException;
>>  import org.apache.qpid.framing.AMQShortString;
>>  import org.apache.qpid.framing.BasicContentHeaderProperties;
>> @@ -36,65 +37,76 @@ public class JMSStreamMessage extends Ab
>>     public static final String MIME_TYPE="jms/stream-message";
>>
>>
>> -
>> -    /**
>> -     * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
>> -     * a byte array in multiple chunks, hence this is used to track how much is left to be read
>> -     */
>> -    private int _byteArrayRemaining = -1;
>> +    private TypedBytesContentReader _typedBytesContentReader;
>> +    private TypedBytesContentWriter _typedBytesContentWriter;
>>
>>     public JMSStreamMessage(AMQMessageDelegateFactory delegateFactory)
>>     {
>> -        this(delegateFactory,null);
>> +        super(delegateFactory,false);
>> +        _typedBytesContentWriter = new TypedBytesContentWriter();
>>
>>     }
>>
>> -    /**
>> -     * Construct a stream message with existing data.
>> -     *
>> -     * @param delegateFactory
>> -     * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
>> -     */
>> -    JMSStreamMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
>> -    {
>>
>> -        super(delegateFactory, data); // this instanties a content header
>> -    }
>>
>>     JMSStreamMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
>>     {
>> -
>> -        super(delegate, data);
>> +        super(delegate, data!=null);
>> +        _typedBytesContentReader = new TypedBytesContentReader(data);
>>     }
>>
>> -
>>     public void reset()
>>     {
>> -        super.reset();
>>         _readableMessage = true;
>> +
>> +        if(_typedBytesContentReader != null)
>> +        {
>> +            _typedBytesContentReader.reset();
>> +        }
>> +        else if (_typedBytesContentWriter != null)
>> +        {
>> +            _typedBytesContentReader = new TypedBytesContentReader(_typedBytesContentWriter.getData());
>> +        }
>> +    }
>> +
>> +    @Override
>> +    public void clearBody() throws JMSException
>> +    {
>> +        super.clearBody();
>> +        _typedBytesContentReader = null;
>> +        _typedBytesContentWriter = new TypedBytesContentWriter();
>> +
>>     }
>>
>> +
>>     protected String getMimeType()
>>     {
>>         return MIME_TYPE;
>>     }
>>
>> -
>> +    @Override
>> +    public java.nio.ByteBuffer getData() throws JMSException
>> +    {
>> +        return _typedBytesContentWriter == null ? _typedBytesContentReader.getData() : _typedBytesContentWriter.getData();
>> +    }
>>
>>     public boolean readBoolean() throws JMSException
>>     {
>> -        return super.readBoolean();
>> +        checkReadable();
>> +        return _typedBytesContentReader.readBoolean();
>>     }
>>
>>
>>     public byte readByte() throws JMSException
>>     {
>> -        return super.readByte();
>> +        checkReadable();
>> +        return _typedBytesContentReader.readByte();
>>     }
>>
>>     public short readShort() throws JMSException
>>     {
>> -        return super.readShort();
>> +        checkReadable();
>> +        return _typedBytesContentReader.readShort();
>>     }
>>
>>     /**
>> @@ -105,102 +117,127 @@ public class JMSStreamMessage extends Ab
>>      */
>>     public char readChar() throws JMSException
>>     {
>> -        return super.readChar();
>> +        checkReadable();
>> +        return _typedBytesContentReader.readChar();
>>     }
>>
>>     public int readInt() throws JMSException
>>     {
>> -        return super.readInt();
>> +        checkReadable();
>> +        return _typedBytesContentReader.readInt();
>>     }
>>
>>     public long readLong() throws JMSException
>>     {
>> -        return super.readLong();
>> +        checkReadable();
>> +        return _typedBytesContentReader.readLong();
>>     }
>>
>>     public float readFloat() throws JMSException
>>     {
>> -        return super.readFloat();
>> +        checkReadable();
>> +        return _typedBytesContentReader.readFloat();
>>     }
>>
>>     public double readDouble() throws JMSException
>>     {
>> -        return super.readDouble();
>> +        checkReadable();
>> +        return _typedBytesContentReader.readDouble();
>>     }
>>
>>     public String readString() throws JMSException
>>     {
>> -        return super.readString();
>> +        checkReadable();
>> +        return _typedBytesContentReader.readString();
>>     }
>>
>>     public int readBytes(byte[] bytes) throws JMSException
>>     {
>> -        return super.readBytes(bytes);
>> +        if(bytes == null)
>> +        {
>> +            throw new IllegalArgumentException("Must provide non-null array to read into");
>> +        }
>> +
>> +        checkReadable();
>> +        return _typedBytesContentReader.readBytes(bytes);
>>     }
>>
>>
>>     public Object readObject() throws JMSException
>>     {
>> -        return super.readObject();
>> +        checkReadable();
>> +        return _typedBytesContentReader.readObject();
>>     }
>>
>>     public void writeBoolean(boolean b) throws JMSException
>>     {
>> -        super.writeBoolean(b);
>> +        checkWritable();
>> +        _typedBytesContentWriter.writeBoolean(b);
>>     }
>>
>>     public void writeByte(byte b) throws JMSException
>>     {
>> -        super.writeByte(b);
>> +        checkWritable();
>> +        _typedBytesContentWriter.writeByte(b);
>>     }
>>
>>     public void writeShort(short i) throws JMSException
>>     {
>> -        super.writeShort(i);
>> +        checkWritable();
>> +        _typedBytesContentWriter.writeShort(i);
>>     }
>>
>>     public void writeChar(char c) throws JMSException
>>     {
>> -        super.writeChar(c);
>> +        checkWritable();
>> +        _typedBytesContentWriter.writeChar(c);
>>     }
>>
>>     public void writeInt(int i) throws JMSException
>>     {
>> -        super.writeInt(i);
>> +        checkWritable();
>> +        _typedBytesContentWriter.writeInt(i);
>>     }
>>
>>     public void writeLong(long l) throws JMSException
>>     {
>> -        super.writeLong(l);
>> +        checkWritable();
>> +        _typedBytesContentWriter.writeLong(l);
>>     }
>>
>>     public void writeFloat(float v) throws JMSException
>>     {
>> -        super.writeFloat(v);
>> +        checkWritable();
>> +        _typedBytesContentWriter.writeFloat(v);
>>     }
>>
>>     public void writeDouble(double v) throws JMSException
>>     {
>> -        super.writeDouble(v);
>> +        checkWritable();
>> +        _typedBytesContentWriter.writeDouble(v);
>>     }
>>
>>     public void writeString(String string) throws JMSException
>>     {
>> -        super.writeString(string);
>> +        checkWritable();
>> +        _typedBytesContentWriter.writeString(string);
>>     }
>>
>>     public void writeBytes(byte[] bytes) throws JMSException
>>     {
>> -        super.writeBytes(bytes);
>> +        checkWritable();
>> +        _typedBytesContentWriter.writeBytes(bytes);
>>     }
>>
>>     public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
>>     {
>> -        super.writeBytes(bytes,offset,length);
>> +        checkWritable();
>> +        _typedBytesContentWriter.writeBytes(bytes, offset, length);
>>     }
>>
>>     public void writeObject(Object object) throws JMSException
>>     {
>> -        super.writeObject(object);
>> +        checkWritable();
>> +        _typedBytesContentWriter.writeObject(object);
>>     }
>>  }
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java Mon Sep 19 15:13:18 2011
>> @@ -22,10 +22,9 @@ package org.apache.qpid.client.message;
>>
>>  import javax.jms.JMSException;
>>
>> -import org.apache.mina.common.ByteBuffer;
>> +import java.nio.ByteBuffer;
>> +
>>  import org.apache.qpid.AMQException;
>> -import org.apache.qpid.framing.AMQShortString;
>> -import org.apache.qpid.framing.BasicContentHeaderProperties;
>>
>>  public class JMSStreamMessageFactory extends AbstractJMSMessageFactory
>>  {
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Mon Sep 19 15:13:18 2011
>> @@ -20,15 +20,21 @@
>>  */
>>  package org.apache.qpid.client.message;
>>
>> +import java.io.DataInputStream;
>>  import java.io.UnsupportedEncodingException;
>> +import java.nio.ByteBuffer;
>> +import java.nio.CharBuffer;
>>  import java.nio.charset.CharacterCodingException;
>>  import java.nio.charset.Charset;
>> +import java.nio.charset.CharsetDecoder;
>> +import java.nio.charset.CharsetEncoder;
>>
>>  import javax.jms.JMSException;
>> +import javax.jms.MessageFormatException;
>>
>> -import org.apache.mina.common.ByteBuffer;
>>  import org.apache.qpid.AMQException;
>>  import org.apache.qpid.client.CustomJMSXProperty;
>> +import org.apache.qpid.framing.AMQFrameDecodingException;
>>  import org.apache.qpid.framing.AMQShortString;
>>  import org.apache.qpid.framing.BasicContentHeaderProperties;
>>  import org.apache.qpid.util.Strings;
>> @@ -37,6 +43,7 @@ public class JMSTextMessage extends Abst
>>  {
>>     private static final String MIME_TYPE = "text/plain";
>>
>> +    private Exception _exception;
>>     private String _decodedValue;
>>
>>     /**
>> @@ -45,36 +52,41 @@ public class JMSTextMessage extends Abst
>>     private static final String PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.toString();
>>     private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
>>
>> -    public JMSTextMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
>> -    {
>> -        this(delegateFactory, null, null);
>> -    }
>> +    private CharsetDecoder _decoder = DEFAULT_CHARSET.newDecoder();
>> +    private CharsetEncoder _encoder = DEFAULT_CHARSET.newEncoder();
>> +
>> +    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
>>
>> -    JMSTextMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data, String encoding) throws JMSException
>> +    public JMSTextMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
>>     {
>> -        super(delegateFactory, data); // this instantiates a content header
>> -        setContentType(getMimeType());
>> -        setEncoding(encoding);
>> +        super(delegateFactory, false); // this instantiates a content header
>>     }
>>
>>     JMSTextMessage(AMQMessageDelegate delegate, ByteBuffer data)
>>             throws AMQException
>>     {
>> -        super(delegate, data);
>> -        setContentType(getMimeType());
>> -        _data = data;
>> -    }
>> +        super(delegate, data!=null);
>>
>> -
>> -    public void clearBodyImpl() throws JMSException
>> -    {
>> -        if (_data != null)
>> +        try
>>         {
>> -            _data.release();
>> -            _data = null;
>> +            if(propertyExists(PAYLOAD_NULL_PROPERTY))
>> +            {
>> +                _decodedValue = null;
>> +            }
>> +            else
>> +            {
>> +                _decodedValue = _decoder.decode(data).toString();
>> +            }
>> +        }
>> +        catch (CharacterCodingException e)
>> +        {
>> +            _exception = e;
>> +        }
>> +        catch (JMSException e)
>> +        {
>> +            _exception = e;
>>         }
>>
>> -        _decodedValue = null;
>>     }
>>
>>     public String toBodyString() throws JMSException
>> @@ -87,95 +99,62 @@ public class JMSTextMessage extends Abst
>>         return MIME_TYPE;
>>     }
>>
>> -    public void setText(String text) throws JMSException
>> +    @Override
>> +    public ByteBuffer getData() throws JMSException
>>     {
>> -        checkWritable();
>> -
>> -        clearBody();
>> +        _encoder.reset();
>>         try
>>         {
>> -            if (text != null)
>> +            if(_exception != null)
>> +            {
>> +                final MessageFormatException messageFormatException = new MessageFormatException("Cannot decode original message");
>> +                messageFormatException.setLinkedException(_exception);
>> +                throw messageFormatException;
>> +            }
>> +            else if(_decodedValue == null)
>> +            {
>> +                return EMPTY_BYTE_BUFFER;
>> +            }
>> +            else
>>             {
>> -                final String encoding = getEncoding();
>> -                if (encoding == null || encoding.equalsIgnoreCase("UTF-8"))
>> -                {
>> -                    _data = ByteBuffer.wrap(Strings.toUTF8(text));
>> -                    setEncoding("UTF-8");
>> -                }
>> -                else
>> -                {
>> -                    _data = ByteBuffer.wrap(text.getBytes(encoding));
>> -                }
>> -                _data.position(_data.limit());
>> -                _changedData=true;
>> +                return _encoder.encode(CharBuffer.wrap(_decodedValue));
>>             }
>> -            _decodedValue = text;
>>         }
>> -        catch (UnsupportedEncodingException e)
>> +        catch (CharacterCodingException e)
>>         {
>> -            // should never occur
>> -            JMSException jmse = new JMSException("Unable to decode text data");
>> -            jmse.setLinkedException(e);
>> -            jmse.initCause(e);
>> -            throw jmse;
>> +            final JMSException jmsException = new JMSException("Cannot encode string in UFT-8: " + _decodedValue);
>> +            jmsException.setLinkedException(e);
>> +            throw jmsException;
>>         }
>>     }
>>
>> -    public String getText() throws JMSException
>> +    @Override
>> +    public void clearBody() throws JMSException
>>     {
>> -        if (_data == null && _decodedValue == null)
>> -        {
>> -            return null;
>> -        }
>> -        else if (_decodedValue != null)
>> -        {
>> -            return _decodedValue;
>> -        }
>> -        else
>> -        {
>> -            _data.rewind();
>> +        super.clearBody();
>> +        _decodedValue = null;
>> +        _exception = null;
>> +    }
>>
>> -            if (propertyExists(PAYLOAD_NULL_PROPERTY) && getBooleanProperty(PAYLOAD_NULL_PROPERTY))
>> -            {
>> -                return null;
>> -            }
>> -            if (getEncoding() != null)
>> -            {
>> -                try
>> -                {
>> -                    _decodedValue = _data.getString(Charset.forName(getEncoding()).newDecoder());
>> -                }
>> -                catch (CharacterCodingException e)
>> -                {
>> -                    JMSException jmse = new JMSException("Could not decode string data: " + e);
>> -                    jmse.setLinkedException(e);
>> -                    jmse.initCause(e);
>> -                    throw jmse;
>> -                }
>> -            }
>> -            else
>> -            {
>> -                try
>> -                {
>> -                    _decodedValue = _data.getString(DEFAULT_CHARSET.newDecoder());
>> -                }
>> -                catch (CharacterCodingException e)
>> -                {
>> -                    JMSException jmse = new JMSException("Could not decode string data: " + e);
>> -                    jmse.setLinkedException(e);
>> -                    jmse.initCause(e);
>> -                    throw jmse;
>> -                }
>> -            }
>> -            return _decodedValue;
>> -        }
>> +    public void setText(String text) throws JMSException
>> +    {
>> +        checkWritable();
>> +
>> +        clearBody();
>> +        _decodedValue = text;
>> +
>> +    }
>> +
>> +    public String getText() throws JMSException
>> +    {
>> +        return _decodedValue;
>>     }
>>
>>     @Override
>>     public void prepareForSending() throws JMSException
>>     {
>>         super.prepareForSending();
>> -        if (_data == null)
>> +        if (_decodedValue == null)
>>         {
>>             setBooleanProperty(PAYLOAD_NULL_PROPERTY, true);
>>         }
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java Mon Sep 19 15:13:18 2011
>> @@ -22,7 +22,7 @@ package org.apache.qpid.client.message;
>>
>>  import javax.jms.JMSException;
>>
>> -import org.apache.mina.common.ByteBuffer;
>> +import java.nio.ByteBuffer;
>>  import org.apache.qpid.AMQException;
>>  import org.apache.qpid.framing.AMQShortString;
>>  import org.apache.qpid.framing.BasicContentHeaderProperties;
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Mon Sep 19 15:13:18 2011
>> @@ -87,9 +87,9 @@ public class UnprocessedMessage_0_8 exte
>>     public void receiveBody(ContentBody body)
>>     {
>>
>> -        if (body.payload != null)
>> +        if (body._payload != null)
>>         {
>> -            final long payloadSize = body.payload.remaining();
>> +            final long payloadSize = body._payload.length;
>>
>>             if (_bodies == null)
>>             {
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Sep 19 15:13:18 2011
>> @@ -20,7 +20,9 @@
>>  */
>>  package org.apache.qpid.client.protocol;
>>
>> +import java.io.DataOutputStream;
>>  import java.io.IOException;
>> +import java.io.OutputStream;
>>  import java.net.SocketAddress;
>>  import java.nio.ByteBuffer;
>>  import java.util.ArrayList;
>> @@ -31,7 +33,6 @@ import java.util.concurrent.CountDownLat
>>  import java.util.concurrent.ThreadFactory;
>>  import java.util.concurrent.TimeUnit;
>>
>> -import org.apache.mina.filter.codec.ProtocolCodecException;
>>  import org.apache.qpid.AMQConnectionClosedException;
>>  import org.apache.qpid.AMQDisconnectedException;
>>  import org.apache.qpid.AMQException;
>> @@ -46,6 +47,7 @@ import org.apache.qpid.client.state.AMQS
>>  import org.apache.qpid.client.state.StateWaiter;
>>  import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
>>  import org.apache.qpid.codec.AMQCodecFactory;
>> +import org.apache.qpid.configuration.ClientProperties;
>>  import org.apache.qpid.framing.AMQBody;
>>  import org.apache.qpid.framing.AMQDataBlock;
>>  import org.apache.qpid.framing.AMQFrame;
>> @@ -57,8 +59,6 @@ import org.apache.qpid.framing.Heartbeat
>>  import org.apache.qpid.framing.MethodRegistry;
>>  import org.apache.qpid.framing.ProtocolInitiation;
>>  import org.apache.qpid.framing.ProtocolVersion;
>> -import org.apache.qpid.pool.Job;
>> -import org.apache.qpid.pool.ReferenceCountingExecutorService;
>>  import org.apache.qpid.protocol.AMQConstant;
>>  import org.apache.qpid.protocol.AMQMethodEvent;
>>  import org.apache.qpid.protocol.AMQMethodListener;
>> @@ -164,19 +164,19 @@ public class AMQProtocolHandler implemen
>>     private FailoverException _lastFailoverException;
>>
>>     /** Defines the default timeout to use for synchronous protocol commands. */
>> -    private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30);
>> +    private final long DEFAULT_SYNC_TIMEOUT = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
>> +                                                           Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
>> +                                                                        ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
>>
>>     /** Object to lock on when changing the latch */
>>     private Object _failoverLatchChange = new Object();
>>     private AMQCodecFactory _codecFactory;
>> -    private Job _readJob;
>> -    private Job _writeJob;
>> -    private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
>> +
>>     private ProtocolVersion _suggestedProtocolVersion;
>>
>>     private long _writtenBytes;
>>     private long _readBytes;
>> -    private NetworkTransport _transport;
>> +
>>     private NetworkConnection _network;
>>     private Sender<ByteBuffer> _sender;
>>
>> @@ -191,24 +191,6 @@ public class AMQProtocolHandler implemen
>>         _protocolSession = new AMQProtocolSession(this, _connection);
>>         _stateManager = new AMQStateManager(_protocolSession);
>>         _codecFactory = new AMQCodecFactory(false, _protocolSession);
>> -        _poolReference.setThreadFactory(new ThreadFactory()
>> -        {
>> -
>> -            public Thread newThread(final Runnable runnable)
>> -            {
>> -                try
>> -                {
>> -                    return Threading.getThreadFactory().createThread(runnable);
>> -                }
>> -                catch (Exception e)
>> -                {
>> -                    throw new RuntimeException("Failed to create thread", e);
>> -                }
>> -            }
>> -        });
>> -        _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
>> -        _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
>> -        _poolReference.acquireExecutorService();
>>         _failoverHandler = new FailoverHandler(this);
>>     }
>>
>> @@ -329,17 +311,7 @@ public class AMQProtocolHandler implemen
>>             }
>>             else
>>             {
>> -
>> -                if (cause instanceof ProtocolCodecException)
>> -                {
>> -                    _logger.info("Protocol Exception caught NOT going to attempt failover as " +
>> -                                 "cause isn't AMQConnectionClosedException: " + cause, cause);
>> -
>> -                    AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
>> -                    propagateExceptionToAllWaiters(amqe);
>> -                }
>>                 _connection.exceptionReceived(cause);
>> -
>>             }
>>
>>             // FIXME Need to correctly handle other exceptions. Things like ...
>> @@ -433,76 +405,63 @@ public class AMQProtocolHandler implemen
>>
>>     public void received(ByteBuffer msg)
>>     {
>> +        _readBytes += msg.remaining();
>>         try
>>         {
>> -            _readBytes += msg.remaining();
>>             final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
>>
>> -            Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
>> +            // Decode buffer
>> +
>> +            for (AMQDataBlock message : dataBlocks)
>>             {
>>
>> -                public void run()
>> -                {
>> -                    // Decode buffer
>> +                    if (PROTOCOL_DEBUG)
>> +                    {
>> +                        _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
>> +                    }
>>
>> -                    for (AMQDataBlock message : dataBlocks)
>> +                    if(message instanceof AMQFrame)
>>                     {
>> +                        final boolean debug = _logger.isDebugEnabled();
>> +                        final long msgNumber = ++_messageReceivedCount;
>>
>> -                        try
>> -                        {
>> -                            if (PROTOCOL_DEBUG)
>> -                            {
>> -                                _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
>> -                            }
>> -
>> -                            if(message instanceof AMQFrame)
>> -                            {
>> -                                final boolean debug = _logger.isDebugEnabled();
>> -                                final long msgNumber = ++_messageReceivedCount;
>> -
>> -                                if (debug && ((msgNumber % 1000) == 0))
>> -                                {
>> -                                    _logger.debug("Received " + _messageReceivedCount + " protocol messages");
>> -                                }
>> -
>> -                                AMQFrame frame = (AMQFrame) message;
>> -
>> -                                final AMQBody bodyFrame = frame.getBodyFrame();
>> -
>> -                                HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
>> -
>> -                                bodyFrame.handle(frame.getChannel(), _protocolSession);
>> -
>> -                                _connection.bytesReceived(_readBytes);
>> -                            }
>> -                            else if (message instanceof ProtocolInitiation)
>> -                            {
>> -                                // We get here if the server sends a response to our initial protocol header
>> -                                // suggesting an alternate ProtocolVersion; the server will then close the
>> -                                // connection.
>> -                                ProtocolInitiation protocolInit = (ProtocolInitiation) message;
>> -                                _suggestedProtocolVersion = protocolInit.checkVersion();
>> -                                _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion);
>> -
>> -                                // get round a bug in old versions of qpid whereby the connection is not closed
>> -                                _stateManager.changeState(AMQState.CONNECTION_CLOSED);
>> -                            }
>> -                        }
>> -                        catch (Exception e)
>> +                        if (debug && ((msgNumber % 1000) == 0))
>>                         {
>> -                            _logger.error("Exception processing frame", e);
>> -                            propagateExceptionToFrameListeners(e);
>> -                            exception(e);
>> +                            _logger.debug("Received " + _messageReceivedCount + " protocol messages");
>>                         }
>> +
>> +                        AMQFrame frame = (AMQFrame) message;
>> +
>> +                        final AMQBody bodyFrame = frame.getBodyFrame();
>> +
>> +                        HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
>> +
>> +                        bodyFrame.handle(frame.getChannel(), _protocolSession);
>> +
>> +                        _connection.bytesReceived(_readBytes);
>> +                    }
>> +                    else if (message instanceof ProtocolInitiation)
>> +                    {
>> +                        // We get here if the server sends a response to our initial protocol header
>> +                        // suggesting an alternate ProtocolVersion; the server will then close the
>> +                        // connection.
>> +                        ProtocolInitiation protocolInit = (ProtocolInitiation) message;
>> +                        _suggestedProtocolVersion = protocolInit.checkVersion();
>> +                        _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion);
>> +
>> +                        // get round a bug in old versions of qpid whereby the connection is not closed
>> +                        _stateManager.changeState(AMQState.CONNECTION_CLOSED);
>>                     }
>>                 }
>> -            });
>>         }
>>         catch (Exception e)
>>         {
>> +            _logger.error("Exception processing frame", e);
>>             propagateExceptionToFrameListeners(e);
>>             exception(e);
>>         }
>> +
>> +
>>     }
>>
>>     public void methodBodyReceived(final int channelId, final AMQBody bodyFrame)
>> @@ -568,17 +527,13 @@ public class AMQProtocolHandler implemen
>>         writeFrame(frame, false);
>>     }
>>
>> -    public void writeFrame(AMQDataBlock frame, boolean wait)
>> +    public  synchronized void writeFrame(AMQDataBlock frame, boolean wait)
>>     {
>> -        final ByteBuffer buf = frame.toNioByteBuffer();
>> +        final ByteBuffer buf = asByteBuffer(frame);
>>         _writtenBytes += buf.remaining();
>> -        Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
>> -        {
>> -            public void run()
>> -            {
>> -                _sender.send(buf);
>> -            }
>> -        });
>> +        _sender.send(buf);
>> +        _sender.flush();
>> +
>>         if (PROTOCOL_DEBUG)
>>         {
>>             _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame));
>> @@ -595,12 +550,41 @@ public class AMQProtocolHandler implemen
>>
>>         _connection.bytesSent(_writtenBytes);
>>
>> -        if (wait)
>> +    }
>> +
>> +    private ByteBuffer asByteBuffer(AMQDataBlock block)
>> +    {
>> +        final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
>> +
>> +        try
>> +        {
>> +            block.writePayload(new DataOutputStream(new OutputStream()
>> +            {
>> +
>> +
>> +                @Override
>> +                public void write(int b) throws IOException
>> +                {
>> +                    buf.put((byte) b);
>> +                }
>> +
>> +                @Override
>> +                public void write(byte[] b, int off, int len) throws IOException
>> +                {
>> +                    buf.put(b, off, len);
>> +                }
>> +            }));
>> +        }
>> +        catch (IOException e)
>>         {
>> -            _sender.flush();
>> +            throw new RuntimeException(e);
>>         }
>> +
>> +        buf.flip();
>> +        return buf;
>>     }
>>
>> +
>>     /**
>>      * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
>>      * calling getProtocolSession().write() then waiting for the response.
>> @@ -723,7 +707,7 @@ public class AMQProtocolHandler implemen
>>                 _logger.debug("FailoverException interrupted connection close, ignoring as connection   close anyway.");
>>             }
>>         }
>> -        _poolReference.releaseExecutorService();
>> +
>>     }
>>
>>     /** @return the number of bytes read from this protocol session */
>> @@ -841,8 +825,13 @@ public class AMQProtocolHandler implemen
>>
>>     public void setNetworkConnection(NetworkConnection network)
>>     {
>> +        setNetworkConnection(network, network.getSender());
>> +    }
>> +
>> +    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
>> +    {
>>         _network = network;
>> -        _sender = network.getSender();
>> +        _sender = sender;
>>     }
>>
>>     /** @param delay delay in seconds (not ms) */
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java Mon Sep 19 15:13:18 2011
>> @@ -20,17 +20,22 @@
>>  */
>>  package org.apache.qpid.client.security;
>>
>> -import org.apache.qpid.util.FileUtils;
>> -
>> -import org.slf4j.Logger;
>> -import org.slf4j.LoggerFactory;
>> -
>>  import java.io.IOException;
>>  import java.io.InputStream;
>> +import java.util.Collection;
>> +import java.util.Collections;
>>  import java.util.Enumeration;
>>  import java.util.HashMap;
>> +import java.util.HashSet;
>>  import java.util.Map;
>>  import java.util.Properties;
>> +import java.util.Set;
>> +import java.util.StringTokenizer;
>> +import java.util.TreeMap;
>> +
>> +import org.apache.qpid.util.FileUtils;
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>>
>>  /**
>>  * CallbackHandlerRegistry is a registry for call back handlers for user authentication and interaction during user
>> @@ -42,7 +47,7 @@ import java.util.Properties;
>>  * "amp.callbackhandler.properties". The format of the properties file is:
>>  *
>>  * <p/><pre>
>> - * CallbackHanlder.mechanism=fully.qualified.class.name
>> + * CallbackHanlder.n.mechanism=fully.qualified.class.name where n is an ordinal
>>  * </pre>
>>  *
>>  * <p/>Where mechanism is an IANA-registered mechanism name and the fully qualified class name refers to a
>> @@ -66,51 +71,15 @@ public class CallbackHandlerRegistry
>>     public static final String DEFAULT_RESOURCE_NAME = "org/apache/qpid/client/security/CallbackHandlerRegistry.properties";
>>
>>     /** A static reference to the singleton instance of this registry. */
>> -    private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry();
>> +    private static final CallbackHandlerRegistry _instance;
>>
>>     /** Holds a map from SASL mechanism names to call back handlers. */
>> -    private Map<String, Class> _mechanismToHandlerClassMap = new HashMap<String, Class>();
>> -
>> -    /** Holds a space delimited list of mechanisms that callback handlers exist for. */
>> -    private String _mechanisms;
>> -
>> -    /**
>> -     * Gets the singleton instance of this registry.
>> -     *
>> -     * @return The singleton instance of this registry.
>> -     */
>> -    public static CallbackHandlerRegistry getInstance()
>> -    {
>> -        return _instance;
>> -    }
>> +    private Map<String, Class<AMQCallbackHandler>> _mechanismToHandlerClassMap = new HashMap<String, Class<AMQCallbackHandler>>();
>>
>> -    /**
>> -     * Gets the callback handler class for a given SASL mechanism name.
>> -     *
>> -     * @param mechanism The SASL mechanism name.
>> -     *
>> -     * @return The callback handler class for the mechanism, or null if none is configured for that mechanism.
>> -     */
>> -    public Class getCallbackHandlerClass(String mechanism)
>> -    {
>> -        return (Class) _mechanismToHandlerClassMap.get(mechanism);
>> -    }
>> +    /** Ordered collection of mechanisms for which callback handlers exist. */
>> +    private Collection<String> _mechanisms;
>>
>> -    /**
>> -     * Gets a space delimited list of supported SASL mechanisms.
>> -     *
>> -     * @return A space delimited list of supported SASL mechanisms.
>> -     */
>> -    public String getMechanisms()
>> -    {
>> -        return _mechanisms;
>> -    }
>> -
>> -    /**
>> -     * Creates the call back handler registry from its configuration resource or file. This also has the side effect
>> -     * of configuring and registering the SASL client factory implementations using {@link DynamicSaslRegistrar}.
>> -     */
>> -    private CallbackHandlerRegistry()
>> +    static
>>     {
>>         // Register any configured SASL client factories.
>>         DynamicSaslRegistrar.registerSaslProviders();
>> @@ -120,12 +89,12 @@ public class CallbackHandlerRegistry
>>             FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME,
>>                 CallbackHandlerRegistry.class.getClassLoader());
>>
>> +        final Properties props = new Properties();
>> +
>>         try
>>         {
>> -            Properties props = new Properties();
>> +
>>             props.load(is);
>> -            parseProperties(props);
>> -            _logger.info("Callback handlers available for SASL mechanisms: " + _mechanisms);
>>         }
>>         catch (IOException e)
>>         {
>> @@ -146,32 +115,68 @@ public class CallbackHandlerRegistry
>>                 }
>>             }
>>         }
>> +
>> +        _instance = new CallbackHandlerRegistry(props);
>> +        _logger.info("Callback handlers available for SASL mechanisms: " + _instance._mechanisms);
>> +
>>     }
>>
>> -    /*private InputStream openPropertiesInputStream(String filename)
>> +    /**
>> +     * Gets the singleton instance of this registry.
>> +     *
>> +     * @return The singleton instance of this registry.
>> +     */
>> +    public static CallbackHandlerRegistry getInstance()
>> +    {
>> +        return _instance;
>> +    }
>> +
>> +    public AMQCallbackHandler createCallbackHandler(final String mechanism)
>>     {
>> -        boolean useDefault = true;
>> -        InputStream is = null;
>> -        if (filename != null)
>> +        final Class<AMQCallbackHandler> mechanismClass = _mechanismToHandlerClassMap.get(mechanism);
>> +
>> +        if (mechanismClass == null)
>>         {
>> -            try
>> -            {
>> -                is = new BufferedInputStream(new FileInputStream(new File(filename)));
>> -                useDefault = false;
>> -            }
>> -            catch (FileNotFoundException e)
>> -            {
>> -                _logger.error("Unable to read from file " + filename + ": " + e, e);
>> -            }
>> +            throw new IllegalArgumentException("Mechanism " + mechanism + " not known");
>>         }
>>
>> -        if (useDefault)
>> +        try
>> +        {
>> +            return mechanismClass.newInstance();
>> +        }
>> +        catch (InstantiationException e)
>> +        {
>> +            throw new IllegalArgumentException("Unable to create an instance of mechanism " + mechanism, e);
>> +        }
>> +        catch (IllegalAccessException e)
>>         {
>> -            is = CallbackHandlerRegistry.class.getResourceAsStream(DEFAULT_RESOURCE_NAME);
>> +            throw new IllegalArgumentException("Unable to create an instance of mechanism " + mechanism, e);
>>         }
>> +    }
>>
>> -        return is;
>> -    }*/
>> +    /**
>> +     * Gets collections of supported SASL mechanism names, ordered by preference
>> +     *
>> +     * @return collection of SASL mechanism names.
>> +     */
>> +    public Collection<String> getMechanisms()
>> +    {
>> +        return Collections.unmodifiableCollection(_mechanisms);
>> +    }
>> +
>> +    /**
>> +     * Creates the call back handler registry from its configuration resource or file.
>> +     *
>> +     * This also has the side effect of configuring and registering the SASL client factory
>> +     * implementations using {@link DynamicSaslRegistrar}.
>> +     *
>> +     * This constructor is default protection to allow for effective unit testing.  Clients must use
>> +     * {@link #getInstance()} to obtain the singleton instance.
>> +     */
>> +    CallbackHandlerRegistry(final Properties props)
>> +    {
>> +        parseProperties(props);
>> +    }
>>
>>     /**
>>      * Scans the specified properties as a mapping from IANA registered SASL mechanism to call back handler
>> @@ -183,20 +188,20 @@ public class CallbackHandlerRegistry
>>      */
>>     private void parseProperties(Properties props)
>>     {
>> +
>> +        final Map<Integer, String> mechanisms = new TreeMap<Integer, String>();
>> +
>>         Enumeration e = props.propertyNames();
>>         while (e.hasMoreElements())
>>         {
>> -            String propertyName = (String) e.nextElement();
>> -            int period = propertyName.indexOf(".");
>> -            if (period < 0)
>> -            {
>> -                _logger.warn("Unable to parse property " + propertyName + " when configuring SASL providers");
>> +            final String propertyName = (String) e.nextElement();
>> +            final String[] parts = propertyName.split("\\.", 2);
>>
>> -                continue;
>> -            }
>> +            checkPropertyNameFormat(propertyName, parts);
>>
>> -            String mechanism = propertyName.substring(period + 1);
>> -            String className = props.getProperty(propertyName);
>> +            final String mechanism = parts[0];
>> +            final int ordinal = getPropertyOrdinal(propertyName, parts);
>> +            final String className = props.getProperty(propertyName);
>>             Class clazz = null;
>>             try
>>             {
>> @@ -205,20 +210,11 @@ public class CallbackHandlerRegistry
>>                 {
>>                     _logger.warn("SASL provider " + clazz + " does not implement " + AMQCallbackHandler.class
>>                         + ". Skipping");
>> -
>>                     continue;
>>                 }
>> -
>>                 _mechanismToHandlerClassMap.put(mechanism, clazz);
>> -                if (_mechanisms == null)
>> -                {
>> -                    _mechanisms = mechanism;
>> -                }
>> -                else
>> -                {
>> -                    // one time cost
>> -                    _mechanisms = _mechanisms + " " + mechanism;
>> -                }
>> +
>> +                mechanisms.put(ordinal, mechanism);
>>             }
>>             catch (ClassNotFoundException ex)
>>             {
>> @@ -227,5 +223,91 @@ public class CallbackHandlerRegistry
>>                 continue;
>>             }
>>         }
>> +
>> +        _mechanisms = mechanisms.values();  // order guaranteed by keys of treemap (i.e. our ordinals)
>> +
>> +
>> +    }
>> +
>> +    private void checkPropertyNameFormat(final String propertyName, final String[] parts)
>> +    {
>> +        if (parts.length != 2)
>> +        {
>> +            throw new IllegalArgumentException("Unable to parse property " + propertyName + " when configuring SASL providers");
>> +        }
>> +    }
>> +
>> +    private int getPropertyOrdinal(final String propertyName, final String[] parts)
>> +    {
>> +        try
>> +        {
>> +            return Integer.parseInt(parts[1]);
>> +        }
>> +        catch(NumberFormatException nfe)
>> +        {
>> +            throw new IllegalArgumentException("Unable to parse property " + propertyName + " when configuring SASL providers", nfe);
>> +        }
>> +    }
>> +
>> +    /**
>> +     * Selects a SASL mechanism that is mutually available to both parties.  If more than one
>> +     * mechanism is mutually available the one appearing first (by ordinal) will be returned.
>> +     *
>> +     * @param peerMechanismList space separated list of mechanisms
>> +     * @return selected mechanism, or null if none available
>> +     */
>> +    public String selectMechanism(final String peerMechanismList)
>> +    {
>> +        final Set<String> peerList = mechListToSet(peerMechanismList);
>> +
>> +        return selectMechInternal(peerList, Collections.<String>emptySet());
>> +    }
>> +
>> +    /**
>> +     * Selects a SASL mechanism that is mutually available to both parties.
>> +     *
>> +     * @param peerMechanismList space separated list of mechanisms
>> +     * @param restrictionList space separated list of mechanisms
>> +     * @return selected mechanism, or null if none available
>> +     */
>> +    public String selectMechanism(final String peerMechanismList, final String restrictionList)
>> +    {
>> +        final Set<String> peerList = mechListToSet(peerMechanismList);
>> +        final Set<String> restrictionSet = mechListToSet(restrictionList);
>> +
>> +        return selectMechInternal(peerList, restrictionSet);
>> +    }
>> +
>> +    private String selectMechInternal(final Set<String> peerSet, final Set<String> restrictionSet)
>> +    {
>> +        for (final String mech : _mechanisms)
>> +        {
>> +            if (peerSet.contains(mech))
>> +            {
>> +                if (restrictionSet.isEmpty() || restrictionSet.contains(mech))
>> +                {
>> +                    return mech;
>> +                }
>> +            }
>> +        }
>> +
>> +        return null;
>> +    }
>> +
>> +    private Set<String> mechListToSet(final String mechanismList)
>> +    {
>> +        if (mechanismList == null)
>> +        {
>> +            return Collections.emptySet();
>> +        }
>> +
>> +        final StringTokenizer tokenizer = new StringTokenizer(mechanismList, " ");
>> +        final Set<String> mechanismSet = new HashSet<String>(tokenizer.countTokens());
>> +        while (tokenizer.hasMoreTokens())
>> +        {
>> +            mechanismSet.add(tokenizer.nextToken());
>> +        }
>> +        return Collections.unmodifiableSet(mechanismSet);
>>     }
>> +
>>  }
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties (original)
>> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties Mon Sep 19 15:13:18 2011
>> @@ -16,7 +16,17 @@
>>  # specific language governing permissions and limitations
>>  # under the License.
>>  #
>> -CallbackHandler.CRAM-MD5-HASHED=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
>> -CallbackHandler.CRAM-MD5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
>> -CallbackHandler.AMQPLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
>> -CallbackHandler.PLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
>> +
>> +#
>> +# Format:
>> +# <mechanism name>.ordinal=<implementation>
>> +#
>> +# @see CallbackHandlerRegistry
>> +#
>> +
>> +EXTERNAL.1=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
>> +GSSAPI.2=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
>> +CRAM-MD5-HASHED.3=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
>> +CRAM-MD5.4=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
>> +AMQPLAIN.5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
>> +PLAIN.6=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java Mon Sep 19 15:13:18 2011
>> @@ -22,7 +22,7 @@ package org.apache.qpid.jms;
>>
>>  import java.util.Map;
>>
>> -import org.apache.qpid.client.SSLConfiguration;
>> +import org.apache.qpid.transport.ConnectionSettings;
>>
>>  public interface BrokerDetails
>>  {
>> @@ -104,14 +104,12 @@ public interface BrokerDetails
>>     long getTimeout();
>>
>>     void setTimeout(long timeout);
>> -
>> -    SSLConfiguration getSSLConfiguration();
>> -
>> -    void setSSLConfiguration(SSLConfiguration sslConfiguration);
>>
>>     boolean getBooleanProperty(String propName);
>>
>>     String toString();
>>
>>     boolean equals(Object o);
>> +
>> +    ConnectionSettings buildConnectionSettings();
>>  }
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java Mon Sep 19 15:13:18 2011
>> @@ -140,7 +140,6 @@ public class FailoverExchangeMethod impl
>>                         broker.setHost(tokens[1]);
>>                         broker.setPort(Integer.parseInt(tokens[2]));
>>                         broker.setProperties(_originalBrokerDetail.getProperties());
>> -                        broker.setSSLConfiguration(_originalBrokerDetail.getSSLConfiguration());
>>                         brokerList.add(broker);
>>
>>                         if (currentBrokerIP.equals(broker.getHost()) &&
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Mon Sep 19 15:13:18 2011
>> @@ -36,6 +36,7 @@ import javax.jms.Queue;
>>  import javax.jms.Topic;
>>  import javax.naming.Context;
>>  import javax.naming.NamingException;
>> +import javax.naming.ConfigurationException;
>>  import javax.naming.spi.InitialContextFactory;
>>
>>  import org.apache.qpid.client.AMQConnectionFactory;
>> @@ -139,7 +140,7 @@ public class PropertiesFileInitialContex
>>         return new ReadOnlyContext(environment, data);
>>     }
>>
>> -    protected void createConnectionFactories(Map data, Hashtable environment)
>> +    protected void createConnectionFactories(Map data, Hashtable environment) throws ConfigurationException
>>     {
>>         for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();)
>>         {
>> @@ -157,7 +158,7 @@ public class PropertiesFileInitialContex
>>         }
>>     }
>>
>> -    protected void createDestinations(Map data, Hashtable environment)
>> +    protected void createDestinations(Map data, Hashtable environment) throws ConfigurationException
>>     {
>>         for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();)
>>         {
>> @@ -225,7 +226,7 @@ public class PropertiesFileInitialContex
>>     /**
>>      * Factory method to create new Connection Factory instances
>>      */
>> -    protected ConnectionFactory createFactory(String url)
>> +    protected ConnectionFactory createFactory(String url) throws ConfigurationException
>>     {
>>         try
>>         {
>> @@ -233,16 +234,18 @@ public class PropertiesFileInitialContex
>>         }
>>         catch (URLSyntaxException urlse)
>>         {
>> -            _logger.warn("Unable to createFactories:" + urlse);
>> -        }
>> +            _logger.warn("Unable to create factory:" + urlse);
>>
>> -        return null;
>> +            ConfigurationException ex = new ConfigurationException("Failed to parse entry: " + urlse + " due to : " +  urlse.getMessage());
>> +            ex.initCause(urlse);
>> +            throw ex;
>> +        }
>>     }
>>
>>     /**
>>      * Factory method to create new Destination instances from an AMQP BindingURL
>>      */
>> -    protected Destination createDestination(String str)
>> +    protected Destination createDestination(String str) throws ConfigurationException
>>     {
>>         try
>>         {
>> @@ -252,7 +255,9 @@ public class PropertiesFileInitialContex
>>         {
>>             _logger.warn("Unable to create destination:" + e, e);
>>
>> -            return null;
>> +            ConfigurationException ex = new ConfigurationException("Failed to parse entry: " + str + " due to : " +  e.getMessage());
>> +            ex.initCause(e);
>> +            throw ex;
>>         }
>>     }
>>
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java Mon Sep 19 15:13:18 2011
>> @@ -23,7 +23,6 @@ package org.apache.qpid.client;
>>  import org.apache.qpid.AMQException;
>>  import org.apache.qpid.client.state.AMQState;
>>  import org.apache.qpid.framing.ProtocolVersion;
>> -import org.apache.qpid.jms.ConnectionURL;
>>  import org.apache.qpid.jms.BrokerDetails;
>>  import org.apache.qpid.url.URLSyntaxException;
>>
>> @@ -37,48 +36,18 @@ public class MockAMQConnection extends A
>>         super(broker, username, password, clientName, virtualHost);
>>     }
>>
>> -    public MockAMQConnection(String broker, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig)
>> -            throws AMQException, URLSyntaxException
>> -    {
>> -        super(broker, username, password, clientName, virtualHost, sslConfig);
>> -    }
>> -
>>     public MockAMQConnection(String host, int port, String username, String password, String clientName, String virtualHost)
>>             throws AMQException, URLSyntaxException
>>     {
>>         super(host, port, username, password, clientName, virtualHost);
>>     }
>>
>> -    public MockAMQConnection(String host, int port, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig)
>> -            throws AMQException, URLSyntaxException
>> -    {
>> -        super(host, port, username, password, clientName, virtualHost, sslConfig);
>> -    }
>> -
>> -    public MockAMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig)
>> -            throws AMQException, URLSyntaxException
>> -    {
>> -        super(host, port, useSSL, username, password, clientName, virtualHost, sslConfig);
>> -    }
>> -
>>     public MockAMQConnection(String connection)
>>             throws AMQException, URLSyntaxException
>>     {
>>         super(connection);
>>     }
>>
>> -    public MockAMQConnection(String connection, SSLConfiguration sslConfig)
>> -            throws AMQException, URLSyntaxException
>> -    {
>> -        super(connection, sslConfig);
>> -    }
>> -
>> -    public MockAMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig)
>> -            throws AMQException
>> -    {
>> -        super(connectionURL, sslConfig);
>> -    }
>> -
>>     @Override
>>     public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException
>>     {
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java Mon Sep 19 15:13:18 2011
>> @@ -43,4 +43,9 @@ public class TestMessageHelper
>>     {
>>         return new JMSStreamMessage(AMQMessageDelegateFactory.FACTORY_0_8);
>>     }
>> +
>> +    public static JMSObjectMessage newJMSObjectMessage()
>> +    {
>> +        return new JMSObjectMessage(AMQMessageDelegateFactory.FACTORY_0_8);
>> +    }
>>  }
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java Mon Sep 19 15:13:18 2011
>> @@ -21,10 +21,10 @@
>>  package org.apache.qpid.test.unit.jndi;
>>
>>  import junit.framework.TestCase;
>> +
>>  import org.apache.qpid.client.AMQConnectionFactory;
>>  import org.apache.qpid.jms.BrokerDetails;
>>  import org.apache.qpid.jms.ConnectionURL;
>> -import org.apache.qpid.url.URLSyntaxException;
>>
>>  public class ConnectionFactoryTest extends TestCase
>>  {
>> @@ -34,21 +34,9 @@ public class ConnectionFactoryTest exten
>>     public static final String URL = "amqp://guest:guest@clientID/test?brokerlist='tcp://localhost:5672'";
>>     public static final String URL_STAR_PWD = "amqp://guest:********@clientID/test?brokerlist='tcp://localhost:5672'";
>>
>> -    public void testConnectionURLString()
>> +    public void testConnectionURLStringMasksPassword() throws Exception
>>     {
>> -        AMQConnectionFactory factory = new AMQConnectionFactory();
>> -
>> -        assertNull("ConnectionURL should have no value at start",
>> -                   factory.getConnectionURL());
>> -
>> -        try
>> -        {
>> -            factory.setConnectionURLString(URL);
>> -        }
>> -        catch (URLSyntaxException e)
>> -        {
>> -            fail(e.getMessage());
>> -        }
>> +        AMQConnectionFactory factory = new AMQConnectionFactory(URL);
>>
>>         //URL will be returned with the password field swapped for '********'
>>         assertEquals("Connection URL not correctly set", URL_STAR_PWD, factory.getConnectionURLString());
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java Mon Sep 19 15:13:18 2011
>> @@ -24,6 +24,7 @@ import java.util.Properties;
>>
>>  import javax.jms.Queue;
>>  import javax.jms.Topic;
>> +import javax.naming.ConfigurationException;
>>  import javax.naming.Context;
>>  import javax.naming.InitialContext;
>>
>> @@ -67,4 +68,22 @@ public class JNDIPropertyFileTest extend
>>             assertEquals("Topic" + i + "WithSpace",bindingKey.asString());
>>         }
>>     }
>> +
>> +    public void testConfigurationErrors() throws Exception
>> +    {
>> +        Properties properties = new Properties();
>> +        properties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
>> +        properties.put("destination.my-queue","amq.topic/test;create:always}");
>> +
>> +        try
>> +        {
>> +            ctx = new InitialContext(properties);
>> +            fail("A configuration exception should be thrown with details about the address syntax error");
>> +        }
>> +        catch(ConfigurationException e)
>> +        {
>> +            assertTrue("Incorrect exception", e.getMessage().contains("Failed to parse entry: amq.topic/test;create:always}"));
>> +        }
>> +
>> +    }
>>  }
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/common.xml
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common.xml?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/common.xml (original)
>> +++ qpid/branches/qpid-3346/qpid/java/common.xml Mon Sep 19 15:13:18 2011
>> @@ -132,8 +132,6 @@
>>        </sequential>
>>   </macrodef>
>>
>> -
>> -
>>   <macrodef name="jython">
>>     <attribute name="path"/>
>>     <element name="args"/>
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java Mon Sep 19 15:13:18 2011
>> @@ -20,9 +20,6 @@
>>  */
>>  package org.apache.qpid.codec;
>>
>> -import org.apache.mina.filter.codec.ProtocolCodecFactory;
>> -import org.apache.mina.filter.codec.ProtocolDecoder;
>> -import org.apache.mina.filter.codec.ProtocolEncoder;
>>  import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
>>
>>  /**
>> @@ -31,14 +28,11 @@ import org.apache.qpid.protocol.AMQVersi
>>  *
>>  * <p/><table id="crc"><caption>CRC Card</caption>
>>  * <tr><th> Responsibilities <th> Collaborations.
>> - * <tr><td> Supply the protocol encoder. <td> {@link AMQEncoder}
>>  * <tr><td> Supply the protocol decoder. <td> {@link AMQDecoder}
>>  * </table>
>>  */
>> -public class AMQCodecFactory implements ProtocolCodecFactory
>> +public class AMQCodecFactory
>>  {
>> -    /** Holds the protocol encoder. */
>> -    private final AMQEncoder _encoder = new AMQEncoder();
>>
>>     /** Holds the protocol decoder. */
>>     private final AMQDecoder _frameDecoder;
>> @@ -56,15 +50,6 @@ public class AMQCodecFactory implements
>>         _frameDecoder = new AMQDecoder(expectProtocolInitiation, session);
>>     }
>>
>> -    /**
>> -     * Gets the AMQP encoder.
>> -     *
>> -     * @return The AMQP encoder.
>> -     */
>> -    public ProtocolEncoder getEncoder()
>> -    {
>> -        return _encoder;
>> -    }
>>
>>     /**
>>      * Gets the AMQP decoder.
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Mon Sep 19 15:13:18 2011
>> @@ -20,13 +20,9 @@
>>  */
>>  package org.apache.qpid.codec;
>>
>> -import java.util.ArrayList;
>> -
>> -import org.apache.mina.common.ByteBuffer;
>> -import org.apache.mina.common.IoSession;
>> -import org.apache.mina.common.SimpleByteBufferAllocator;
>> -import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
>> -import org.apache.mina.filter.codec.ProtocolDecoderOutput;
>> +import java.io.*;
>> +import java.nio.ByteBuffer;
>> +import java.util.*;
>>
>>  import org.apache.qpid.framing.AMQDataBlock;
>>  import org.apache.qpid.framing.AMQDataBlockDecoder;
>> @@ -54,11 +50,8 @@ import org.apache.qpid.protocol.AMQVersi
>>  * @todo If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the
>>  *       per-session overhead.
>>  */
>> -public class AMQDecoder extends CumulativeProtocolDecoder
>> +public class AMQDecoder
>>  {
>> -
>> -    private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer";
>> -
>>     /** Holds the 'normal' AMQP data decoder. */
>>     private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
>>
>> @@ -67,12 +60,11 @@ public class AMQDecoder extends Cumulati
>>
>>     /** Flag to indicate whether this decoder needs to handle protocol initiation. */
>>     private boolean _expectProtocolInitiation;
>> -    private boolean firstDecode = true;
>>
>>     private AMQMethodBodyFactory _bodyFactory;
>>
>> -    private ByteBuffer _remainingBuf;
>> -
>> +    private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>();
>> +
>>     /**
>>      * Creates a new AMQP decoder.
>>      *
>> @@ -84,98 +76,7 @@ public class AMQDecoder extends Cumulati
>>         _bodyFactory = new AMQMethodBodyFactory(session);
>>     }
>>
>> -    /**
>> -     * Delegates decoding AMQP from the data buffer that Mina has retrieved from the wire, to the data or protocol
>> -     * intiation decoders.
>> -     *
>> -     * @param session The Mina session.
>> -     * @param in      The raw byte buffer.
>> -     * @param out     The Mina object output gatherer to write decoded objects to.
>> -     *
>> -     * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
>> -     *
>> -     * @throws Exception If the data cannot be decoded for any reason.
>> -     */
>> -    protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
>> -    {
>> -
>> -        boolean decoded;
>> -        if (_expectProtocolInitiation
>> -            || (firstDecode
>> -                && (in.remaining() > 0)
>> -                && (in.get(in.position()) == (byte)'A')))
>> -        {
>> -            decoded = doDecodePI(session, in, out);
>> -        }
>> -        else
>> -        {
>> -            decoded = doDecodeDataBlock(session, in, out);
>> -        }
>> -        if(firstDecode && decoded)
>> -        {
>> -            firstDecode = false;
>> -        }
>> -        return decoded;
>> -    }
>> -
>> -    /**
>> -     * Decodes AMQP data, delegating the decoding to an {@link AMQDataBlockDecoder}.
>> -     *
>> -     * @param session The Mina session.
>> -     * @param in      The raw byte buffer.
>> -     * @param out     The Mina object output gatherer to write decoded objects to.
>> -     *
>> -     * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
>> -     *
>> -     * @throws Exception If the data cannot be decoded for any reason.
>> -     */
>> -    protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
>> -    {
>> -        int pos = in.position();
>> -        boolean enoughData = _dataBlockDecoder.decodable(in.buf());
>> -        in.position(pos);
>> -        if (!enoughData)
>> -        {
>> -            // returning false means it will leave the contents in the buffer and
>> -            // call us again when more data has been read
>> -            return false;
>> -        }
>> -        else
>> -        {
>> -            _dataBlockDecoder.decode(session, in, out);
>> -
>> -            return true;
>> -        }
>> -    }
>> -
>> -    /**
>> -     * Decodes an AMQP initiation, delegating the decoding to a {@link ProtocolInitiation.Decoder}.
>> -     *
>> -     * @param session The Mina session.
>> -     * @param in      The raw byte buffer.
>> -     * @param out     The Mina object output gatherer to write decoded objects to.
>> -     *
>> -     * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
>> -     *
>> -     * @throws Exception If the data cannot be decoded for any reason.
>> -     */
>> -    private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
>> -    {
>> -        boolean enoughData = _piDecoder.decodable(in.buf());
>> -        if (!enoughData)
>> -        {
>> -            // returning false means it will leave the contents in the buffer and
>> -            // call us again when more data has been read
>> -            return false;
>> -        }
>> -        else
>> -        {
>> -            ProtocolInitiation pi = new ProtocolInitiation(in.buf());
>> -            out.write(pi);
>>
>> -            return true;
>> -        }
>> -    }
>>
>>     /**
>>      * Sets the protocol initation flag, that determines whether decoding is handled by the data decoder of the protocol
>> @@ -189,151 +90,168 @@ public class AMQDecoder extends Cumulati
>>         _expectProtocolInitiation = expectProtocolInitiation;
>>     }
>>
>> -
>> -    /**
>> -     * Cumulates content of <tt>in</tt> into internal buffer and forwards
>> -     * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
>> -     * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
>> -     * and the cumulative buffer is compacted after decoding ends.
>> -     *
>> -     * @throws IllegalStateException if your <tt>doDecode()</tt> returned
>> -     *                               <tt>true</tt> not consuming the cumulative buffer.
>> -     */
>> -    public void decode( IoSession session, ByteBuffer in,
>> -                        ProtocolDecoderOutput out ) throws Exception
>> +    private class RemainingByteArrayInputStream extends InputStream
>>     {
>> -        ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
>> -        // if we have a session buffer, append data to that otherwise
>> -        // use the buffer read from the network directly
>> -        if( buf != null )
>> -        {
>> -            buf.put( in );
>> -            buf.flip();
>> -        }
>> -        else
>> +        private int _currentListPos;
>> +        private int _markPos;
>> +
>> +
>> +        @Override
>> +        public int read() throws IOException
>>         {
>> -            buf = in;
>> +            ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
>> +            if(currentStream.available() > 0)
>> +            {
>> +                return currentStream.read();
>> +            }
>> +            else if((_currentListPos == _remainingBufs.size())
>> +                    || (++_currentListPos == _remainingBufs.size()))
>> +            {
>> +                return -1;
>> +            }
>> +            else
>> +            {
>> +
>> +                ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
>> +                stream.mark(0);
>> +                return stream.read();
>> +            }
>>         }
>>
>> -        for( ;; )
>> +        @Override
>> +        public int read(final byte[] b, final int off, final int len) throws IOException
>>         {
>> -            int oldPos = buf.position();
>> -            boolean decoded = doDecode( session, buf, out );
>> -            if( decoded )
>> +
>> +            if(_currentListPos == _remainingBufs.size())
>>             {
>> -                if( buf.position() == oldPos )
>> +                return -1;
>> +            }
>> +            else
>> +            {
>> +                ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
>> +                final int available = currentStream.available();
>> +                int read = currentStream.read(b, off, len > available ? available : len);
>> +                if(read < len)
>>                 {
>> -                    throw new IllegalStateException(
>> -                            "doDecode() can't return true when buffer is not consumed." );
>> +                    if(_currentListPos++ != _remainingBufs.size())
>> +                    {
>> +                        _remainingBufs.get(_currentListPos).mark(0);
>> +                    }
>> +                    int correctRead = read == -1 ? 0 : read;
>> +                    int subRead = read(b, off+correctRead, len-correctRead);
>> +                    if(subRead == -1)
>> +                    {
>> +                        return read;
>> +                    }
>> +                    else
>> +                    {
>> +                        return correctRead+subRead;
>> +                    }
>>                 }
>> -
>> -                if( !buf.hasRemaining() )
>> +                else
>>                 {
>> -                    break;
>> +                    return len;
>>                 }
>>             }
>> -            else
>> -            {
>> -                break;
>> -            }
>>         }
>>
>> -        // if there is any data left that cannot be decoded, we store
>> -        // it in a buffer in the session and next time this decoder is
>> -        // invoked the session buffer gets appended to
>> -        if ( buf.hasRemaining() )
>> +        @Override
>> +        public int available() throws IOException
>>         {
>> -            storeRemainingInSession( buf, session );
>> +            int total = 0;
>> +            for(int i = _currentListPos; i < _remainingBufs.size(); i++)
>> +            {
>> +                total += _remainingBufs.get(i).available();
>> +            }
>> +            return total;
>>         }
>> -        else
>> +
>> +        @Override
>> +        public void mark(final int readlimit)
>>         {
>> -            removeSessionBuffer( session );
>> +            _markPos = _currentListPos;
>> +            final ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
>> +            if(stream != null)
>> +            {
>> +                stream.mark(readlimit);
>> +            }
>>         }
>> -    }
>> -
>> -    /**
>> -     * Releases the cumulative buffer used by the specified <tt>session</tt>.
>> -     * Please don't forget to call <tt>super.dispose( session )</tt> when
>> -     * you override this method.
>> -     */
>> -    public void dispose( IoSession session ) throws Exception
>> -    {
>> -        removeSessionBuffer( session );
>> -    }
>>
>> -    private void removeSessionBuffer(IoSession session)
>> -    {
>> -        ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
>> -        if( buf != null )
>> +        @Override
>> +        public void reset() throws IOException
>>         {
>> -            buf.release();
>> -            session.removeAttribute( BUFFER );
>> +            _currentListPos = _markPos;
>> +            final int size = _remainingBufs.size();
>> +            if(_currentListPos < size)
>> +            {
>> +                _remainingBufs.get(_currentListPos).reset();
>> +            }
>> +            for(int i = _currentListPos+1; i<size; i++)
>> +            {
>> +                _remainingBufs.get(i).reset();
>> +            }
>>         }
>>     }
>>
>> -    private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
>> -
>> -    private void storeRemainingInSession(ByteBuffer buf, IoSession session)
>> -    {
>> -        ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false );
>> -        remainingBuf.setAutoExpand( true );
>> -        remainingBuf.put( buf );
>> -        session.setAttribute( BUFFER, remainingBuf );
>> -    }
>>
>> -    public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException
>> +    public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
>>     {
>>
>>         // get prior remaining data from accumulator
>>         ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
>> -        ByteBuffer msg;
>> -        // if we have a session buffer, append data to that otherwise
>> -        // use the buffer read from the network directly
>> -        if( _remainingBuf != null )
>> +        DataInputStream msg;
>> +
>> +
>> +        ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
>> +        if(!_remainingBufs.isEmpty())
>>         {
>> -            _remainingBuf.put(buf);
>> -            _remainingBuf.flip();
>> -            msg = _remainingBuf;
>> +            _remainingBufs.add(bais);
>> +            msg = new DataInputStream(new RemainingByteArrayInputStream());
>>         }
>>         else
>>         {
>> -            msg = ByteBuffer.wrap(buf);
>> +            msg = new DataInputStream(bais);
>>         }
>> -
>> -        if (_expectProtocolInitiation
>> -            || (firstDecode
>> -                && (msg.remaining() > 0)
>> -                && (msg.get(msg.position()) == (byte)'A')))
>> -        {
>> -            if (_piDecoder.decodable(msg.buf()))
>> -            {
>> -                dataBlocks.add(new ProtocolInitiation(msg.buf()));
>> -            }
>> -        }
>> -        else
>> +
>> +        boolean enoughData = true;
>> +        while (enoughData)
>>         {
>> -            boolean enoughData = true;
>> -            while (enoughData)
>> +            if(!_expectProtocolInitiation)
>>             {
>> -                int pos = msg.position();
>> -
>>                 enoughData = _dataBlockDecoder.decodable(msg);
>> -                msg.position(pos);
>>                 if (enoughData)
>>                 {
>>                     dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg));
>>                 }
>> -                else
>> +            }
>> +            else
>> +            {
>> +                enoughData = _piDecoder.decodable(msg);
>> +                if (enoughData)
>>                 {
>> -                    _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false);
>> -                    _remainingBuf.setAutoExpand(true);
>> -                    _remainingBuf.put(msg);
>> +                    dataBlocks.add(new ProtocolInitiation(msg));
>> +                }
>> +
>> +            }
>> +
>> +            if(!enoughData)
>> +            {
>> +                if(!_remainingBufs.isEmpty())
>> +                {
>> +                    _remainingBufs.remove(_remainingBufs.size()-1);
>> +                    ListIterator<ByteArrayInputStream> iterator = _remainingBufs.listIterator();
>> +                    while(iterator.hasNext() && iterator.next().available() == 0)
>> +                    {
>> +                        iterator.remove();
>> +                    }
>> +                }
>> +                if(bais.available()!=0)
>> +                {
>> +                    byte[] remaining = new byte[bais.available()];
>> +                    bais.read(remaining);
>> +                    _remainingBufs.add(new ByteArrayInputStream(remaining));
>>                 }
>>             }
>> -        }
>> -        if(firstDecode && dataBlocks.size() > 0)
>> -        {
>> -            firstDecode = false;
>>         }
>>         return dataBlocks;
>>     }
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java Mon Sep 19 15:13:18 2011
>> @@ -23,7 +23,7 @@ package org.apache.qpid.configuration;
>>  */
>>  public class ClientProperties
>>  {
>> -
>> +
>>     /**
>>      * Currently with Qpid it is not possible to change the client ID.
>>      * If one is not specified upon connection construction, an id is generated automatically.
>> @@ -68,38 +68,50 @@ public class ClientProperties
>>      * by the broker in TuneOK it will be used as the heartbeat interval.
>>      * If not a warning will be printed and the max value specified for
>>      * heartbeat in TuneOK will be used
>> -     *
>> +     *
>>      * The default idle timeout is set to 120 secs
>>      */
>>     public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout";
>>     public static final long DEFAULT_IDLE_TIMEOUT = 120000;
>> -
>> +
>>     public static final String HEARTBEAT = "qpid.heartbeat";
>>     public static final int HEARTBEAT_DEFAULT = 120;
>> -
>> +
>>     /**
>>      * This value will be used to determine the default destination syntax type.
>>      * Currently the two types are Binding URL (java only) and the Addressing format (used by
>> -     * all clients).
>> +     * all clients).
>>      */
>>     public static final String DEST_SYNTAX = "qpid.dest_syntax";
>> -
>> +
>>     public static final String USE_LEGACY_MAP_MESSAGE_FORMAT = "qpid.use_legacy_map_message";
>>
>>     public static final String AMQP_VERSION = "qpid.amqp.version";
>> -
>> -    private static ClientProperties _instance = new ClientProperties();
>> -
>> +
>> +    public static final String QPID_VERIFY_CLIENT_ID = "qpid.verify_client_id";
>> +
>> +    /**
>> +     * System properties to change the default timeout used during
>> +     * synchronous operations.
>> +     */
>> +    public static final String QPID_SYNC_OP_TIMEOUT = "qpid.sync_op_timeout";
>> +    public static final String AMQJ_DEFAULT_SYNCWRITE_TIMEOUT = "amqj.default_syncwrite_timeout";
>> +
>> +    /**
>> +     * A default timeout value for synchronous operations
>> +     */
>> +    public static final int DEFAULT_SYNC_OPERATION_TIMEOUT = 60000;
>> +
>>     /*
>> -    public static final QpidProperty<Boolean>  IGNORE_SET_CLIENTID_PROP_NAME =
>> +    public static final QpidProperty<Boolean>  IGNORE_SET_CLIENTID_PROP_NAME =
>>         QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID");
>> -
>> +
>>     public static final QpidProperty<Boolean> SYNC_PERSISTENT_PROP_NAME =
>>         QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence");
>> -
>> -
>> +
>> +
>>     public static final QpidProperty<Integer> MAX_PREFETCH_PROP_NAME =
>>         QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch"); */
>> -
>> -
>> +
>> +
>>  }
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java Mon Sep 19 15:13:18 2011
>> @@ -20,7 +20,9 @@
>>  */
>>  package org.apache.qpid.framing;
>>
>> -import org.apache.mina.common.ByteBuffer;
>> +import java.io.DataOutputStream;
>> +import java.io.IOException;
>> +
>>  import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
>>  import org.apache.qpid.AMQException;
>>
>> @@ -34,7 +36,7 @@ public interface AMQBody
>>      */
>>     public abstract int getSize();
>>
>> -    public void writePayload(ByteBuffer buffer);
>> +    public void writePayload(DataOutputStream buffer) throws IOException;
>>
>> -    void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException;
>> +    void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException;
>>  }
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java Mon Sep 19 15:13:18 2011
>> @@ -20,7 +20,10 @@
>>  */
>>  package org.apache.qpid.framing;
>>
>> -import org.apache.mina.common.ByteBuffer;
>> +import java.io.DataInputStream;
>> +import java.io.DataOutputStream;
>> +import java.io.IOException;
>> +
>>
>>  /**
>>  * A data block represents something that has a size in bytes and the ability to write itself to a byte
>> @@ -39,25 +42,6 @@ public abstract class AMQDataBlock imple
>>      * Writes the datablock to the specified buffer.
>>      * @param buffer
>>      */
>> -    public abstract void writePayload(ByteBuffer buffer);
>> -
>> -    public ByteBuffer toByteBuffer()
>> -    {
>> -        final ByteBuffer buffer = ByteBuffer.allocate((int)getSize());
>> -
>> -        writePayload(buffer);
>> -        buffer.flip();
>> -        return buffer;
>> -    }
>> -
>> -    public java.nio.ByteBuffer toNioByteBuffer()
>> -    {
>> -        final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize());
>> -
>> -        ByteBuffer buf = ByteBuffer.wrap(buffer);
>> -        writePayload(buf);
>> -        buffer.flip();
>> -        return buffer;
>> -    }
>> +    public abstract void writePayload(DataOutputStream buffer) throws IOException;
>>
>>  }
>>
>> Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
>> URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> ==============================================================================
>> --- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original)
>> +++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Mon Sep 19 15:13:18 2011
>> @@ -20,18 +20,14 @@
>>  */
>>  package org.apache.qpid.framing;
>>
>> -import org.apache.mina.common.ByteBuffer;
>> -import org.apache.mina.common.IoSession;
>> -import org.apache.mina.filter.codec.ProtocolDecoderOutput;
>> -
>> -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
>> -
>>  import org.slf4j.Logger;
>>  import org.slf4j.LoggerFactory;
>>
>> +import java.io.DataInputStream;
>> +import java.io.IOException;
>> +
>>  public class AMQDataBlockDecoder
>>  {
>> -    private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY";
>>
>>     private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
>>
>> @@ -47,27 +43,32 @@ public class AMQDataBlockDecoder
>>     public AMQDataBlockDecoder()
>>     { }
>>
>> -    public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException
>> +    public boolean decodable(DataInputStream in) throws AMQFrameDecodingException, IOException
>>     {
>> -        final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
>> +        final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1);
>>         // type, channel, body length and end byte
>>         if (remainingAfterAttributes < 0)
>>         {
>>             return false;
>>         }
>>
>> -        in.position(in.position() + 1 + 2);
>> +        in.mark(8);
>> +        in.skip(1 + 2);
>> +
>> +
>>         // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt()
>> -        final long bodySize = in.getInt() & 0xffffffffL;
>> +        final long bodySize = in.readInt() & 0xffffffffL;
>> +
>> +        in.reset();
>>
>>         return (remainingAfterAttributes >= bodySize);
>>
>>     }
>>
>> -    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in)
>> -        throws AMQFrameDecodingException, AMQProtocolVersionException
>> +    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, DataInputStream in)
>> +            throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
>>     {
>> -        final byte type = in.get();
>> +        final byte type = in.readByte();
>>
>>         BodyFactory bodyFactory;
>>         if (type == AMQMethodBody.TYPE)
>> @@ -84,8 +85,8 @@ public class AMQDataBlockDecoder
>>             throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
>>         }
>>
>> -        final int channel = in.getUnsignedShort();
>> -        final long bodySize = in.getUnsignedInt();
>> +        final int channel = in.readUnsignedShort();
>> +        final long bodySize = EncodingUtils.readUnsignedInteger(in);
>>
>>         // bodySize can be zero
>>         if ((channel < 0) || (bodySize < 0))
>> @@ -96,7 +97,7 @@ public class AMQDataBlockDecoder
>>
>>         AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
>>
>> -        byte marker = in.get();
>> +        byte marker = in.readByte();
>>         if ((marker & 0xFF) != 0xCE)
>>         {
>>             throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
>> @@ -106,26 +107,4 @@ public class AMQDataBlockDecoder
>>         return frame;
>>     }
>>
>> -    public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
>> -    {
>> -        AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
>> -        if (bodyFactory == null)
>> -        {
>> -            AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
>> -            bodyFactory = new AMQMethodBodyFactory(protocolSession);
>> -            session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
>> -        }
>> -
>> -        out.write(createAndPopulateFrame(bodyFactory, in));
>> -    }
>> -
>> -    public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException
>> -    {
>> -        return decodable(msg.buf());
>> -    }
>> -
>> -    public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException
>> -    {
>> -        return createAndPopulateFrame(factory, ByteBuffer.wrap(msg));
>> -    }
>>  }
>>
>>
>>
>> ---------------------------------------------------------------------
>> Apache Qpid - AMQP Messaging Implementation
>> Project:      http://qpid.apache.org
>> Use/Interact: mailto:commits-subscribe@qpid.apache.org
>>
>>
>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Mime
View raw message