qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rajith Attapattu <rajit...@gmail.com>
Subject Re: svn commit: r1172657 [14/21] - in /qpid/branches/qpid-3346/qpid: ./ cpp/ cpp/bindings/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qpid/dotnet/ cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/Properties/ cpp/bindings/qpid/dotnet/example
Date Mon, 19 Sep 2011 16:26:22 GMT
On Mon, Sep 19, 2011 at 12:07 PM, Ken Giusti <kgiusti@redhat.com> wrote:
> Rajith,
>
> Yes, not on trunk, but...
>
> Actually, there is a problem with that branch due to that merge.  Involves java - for some reason that branch still contains a "mina" tree in
>
> java/common/src/test/java/org/apache/qpid/transport/network
>
> even though it no longer appears on trunk.

Robert Godfrey did some recent work to get rid of mina from trunk,
which explains what you are seeing :)

Rajith

> In any case, I'll avoid merging anything from the java subtree of that branch back to trunk, when the time comes.
>
> thanks,
>
> -K
>
>
> ----- Original Message -----
>> Ken,
>>
>> Apologies, I didn't realize that you were merging from trunk not into
>> it :)
>> Thanks Gordon for pointing it out.
>>
>> Regards,
>>
>> Rajith
>>
>> On Mon, Sep 19, 2011 at 11:48 AM, Rajith Attapattu
>> <rajith77@gmail.com> wrote:
>> > Ken,
>> >
>> > I assume this wasn't intentional and was merely a side effect of
>> > the merge ?
>> > I didn't look at the diff very closely, but I wonder what the
>> > implication of these changes would be.
>> >
>> > Looking further it seems a lot of classes on the java client and
>> > broker have changed.
>> > I think in order to be on the safe side, these changes should be
>> > backed out.
>> > If I am not mistaken that would not impact anything as I believe
>> > your
>> > work didn't involve the java side.
>> >
>> > Rajith
>> >
>> > On Mon, Sep 19, 2011 at 11:13 AM,  <kgiusti@apache.org> wrote:
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -23,7 +23,8 @@ package org.apache.qpid.client.message;
>> >>  import javax.jms.JMSException;
>> >>  import javax.jms.StreamMessage;
>> >>
>> >> -import org.apache.mina.common.ByteBuffer;
>> >> +import java.nio.ByteBuffer;
>> >> +
>> >>  import org.apache.qpid.AMQException;
>> >>  import org.apache.qpid.framing.AMQShortString;
>> >>  import org.apache.qpid.framing.BasicContentHeaderProperties;
>> >> @@ -36,65 +37,76 @@ public class JMSStreamMessage extends Ab
>> >>     public static final String MIME_TYPE="jms/stream-message";
>> >>
>> >>
>> >> -
>> >> -    /**
>> >> -     * This is set when reading a byte array. The
>> >> readBytes(byte[]) method supports multiple calls to read
>> >> -     * a byte array in multiple chunks, hence this is used to
>> >> track how much is left to be read
>> >> -     */
>> >> -    private int _byteArrayRemaining = -1;
>> >> +    private TypedBytesContentReader _typedBytesContentReader;
>> >> +    private TypedBytesContentWriter _typedBytesContentWriter;
>> >>
>> >>     public JMSStreamMessage(AMQMessageDelegateFactory
>> >>     delegateFactory)
>> >>     {
>> >> -        this(delegateFactory,null);
>> >> +        super(delegateFactory,false);
>> >> +        _typedBytesContentWriter = new TypedBytesContentWriter();
>> >>
>> >>     }
>> >>
>> >> -    /**
>> >> -     * Construct a stream message with existing data.
>> >> -     *
>> >> -     * @param delegateFactory
>> >> -     * @param data the data that comprises this message. If data
>> >> is null, you get a 1024 byte buffer that is
>> >> -     */
>> >> -    JMSStreamMessage(AMQMessageDelegateFactory delegateFactory,
>> >> ByteBuffer data)
>> >> -    {
>> >>
>> >> -        super(delegateFactory, data); // this instanties a
>> >> content header
>> >> -    }
>> >>
>> >>     JMSStreamMessage(AMQMessageDelegate delegate, ByteBuffer data)
>> >>     throws AMQException
>> >>     {
>> >> -
>> >> -        super(delegate, data);
>> >> +        super(delegate, data!=null);
>> >> +        _typedBytesContentReader = new
>> >> TypedBytesContentReader(data);
>> >>     }
>> >>
>> >> -
>> >>     public void reset()
>> >>     {
>> >> -        super.reset();
>> >>         _readableMessage = true;
>> >> +
>> >> +        if(_typedBytesContentReader != null)
>> >> +        {
>> >> +            _typedBytesContentReader.reset();
>> >> +        }
>> >> +        else if (_typedBytesContentWriter != null)
>> >> +        {
>> >> +            _typedBytesContentReader = new
>> >> TypedBytesContentReader(_typedBytesContentWriter.getData());
>> >> +        }
>> >> +    }
>> >> +
>> >> +    @Override
>> >> +    public void clearBody() throws JMSException
>> >> +    {
>> >> +        super.clearBody();
>> >> +        _typedBytesContentReader = null;
>> >> +        _typedBytesContentWriter = new TypedBytesContentWriter();
>> >> +
>> >>     }
>> >>
>> >> +
>> >>     protected String getMimeType()
>> >>     {
>> >>         return MIME_TYPE;
>> >>     }
>> >>
>> >> -
>> >> +    @Override
>> >> +    public java.nio.ByteBuffer getData() throws JMSException
>> >> +    {
>> >> +        return _typedBytesContentWriter == null ?
>> >> _typedBytesContentReader.getData() :
>> >> _typedBytesContentWriter.getData();
>> >> +    }
>> >>
>> >>     public boolean readBoolean() throws JMSException
>> >>     {
>> >> -        return super.readBoolean();
>> >> +        checkReadable();
>> >> +        return _typedBytesContentReader.readBoolean();
>> >>     }
>> >>
>> >>
>> >>     public byte readByte() throws JMSException
>> >>     {
>> >> -        return super.readByte();
>> >> +        checkReadable();
>> >> +        return _typedBytesContentReader.readByte();
>> >>     }
>> >>
>> >>     public short readShort() throws JMSException
>> >>     {
>> >> -        return super.readShort();
>> >> +        checkReadable();
>> >> +        return _typedBytesContentReader.readShort();
>> >>     }
>> >>
>> >>     /**
>> >> @@ -105,102 +117,127 @@ public class JMSStreamMessage extends Ab
>> >>      */
>> >>     public char readChar() throws JMSException
>> >>     {
>> >> -        return super.readChar();
>> >> +        checkReadable();
>> >> +        return _typedBytesContentReader.readChar();
>> >>     }
>> >>
>> >>     public int readInt() throws JMSException
>> >>     {
>> >> -        return super.readInt();
>> >> +        checkReadable();
>> >> +        return _typedBytesContentReader.readInt();
>> >>     }
>> >>
>> >>     public long readLong() throws JMSException
>> >>     {
>> >> -        return super.readLong();
>> >> +        checkReadable();
>> >> +        return _typedBytesContentReader.readLong();
>> >>     }
>> >>
>> >>     public float readFloat() throws JMSException
>> >>     {
>> >> -        return super.readFloat();
>> >> +        checkReadable();
>> >> +        return _typedBytesContentReader.readFloat();
>> >>     }
>> >>
>> >>     public double readDouble() throws JMSException
>> >>     {
>> >> -        return super.readDouble();
>> >> +        checkReadable();
>> >> +        return _typedBytesContentReader.readDouble();
>> >>     }
>> >>
>> >>     public String readString() throws JMSException
>> >>     {
>> >> -        return super.readString();
>> >> +        checkReadable();
>> >> +        return _typedBytesContentReader.readString();
>> >>     }
>> >>
>> >>     public int readBytes(byte[] bytes) throws JMSException
>> >>     {
>> >> -        return super.readBytes(bytes);
>> >> +        if(bytes == null)
>> >> +        {
>> >> +            throw new IllegalArgumentException("Must provide
>> >> non-null array to read into");
>> >> +        }
>> >> +
>> >> +        checkReadable();
>> >> +        return _typedBytesContentReader.readBytes(bytes);
>> >>     }
>> >>
>> >>
>> >>     public Object readObject() throws JMSException
>> >>     {
>> >> -        return super.readObject();
>> >> +        checkReadable();
>> >> +        return _typedBytesContentReader.readObject();
>> >>     }
>> >>
>> >>     public void writeBoolean(boolean b) throws JMSException
>> >>     {
>> >> -        super.writeBoolean(b);
>> >> +        checkWritable();
>> >> +        _typedBytesContentWriter.writeBoolean(b);
>> >>     }
>> >>
>> >>     public void writeByte(byte b) throws JMSException
>> >>     {
>> >> -        super.writeByte(b);
>> >> +        checkWritable();
>> >> +        _typedBytesContentWriter.writeByte(b);
>> >>     }
>> >>
>> >>     public void writeShort(short i) throws JMSException
>> >>     {
>> >> -        super.writeShort(i);
>> >> +        checkWritable();
>> >> +        _typedBytesContentWriter.writeShort(i);
>> >>     }
>> >>
>> >>     public void writeChar(char c) throws JMSException
>> >>     {
>> >> -        super.writeChar(c);
>> >> +        checkWritable();
>> >> +        _typedBytesContentWriter.writeChar(c);
>> >>     }
>> >>
>> >>     public void writeInt(int i) throws JMSException
>> >>     {
>> >> -        super.writeInt(i);
>> >> +        checkWritable();
>> >> +        _typedBytesContentWriter.writeInt(i);
>> >>     }
>> >>
>> >>     public void writeLong(long l) throws JMSException
>> >>     {
>> >> -        super.writeLong(l);
>> >> +        checkWritable();
>> >> +        _typedBytesContentWriter.writeLong(l);
>> >>     }
>> >>
>> >>     public void writeFloat(float v) throws JMSException
>> >>     {
>> >> -        super.writeFloat(v);
>> >> +        checkWritable();
>> >> +        _typedBytesContentWriter.writeFloat(v);
>> >>     }
>> >>
>> >>     public void writeDouble(double v) throws JMSException
>> >>     {
>> >> -        super.writeDouble(v);
>> >> +        checkWritable();
>> >> +        _typedBytesContentWriter.writeDouble(v);
>> >>     }
>> >>
>> >>     public void writeString(String string) throws JMSException
>> >>     {
>> >> -        super.writeString(string);
>> >> +        checkWritable();
>> >> +        _typedBytesContentWriter.writeString(string);
>> >>     }
>> >>
>> >>     public void writeBytes(byte[] bytes) throws JMSException
>> >>     {
>> >> -        super.writeBytes(bytes);
>> >> +        checkWritable();
>> >> +        _typedBytesContentWriter.writeBytes(bytes);
>> >>     }
>> >>
>> >>     public void writeBytes(byte[] bytes, int offset, int length)
>> >>     throws JMSException
>> >>     {
>> >> -        super.writeBytes(bytes,offset,length);
>> >> +        checkWritable();
>> >> +        _typedBytesContentWriter.writeBytes(bytes, offset,
>> >> length);
>> >>     }
>> >>
>> >>     public void writeObject(Object object) throws JMSException
>> >>     {
>> >> -        super.writeObject(object);
>> >> +        checkWritable();
>> >> +        _typedBytesContentWriter.writeObject(object);
>> >>     }
>> >>  }
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -22,10 +22,9 @@ package org.apache.qpid.client.message;
>> >>
>> >>  import javax.jms.JMSException;
>> >>
>> >> -import org.apache.mina.common.ByteBuffer;
>> >> +import java.nio.ByteBuffer;
>> >> +
>> >>  import org.apache.qpid.AMQException;
>> >> -import org.apache.qpid.framing.AMQShortString;
>> >> -import org.apache.qpid.framing.BasicContentHeaderProperties;
>> >>
>> >>  public class JMSStreamMessageFactory extends
>> >>  AbstractJMSMessageFactory
>> >>  {
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -20,15 +20,21 @@
>> >>  */
>> >>  package org.apache.qpid.client.message;
>> >>
>> >> +import java.io.DataInputStream;
>> >>  import java.io.UnsupportedEncodingException;
>> >> +import java.nio.ByteBuffer;
>> >> +import java.nio.CharBuffer;
>> >>  import java.nio.charset.CharacterCodingException;
>> >>  import java.nio.charset.Charset;
>> >> +import java.nio.charset.CharsetDecoder;
>> >> +import java.nio.charset.CharsetEncoder;
>> >>
>> >>  import javax.jms.JMSException;
>> >> +import javax.jms.MessageFormatException;
>> >>
>> >> -import org.apache.mina.common.ByteBuffer;
>> >>  import org.apache.qpid.AMQException;
>> >>  import org.apache.qpid.client.CustomJMSXProperty;
>> >> +import org.apache.qpid.framing.AMQFrameDecodingException;
>> >>  import org.apache.qpid.framing.AMQShortString;
>> >>  import org.apache.qpid.framing.BasicContentHeaderProperties;
>> >>  import org.apache.qpid.util.Strings;
>> >> @@ -37,6 +43,7 @@ public class JMSTextMessage extends Abst
>> >>  {
>> >>     private static final String MIME_TYPE = "text/plain";
>> >>
>> >> +    private Exception _exception;
>> >>     private String _decodedValue;
>> >>
>> >>     /**
>> >> @@ -45,36 +52,41 @@ public class JMSTextMessage extends Abst
>> >>     private static final String PAYLOAD_NULL_PROPERTY =
>> >>     CustomJMSXProperty.JMS_AMQP_NULL.toString();
>> >>     private static final Charset DEFAULT_CHARSET =
>> >>     Charset.forName("UTF-8");
>> >>
>> >> -    public JMSTextMessage(AMQMessageDelegateFactory
>> >> delegateFactory) throws JMSException
>> >> -    {
>> >> -        this(delegateFactory, null, null);
>> >> -    }
>> >> +    private CharsetDecoder _decoder =
>> >> DEFAULT_CHARSET.newDecoder();
>> >> +    private CharsetEncoder _encoder =
>> >> DEFAULT_CHARSET.newEncoder();
>> >> +
>> >> +    private static final ByteBuffer EMPTY_BYTE_BUFFER =
>> >> ByteBuffer.allocate(0);
>> >>
>> >> -    JMSTextMessage(AMQMessageDelegateFactory delegateFactory,
>> >> ByteBuffer data, String encoding) throws JMSException
>> >> +    public JMSTextMessage(AMQMessageDelegateFactory
>> >> delegateFactory) throws JMSException
>> >>     {
>> >> -        super(delegateFactory, data); // this instantiates a
>> >> content header
>> >> -        setContentType(getMimeType());
>> >> -        setEncoding(encoding);
>> >> +        super(delegateFactory, false); // this instantiates a
>> >> content header
>> >>     }
>> >>
>> >>     JMSTextMessage(AMQMessageDelegate delegate, ByteBuffer data)
>> >>             throws AMQException
>> >>     {
>> >> -        super(delegate, data);
>> >> -        setContentType(getMimeType());
>> >> -        _data = data;
>> >> -    }
>> >> +        super(delegate, data!=null);
>> >>
>> >> -
>> >> -    public void clearBodyImpl() throws JMSException
>> >> -    {
>> >> -        if (_data != null)
>> >> +        try
>> >>         {
>> >> -            _data.release();
>> >> -            _data = null;
>> >> +            if(propertyExists(PAYLOAD_NULL_PROPERTY))
>> >> +            {
>> >> +                _decodedValue = null;
>> >> +            }
>> >> +            else
>> >> +            {
>> >> +                _decodedValue = _decoder.decode(data).toString();
>> >> +            }
>> >> +        }
>> >> +        catch (CharacterCodingException e)
>> >> +        {
>> >> +            _exception = e;
>> >> +        }
>> >> +        catch (JMSException e)
>> >> +        {
>> >> +            _exception = e;
>> >>         }
>> >>
>> >> -        _decodedValue = null;
>> >>     }
>> >>
>> >>     public String toBodyString() throws JMSException
>> >> @@ -87,95 +99,62 @@ public class JMSTextMessage extends Abst
>> >>         return MIME_TYPE;
>> >>     }
>> >>
>> >> -    public void setText(String text) throws JMSException
>> >> +    @Override
>> >> +    public ByteBuffer getData() throws JMSException
>> >>     {
>> >> -        checkWritable();
>> >> -
>> >> -        clearBody();
>> >> +        _encoder.reset();
>> >>         try
>> >>         {
>> >> -            if (text != null)
>> >> +            if(_exception != null)
>> >> +            {
>> >> +                final MessageFormatException
>> >> messageFormatException = new MessageFormatException("Cannot
>> >> decode original message");
>> >> +
>> >>                messageFormatException.setLinkedException(_exception);
>> >> +                throw messageFormatException;
>> >> +            }
>> >> +            else if(_decodedValue == null)
>> >> +            {
>> >> +                return EMPTY_BYTE_BUFFER;
>> >> +            }
>> >> +            else
>> >>             {
>> >> -                final String encoding = getEncoding();
>> >> -                if (encoding == null ||
>> >> encoding.equalsIgnoreCase("UTF-8"))
>> >> -                {
>> >> -                    _data =
>> >> ByteBuffer.wrap(Strings.toUTF8(text));
>> >> -                    setEncoding("UTF-8");
>> >> -                }
>> >> -                else
>> >> -                {
>> >> -                    _data =
>> >> ByteBuffer.wrap(text.getBytes(encoding));
>> >> -                }
>> >> -                _data.position(_data.limit());
>> >> -                _changedData=true;
>> >> +                return
>> >> _encoder.encode(CharBuffer.wrap(_decodedValue));
>> >>             }
>> >> -            _decodedValue = text;
>> >>         }
>> >> -        catch (UnsupportedEncodingException e)
>> >> +        catch (CharacterCodingException e)
>> >>         {
>> >> -            // should never occur
>> >> -            JMSException jmse = new JMSException("Unable to
>> >> decode text data");
>> >> -            jmse.setLinkedException(e);
>> >> -            jmse.initCause(e);
>> >> -            throw jmse;
>> >> +            final JMSException jmsException = new
>> >> JMSException("Cannot encode string in UFT-8: " + _decodedValue);
>> >> +            jmsException.setLinkedException(e);
>> >> +            throw jmsException;
>> >>         }
>> >>     }
>> >>
>> >> -    public String getText() throws JMSException
>> >> +    @Override
>> >> +    public void clearBody() throws JMSException
>> >>     {
>> >> -        if (_data == null && _decodedValue == null)
>> >> -        {
>> >> -            return null;
>> >> -        }
>> >> -        else if (_decodedValue != null)
>> >> -        {
>> >> -            return _decodedValue;
>> >> -        }
>> >> -        else
>> >> -        {
>> >> -            _data.rewind();
>> >> +        super.clearBody();
>> >> +        _decodedValue = null;
>> >> +        _exception = null;
>> >> +    }
>> >>
>> >> -            if (propertyExists(PAYLOAD_NULL_PROPERTY) &&
>> >> getBooleanProperty(PAYLOAD_NULL_PROPERTY))
>> >> -            {
>> >> -                return null;
>> >> -            }
>> >> -            if (getEncoding() != null)
>> >> -            {
>> >> -                try
>> >> -                {
>> >> -                    _decodedValue =
>> >> _data.getString(Charset.forName(getEncoding()).newDecoder());
>> >> -                }
>> >> -                catch (CharacterCodingException e)
>> >> -                {
>> >> -                    JMSException jmse = new JMSException("Could
>> >> not decode string data: " + e);
>> >> -                    jmse.setLinkedException(e);
>> >> -                    jmse.initCause(e);
>> >> -                    throw jmse;
>> >> -                }
>> >> -            }
>> >> -            else
>> >> -            {
>> >> -                try
>> >> -                {
>> >> -                    _decodedValue =
>> >> _data.getString(DEFAULT_CHARSET.newDecoder());
>> >> -                }
>> >> -                catch (CharacterCodingException e)
>> >> -                {
>> >> -                    JMSException jmse = new JMSException("Could
>> >> not decode string data: " + e);
>> >> -                    jmse.setLinkedException(e);
>> >> -                    jmse.initCause(e);
>> >> -                    throw jmse;
>> >> -                }
>> >> -            }
>> >> -            return _decodedValue;
>> >> -        }
>> >> +    public void setText(String text) throws JMSException
>> >> +    {
>> >> +        checkWritable();
>> >> +
>> >> +        clearBody();
>> >> +        _decodedValue = text;
>> >> +
>> >> +    }
>> >> +
>> >> +    public String getText() throws JMSException
>> >> +    {
>> >> +        return _decodedValue;
>> >>     }
>> >>
>> >>     @Override
>> >>     public void prepareForSending() throws JMSException
>> >>     {
>> >>         super.prepareForSending();
>> >> -        if (_data == null)
>> >> +        if (_decodedValue == null)
>> >>         {
>> >>             setBooleanProperty(PAYLOAD_NULL_PROPERTY, true);
>> >>         }
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -22,7 +22,7 @@ package org.apache.qpid.client.message;
>> >>
>> >>  import javax.jms.JMSException;
>> >>
>> >> -import org.apache.mina.common.ByteBuffer;
>> >> +import java.nio.ByteBuffer;
>> >>  import org.apache.qpid.AMQException;
>> >>  import org.apache.qpid.framing.AMQShortString;
>> >>  import org.apache.qpid.framing.BasicContentHeaderProperties;
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -87,9 +87,9 @@ public class UnprocessedMessage_0_8 exte
>> >>     public void receiveBody(ContentBody body)
>> >>     {
>> >>
>> >> -        if (body.payload != null)
>> >> +        if (body._payload != null)
>> >>         {
>> >> -            final long payloadSize = body.payload.remaining();
>> >> +            final long payloadSize = body._payload.length;
>> >>
>> >>             if (_bodies == null)
>> >>             {
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -20,7 +20,9 @@
>> >>  */
>> >>  package org.apache.qpid.client.protocol;
>> >>
>> >> +import java.io.DataOutputStream;
>> >>  import java.io.IOException;
>> >> +import java.io.OutputStream;
>> >>  import java.net.SocketAddress;
>> >>  import java.nio.ByteBuffer;
>> >>  import java.util.ArrayList;
>> >> @@ -31,7 +33,6 @@ import java.util.concurrent.CountDownLat
>> >>  import java.util.concurrent.ThreadFactory;
>> >>  import java.util.concurrent.TimeUnit;
>> >>
>> >> -import org.apache.mina.filter.codec.ProtocolCodecException;
>> >>  import org.apache.qpid.AMQConnectionClosedException;
>> >>  import org.apache.qpid.AMQDisconnectedException;
>> >>  import org.apache.qpid.AMQException;
>> >> @@ -46,6 +47,7 @@ import org.apache.qpid.client.state.AMQS
>> >>  import org.apache.qpid.client.state.StateWaiter;
>> >>  import
>> >>  org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
>> >>  import org.apache.qpid.codec.AMQCodecFactory;
>> >> +import org.apache.qpid.configuration.ClientProperties;
>> >>  import org.apache.qpid.framing.AMQBody;
>> >>  import org.apache.qpid.framing.AMQDataBlock;
>> >>  import org.apache.qpid.framing.AMQFrame;
>> >> @@ -57,8 +59,6 @@ import org.apache.qpid.framing.Heartbeat
>> >>  import org.apache.qpid.framing.MethodRegistry;
>> >>  import org.apache.qpid.framing.ProtocolInitiation;
>> >>  import org.apache.qpid.framing.ProtocolVersion;
>> >> -import org.apache.qpid.pool.Job;
>> >> -import org.apache.qpid.pool.ReferenceCountingExecutorService;
>> >>  import org.apache.qpid.protocol.AMQConstant;
>> >>  import org.apache.qpid.protocol.AMQMethodEvent;
>> >>  import org.apache.qpid.protocol.AMQMethodListener;
>> >> @@ -164,19 +164,19 @@ public class AMQProtocolHandler implemen
>> >>     private FailoverException _lastFailoverException;
>> >>
>> >>     /** Defines the default timeout to use for synchronous
>> >>     protocol commands. */
>> >> -    private final long DEFAULT_SYNC_TIMEOUT =
>> >> Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30);
>> >> +    private final long DEFAULT_SYNC_TIMEOUT =
>> >> Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
>> >> +
>> >>                                                           Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
>> >> +
>> >>                                                                        ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
>> >>
>> >>     /** Object to lock on when changing the latch */
>> >>     private Object _failoverLatchChange = new Object();
>> >>     private AMQCodecFactory _codecFactory;
>> >> -    private Job _readJob;
>> >> -    private Job _writeJob;
>> >> -    private ReferenceCountingExecutorService _poolReference =
>> >> ReferenceCountingExecutorService.getInstance();
>> >> +
>> >>     private ProtocolVersion _suggestedProtocolVersion;
>> >>
>> >>     private long _writtenBytes;
>> >>     private long _readBytes;
>> >> -    private NetworkTransport _transport;
>> >> +
>> >>     private NetworkConnection _network;
>> >>     private Sender<ByteBuffer> _sender;
>> >>
>> >> @@ -191,24 +191,6 @@ public class AMQProtocolHandler implemen
>> >>         _protocolSession = new AMQProtocolSession(this,
>> >>         _connection);
>> >>         _stateManager = new AMQStateManager(_protocolSession);
>> >>         _codecFactory = new AMQCodecFactory(false,
>> >>         _protocolSession);
>> >> -        _poolReference.setThreadFactory(new ThreadFactory()
>> >> -        {
>> >> -
>> >> -            public Thread newThread(final Runnable runnable)
>> >> -            {
>> >> -                try
>> >> -                {
>> >> -                    return
>> >> Threading.getThreadFactory().createThread(runnable);
>> >> -                }
>> >> -                catch (Exception e)
>> >> -                {
>> >> -                    throw new RuntimeException("Failed to create
>> >> thread", e);
>> >> -                }
>> >> -            }
>> >> -        });
>> >> -        _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS,
>> >> true);
>> >> -        _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS,
>> >> false);
>> >> -        _poolReference.acquireExecutorService();
>> >>         _failoverHandler = new FailoverHandler(this);
>> >>     }
>> >>
>> >> @@ -329,17 +311,7 @@ public class AMQProtocolHandler implemen
>> >>             }
>> >>             else
>> >>             {
>> >> -
>> >> -                if (cause instanceof ProtocolCodecException)
>> >> -                {
>> >> -                    _logger.info("Protocol Exception caught NOT
>> >> going to attempt failover as " +
>> >> -                                 "cause isn't
>> >> AMQConnectionClosedException: " + cause, cause);
>> >> -
>> >> -                    AMQException amqe = new
>> >> AMQException("Protocol handler error: " + cause, cause);
>> >> -                    propagateExceptionToAllWaiters(amqe);
>> >> -                }
>> >>                 _connection.exceptionReceived(cause);
>> >> -
>> >>             }
>> >>
>> >>             // FIXME Need to correctly handle other exceptions.
>> >>             Things like ...
>> >> @@ -433,76 +405,63 @@ public class AMQProtocolHandler implemen
>> >>
>> >>     public void received(ByteBuffer msg)
>> >>     {
>> >> +        _readBytes += msg.remaining();
>> >>         try
>> >>         {
>> >> -            _readBytes += msg.remaining();
>> >>             final ArrayList<AMQDataBlock> dataBlocks =
>> >>             _codecFactory.getDecoder().decodeBuffer(msg);
>> >>
>> >> -            Job.fireAsynchEvent(_poolReference.getPool(),
>> >> _readJob, new Runnable()
>> >> +            // Decode buffer
>> >> +
>> >> +            for (AMQDataBlock message : dataBlocks)
>> >>             {
>> >>
>> >> -                public void run()
>> >> -                {
>> >> -                    // Decode buffer
>> >> +                    if (PROTOCOL_DEBUG)
>> >> +                    {
>> >> +                        _protocolLogger.info(String.format("RECV:
>> >> [%s] %s", this, message));
>> >> +                    }
>> >>
>> >> -                    for (AMQDataBlock message : dataBlocks)
>> >> +                    if(message instanceof AMQFrame)
>> >>                     {
>> >> +                        final boolean debug =
>> >> _logger.isDebugEnabled();
>> >> +                        final long msgNumber =
>> >> ++_messageReceivedCount;
>> >>
>> >> -                        try
>> >> -                        {
>> >> -                            if (PROTOCOL_DEBUG)
>> >> -                            {
>> >> -
>> >>                                _protocolLogger.info(String.format("RECV:
>> >> [%s] %s", this, message));
>> >> -                            }
>> >> -
>> >> -                            if(message instanceof AMQFrame)
>> >> -                            {
>> >> -                                final boolean debug =
>> >> _logger.isDebugEnabled();
>> >> -                                final long msgNumber =
>> >> ++_messageReceivedCount;
>> >> -
>> >> -                                if (debug && ((msgNumber % 1000)
>> >> == 0))
>> >> -                                {
>> >> -                                    _logger.debug("Received " +
>> >> _messageReceivedCount + " protocol messages");
>> >> -                                }
>> >> -
>> >> -                                AMQFrame frame = (AMQFrame)
>> >> message;
>> >> -
>> >> -                                final AMQBody bodyFrame =
>> >> frame.getBodyFrame();
>> >> -
>> >> -
>> >>                                HeartbeatDiagnostics.received(bodyFrame
>> >> instanceof HeartbeatBody);
>> >> -
>> >> -
>> >>                                bodyFrame.handle(frame.getChannel(),
>> >> _protocolSession);
>> >> -
>> >> -
>> >>                                _connection.bytesReceived(_readBytes);
>> >> -                            }
>> >> -                            else if (message instanceof
>> >> ProtocolInitiation)
>> >> -                            {
>> >> -                                // We get here if the server
>> >> sends a response to our initial protocol header
>> >> -                                // suggesting an alternate
>> >> ProtocolVersion; the server will then close the
>> >> -                                // connection.
>> >> -                                ProtocolInitiation protocolInit =
>> >> (ProtocolInitiation) message;
>> >> -                                _suggestedProtocolVersion =
>> >> protocolInit.checkVersion();
>> >> -                                _logger.info("Broker suggested
>> >> using protocol version:" + _suggestedProtocolVersion);
>> >> -
>> >> -                                // get round a bug in old
>> >> versions of qpid whereby the connection is not closed
>> >> -
>> >>                                _stateManager.changeState(AMQState.CONNECTION_CLOSED);
>> >> -                            }
>> >> -                        }
>> >> -                        catch (Exception e)
>> >> +                        if (debug && ((msgNumber % 1000) == 0))
>> >>                         {
>> >> -                            _logger.error("Exception processing
>> >> frame", e);
>> >> -
>> >>                            propagateExceptionToFrameListeners(e);
>> >> -                            exception(e);
>> >> +                            _logger.debug("Received " +
>> >> _messageReceivedCount + " protocol messages");
>> >>                         }
>> >> +
>> >> +                        AMQFrame frame = (AMQFrame) message;
>> >> +
>> >> +                        final AMQBody bodyFrame =
>> >> frame.getBodyFrame();
>> >> +
>> >> +                        HeartbeatDiagnostics.received(bodyFrame
>> >> instanceof HeartbeatBody);
>> >> +
>> >> +                        bodyFrame.handle(frame.getChannel(),
>> >> _protocolSession);
>> >> +
>> >> +                        _connection.bytesReceived(_readBytes);
>> >> +                    }
>> >> +                    else if (message instanceof
>> >> ProtocolInitiation)
>> >> +                    {
>> >> +                        // We get here if the server sends a
>> >> response to our initial protocol header
>> >> +                        // suggesting an alternate
>> >> ProtocolVersion; the server will then close the
>> >> +                        // connection.
>> >> +                        ProtocolInitiation protocolInit =
>> >> (ProtocolInitiation) message;
>> >> +                        _suggestedProtocolVersion =
>> >> protocolInit.checkVersion();
>> >> +                        _logger.info("Broker suggested using
>> >> protocol version:" + _suggestedProtocolVersion);
>> >> +
>> >> +                        // get round a bug in old versions of
>> >> qpid whereby the connection is not closed
>> >> +
>> >>                        _stateManager.changeState(AMQState.CONNECTION_CLOSED);
>> >>                     }
>> >>                 }
>> >> -            });
>> >>         }
>> >>         catch (Exception e)
>> >>         {
>> >> +            _logger.error("Exception processing frame", e);
>> >>             propagateExceptionToFrameListeners(e);
>> >>             exception(e);
>> >>         }
>> >> +
>> >> +
>> >>     }
>> >>
>> >>     public void methodBodyReceived(final int channelId, final
>> >>     AMQBody bodyFrame)
>> >> @@ -568,17 +527,13 @@ public class AMQProtocolHandler implemen
>> >>         writeFrame(frame, false);
>> >>     }
>> >>
>> >> -    public void writeFrame(AMQDataBlock frame, boolean wait)
>> >> +    public  synchronized void writeFrame(AMQDataBlock frame,
>> >> boolean wait)
>> >>     {
>> >> -        final ByteBuffer buf = frame.toNioByteBuffer();
>> >> +        final ByteBuffer buf = asByteBuffer(frame);
>> >>         _writtenBytes += buf.remaining();
>> >> -        Job.fireAsynchEvent(_poolReference.getPool(), _writeJob,
>> >> new Runnable()
>> >> -        {
>> >> -            public void run()
>> >> -            {
>> >> -                _sender.send(buf);
>> >> -            }
>> >> -        });
>> >> +        _sender.send(buf);
>> >> +        _sender.flush();
>> >> +
>> >>         if (PROTOCOL_DEBUG)
>> >>         {
>> >>             _protocolLogger.debug(String.format("SEND: [%s] %s",
>> >>             this, frame));
>> >> @@ -595,12 +550,41 @@ public class AMQProtocolHandler implemen
>> >>
>> >>         _connection.bytesSent(_writtenBytes);
>> >>
>> >> -        if (wait)
>> >> +    }
>> >> +
>> >> +    private ByteBuffer asByteBuffer(AMQDataBlock block)
>> >> +    {
>> >> +        final ByteBuffer buf = ByteBuffer.allocate((int)
>> >> block.getSize());
>> >> +
>> >> +        try
>> >> +        {
>> >> +            block.writePayload(new DataOutputStream(new
>> >> OutputStream()
>> >> +            {
>> >> +
>> >> +
>> >> +                @Override
>> >> +                public void write(int b) throws IOException
>> >> +                {
>> >> +                    buf.put((byte) b);
>> >> +                }
>> >> +
>> >> +                @Override
>> >> +                public void write(byte[] b, int off, int len)
>> >> throws IOException
>> >> +                {
>> >> +                    buf.put(b, off, len);
>> >> +                }
>> >> +            }));
>> >> +        }
>> >> +        catch (IOException e)
>> >>         {
>> >> -            _sender.flush();
>> >> +            throw new RuntimeException(e);
>> >>         }
>> >> +
>> >> +        buf.flip();
>> >> +        return buf;
>> >>     }
>> >>
>> >> +
>> >>     /**
>> >>      * Convenience method that writes a frame to the protocol
>> >>      session and waits for a particular response. Equivalent to
>> >>      * calling getProtocolSession().write() then waiting for the
>> >>      response.
>> >> @@ -723,7 +707,7 @@ public class AMQProtocolHandler implemen
>> >>                 _logger.debug("FailoverException interrupted
>> >>                 connection close, ignoring as connection   close
>> >>                 anyway.");
>> >>             }
>> >>         }
>> >> -        _poolReference.releaseExecutorService();
>> >> +
>> >>     }
>> >>
>> >>     /** @return the number of bytes read from this protocol
>> >>     session */
>> >> @@ -841,8 +825,13 @@ public class AMQProtocolHandler implemen
>> >>
>> >>     public void setNetworkConnection(NetworkConnection network)
>> >>     {
>> >> +        setNetworkConnection(network, network.getSender());
>> >> +    }
>> >> +
>> >> +    public void setNetworkConnection(NetworkConnection network,
>> >> Sender<ByteBuffer> sender)
>> >> +    {
>> >>         _network = network;
>> >> -        _sender = network.getSender();
>> >> +        _sender = sender;
>> >>     }
>> >>
>> >>     /** @param delay delay in seconds (not ms) */
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -20,17 +20,22 @@
>> >>  */
>> >>  package org.apache.qpid.client.security;
>> >>
>> >> -import org.apache.qpid.util.FileUtils;
>> >> -
>> >> -import org.slf4j.Logger;
>> >> -import org.slf4j.LoggerFactory;
>> >> -
>> >>  import java.io.IOException;
>> >>  import java.io.InputStream;
>> >> +import java.util.Collection;
>> >> +import java.util.Collections;
>> >>  import java.util.Enumeration;
>> >>  import java.util.HashMap;
>> >> +import java.util.HashSet;
>> >>  import java.util.Map;
>> >>  import java.util.Properties;
>> >> +import java.util.Set;
>> >> +import java.util.StringTokenizer;
>> >> +import java.util.TreeMap;
>> >> +
>> >> +import org.apache.qpid.util.FileUtils;
>> >> +import org.slf4j.Logger;
>> >> +import org.slf4j.LoggerFactory;
>> >>
>> >>  /**
>> >>  * CallbackHandlerRegistry is a registry for call back handlers
>> >>  for user authentication and interaction during user
>> >> @@ -42,7 +47,7 @@ import java.util.Properties;
>> >>  * "amp.callbackhandler.properties". The format of the properties
>> >>  file is:
>> >>  *
>> >>  * <p/><pre>
>> >> - * CallbackHanlder.mechanism=fully.qualified.class.name
>> >> + * CallbackHanlder.n.mechanism=fully.qualified.class.name where n
>> >> is an ordinal
>> >>  * </pre>
>> >>  *
>> >>  * <p/>Where mechanism is an IANA-registered mechanism name and
>> >>  the fully qualified class name refers to a
>> >> @@ -66,51 +71,15 @@ public class CallbackHandlerRegistry
>> >>     public static final String DEFAULT_RESOURCE_NAME =
>> >>     "org/apache/qpid/client/security/CallbackHandlerRegistry.properties";
>> >>
>> >>     /** A static reference to the singleton instance of this
>> >>     registry. */
>> >> -    private static CallbackHandlerRegistry _instance = new
>> >> CallbackHandlerRegistry();
>> >> +    private static final CallbackHandlerRegistry _instance;
>> >>
>> >>     /** Holds a map from SASL mechanism names to call back
>> >>     handlers. */
>> >> -    private Map<String, Class> _mechanismToHandlerClassMap = new
>> >> HashMap<String, Class>();
>> >> -
>> >> -    /** Holds a space delimited list of mechanisms that callback
>> >> handlers exist for. */
>> >> -    private String _mechanisms;
>> >> -
>> >> -    /**
>> >> -     * Gets the singleton instance of this registry.
>> >> -     *
>> >> -     * @return The singleton instance of this registry.
>> >> -     */
>> >> -    public static CallbackHandlerRegistry getInstance()
>> >> -    {
>> >> -        return _instance;
>> >> -    }
>> >> +    private Map<String, Class<AMQCallbackHandler>>
>> >> _mechanismToHandlerClassMap = new HashMap<String,
>> >> Class<AMQCallbackHandler>>();
>> >>
>> >> -    /**
>> >> -     * Gets the callback handler class for a given SASL mechanism
>> >> name.
>> >> -     *
>> >> -     * @param mechanism The SASL mechanism name.
>> >> -     *
>> >> -     * @return The callback handler class for the mechanism, or
>> >> null if none is configured for that mechanism.
>> >> -     */
>> >> -    public Class getCallbackHandlerClass(String mechanism)
>> >> -    {
>> >> -        return (Class)
>> >> _mechanismToHandlerClassMap.get(mechanism);
>> >> -    }
>> >> +    /** Ordered collection of mechanisms for which callback
>> >> handlers exist. */
>> >> +    private Collection<String> _mechanisms;
>> >>
>> >> -    /**
>> >> -     * Gets a space delimited list of supported SASL mechanisms.
>> >> -     *
>> >> -     * @return A space delimited list of supported SASL
>> >> mechanisms.
>> >> -     */
>> >> -    public String getMechanisms()
>> >> -    {
>> >> -        return _mechanisms;
>> >> -    }
>> >> -
>> >> -    /**
>> >> -     * Creates the call back handler registry from its
>> >> configuration resource or file. This also has the side effect
>> >> -     * of configuring and registering the SASL client factory
>> >> implementations using {@link DynamicSaslRegistrar}.
>> >> -     */
>> >> -    private CallbackHandlerRegistry()
>> >> +    static
>> >>     {
>> >>         // Register any configured SASL client factories.
>> >>         DynamicSaslRegistrar.registerSaslProviders();
>> >> @@ -120,12 +89,12 @@ public class CallbackHandlerRegistry
>> >>             FileUtils.openFileOrDefaultResource(filename,
>> >>             DEFAULT_RESOURCE_NAME,
>> >>                 CallbackHandlerRegistry.class.getClassLoader());
>> >>
>> >> +        final Properties props = new Properties();
>> >> +
>> >>         try
>> >>         {
>> >> -            Properties props = new Properties();
>> >> +
>> >>             props.load(is);
>> >> -            parseProperties(props);
>> >> -            _logger.info("Callback handlers available for SASL
>> >> mechanisms: " + _mechanisms);
>> >>         }
>> >>         catch (IOException e)
>> >>         {
>> >> @@ -146,32 +115,68 @@ public class CallbackHandlerRegistry
>> >>                 }
>> >>             }
>> >>         }
>> >> +
>> >> +        _instance = new CallbackHandlerRegistry(props);
>> >> +        _logger.info("Callback handlers available for SASL
>> >> mechanisms: " + _instance._mechanisms);
>> >> +
>> >>     }
>> >>
>> >> -    /*private InputStream openPropertiesInputStream(String
>> >> filename)
>> >> +    /**
>> >> +     * Gets the singleton instance of this registry.
>> >> +     *
>> >> +     * @return The singleton instance of this registry.
>> >> +     */
>> >> +    public static CallbackHandlerRegistry getInstance()
>> >> +    {
>> >> +        return _instance;
>> >> +    }
>> >> +
>> >> +    public AMQCallbackHandler createCallbackHandler(final String
>> >> mechanism)
>> >>     {
>> >> -        boolean useDefault = true;
>> >> -        InputStream is = null;
>> >> -        if (filename != null)
>> >> +        final Class<AMQCallbackHandler> mechanismClass =
>> >> _mechanismToHandlerClassMap.get(mechanism);
>> >> +
>> >> +        if (mechanismClass == null)
>> >>         {
>> >> -            try
>> >> -            {
>> >> -                is = new BufferedInputStream(new
>> >> FileInputStream(new File(filename)));
>> >> -                useDefault = false;
>> >> -            }
>> >> -            catch (FileNotFoundException e)
>> >> -            {
>> >> -                _logger.error("Unable to read from file " +
>> >> filename + ": " + e, e);
>> >> -            }
>> >> +            throw new IllegalArgumentException("Mechanism " +
>> >> mechanism + " not known");
>> >>         }
>> >>
>> >> -        if (useDefault)
>> >> +        try
>> >> +        {
>> >> +            return mechanismClass.newInstance();
>> >> +        }
>> >> +        catch (InstantiationException e)
>> >> +        {
>> >> +            throw new IllegalArgumentException("Unable to create
>> >> an instance of mechanism " + mechanism, e);
>> >> +        }
>> >> +        catch (IllegalAccessException e)
>> >>         {
>> >> -            is =
>> >> CallbackHandlerRegistry.class.getResourceAsStream(DEFAULT_RESOURCE_NAME);
>> >> +            throw new IllegalArgumentException("Unable to create
>> >> an instance of mechanism " + mechanism, e);
>> >>         }
>> >> +    }
>> >>
>> >> -        return is;
>> >> -    }*/
>> >> +    /**
>> >> +     * Gets collections of supported SASL mechanism names,
>> >> ordered by preference
>> >> +     *
>> >> +     * @return collection of SASL mechanism names.
>> >> +     */
>> >> +    public Collection<String> getMechanisms()
>> >> +    {
>> >> +        return Collections.unmodifiableCollection(_mechanisms);
>> >> +    }
>> >> +
>> >> +    /**
>> >> +     * Creates the call back handler registry from its
>> >> configuration resource or file.
>> >> +     *
>> >> +     * This also has the side effect of configuring and
>> >> registering the SASL client factory
>> >> +     * implementations using {@link DynamicSaslRegistrar}.
>> >> +     *
>> >> +     * This constructor is default protection to allow for
>> >> effective unit testing.  Clients must use
>> >> +     * {@link #getInstance()} to obtain the singleton instance.
>> >> +     */
>> >> +    CallbackHandlerRegistry(final Properties props)
>> >> +    {
>> >> +        parseProperties(props);
>> >> +    }
>> >>
>> >>     /**
>> >>      * Scans the specified properties as a mapping from IANA
>> >>      registered SASL mechanism to call back handler
>> >> @@ -183,20 +188,20 @@ public class CallbackHandlerRegistry
>> >>      */
>> >>     private void parseProperties(Properties props)
>> >>     {
>> >> +
>> >> +        final Map<Integer, String> mechanisms = new
>> >> TreeMap<Integer, String>();
>> >> +
>> >>         Enumeration e = props.propertyNames();
>> >>         while (e.hasMoreElements())
>> >>         {
>> >> -            String propertyName = (String) e.nextElement();
>> >> -            int period = propertyName.indexOf(".");
>> >> -            if (period < 0)
>> >> -            {
>> >> -                _logger.warn("Unable to parse property " +
>> >> propertyName + " when configuring SASL providers");
>> >> +            final String propertyName = (String) e.nextElement();
>> >> +            final String[] parts = propertyName.split("\\.", 2);
>> >>
>> >> -                continue;
>> >> -            }
>> >> +            checkPropertyNameFormat(propertyName, parts);
>> >>
>> >> -            String mechanism = propertyName.substring(period +
>> >> 1);
>> >> -            String className = props.getProperty(propertyName);
>> >> +            final String mechanism = parts[0];
>> >> +            final int ordinal = getPropertyOrdinal(propertyName,
>> >> parts);
>> >> +            final String className =
>> >> props.getProperty(propertyName);
>> >>             Class clazz = null;
>> >>             try
>> >>             {
>> >> @@ -205,20 +210,11 @@ public class CallbackHandlerRegistry
>> >>                 {
>> >>                     _logger.warn("SASL provider " + clazz + " does
>> >>                     not implement " + AMQCallbackHandler.class
>> >>                         + ". Skipping");
>> >> -
>> >>                     continue;
>> >>                 }
>> >> -
>> >>                 _mechanismToHandlerClassMap.put(mechanism, clazz);
>> >> -                if (_mechanisms == null)
>> >> -                {
>> >> -                    _mechanisms = mechanism;
>> >> -                }
>> >> -                else
>> >> -                {
>> >> -                    // one time cost
>> >> -                    _mechanisms = _mechanisms + " " + mechanism;
>> >> -                }
>> >> +
>> >> +                mechanisms.put(ordinal, mechanism);
>> >>             }
>> >>             catch (ClassNotFoundException ex)
>> >>             {
>> >> @@ -227,5 +223,91 @@ public class CallbackHandlerRegistry
>> >>                 continue;
>> >>             }
>> >>         }
>> >> +
>> >> +        _mechanisms = mechanisms.values();  // order guaranteed
>> >> by keys of treemap (i.e. our ordinals)
>> >> +
>> >> +
>> >> +    }
>> >> +
>> >> +    private void checkPropertyNameFormat(final String
>> >> propertyName, final String[] parts)
>> >> +    {
>> >> +        if (parts.length != 2)
>> >> +        {
>> >> +            throw new IllegalArgumentException("Unable to parse
>> >> property " + propertyName + " when configuring SASL providers");
>> >> +        }
>> >> +    }
>> >> +
>> >> +    private int getPropertyOrdinal(final String propertyName,
>> >> final String[] parts)
>> >> +    {
>> >> +        try
>> >> +        {
>> >> +            return Integer.parseInt(parts[1]);
>> >> +        }
>> >> +        catch(NumberFormatException nfe)
>> >> +        {
>> >> +            throw new IllegalArgumentException("Unable to parse
>> >> property " + propertyName + " when configuring SASL providers",
>> >> nfe);
>> >> +        }
>> >> +    }
>> >> +
>> >> +    /**
>> >> +     * Selects a SASL mechanism that is mutually available to
>> >> both parties.  If more than one
>> >> +     * mechanism is mutually available the one appearing first
>> >> (by ordinal) will be returned.
>> >> +     *
>> >> +     * @param peerMechanismList space separated list of
>> >> mechanisms
>> >> +     * @return selected mechanism, or null if none available
>> >> +     */
>> >> +    public String selectMechanism(final String peerMechanismList)
>> >> +    {
>> >> +        final Set<String> peerList =
>> >> mechListToSet(peerMechanismList);
>> >> +
>> >> +        return selectMechInternal(peerList,
>> >> Collections.<String>emptySet());
>> >> +    }
>> >> +
>> >> +    /**
>> >> +     * Selects a SASL mechanism that is mutually available to
>> >> both parties.
>> >> +     *
>> >> +     * @param peerMechanismList space separated list of
>> >> mechanisms
>> >> +     * @param restrictionList space separated list of mechanisms
>> >> +     * @return selected mechanism, or null if none available
>> >> +     */
>> >> +    public String selectMechanism(final String peerMechanismList,
>> >> final String restrictionList)
>> >> +    {
>> >> +        final Set<String> peerList =
>> >> mechListToSet(peerMechanismList);
>> >> +        final Set<String> restrictionSet =
>> >> mechListToSet(restrictionList);
>> >> +
>> >> +        return selectMechInternal(peerList, restrictionSet);
>> >> +    }
>> >> +
>> >> +    private String selectMechInternal(final Set<String> peerSet,
>> >> final Set<String> restrictionSet)
>> >> +    {
>> >> +        for (final String mech : _mechanisms)
>> >> +        {
>> >> +            if (peerSet.contains(mech))
>> >> +            {
>> >> +                if (restrictionSet.isEmpty() ||
>> >> restrictionSet.contains(mech))
>> >> +                {
>> >> +                    return mech;
>> >> +                }
>> >> +            }
>> >> +        }
>> >> +
>> >> +        return null;
>> >> +    }
>> >> +
>> >> +    private Set<String> mechListToSet(final String mechanismList)
>> >> +    {
>> >> +        if (mechanismList == null)
>> >> +        {
>> >> +            return Collections.emptySet();
>> >> +        }
>> >> +
>> >> +        final StringTokenizer tokenizer = new
>> >> StringTokenizer(mechanismList, " ");
>> >> +        final Set<String> mechanismSet = new
>> >> HashSet<String>(tokenizer.countTokens());
>> >> +        while (tokenizer.hasMoreTokens())
>> >> +        {
>> >> +            mechanismSet.add(tokenizer.nextToken());
>> >> +        }
>> >> +        return Collections.unmodifiableSet(mechanismSet);
>> >>     }
>> >> +
>> >>  }
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -16,7 +16,17 @@
>> >>  # specific language governing permissions and limitations
>> >>  # under the License.
>> >>  #
>> >> -CallbackHandler.CRAM-MD5-HASHED=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
>> >> -CallbackHandler.CRAM-MD5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
>> >> -CallbackHandler.AMQPLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
>> >> -CallbackHandler.PLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
>> >> +
>> >> +#
>> >> +# Format:
>> >> +# <mechanism name>.ordinal=<implementation>
>> >> +#
>> >> +# @see CallbackHandlerRegistry
>> >> +#
>> >> +
>> >> +EXTERNAL.1=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
>> >> +GSSAPI.2=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
>> >> +CRAM-MD5-HASHED.3=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
>> >> +CRAM-MD5.4=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
>> >> +AMQPLAIN.5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
>> >> +PLAIN.6=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -22,7 +22,7 @@ package org.apache.qpid.jms;
>> >>
>> >>  import java.util.Map;
>> >>
>> >> -import org.apache.qpid.client.SSLConfiguration;
>> >> +import org.apache.qpid.transport.ConnectionSettings;
>> >>
>> >>  public interface BrokerDetails
>> >>  {
>> >> @@ -104,14 +104,12 @@ public interface BrokerDetails
>> >>     long getTimeout();
>> >>
>> >>     void setTimeout(long timeout);
>> >> -
>> >> -    SSLConfiguration getSSLConfiguration();
>> >> -
>> >> -    void setSSLConfiguration(SSLConfiguration sslConfiguration);
>> >>
>> >>     boolean getBooleanProperty(String propName);
>> >>
>> >>     String toString();
>> >>
>> >>     boolean equals(Object o);
>> >> +
>> >> +    ConnectionSettings buildConnectionSettings();
>> >>  }
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -140,7 +140,6 @@ public class FailoverExchangeMethod impl
>> >>                         broker.setHost(tokens[1]);
>> >>                         broker.setPort(Integer.parseInt(tokens[2]));
>> >>                         broker.setProperties(_originalBrokerDetail.getProperties());
>> >> -
>> >>                        broker.setSSLConfiguration(_originalBrokerDetail.getSSLConfiguration());
>> >>                         brokerList.add(broker);
>> >>
>> >>                         if
>> >>                         (currentBrokerIP.equals(broker.getHost())
>> >>                         &&
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -36,6 +36,7 @@ import javax.jms.Queue;
>> >>  import javax.jms.Topic;
>> >>  import javax.naming.Context;
>> >>  import javax.naming.NamingException;
>> >> +import javax.naming.ConfigurationException;
>> >>  import javax.naming.spi.InitialContextFactory;
>> >>
>> >>  import org.apache.qpid.client.AMQConnectionFactory;
>> >> @@ -139,7 +140,7 @@ public class PropertiesFileInitialContex
>> >>         return new ReadOnlyContext(environment, data);
>> >>     }
>> >>
>> >> -    protected void createConnectionFactories(Map data, Hashtable
>> >> environment)
>> >> +    protected void createConnectionFactories(Map data, Hashtable
>> >> environment) throws ConfigurationException
>> >>     {
>> >>         for (Iterator iter = environment.entrySet().iterator();
>> >>         iter.hasNext();)
>> >>         {
>> >> @@ -157,7 +158,7 @@ public class PropertiesFileInitialContex
>> >>         }
>> >>     }
>> >>
>> >> -    protected void createDestinations(Map data, Hashtable
>> >> environment)
>> >> +    protected void createDestinations(Map data, Hashtable
>> >> environment) throws ConfigurationException
>> >>     {
>> >>         for (Iterator iter = environment.entrySet().iterator();
>> >>         iter.hasNext();)
>> >>         {
>> >> @@ -225,7 +226,7 @@ public class PropertiesFileInitialContex
>> >>     /**
>> >>      * Factory method to create new Connection Factory instances
>> >>      */
>> >> -    protected ConnectionFactory createFactory(String url)
>> >> +    protected ConnectionFactory createFactory(String url) throws
>> >> ConfigurationException
>> >>     {
>> >>         try
>> >>         {
>> >> @@ -233,16 +234,18 @@ public class PropertiesFileInitialContex
>> >>         }
>> >>         catch (URLSyntaxException urlse)
>> >>         {
>> >> -            _logger.warn("Unable to createFactories:" + urlse);
>> >> -        }
>> >> +            _logger.warn("Unable to create factory:" + urlse);
>> >>
>> >> -        return null;
>> >> +            ConfigurationException ex = new
>> >> ConfigurationException("Failed to parse entry: " + urlse + " due
>> >> to : " +  urlse.getMessage());
>> >> +            ex.initCause(urlse);
>> >> +            throw ex;
>> >> +        }
>> >>     }
>> >>
>> >>     /**
>> >>      * Factory method to create new Destination instances from an
>> >>      AMQP BindingURL
>> >>      */
>> >> -    protected Destination createDestination(String str)
>> >> +    protected Destination createDestination(String str) throws
>> >> ConfigurationException
>> >>     {
>> >>         try
>> >>         {
>> >> @@ -252,7 +255,9 @@ public class PropertiesFileInitialContex
>> >>         {
>> >>             _logger.warn("Unable to create destination:" + e, e);
>> >>
>> >> -            return null;
>> >> +            ConfigurationException ex = new
>> >> ConfigurationException("Failed to parse entry: " + str + " due to
>> >> : " +  e.getMessage());
>> >> +            ex.initCause(e);
>> >> +            throw ex;
>> >>         }
>> >>     }
>> >>
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -23,7 +23,6 @@ package org.apache.qpid.client;
>> >>  import org.apache.qpid.AMQException;
>> >>  import org.apache.qpid.client.state.AMQState;
>> >>  import org.apache.qpid.framing.ProtocolVersion;
>> >> -import org.apache.qpid.jms.ConnectionURL;
>> >>  import org.apache.qpid.jms.BrokerDetails;
>> >>  import org.apache.qpid.url.URLSyntaxException;
>> >>
>> >> @@ -37,48 +36,18 @@ public class MockAMQConnection extends A
>> >>         super(broker, username, password, clientName,
>> >>         virtualHost);
>> >>     }
>> >>
>> >> -    public MockAMQConnection(String broker, String username,
>> >> String password, String clientName, String virtualHost,
>> >> SSLConfiguration sslConfig)
>> >> -            throws AMQException, URLSyntaxException
>> >> -    {
>> >> -        super(broker, username, password, clientName,
>> >> virtualHost, sslConfig);
>> >> -    }
>> >> -
>> >>     public MockAMQConnection(String host, int port, String
>> >>     username, String password, String clientName, String
>> >>     virtualHost)
>> >>             throws AMQException, URLSyntaxException
>> >>     {
>> >>         super(host, port, username, password, clientName,
>> >>         virtualHost);
>> >>     }
>> >>
>> >> -    public MockAMQConnection(String host, int port, String
>> >> username, String password, String clientName, String virtualHost,
>> >> SSLConfiguration sslConfig)
>> >> -            throws AMQException, URLSyntaxException
>> >> -    {
>> >> -        super(host, port, username, password, clientName,
>> >> virtualHost, sslConfig);
>> >> -    }
>> >> -
>> >> -    public MockAMQConnection(String host, int port, boolean
>> >> useSSL, String username, String password, String clientName,
>> >> String virtualHost, SSLConfiguration sslConfig)
>> >> -            throws AMQException, URLSyntaxException
>> >> -    {
>> >> -        super(host, port, useSSL, username, password, clientName,
>> >> virtualHost, sslConfig);
>> >> -    }
>> >> -
>> >>     public MockAMQConnection(String connection)
>> >>             throws AMQException, URLSyntaxException
>> >>     {
>> >>         super(connection);
>> >>     }
>> >>
>> >> -    public MockAMQConnection(String connection, SSLConfiguration
>> >> sslConfig)
>> >> -            throws AMQException, URLSyntaxException
>> >> -    {
>> >> -        super(connection, sslConfig);
>> >> -    }
>> >> -
>> >> -    public MockAMQConnection(ConnectionURL connectionURL,
>> >> SSLConfiguration sslConfig)
>> >> -            throws AMQException
>> >> -    {
>> >> -        super(connectionURL, sslConfig);
>> >> -    }
>> >> -
>> >>     @Override
>> >>     public ProtocolVersion makeBrokerConnection(BrokerDetails
>> >>     brokerDetail) throws IOException
>> >>     {
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -43,4 +43,9 @@ public class TestMessageHelper
>> >>     {
>> >>         return new
>> >>         JMSStreamMessage(AMQMessageDelegateFactory.FACTORY_0_8);
>> >>     }
>> >> +
>> >> +    public static JMSObjectMessage newJMSObjectMessage()
>> >> +    {
>> >> +        return new
>> >> JMSObjectMessage(AMQMessageDelegateFactory.FACTORY_0_8);
>> >> +    }
>> >>  }
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -21,10 +21,10 @@
>> >>  package org.apache.qpid.test.unit.jndi;
>> >>
>> >>  import junit.framework.TestCase;
>> >> +
>> >>  import org.apache.qpid.client.AMQConnectionFactory;
>> >>  import org.apache.qpid.jms.BrokerDetails;
>> >>  import org.apache.qpid.jms.ConnectionURL;
>> >> -import org.apache.qpid.url.URLSyntaxException;
>> >>
>> >>  public class ConnectionFactoryTest extends TestCase
>> >>  {
>> >> @@ -34,21 +34,9 @@ public class ConnectionFactoryTest exten
>> >>     public static final String URL =
>> >>     "amqp://guest:guest@clientID/test?brokerlist='tcp://localhost:5672'";
>> >>     public static final String URL_STAR_PWD =
>> >>     "amqp://guest:********@clientID/test?brokerlist='tcp://localhost:5672'";
>> >>
>> >> -    public void testConnectionURLString()
>> >> +    public void testConnectionURLStringMasksPassword() throws
>> >> Exception
>> >>     {
>> >> -        AMQConnectionFactory factory = new
>> >> AMQConnectionFactory();
>> >> -
>> >> -        assertNull("ConnectionURL should have no value at start",
>> >> -                   factory.getConnectionURL());
>> >> -
>> >> -        try
>> >> -        {
>> >> -            factory.setConnectionURLString(URL);
>> >> -        }
>> >> -        catch (URLSyntaxException e)
>> >> -        {
>> >> -            fail(e.getMessage());
>> >> -        }
>> >> +        AMQConnectionFactory factory = new
>> >> AMQConnectionFactory(URL);
>> >>
>> >>         //URL will be returned with the password field swapped for
>> >>         '********'
>> >>         assertEquals("Connection URL not correctly set",
>> >>         URL_STAR_PWD, factory.getConnectionURLString());
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -24,6 +24,7 @@ import java.util.Properties;
>> >>
>> >>  import javax.jms.Queue;
>> >>  import javax.jms.Topic;
>> >> +import javax.naming.ConfigurationException;
>> >>  import javax.naming.Context;
>> >>  import javax.naming.InitialContext;
>> >>
>> >> @@ -67,4 +68,22 @@ public class JNDIPropertyFileTest extend
>> >>             assertEquals("Topic" + i +
>> >>             "WithSpace",bindingKey.asString());
>> >>         }
>> >>     }
>> >> +
>> >> +    public void testConfigurationErrors() throws Exception
>> >> +    {
>> >> +        Properties properties = new Properties();
>> >> +        properties.put("java.naming.factory.initial",
>> >> "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
>> >> +
>> >>        properties.put("destination.my-queue","amq.topic/test;create:always}");
>> >> +
>> >> +        try
>> >> +        {
>> >> +            ctx = new InitialContext(properties);
>> >> +            fail("A configuration exception should be thrown with
>> >> details about the address syntax error");
>> >> +        }
>> >> +        catch(ConfigurationException e)
>> >> +        {
>> >> +            assertTrue("Incorrect exception",
>> >> e.getMessage().contains("Failed to parse entry:
>> >> amq.topic/test;create:always}"));
>> >> +        }
>> >> +
>> >> +    }
>> >>  }
>> >>
>> >> Modified: qpid/branches/qpid-3346/qpid/java/common.xml
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common.xml?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> --- qpid/branches/qpid-3346/qpid/java/common.xml (original)
>> >> +++ qpid/branches/qpid-3346/qpid/java/common.xml Mon Sep 19
>> >> 15:13:18 2011
>> >> @@ -132,8 +132,6 @@
>> >>        </sequential>
>> >>   </macrodef>
>> >>
>> >> -
>> >> -
>> >>   <macrodef name="jython">
>> >>     <attribute name="path"/>
>> >>     <element name="args"/>
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -20,9 +20,6 @@
>> >>  */
>> >>  package org.apache.qpid.codec;
>> >>
>> >> -import org.apache.mina.filter.codec.ProtocolCodecFactory;
>> >> -import org.apache.mina.filter.codec.ProtocolDecoder;
>> >> -import org.apache.mina.filter.codec.ProtocolEncoder;
>> >>  import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
>> >>
>> >>  /**
>> >> @@ -31,14 +28,11 @@ import org.apache.qpid.protocol.AMQVersi
>> >>  *
>> >>  * <p/><table id="crc"><caption>CRC Card</caption>
>> >>  * <tr><th> Responsibilities <th> Collaborations.
>> >> - * <tr><td> Supply the protocol encoder. <td> {@link AMQEncoder}
>> >>  * <tr><td> Supply the protocol decoder. <td> {@link AMQDecoder}
>> >>  * </table>
>> >>  */
>> >> -public class AMQCodecFactory implements ProtocolCodecFactory
>> >> +public class AMQCodecFactory
>> >>  {
>> >> -    /** Holds the protocol encoder. */
>> >> -    private final AMQEncoder _encoder = new AMQEncoder();
>> >>
>> >>     /** Holds the protocol decoder. */
>> >>     private final AMQDecoder _frameDecoder;
>> >> @@ -56,15 +50,6 @@ public class AMQCodecFactory implements
>> >>         _frameDecoder = new AMQDecoder(expectProtocolInitiation,
>> >>         session);
>> >>     }
>> >>
>> >> -    /**
>> >> -     * Gets the AMQP encoder.
>> >> -     *
>> >> -     * @return The AMQP encoder.
>> >> -     */
>> >> -    public ProtocolEncoder getEncoder()
>> >> -    {
>> >> -        return _encoder;
>> >> -    }
>> >>
>> >>     /**
>> >>      * Gets the AMQP decoder.
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -20,13 +20,9 @@
>> >>  */
>> >>  package org.apache.qpid.codec;
>> >>
>> >> -import java.util.ArrayList;
>> >> -
>> >> -import org.apache.mina.common.ByteBuffer;
>> >> -import org.apache.mina.common.IoSession;
>> >> -import org.apache.mina.common.SimpleByteBufferAllocator;
>> >> -import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
>> >> -import org.apache.mina.filter.codec.ProtocolDecoderOutput;
>> >> +import java.io.*;
>> >> +import java.nio.ByteBuffer;
>> >> +import java.util.*;
>> >>
>> >>  import org.apache.qpid.framing.AMQDataBlock;
>> >>  import org.apache.qpid.framing.AMQDataBlockDecoder;
>> >> @@ -54,11 +50,8 @@ import org.apache.qpid.protocol.AMQVersi
>> >>  * @todo If protocol initiation decoder not needed, then don't
>> >>  create it. Probably not a big deal, but it adds to the
>> >>  *       per-session overhead.
>> >>  */
>> >> -public class AMQDecoder extends CumulativeProtocolDecoder
>> >> +public class AMQDecoder
>> >>  {
>> >> -
>> >> -    private static final String BUFFER =
>> >> AMQDecoder.class.getName() + ".Buffer";
>> >> -
>> >>     /** Holds the 'normal' AMQP data decoder. */
>> >>     private AMQDataBlockDecoder _dataBlockDecoder = new
>> >>     AMQDataBlockDecoder();
>> >>
>> >> @@ -67,12 +60,11 @@ public class AMQDecoder extends Cumulati
>> >>
>> >>     /** Flag to indicate whether this decoder needs to handle
>> >>     protocol initiation. */
>> >>     private boolean _expectProtocolInitiation;
>> >> -    private boolean firstDecode = true;
>> >>
>> >>     private AMQMethodBodyFactory _bodyFactory;
>> >>
>> >> -    private ByteBuffer _remainingBuf;
>> >> -
>> >> +    private List<ByteArrayInputStream> _remainingBufs = new
>> >> ArrayList<ByteArrayInputStream>();
>> >> +
>> >>     /**
>> >>      * Creates a new AMQP decoder.
>> >>      *
>> >> @@ -84,98 +76,7 @@ public class AMQDecoder extends Cumulati
>> >>         _bodyFactory = new AMQMethodBodyFactory(session);
>> >>     }
>> >>
>> >> -    /**
>> >> -     * Delegates decoding AMQP from the data buffer that Mina has
>> >> retrieved from the wire, to the data or protocol
>> >> -     * intiation decoders.
>> >> -     *
>> >> -     * @param session The Mina session.
>> >> -     * @param in      The raw byte buffer.
>> >> -     * @param out     The Mina object output gatherer to write
>> >> decoded objects to.
>> >> -     *
>> >> -     * @return <tt>true</tt> if the data was decoded,
>> >> <tt>false<tt> if more is needed and the data should accumulate.
>> >> -     *
>> >> -     * @throws Exception If the data cannot be decoded for any
>> >> reason.
>> >> -     */
>> >> -    protected boolean doDecode(IoSession session, ByteBuffer in,
>> >> ProtocolDecoderOutput out) throws Exception
>> >> -    {
>> >> -
>> >> -        boolean decoded;
>> >> -        if (_expectProtocolInitiation
>> >> -            || (firstDecode
>> >> -                && (in.remaining() > 0)
>> >> -                && (in.get(in.position()) == (byte)'A')))
>> >> -        {
>> >> -            decoded = doDecodePI(session, in, out);
>> >> -        }
>> >> -        else
>> >> -        {
>> >> -            decoded = doDecodeDataBlock(session, in, out);
>> >> -        }
>> >> -        if(firstDecode && decoded)
>> >> -        {
>> >> -            firstDecode = false;
>> >> -        }
>> >> -        return decoded;
>> >> -    }
>> >> -
>> >> -    /**
>> >> -     * Decodes AMQP data, delegating the decoding to an {@link
>> >> AMQDataBlockDecoder}.
>> >> -     *
>> >> -     * @param session The Mina session.
>> >> -     * @param in      The raw byte buffer.
>> >> -     * @param out     The Mina object output gatherer to write
>> >> decoded objects to.
>> >> -     *
>> >> -     * @return <tt>true</tt> if the data was decoded,
>> >> <tt>false<tt> if more is needed and the data should accumulate.
>> >> -     *
>> >> -     * @throws Exception If the data cannot be decoded for any
>> >> reason.
>> >> -     */
>> >> -    protected boolean doDecodeDataBlock(IoSession session,
>> >> ByteBuffer in, ProtocolDecoderOutput out) throws Exception
>> >> -    {
>> >> -        int pos = in.position();
>> >> -        boolean enoughData =
>> >> _dataBlockDecoder.decodable(in.buf());
>> >> -        in.position(pos);
>> >> -        if (!enoughData)
>> >> -        {
>> >> -            // returning false means it will leave the contents
>> >> in the buffer and
>> >> -            // call us again when more data has been read
>> >> -            return false;
>> >> -        }
>> >> -        else
>> >> -        {
>> >> -            _dataBlockDecoder.decode(session, in, out);
>> >> -
>> >> -            return true;
>> >> -        }
>> >> -    }
>> >> -
>> >> -    /**
>> >> -     * Decodes an AMQP initiation, delegating the decoding to a
>> >> {@link ProtocolInitiation.Decoder}.
>> >> -     *
>> >> -     * @param session The Mina session.
>> >> -     * @param in      The raw byte buffer.
>> >> -     * @param out     The Mina object output gatherer to write
>> >> decoded objects to.
>> >> -     *
>> >> -     * @return <tt>true</tt> if the data was decoded,
>> >> <tt>false<tt> if more is needed and the data should accumulate.
>> >> -     *
>> >> -     * @throws Exception If the data cannot be decoded for any
>> >> reason.
>> >> -     */
>> >> -    private boolean doDecodePI(IoSession session, ByteBuffer in,
>> >> ProtocolDecoderOutput out) throws Exception
>> >> -    {
>> >> -        boolean enoughData = _piDecoder.decodable(in.buf());
>> >> -        if (!enoughData)
>> >> -        {
>> >> -            // returning false means it will leave the contents
>> >> in the buffer and
>> >> -            // call us again when more data has been read
>> >> -            return false;
>> >> -        }
>> >> -        else
>> >> -        {
>> >> -            ProtocolInitiation pi = new
>> >> ProtocolInitiation(in.buf());
>> >> -            out.write(pi);
>> >>
>> >> -            return true;
>> >> -        }
>> >> -    }
>> >>
>> >>     /**
>> >>      * Sets the protocol initation flag, that determines whether
>> >>      decoding is handled by the data decoder of the protocol
>> >> @@ -189,151 +90,168 @@ public class AMQDecoder extends Cumulati
>> >>         _expectProtocolInitiation = expectProtocolInitiation;
>> >>     }
>> >>
>> >> -
>> >> -    /**
>> >> -     * Cumulates content of <tt>in</tt> into internal buffer and
>> >> forwards
>> >> -     * decoding request to {@link #doDecode(IoSession,
>> >> ByteBuffer, ProtocolDecoderOutput)}.
>> >> -     * <tt>doDecode()</tt> is invoked repeatedly until it returns
>> >> <tt>false</tt>
>> >> -     * and the cumulative buffer is compacted after decoding
>> >> ends.
>> >> -     *
>> >> -     * @throws IllegalStateException if your <tt>doDecode()</tt>
>> >> returned
>> >> -     *                               <tt>true</tt> not consuming
>> >> the cumulative buffer.
>> >> -     */
>> >> -    public void decode( IoSession session, ByteBuffer in,
>> >> -                        ProtocolDecoderOutput out ) throws
>> >> Exception
>> >> +    private class RemainingByteArrayInputStream extends
>> >> InputStream
>> >>     {
>> >> -        ByteBuffer buf = ( ByteBuffer ) session.getAttribute(
>> >> BUFFER );
>> >> -        // if we have a session buffer, append data to that
>> >> otherwise
>> >> -        // use the buffer read from the network directly
>> >> -        if( buf != null )
>> >> -        {
>> >> -            buf.put( in );
>> >> -            buf.flip();
>> >> -        }
>> >> -        else
>> >> +        private int _currentListPos;
>> >> +        private int _markPos;
>> >> +
>> >> +
>> >> +        @Override
>> >> +        public int read() throws IOException
>> >>         {
>> >> -            buf = in;
>> >> +            ByteArrayInputStream currentStream =
>> >> _remainingBufs.get(_currentListPos);
>> >> +            if(currentStream.available() > 0)
>> >> +            {
>> >> +                return currentStream.read();
>> >> +            }
>> >> +            else if((_currentListPos == _remainingBufs.size())
>> >> +                    || (++_currentListPos ==
>> >> _remainingBufs.size()))
>> >> +            {
>> >> +                return -1;
>> >> +            }
>> >> +            else
>> >> +            {
>> >> +
>> >> +                ByteArrayInputStream stream =
>> >> _remainingBufs.get(_currentListPos);
>> >> +                stream.mark(0);
>> >> +                return stream.read();
>> >> +            }
>> >>         }
>> >>
>> >> -        for( ;; )
>> >> +        @Override
>> >> +        public int read(final byte[] b, final int off, final int
>> >> len) throws IOException
>> >>         {
>> >> -            int oldPos = buf.position();
>> >> -            boolean decoded = doDecode( session, buf, out );
>> >> -            if( decoded )
>> >> +
>> >> +            if(_currentListPos == _remainingBufs.size())
>> >>             {
>> >> -                if( buf.position() == oldPos )
>> >> +                return -1;
>> >> +            }
>> >> +            else
>> >> +            {
>> >> +                ByteArrayInputStream currentStream =
>> >> _remainingBufs.get(_currentListPos);
>> >> +                final int available = currentStream.available();
>> >> +                int read = currentStream.read(b, off, len >
>> >> available ? available : len);
>> >> +                if(read < len)
>> >>                 {
>> >> -                    throw new IllegalStateException(
>> >> -                            "doDecode() can't return true when
>> >> buffer is not consumed." );
>> >> +                    if(_currentListPos++ !=
>> >> _remainingBufs.size())
>> >> +                    {
>> >> +
>> >>                        _remainingBufs.get(_currentListPos).mark(0);
>> >> +                    }
>> >> +                    int correctRead = read == -1 ? 0 : read;
>> >> +                    int subRead = read(b, off+correctRead,
>> >> len-correctRead);
>> >> +                    if(subRead == -1)
>> >> +                    {
>> >> +                        return read;
>> >> +                    }
>> >> +                    else
>> >> +                    {
>> >> +                        return correctRead+subRead;
>> >> +                    }
>> >>                 }
>> >> -
>> >> -                if( !buf.hasRemaining() )
>> >> +                else
>> >>                 {
>> >> -                    break;
>> >> +                    return len;
>> >>                 }
>> >>             }
>> >> -            else
>> >> -            {
>> >> -                break;
>> >> -            }
>> >>         }
>> >>
>> >> -        // if there is any data left that cannot be decoded, we
>> >> store
>> >> -        // it in a buffer in the session and next time this
>> >> decoder is
>> >> -        // invoked the session buffer gets appended to
>> >> -        if ( buf.hasRemaining() )
>> >> +        @Override
>> >> +        public int available() throws IOException
>> >>         {
>> >> -            storeRemainingInSession( buf, session );
>> >> +            int total = 0;
>> >> +            for(int i = _currentListPos; i <
>> >> _remainingBufs.size(); i++)
>> >> +            {
>> >> +                total += _remainingBufs.get(i).available();
>> >> +            }
>> >> +            return total;
>> >>         }
>> >> -        else
>> >> +
>> >> +        @Override
>> >> +        public void mark(final int readlimit)
>> >>         {
>> >> -            removeSessionBuffer( session );
>> >> +            _markPos = _currentListPos;
>> >> +            final ByteArrayInputStream stream =
>> >> _remainingBufs.get(_currentListPos);
>> >> +            if(stream != null)
>> >> +            {
>> >> +                stream.mark(readlimit);
>> >> +            }
>> >>         }
>> >> -    }
>> >> -
>> >> -    /**
>> >> -     * Releases the cumulative buffer used by the specified
>> >> <tt>session</tt>.
>> >> -     * Please don't forget to call <tt>super.dispose( session
>> >> )</tt> when
>> >> -     * you override this method.
>> >> -     */
>> >> -    public void dispose( IoSession session ) throws Exception
>> >> -    {
>> >> -        removeSessionBuffer( session );
>> >> -    }
>> >>
>> >> -    private void removeSessionBuffer(IoSession session)
>> >> -    {
>> >> -        ByteBuffer buf = ( ByteBuffer ) session.getAttribute(
>> >> BUFFER );
>> >> -        if( buf != null )
>> >> +        @Override
>> >> +        public void reset() throws IOException
>> >>         {
>> >> -            buf.release();
>> >> -            session.removeAttribute( BUFFER );
>> >> +            _currentListPos = _markPos;
>> >> +            final int size = _remainingBufs.size();
>> >> +            if(_currentListPos < size)
>> >> +            {
>> >> +                _remainingBufs.get(_currentListPos).reset();
>> >> +            }
>> >> +            for(int i = _currentListPos+1; i<size; i++)
>> >> +            {
>> >> +                _remainingBufs.get(i).reset();
>> >> +            }
>> >>         }
>> >>     }
>> >>
>> >> -    private static final SimpleByteBufferAllocator
>> >> SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
>> >> -
>> >> -    private void storeRemainingInSession(ByteBuffer buf,
>> >> IoSession session)
>> >> -    {
>> >> -        ByteBuffer remainingBuf =
>> >> SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false );
>> >> -        remainingBuf.setAutoExpand( true );
>> >> -        remainingBuf.put( buf );
>> >> -        session.setAttribute( BUFFER, remainingBuf );
>> >> -    }
>> >>
>> >> -    public ArrayList<AMQDataBlock>
>> >> decodeBuffer(java.nio.ByteBuffer buf) throws
>> >> AMQFrameDecodingException, AMQProtocolVersionException
>> >> +    public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf)
>> >> throws AMQFrameDecodingException, AMQProtocolVersionException,
>> >> IOException
>> >>     {
>> >>
>> >>         // get prior remaining data from accumulator
>> >>         ArrayList<AMQDataBlock> dataBlocks = new
>> >>         ArrayList<AMQDataBlock>();
>> >> -        ByteBuffer msg;
>> >> -        // if we have a session buffer, append data to that
>> >> otherwise
>> >> -        // use the buffer read from the network directly
>> >> -        if( _remainingBuf != null )
>> >> +        DataInputStream msg;
>> >> +
>> >> +
>> >> +        ByteArrayInputStream bais = new
>> >> ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(),
>> >> buf.remaining());
>> >> +        if(!_remainingBufs.isEmpty())
>> >>         {
>> >> -            _remainingBuf.put(buf);
>> >> -            _remainingBuf.flip();
>> >> -            msg = _remainingBuf;
>> >> +            _remainingBufs.add(bais);
>> >> +            msg = new DataInputStream(new
>> >> RemainingByteArrayInputStream());
>> >>         }
>> >>         else
>> >>         {
>> >> -            msg = ByteBuffer.wrap(buf);
>> >> +            msg = new DataInputStream(bais);
>> >>         }
>> >> -
>> >> -        if (_expectProtocolInitiation
>> >> -            || (firstDecode
>> >> -                && (msg.remaining() > 0)
>> >> -                && (msg.get(msg.position()) == (byte)'A')))
>> >> -        {
>> >> -            if (_piDecoder.decodable(msg.buf()))
>> >> -            {
>> >> -                dataBlocks.add(new
>> >> ProtocolInitiation(msg.buf()));
>> >> -            }
>> >> -        }
>> >> -        else
>> >> +
>> >> +        boolean enoughData = true;
>> >> +        while (enoughData)
>> >>         {
>> >> -            boolean enoughData = true;
>> >> -            while (enoughData)
>> >> +            if(!_expectProtocolInitiation)
>> >>             {
>> >> -                int pos = msg.position();
>> >> -
>> >>                 enoughData = _dataBlockDecoder.decodable(msg);
>> >> -                msg.position(pos);
>> >>                 if (enoughData)
>> >>                 {
>> >>                     dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory,
>> >>                     msg));
>> >>                 }
>> >> -                else
>> >> +            }
>> >> +            else
>> >> +            {
>> >> +                enoughData = _piDecoder.decodable(msg);
>> >> +                if (enoughData)
>> >>                 {
>> >> -                    _remainingBuf =
>> >> SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false);
>> >> -                    _remainingBuf.setAutoExpand(true);
>> >> -                    _remainingBuf.put(msg);
>> >> +                    dataBlocks.add(new ProtocolInitiation(msg));
>> >> +                }
>> >> +
>> >> +            }
>> >> +
>> >> +            if(!enoughData)
>> >> +            {
>> >> +                if(!_remainingBufs.isEmpty())
>> >> +                {
>> >> +
>> >>                    _remainingBufs.remove(_remainingBufs.size()-1);
>> >> +                    ListIterator<ByteArrayInputStream> iterator =
>> >> _remainingBufs.listIterator();
>> >> +                    while(iterator.hasNext() &&
>> >> iterator.next().available() == 0)
>> >> +                    {
>> >> +                        iterator.remove();
>> >> +                    }
>> >> +                }
>> >> +                if(bais.available()!=0)
>> >> +                {
>> >> +                    byte[] remaining = new
>> >> byte[bais.available()];
>> >> +                    bais.read(remaining);
>> >> +                    _remainingBufs.add(new
>> >> ByteArrayInputStream(remaining));
>> >>                 }
>> >>             }
>> >> -        }
>> >> -        if(firstDecode && dataBlocks.size() > 0)
>> >> -        {
>> >> -            firstDecode = false;
>> >>         }
>> >>         return dataBlocks;
>> >>     }
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -23,7 +23,7 @@ package org.apache.qpid.configuration;
>> >>  */
>> >>  public class ClientProperties
>> >>  {
>> >> -
>> >> +
>> >>     /**
>> >>      * Currently with Qpid it is not possible to change the client
>> >>      ID.
>> >>      * If one is not specified upon connection construction, an id
>> >>      is generated automatically.
>> >> @@ -68,38 +68,50 @@ public class ClientProperties
>> >>      * by the broker in TuneOK it will be used as the heartbeat
>> >>      interval.
>> >>      * If not a warning will be printed and the max value
>> >>      specified for
>> >>      * heartbeat in TuneOK will be used
>> >> -     *
>> >> +     *
>> >>      * The default idle timeout is set to 120 secs
>> >>      */
>> >>     public static final String IDLE_TIMEOUT_PROP_NAME =
>> >>     "idle_timeout";
>> >>     public static final long DEFAULT_IDLE_TIMEOUT = 120000;
>> >> -
>> >> +
>> >>     public static final String HEARTBEAT = "qpid.heartbeat";
>> >>     public static final int HEARTBEAT_DEFAULT = 120;
>> >> -
>> >> +
>> >>     /**
>> >>      * This value will be used to determine the default
>> >>      destination syntax type.
>> >>      * Currently the two types are Binding URL (java only) and the
>> >>      Addressing format (used by
>> >> -     * all clients).
>> >> +     * all clients).
>> >>      */
>> >>     public static final String DEST_SYNTAX = "qpid.dest_syntax";
>> >> -
>> >> +
>> >>     public static final String USE_LEGACY_MAP_MESSAGE_FORMAT =
>> >>     "qpid.use_legacy_map_message";
>> >>
>> >>     public static final String AMQP_VERSION = "qpid.amqp.version";
>> >> -
>> >> -    private static ClientProperties _instance = new
>> >> ClientProperties();
>> >> -
>> >> +
>> >> +    public static final String QPID_VERIFY_CLIENT_ID =
>> >> "qpid.verify_client_id";
>> >> +
>> >> +    /**
>> >> +     * System properties to change the default timeout used
>> >> during
>> >> +     * synchronous operations.
>> >> +     */
>> >> +    public static final String QPID_SYNC_OP_TIMEOUT =
>> >> "qpid.sync_op_timeout";
>> >> +    public static final String AMQJ_DEFAULT_SYNCWRITE_TIMEOUT =
>> >> "amqj.default_syncwrite_timeout";
>> >> +
>> >> +    /**
>> >> +     * A default timeout value for synchronous operations
>> >> +     */
>> >> +    public static final int DEFAULT_SYNC_OPERATION_TIMEOUT =
>> >> 60000;
>> >> +
>> >>     /*
>> >> -    public static final QpidProperty<Boolean>
>> >>  IGNORE_SET_CLIENTID_PROP_NAME =
>> >> +    public static final QpidProperty<Boolean>
>> >>  IGNORE_SET_CLIENTID_PROP_NAME =
>> >>         QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID");
>> >> -
>> >> +
>> >>     public static final QpidProperty<Boolean>
>> >>     SYNC_PERSISTENT_PROP_NAME =
>> >>         QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence");
>> >> -
>> >> -
>> >> +
>> >> +
>> >>     public static final QpidProperty<Integer>
>> >>     MAX_PREFETCH_PROP_NAME =
>> >>         QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch");
>> >>         */
>> >> -
>> >> -
>> >> +
>> >> +
>> >>  }
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -20,7 +20,9 @@
>> >>  */
>> >>  package org.apache.qpid.framing;
>> >>
>> >> -import org.apache.mina.common.ByteBuffer;
>> >> +import java.io.DataOutputStream;
>> >> +import java.io.IOException;
>> >> +
>> >>  import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
>> >>  import org.apache.qpid.AMQException;
>> >>
>> >> @@ -34,7 +36,7 @@ public interface AMQBody
>> >>      */
>> >>     public abstract int getSize();
>> >>
>> >> -    public void writePayload(ByteBuffer buffer);
>> >> +    public void writePayload(DataOutputStream buffer) throws
>> >> IOException;
>> >>
>> >> -    void handle(final int channelId, final
>> >> AMQVersionAwareProtocolSession amqMinaProtocolSession) throws
>> >> AMQException;
>> >> +    void handle(final int channelId, final
>> >> AMQVersionAwareProtocolSession amqProtocolSession) throws
>> >> AMQException;
>> >>  }
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -20,7 +20,10 @@
>> >>  */
>> >>  package org.apache.qpid.framing;
>> >>
>> >> -import org.apache.mina.common.ByteBuffer;
>> >> +import java.io.DataInputStream;
>> >> +import java.io.DataOutputStream;
>> >> +import java.io.IOException;
>> >> +
>> >>
>> >>  /**
>> >>  * A data block represents something that has a size in bytes and
>> >>  the ability to write itself to a byte
>> >> @@ -39,25 +42,6 @@ public abstract class AMQDataBlock imple
>> >>      * Writes the datablock to the specified buffer.
>> >>      * @param buffer
>> >>      */
>> >> -    public abstract void writePayload(ByteBuffer buffer);
>> >> -
>> >> -    public ByteBuffer toByteBuffer()
>> >> -    {
>> >> -        final ByteBuffer buffer =
>> >> ByteBuffer.allocate((int)getSize());
>> >> -
>> >> -        writePayload(buffer);
>> >> -        buffer.flip();
>> >> -        return buffer;
>> >> -    }
>> >> -
>> >> -    public java.nio.ByteBuffer toNioByteBuffer()
>> >> -    {
>> >> -        final java.nio.ByteBuffer buffer =
>> >> java.nio.ByteBuffer.allocate((int) getSize());
>> >> -
>> >> -        ByteBuffer buf = ByteBuffer.wrap(buffer);
>> >> -        writePayload(buf);
>> >> -        buffer.flip();
>> >> -        return buffer;
>> >> -    }
>> >> +    public abstract void writePayload(DataOutputStream buffer)
>> >> throws IOException;
>> >>
>> >>  }
>> >>
>> >> Modified:
>> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
>> >> URL:
>> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=1172657&r1=1172656&r2=1172657&view=diff
>> >> ==============================================================================
>> >> ---
>> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
>> >> (original)
>> >> +++
>> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
>> >> Mon Sep 19 15:13:18 2011
>> >> @@ -20,18 +20,14 @@
>> >>  */
>> >>  package org.apache.qpid.framing;
>> >>
>> >> -import org.apache.mina.common.ByteBuffer;
>> >> -import org.apache.mina.common.IoSession;
>> >> -import org.apache.mina.filter.codec.ProtocolDecoderOutput;
>> >> -
>> >> -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
>> >> -
>> >>  import org.slf4j.Logger;
>> >>  import org.slf4j.LoggerFactory;
>> >>
>> >> +import java.io.DataInputStream;
>> >> +import java.io.IOException;
>> >> +
>> >>  public class AMQDataBlockDecoder
>> >>  {
>> >> -    private static final String SESSION_METHOD_BODY_FACTORY =
>> >> "QPID_SESSION_METHOD_BODY_FACTORY";
>> >>
>> >>     private static final BodyFactory[] _bodiesSupported = new
>> >>     BodyFactory[Byte.MAX_VALUE];
>> >>
>> >> @@ -47,27 +43,32 @@ public class AMQDataBlockDecoder
>> >>     public AMQDataBlockDecoder()
>> >>     { }
>> >>
>> >> -    public boolean decodable(java.nio.ByteBuffer in) throws
>> >> AMQFrameDecodingException
>> >> +    public boolean decodable(DataInputStream in) throws
>> >> AMQFrameDecodingException, IOException
>> >>     {
>> >> -        final int remainingAfterAttributes = in.remaining() - (1
>> >> + 2 + 4 + 1);
>> >> +        final int remainingAfterAttributes = in.available() - (1
>> >> + 2 + 4 + 1);
>> >>         // type, channel, body length and end byte
>> >>         if (remainingAfterAttributes < 0)
>> >>         {
>> >>             return false;
>> >>         }
>> >>
>> >> -        in.position(in.position() + 1 + 2);
>> >> +        in.mark(8);
>> >> +        in.skip(1 + 2);
>> >> +
>> >> +
>> >>         // Get an unsigned int, lifted from MINA ByteBuffer
>> >>         getUnsignedInt()
>> >> -        final long bodySize = in.getInt() & 0xffffffffL;
>> >> +        final long bodySize = in.readInt() & 0xffffffffL;
>> >> +
>> >> +        in.reset();
>> >>
>> >>         return (remainingAfterAttributes >= bodySize);
>> >>
>> >>     }
>> >>
>> >> -    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory
>> >> methodBodyFactory, ByteBuffer in)
>> >> -        throws AMQFrameDecodingException,
>> >> AMQProtocolVersionException
>> >> +    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory
>> >> methodBodyFactory, DataInputStream in)
>> >> +            throws AMQFrameDecodingException,
>> >> AMQProtocolVersionException, IOException
>> >>     {
>> >> -        final byte type = in.get();
>> >> +        final byte type = in.readByte();
>> >>
>> >>         BodyFactory bodyFactory;
>> >>         if (type == AMQMethodBody.TYPE)
>> >> @@ -84,8 +85,8 @@ public class AMQDataBlockDecoder
>> >>             throw new AMQFrameDecodingException(null, "Unsupported
>> >>             frame type: " + type, null);
>> >>         }
>> >>
>> >> -        final int channel = in.getUnsignedShort();
>> >> -        final long bodySize = in.getUnsignedInt();
>> >> +        final int channel = in.readUnsignedShort();
>> >> +        final long bodySize =
>> >> EncodingUtils.readUnsignedInteger(in);
>> >>
>> >>         // bodySize can be zero
>> >>         if ((channel < 0) || (bodySize < 0))
>> >> @@ -96,7 +97,7 @@ public class AMQDataBlockDecoder
>> >>
>> >>         AMQFrame frame = new AMQFrame(in, channel, bodySize,
>> >>         bodyFactory);
>> >>
>> >> -        byte marker = in.get();
>> >> +        byte marker = in.readByte();
>> >>         if ((marker & 0xFF) != 0xCE)
>> >>         {
>> >>             throw new AMQFrameDecodingException(null, "End of
>> >>             frame marker not found. Read " + marker + " length="
>> >>             + bodySize
>> >> @@ -106,26 +107,4 @@ public class AMQDataBlockDecoder
>> >>         return frame;
>> >>     }
>> >>
>> >> -    public void decode(IoSession session, ByteBuffer in,
>> >> ProtocolDecoderOutput out) throws Exception
>> >> -    {
>> >> -        AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory)
>> >> session.getAttribute(SESSION_METHOD_BODY_FACTORY);
>> >> -        if (bodyFactory == null)
>> >> -        {
>> >> -            AMQVersionAwareProtocolSession protocolSession =
>> >> (AMQVersionAwareProtocolSession) session.getAttachment();
>> >> -            bodyFactory = new
>> >> AMQMethodBodyFactory(protocolSession);
>> >> -            session.setAttribute(SESSION_METHOD_BODY_FACTORY,
>> >> bodyFactory);
>> >> -        }
>> >> -
>> >> -        out.write(createAndPopulateFrame(bodyFactory, in));
>> >> -    }
>> >> -
>> >> -    public boolean decodable(ByteBuffer msg) throws
>> >> AMQFrameDecodingException
>> >> -    {
>> >> -        return decodable(msg.buf());
>> >> -    }
>> >> -
>> >> -    public AMQDataBlock
>> >> createAndPopulateFrame(AMQMethodBodyFactory factory,
>> >> java.nio.ByteBuffer msg) throws AMQProtocolVersionException,
>> >> AMQFrameDecodingException
>> >> -    {
>> >> -        return createAndPopulateFrame(factory,
>> >> ByteBuffer.wrap(msg));
>> >> -    }
>> >>  }
>> >>
>> >>
>> >>
>> >> ---------------------------------------------------------------------
>> >> Apache Qpid - AMQP Messaging Implementation
>> >> Project:      http://qpid.apache.org
>> >> Use/Interact: mailto:commits-subscribe@qpid.apache.org
>> >>
>> >>
>> >
>>
>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Mime
View raw message