qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ken Giusti <kgiu...@redhat.com>
Subject Re: svn commit: r1172657 [14/21] - in /qpid/branches/qpid-3346/qpid: ./ cpp/ cpp/bindings/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qpid/dotnet/ cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/Properties/ cpp/bindings/qpid/dotnet/example
Date Mon, 19 Sep 2011 16:03:16 GMT
Yikes!!!  

well that would explain why I can no longer build java on that branch.

I don't recognize any of those changes!   Matter of fact, there shouldn't be any changes to the java subtree on that branch.

I merely did a merge from trunk onto that branch:

 svn merge https://svn.apache.org/repos/asf/qpid/trunk/qpid

I'll revert that commit, but I'd like to know what I did wrong so I don't do it again.

thanks for catching that.

-K


----- Original Message -----
> Ken,
> 
> I assume this wasn't intentional and was merely a side effect of the
> merge ?
> I didn't look at the diff very closely, but I wonder what the
> implication of these changes would be.
> 
> Looking further it seems a lot of classes on the java client and
> broker have changed.
> I think in order to be on the safe side, these changes should be
> backed out.
> If I am not mistaken that would not impact anything as I believe your
> work didn't involve the java side.
> 
> Rajith
> 
> On Mon, Sep 19, 2011 at 11:13 AM,  <kgiusti@apache.org> wrote:
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
> > Mon Sep 19 15:13:18 2011
> > @@ -23,7 +23,8 @@ package org.apache.qpid.client.message;
> >  import javax.jms.JMSException;
> >  import javax.jms.StreamMessage;
> >
> > -import org.apache.mina.common.ByteBuffer;
> > +import java.nio.ByteBuffer;
> > +
> >  import org.apache.qpid.AMQException;
> >  import org.apache.qpid.framing.AMQShortString;
> >  import org.apache.qpid.framing.BasicContentHeaderProperties;
> > @@ -36,65 +37,76 @@ public class JMSStreamMessage extends Ab
> >     public static final String MIME_TYPE="jms/stream-message";
> >
> >
> > -
> > -    /**
> > -     * This is set when reading a byte array. The
> > readBytes(byte[]) method supports multiple calls to read
> > -     * a byte array in multiple chunks, hence this is used to
> > track how much is left to be read
> > -     */
> > -    private int _byteArrayRemaining = -1;
> > +    private TypedBytesContentReader _typedBytesContentReader;
> > +    private TypedBytesContentWriter _typedBytesContentWriter;
> >
> >     public JMSStreamMessage(AMQMessageDelegateFactory
> >     delegateFactory)
> >     {
> > -        this(delegateFactory,null);
> > +        super(delegateFactory,false);
> > +        _typedBytesContentWriter = new TypedBytesContentWriter();
> >
> >     }
> >
> > -    /**
> > -     * Construct a stream message with existing data.
> > -     *
> > -     * @param delegateFactory
> > -     * @param data the data that comprises this message. If data
> > is null, you get a 1024 byte buffer that is
> > -     */
> > -    JMSStreamMessage(AMQMessageDelegateFactory delegateFactory,
> > ByteBuffer data)
> > -    {
> >
> > -        super(delegateFactory, data); // this instanties a content
> > header
> > -    }
> >
> >     JMSStreamMessage(AMQMessageDelegate delegate, ByteBuffer data)
> >     throws AMQException
> >     {
> > -
> > -        super(delegate, data);
> > +        super(delegate, data!=null);
> > +        _typedBytesContentReader = new
> > TypedBytesContentReader(data);
> >     }
> >
> > -
> >     public void reset()
> >     {
> > -        super.reset();
> >         _readableMessage = true;
> > +
> > +        if(_typedBytesContentReader != null)
> > +        {
> > +            _typedBytesContentReader.reset();
> > +        }
> > +        else if (_typedBytesContentWriter != null)
> > +        {
> > +            _typedBytesContentReader = new
> > TypedBytesContentReader(_typedBytesContentWriter.getData());
> > +        }
> > +    }
> > +
> > +    @Override
> > +    public void clearBody() throws JMSException
> > +    {
> > +        super.clearBody();
> > +        _typedBytesContentReader = null;
> > +        _typedBytesContentWriter = new TypedBytesContentWriter();
> > +
> >     }
> >
> > +
> >     protected String getMimeType()
> >     {
> >         return MIME_TYPE;
> >     }
> >
> > -
> > +    @Override
> > +    public java.nio.ByteBuffer getData() throws JMSException
> > +    {
> > +        return _typedBytesContentWriter == null ?
> > _typedBytesContentReader.getData() :
> > _typedBytesContentWriter.getData();
> > +    }
> >
> >     public boolean readBoolean() throws JMSException
> >     {
> > -        return super.readBoolean();
> > +        checkReadable();
> > +        return _typedBytesContentReader.readBoolean();
> >     }
> >
> >
> >     public byte readByte() throws JMSException
> >     {
> > -        return super.readByte();
> > +        checkReadable();
> > +        return _typedBytesContentReader.readByte();
> >     }
> >
> >     public short readShort() throws JMSException
> >     {
> > -        return super.readShort();
> > +        checkReadable();
> > +        return _typedBytesContentReader.readShort();
> >     }
> >
> >     /**
> > @@ -105,102 +117,127 @@ public class JMSStreamMessage extends Ab
> >      */
> >     public char readChar() throws JMSException
> >     {
> > -        return super.readChar();
> > +        checkReadable();
> > +        return _typedBytesContentReader.readChar();
> >     }
> >
> >     public int readInt() throws JMSException
> >     {
> > -        return super.readInt();
> > +        checkReadable();
> > +        return _typedBytesContentReader.readInt();
> >     }
> >
> >     public long readLong() throws JMSException
> >     {
> > -        return super.readLong();
> > +        checkReadable();
> > +        return _typedBytesContentReader.readLong();
> >     }
> >
> >     public float readFloat() throws JMSException
> >     {
> > -        return super.readFloat();
> > +        checkReadable();
> > +        return _typedBytesContentReader.readFloat();
> >     }
> >
> >     public double readDouble() throws JMSException
> >     {
> > -        return super.readDouble();
> > +        checkReadable();
> > +        return _typedBytesContentReader.readDouble();
> >     }
> >
> >     public String readString() throws JMSException
> >     {
> > -        return super.readString();
> > +        checkReadable();
> > +        return _typedBytesContentReader.readString();
> >     }
> >
> >     public int readBytes(byte[] bytes) throws JMSException
> >     {
> > -        return super.readBytes(bytes);
> > +        if(bytes == null)
> > +        {
> > +            throw new IllegalArgumentException("Must provide
> > non-null array to read into");
> > +        }
> > +
> > +        checkReadable();
> > +        return _typedBytesContentReader.readBytes(bytes);
> >     }
> >
> >
> >     public Object readObject() throws JMSException
> >     {
> > -        return super.readObject();
> > +        checkReadable();
> > +        return _typedBytesContentReader.readObject();
> >     }
> >
> >     public void writeBoolean(boolean b) throws JMSException
> >     {
> > -        super.writeBoolean(b);
> > +        checkWritable();
> > +        _typedBytesContentWriter.writeBoolean(b);
> >     }
> >
> >     public void writeByte(byte b) throws JMSException
> >     {
> > -        super.writeByte(b);
> > +        checkWritable();
> > +        _typedBytesContentWriter.writeByte(b);
> >     }
> >
> >     public void writeShort(short i) throws JMSException
> >     {
> > -        super.writeShort(i);
> > +        checkWritable();
> > +        _typedBytesContentWriter.writeShort(i);
> >     }
> >
> >     public void writeChar(char c) throws JMSException
> >     {
> > -        super.writeChar(c);
> > +        checkWritable();
> > +        _typedBytesContentWriter.writeChar(c);
> >     }
> >
> >     public void writeInt(int i) throws JMSException
> >     {
> > -        super.writeInt(i);
> > +        checkWritable();
> > +        _typedBytesContentWriter.writeInt(i);
> >     }
> >
> >     public void writeLong(long l) throws JMSException
> >     {
> > -        super.writeLong(l);
> > +        checkWritable();
> > +        _typedBytesContentWriter.writeLong(l);
> >     }
> >
> >     public void writeFloat(float v) throws JMSException
> >     {
> > -        super.writeFloat(v);
> > +        checkWritable();
> > +        _typedBytesContentWriter.writeFloat(v);
> >     }
> >
> >     public void writeDouble(double v) throws JMSException
> >     {
> > -        super.writeDouble(v);
> > +        checkWritable();
> > +        _typedBytesContentWriter.writeDouble(v);
> >     }
> >
> >     public void writeString(String string) throws JMSException
> >     {
> > -        super.writeString(string);
> > +        checkWritable();
> > +        _typedBytesContentWriter.writeString(string);
> >     }
> >
> >     public void writeBytes(byte[] bytes) throws JMSException
> >     {
> > -        super.writeBytes(bytes);
> > +        checkWritable();
> > +        _typedBytesContentWriter.writeBytes(bytes);
> >     }
> >
> >     public void writeBytes(byte[] bytes, int offset, int length)
> >     throws JMSException
> >     {
> > -        super.writeBytes(bytes,offset,length);
> > +        checkWritable();
> > +        _typedBytesContentWriter.writeBytes(bytes, offset,
> > length);
> >     }
> >
> >     public void writeObject(Object object) throws JMSException
> >     {
> > -        super.writeObject(object);
> > +        checkWritable();
> > +        _typedBytesContentWriter.writeObject(object);
> >     }
> >  }
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
> > Mon Sep 19 15:13:18 2011
> > @@ -22,10 +22,9 @@ package org.apache.qpid.client.message;
> >
> >  import javax.jms.JMSException;
> >
> > -import org.apache.mina.common.ByteBuffer;
> > +import java.nio.ByteBuffer;
> > +
> >  import org.apache.qpid.AMQException;
> > -import org.apache.qpid.framing.AMQShortString;
> > -import org.apache.qpid.framing.BasicContentHeaderProperties;
> >
> >  public class JMSStreamMessageFactory extends
> >  AbstractJMSMessageFactory
> >  {
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
> > Mon Sep 19 15:13:18 2011
> > @@ -20,15 +20,21 @@
> >  */
> >  package org.apache.qpid.client.message;
> >
> > +import java.io.DataInputStream;
> >  import java.io.UnsupportedEncodingException;
> > +import java.nio.ByteBuffer;
> > +import java.nio.CharBuffer;
> >  import java.nio.charset.CharacterCodingException;
> >  import java.nio.charset.Charset;
> > +import java.nio.charset.CharsetDecoder;
> > +import java.nio.charset.CharsetEncoder;
> >
> >  import javax.jms.JMSException;
> > +import javax.jms.MessageFormatException;
> >
> > -import org.apache.mina.common.ByteBuffer;
> >  import org.apache.qpid.AMQException;
> >  import org.apache.qpid.client.CustomJMSXProperty;
> > +import org.apache.qpid.framing.AMQFrameDecodingException;
> >  import org.apache.qpid.framing.AMQShortString;
> >  import org.apache.qpid.framing.BasicContentHeaderProperties;
> >  import org.apache.qpid.util.Strings;
> > @@ -37,6 +43,7 @@ public class JMSTextMessage extends Abst
> >  {
> >     private static final String MIME_TYPE = "text/plain";
> >
> > +    private Exception _exception;
> >     private String _decodedValue;
> >
> >     /**
> > @@ -45,36 +52,41 @@ public class JMSTextMessage extends Abst
> >     private static final String PAYLOAD_NULL_PROPERTY =
> >     CustomJMSXProperty.JMS_AMQP_NULL.toString();
> >     private static final Charset DEFAULT_CHARSET =
> >     Charset.forName("UTF-8");
> >
> > -    public JMSTextMessage(AMQMessageDelegateFactory
> > delegateFactory) throws JMSException
> > -    {
> > -        this(delegateFactory, null, null);
> > -    }
> > +    private CharsetDecoder _decoder =
> > DEFAULT_CHARSET.newDecoder();
> > +    private CharsetEncoder _encoder =
> > DEFAULT_CHARSET.newEncoder();
> > +
> > +    private static final ByteBuffer EMPTY_BYTE_BUFFER =
> > ByteBuffer.allocate(0);
> >
> > -    JMSTextMessage(AMQMessageDelegateFactory delegateFactory,
> > ByteBuffer data, String encoding) throws JMSException
> > +    public JMSTextMessage(AMQMessageDelegateFactory
> > delegateFactory) throws JMSException
> >     {
> > -        super(delegateFactory, data); // this instantiates a
> > content header
> > -        setContentType(getMimeType());
> > -        setEncoding(encoding);
> > +        super(delegateFactory, false); // this instantiates a
> > content header
> >     }
> >
> >     JMSTextMessage(AMQMessageDelegate delegate, ByteBuffer data)
> >             throws AMQException
> >     {
> > -        super(delegate, data);
> > -        setContentType(getMimeType());
> > -        _data = data;
> > -    }
> > +        super(delegate, data!=null);
> >
> > -
> > -    public void clearBodyImpl() throws JMSException
> > -    {
> > -        if (_data != null)
> > +        try
> >         {
> > -            _data.release();
> > -            _data = null;
> > +            if(propertyExists(PAYLOAD_NULL_PROPERTY))
> > +            {
> > +                _decodedValue = null;
> > +            }
> > +            else
> > +            {
> > +                _decodedValue = _decoder.decode(data).toString();
> > +            }
> > +        }
> > +        catch (CharacterCodingException e)
> > +        {
> > +            _exception = e;
> > +        }
> > +        catch (JMSException e)
> > +        {
> > +            _exception = e;
> >         }
> >
> > -        _decodedValue = null;
> >     }
> >
> >     public String toBodyString() throws JMSException
> > @@ -87,95 +99,62 @@ public class JMSTextMessage extends Abst
> >         return MIME_TYPE;
> >     }
> >
> > -    public void setText(String text) throws JMSException
> > +    @Override
> > +    public ByteBuffer getData() throws JMSException
> >     {
> > -        checkWritable();
> > -
> > -        clearBody();
> > +        _encoder.reset();
> >         try
> >         {
> > -            if (text != null)
> > +            if(_exception != null)
> > +            {
> > +                final MessageFormatException
> > messageFormatException = new MessageFormatException("Cannot decode
> > original message");
> > +
> >                messageFormatException.setLinkedException(_exception);
> > +                throw messageFormatException;
> > +            }
> > +            else if(_decodedValue == null)
> > +            {
> > +                return EMPTY_BYTE_BUFFER;
> > +            }
> > +            else
> >             {
> > -                final String encoding = getEncoding();
> > -                if (encoding == null ||
> > encoding.equalsIgnoreCase("UTF-8"))
> > -                {
> > -                    _data = ByteBuffer.wrap(Strings.toUTF8(text));
> > -                    setEncoding("UTF-8");
> > -                }
> > -                else
> > -                {
> > -                    _data =
> > ByteBuffer.wrap(text.getBytes(encoding));
> > -                }
> > -                _data.position(_data.limit());
> > -                _changedData=true;
> > +                return
> > _encoder.encode(CharBuffer.wrap(_decodedValue));
> >             }
> > -            _decodedValue = text;
> >         }
> > -        catch (UnsupportedEncodingException e)
> > +        catch (CharacterCodingException e)
> >         {
> > -            // should never occur
> > -            JMSException jmse = new JMSException("Unable to decode
> > text data");
> > -            jmse.setLinkedException(e);
> > -            jmse.initCause(e);
> > -            throw jmse;
> > +            final JMSException jmsException = new
> > JMSException("Cannot encode string in UFT-8: " + _decodedValue);
> > +            jmsException.setLinkedException(e);
> > +            throw jmsException;
> >         }
> >     }
> >
> > -    public String getText() throws JMSException
> > +    @Override
> > +    public void clearBody() throws JMSException
> >     {
> > -        if (_data == null && _decodedValue == null)
> > -        {
> > -            return null;
> > -        }
> > -        else if (_decodedValue != null)
> > -        {
> > -            return _decodedValue;
> > -        }
> > -        else
> > -        {
> > -            _data.rewind();
> > +        super.clearBody();
> > +        _decodedValue = null;
> > +        _exception = null;
> > +    }
> >
> > -            if (propertyExists(PAYLOAD_NULL_PROPERTY) &&
> > getBooleanProperty(PAYLOAD_NULL_PROPERTY))
> > -            {
> > -                return null;
> > -            }
> > -            if (getEncoding() != null)
> > -            {
> > -                try
> > -                {
> > -                    _decodedValue =
> > _data.getString(Charset.forName(getEncoding()).newDecoder());
> > -                }
> > -                catch (CharacterCodingException e)
> > -                {
> > -                    JMSException jmse = new JMSException("Could
> > not decode string data: " + e);
> > -                    jmse.setLinkedException(e);
> > -                    jmse.initCause(e);
> > -                    throw jmse;
> > -                }
> > -            }
> > -            else
> > -            {
> > -                try
> > -                {
> > -                    _decodedValue =
> > _data.getString(DEFAULT_CHARSET.newDecoder());
> > -                }
> > -                catch (CharacterCodingException e)
> > -                {
> > -                    JMSException jmse = new JMSException("Could
> > not decode string data: " + e);
> > -                    jmse.setLinkedException(e);
> > -                    jmse.initCause(e);
> > -                    throw jmse;
> > -                }
> > -            }
> > -            return _decodedValue;
> > -        }
> > +    public void setText(String text) throws JMSException
> > +    {
> > +        checkWritable();
> > +
> > +        clearBody();
> > +        _decodedValue = text;
> > +
> > +    }
> > +
> > +    public String getText() throws JMSException
> > +    {
> > +        return _decodedValue;
> >     }
> >
> >     @Override
> >     public void prepareForSending() throws JMSException
> >     {
> >         super.prepareForSending();
> > -        if (_data == null)
> > +        if (_decodedValue == null)
> >         {
> >             setBooleanProperty(PAYLOAD_NULL_PROPERTY, true);
> >         }
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
> > Mon Sep 19 15:13:18 2011
> > @@ -22,7 +22,7 @@ package org.apache.qpid.client.message;
> >
> >  import javax.jms.JMSException;
> >
> > -import org.apache.mina.common.ByteBuffer;
> > +import java.nio.ByteBuffer;
> >  import org.apache.qpid.AMQException;
> >  import org.apache.qpid.framing.AMQShortString;
> >  import org.apache.qpid.framing.BasicContentHeaderProperties;
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
> > Mon Sep 19 15:13:18 2011
> > @@ -87,9 +87,9 @@ public class UnprocessedMessage_0_8 exte
> >     public void receiveBody(ContentBody body)
> >     {
> >
> > -        if (body.payload != null)
> > +        if (body._payload != null)
> >         {
> > -            final long payloadSize = body.payload.remaining();
> > +            final long payloadSize = body._payload.length;
> >
> >             if (_bodies == null)
> >             {
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
> > Mon Sep 19 15:13:18 2011
> > @@ -20,7 +20,9 @@
> >  */
> >  package org.apache.qpid.client.protocol;
> >
> > +import java.io.DataOutputStream;
> >  import java.io.IOException;
> > +import java.io.OutputStream;
> >  import java.net.SocketAddress;
> >  import java.nio.ByteBuffer;
> >  import java.util.ArrayList;
> > @@ -31,7 +33,6 @@ import java.util.concurrent.CountDownLat
> >  import java.util.concurrent.ThreadFactory;
> >  import java.util.concurrent.TimeUnit;
> >
> > -import org.apache.mina.filter.codec.ProtocolCodecException;
> >  import org.apache.qpid.AMQConnectionClosedException;
> >  import org.apache.qpid.AMQDisconnectedException;
> >  import org.apache.qpid.AMQException;
> > @@ -46,6 +47,7 @@ import org.apache.qpid.client.state.AMQS
> >  import org.apache.qpid.client.state.StateWaiter;
> >  import
> >  org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
> >  import org.apache.qpid.codec.AMQCodecFactory;
> > +import org.apache.qpid.configuration.ClientProperties;
> >  import org.apache.qpid.framing.AMQBody;
> >  import org.apache.qpid.framing.AMQDataBlock;
> >  import org.apache.qpid.framing.AMQFrame;
> > @@ -57,8 +59,6 @@ import org.apache.qpid.framing.Heartbeat
> >  import org.apache.qpid.framing.MethodRegistry;
> >  import org.apache.qpid.framing.ProtocolInitiation;
> >  import org.apache.qpid.framing.ProtocolVersion;
> > -import org.apache.qpid.pool.Job;
> > -import org.apache.qpid.pool.ReferenceCountingExecutorService;
> >  import org.apache.qpid.protocol.AMQConstant;
> >  import org.apache.qpid.protocol.AMQMethodEvent;
> >  import org.apache.qpid.protocol.AMQMethodListener;
> > @@ -164,19 +164,19 @@ public class AMQProtocolHandler implemen
> >     private FailoverException _lastFailoverException;
> >
> >     /** Defines the default timeout to use for synchronous protocol
> >     commands. */
> > -    private final long DEFAULT_SYNC_TIMEOUT =
> > Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30);
> > +    private final long DEFAULT_SYNC_TIMEOUT =
> > Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
> > +
> >                                                           Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
> > +
> >                                                                        ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
> >
> >     /** Object to lock on when changing the latch */
> >     private Object _failoverLatchChange = new Object();
> >     private AMQCodecFactory _codecFactory;
> > -    private Job _readJob;
> > -    private Job _writeJob;
> > -    private ReferenceCountingExecutorService _poolReference =
> > ReferenceCountingExecutorService.getInstance();
> > +
> >     private ProtocolVersion _suggestedProtocolVersion;
> >
> >     private long _writtenBytes;
> >     private long _readBytes;
> > -    private NetworkTransport _transport;
> > +
> >     private NetworkConnection _network;
> >     private Sender<ByteBuffer> _sender;
> >
> > @@ -191,24 +191,6 @@ public class AMQProtocolHandler implemen
> >         _protocolSession = new AMQProtocolSession(this,
> >         _connection);
> >         _stateManager = new AMQStateManager(_protocolSession);
> >         _codecFactory = new AMQCodecFactory(false,
> >         _protocolSession);
> > -        _poolReference.setThreadFactory(new ThreadFactory()
> > -        {
> > -
> > -            public Thread newThread(final Runnable runnable)
> > -            {
> > -                try
> > -                {
> > -                    return
> > Threading.getThreadFactory().createThread(runnable);
> > -                }
> > -                catch (Exception e)
> > -                {
> > -                    throw new RuntimeException("Failed to create
> > thread", e);
> > -                }
> > -            }
> > -        });
> > -        _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS,
> > true);
> > -        _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS,
> > false);
> > -        _poolReference.acquireExecutorService();
> >         _failoverHandler = new FailoverHandler(this);
> >     }
> >
> > @@ -329,17 +311,7 @@ public class AMQProtocolHandler implemen
> >             }
> >             else
> >             {
> > -
> > -                if (cause instanceof ProtocolCodecException)
> > -                {
> > -                    _logger.info("Protocol Exception caught NOT
> > going to attempt failover as " +
> > -                                 "cause isn't
> > AMQConnectionClosedException: " + cause, cause);
> > -
> > -                    AMQException amqe = new AMQException("Protocol
> > handler error: " + cause, cause);
> > -                    propagateExceptionToAllWaiters(amqe);
> > -                }
> >                 _connection.exceptionReceived(cause);
> > -
> >             }
> >
> >             // FIXME Need to correctly handle other exceptions.
> >             Things like ...
> > @@ -433,76 +405,63 @@ public class AMQProtocolHandler implemen
> >
> >     public void received(ByteBuffer msg)
> >     {
> > +        _readBytes += msg.remaining();
> >         try
> >         {
> > -            _readBytes += msg.remaining();
> >             final ArrayList<AMQDataBlock> dataBlocks =
> >             _codecFactory.getDecoder().decodeBuffer(msg);
> >
> > -            Job.fireAsynchEvent(_poolReference.getPool(),
> > _readJob, new Runnable()
> > +            // Decode buffer
> > +
> > +            for (AMQDataBlock message : dataBlocks)
> >             {
> >
> > -                public void run()
> > -                {
> > -                    // Decode buffer
> > +                    if (PROTOCOL_DEBUG)
> > +                    {
> > +                        _protocolLogger.info(String.format("RECV:
> > [%s] %s", this, message));
> > +                    }
> >
> > -                    for (AMQDataBlock message : dataBlocks)
> > +                    if(message instanceof AMQFrame)
> >                     {
> > +                        final boolean debug =
> > _logger.isDebugEnabled();
> > +                        final long msgNumber =
> > ++_messageReceivedCount;
> >
> > -                        try
> > -                        {
> > -                            if (PROTOCOL_DEBUG)
> > -                            {
> > -
> >                                _protocolLogger.info(String.format("RECV:
> > [%s] %s", this, message));
> > -                            }
> > -
> > -                            if(message instanceof AMQFrame)
> > -                            {
> > -                                final boolean debug =
> > _logger.isDebugEnabled();
> > -                                final long msgNumber =
> > ++_messageReceivedCount;
> > -
> > -                                if (debug && ((msgNumber % 1000)
> > == 0))
> > -                                {
> > -                                    _logger.debug("Received " +
> > _messageReceivedCount + " protocol messages");
> > -                                }
> > -
> > -                                AMQFrame frame = (AMQFrame)
> > message;
> > -
> > -                                final AMQBody bodyFrame =
> > frame.getBodyFrame();
> > -
> > -
> >                                HeartbeatDiagnostics.received(bodyFrame
> > instanceof HeartbeatBody);
> > -
> > -
> >                                bodyFrame.handle(frame.getChannel(),
> > _protocolSession);
> > -
> > -
> >                                _connection.bytesReceived(_readBytes);
> > -                            }
> > -                            else if (message instanceof
> > ProtocolInitiation)
> > -                            {
> > -                                // We get here if the server sends
> > a response to our initial protocol header
> > -                                // suggesting an alternate
> > ProtocolVersion; the server will then close the
> > -                                // connection.
> > -                                ProtocolInitiation protocolInit =
> > (ProtocolInitiation) message;
> > -                                _suggestedProtocolVersion =
> > protocolInit.checkVersion();
> > -                                _logger.info("Broker suggested
> > using protocol version:" + _suggestedProtocolVersion);
> > -
> > -                                // get round a bug in old versions
> > of qpid whereby the connection is not closed
> > -
> >                                _stateManager.changeState(AMQState.CONNECTION_CLOSED);
> > -                            }
> > -                        }
> > -                        catch (Exception e)
> > +                        if (debug && ((msgNumber % 1000) == 0))
> >                         {
> > -                            _logger.error("Exception processing
> > frame", e);
> > -                            propagateExceptionToFrameListeners(e);
> > -                            exception(e);
> > +                            _logger.debug("Received " +
> > _messageReceivedCount + " protocol messages");
> >                         }
> > +
> > +                        AMQFrame frame = (AMQFrame) message;
> > +
> > +                        final AMQBody bodyFrame =
> > frame.getBodyFrame();
> > +
> > +                        HeartbeatDiagnostics.received(bodyFrame
> > instanceof HeartbeatBody);
> > +
> > +                        bodyFrame.handle(frame.getChannel(),
> > _protocolSession);
> > +
> > +                        _connection.bytesReceived(_readBytes);
> > +                    }
> > +                    else if (message instanceof
> > ProtocolInitiation)
> > +                    {
> > +                        // We get here if the server sends a
> > response to our initial protocol header
> > +                        // suggesting an alternate
> > ProtocolVersion; the server will then close the
> > +                        // connection.
> > +                        ProtocolInitiation protocolInit =
> > (ProtocolInitiation) message;
> > +                        _suggestedProtocolVersion =
> > protocolInit.checkVersion();
> > +                        _logger.info("Broker suggested using
> > protocol version:" + _suggestedProtocolVersion);
> > +
> > +                        // get round a bug in old versions of qpid
> > whereby the connection is not closed
> > +
> >                        _stateManager.changeState(AMQState.CONNECTION_CLOSED);
> >                     }
> >                 }
> > -            });
> >         }
> >         catch (Exception e)
> >         {
> > +            _logger.error("Exception processing frame", e);
> >             propagateExceptionToFrameListeners(e);
> >             exception(e);
> >         }
> > +
> > +
> >     }
> >
> >     public void methodBodyReceived(final int channelId, final
> >     AMQBody bodyFrame)
> > @@ -568,17 +527,13 @@ public class AMQProtocolHandler implemen
> >         writeFrame(frame, false);
> >     }
> >
> > -    public void writeFrame(AMQDataBlock frame, boolean wait)
> > +    public  synchronized void writeFrame(AMQDataBlock frame,
> > boolean wait)
> >     {
> > -        final ByteBuffer buf = frame.toNioByteBuffer();
> > +        final ByteBuffer buf = asByteBuffer(frame);
> >         _writtenBytes += buf.remaining();
> > -        Job.fireAsynchEvent(_poolReference.getPool(), _writeJob,
> > new Runnable()
> > -        {
> > -            public void run()
> > -            {
> > -                _sender.send(buf);
> > -            }
> > -        });
> > +        _sender.send(buf);
> > +        _sender.flush();
> > +
> >         if (PROTOCOL_DEBUG)
> >         {
> >             _protocolLogger.debug(String.format("SEND: [%s] %s",
> >             this, frame));
> > @@ -595,12 +550,41 @@ public class AMQProtocolHandler implemen
> >
> >         _connection.bytesSent(_writtenBytes);
> >
> > -        if (wait)
> > +    }
> > +
> > +    private ByteBuffer asByteBuffer(AMQDataBlock block)
> > +    {
> > +        final ByteBuffer buf = ByteBuffer.allocate((int)
> > block.getSize());
> > +
> > +        try
> > +        {
> > +            block.writePayload(new DataOutputStream(new
> > OutputStream()
> > +            {
> > +
> > +
> > +                @Override
> > +                public void write(int b) throws IOException
> > +                {
> > +                    buf.put((byte) b);
> > +                }
> > +
> > +                @Override
> > +                public void write(byte[] b, int off, int len)
> > throws IOException
> > +                {
> > +                    buf.put(b, off, len);
> > +                }
> > +            }));
> > +        }
> > +        catch (IOException e)
> >         {
> > -            _sender.flush();
> > +            throw new RuntimeException(e);
> >         }
> > +
> > +        buf.flip();
> > +        return buf;
> >     }
> >
> > +
> >     /**
> >      * Convenience method that writes a frame to the protocol
> >      session and waits for a particular response. Equivalent to
> >      * calling getProtocolSession().write() then waiting for the
> >      response.
> > @@ -723,7 +707,7 @@ public class AMQProtocolHandler implemen
> >                 _logger.debug("FailoverException interrupted
> >                 connection close, ignoring as connection   close
> >                 anyway.");
> >             }
> >         }
> > -        _poolReference.releaseExecutorService();
> > +
> >     }
> >
> >     /** @return the number of bytes read from this protocol session
> >     */
> > @@ -841,8 +825,13 @@ public class AMQProtocolHandler implemen
> >
> >     public void setNetworkConnection(NetworkConnection network)
> >     {
> > +        setNetworkConnection(network, network.getSender());
> > +    }
> > +
> > +    public void setNetworkConnection(NetworkConnection network,
> > Sender<ByteBuffer> sender)
> > +    {
> >         _network = network;
> > -        _sender = network.getSender();
> > +        _sender = sender;
> >     }
> >
> >     /** @param delay delay in seconds (not ms) */
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java
> > Mon Sep 19 15:13:18 2011
> > @@ -20,17 +20,22 @@
> >  */
> >  package org.apache.qpid.client.security;
> >
> > -import org.apache.qpid.util.FileUtils;
> > -
> > -import org.slf4j.Logger;
> > -import org.slf4j.LoggerFactory;
> > -
> >  import java.io.IOException;
> >  import java.io.InputStream;
> > +import java.util.Collection;
> > +import java.util.Collections;
> >  import java.util.Enumeration;
> >  import java.util.HashMap;
> > +import java.util.HashSet;
> >  import java.util.Map;
> >  import java.util.Properties;
> > +import java.util.Set;
> > +import java.util.StringTokenizer;
> > +import java.util.TreeMap;
> > +
> > +import org.apache.qpid.util.FileUtils;
> > +import org.slf4j.Logger;
> > +import org.slf4j.LoggerFactory;
> >
> >  /**
> >  * CallbackHandlerRegistry is a registry for call back handlers for
> >  user authentication and interaction during user
> > @@ -42,7 +47,7 @@ import java.util.Properties;
> >  * "amp.callbackhandler.properties". The format of the properties
> >  file is:
> >  *
> >  * <p/><pre>
> > - * CallbackHanlder.mechanism=fully.qualified.class.name
> > + * CallbackHanlder.n.mechanism=fully.qualified.class.name where n
> > is an ordinal
> >  * </pre>
> >  *
> >  * <p/>Where mechanism is an IANA-registered mechanism name and the
> >  fully qualified class name refers to a
> > @@ -66,51 +71,15 @@ public class CallbackHandlerRegistry
> >     public static final String DEFAULT_RESOURCE_NAME =
> >     "org/apache/qpid/client/security/CallbackHandlerRegistry.properties";
> >
> >     /** A static reference to the singleton instance of this
> >     registry. */
> > -    private static CallbackHandlerRegistry _instance = new
> > CallbackHandlerRegistry();
> > +    private static final CallbackHandlerRegistry _instance;
> >
> >     /** Holds a map from SASL mechanism names to call back
> >     handlers. */
> > -    private Map<String, Class> _mechanismToHandlerClassMap = new
> > HashMap<String, Class>();
> > -
> > -    /** Holds a space delimited list of mechanisms that callback
> > handlers exist for. */
> > -    private String _mechanisms;
> > -
> > -    /**
> > -     * Gets the singleton instance of this registry.
> > -     *
> > -     * @return The singleton instance of this registry.
> > -     */
> > -    public static CallbackHandlerRegistry getInstance()
> > -    {
> > -        return _instance;
> > -    }
> > +    private Map<String, Class<AMQCallbackHandler>>
> > _mechanismToHandlerClassMap = new HashMap<String,
> > Class<AMQCallbackHandler>>();
> >
> > -    /**
> > -     * Gets the callback handler class for a given SASL mechanism
> > name.
> > -     *
> > -     * @param mechanism The SASL mechanism name.
> > -     *
> > -     * @return The callback handler class for the mechanism, or
> > null if none is configured for that mechanism.
> > -     */
> > -    public Class getCallbackHandlerClass(String mechanism)
> > -    {
> > -        return (Class) _mechanismToHandlerClassMap.get(mechanism);
> > -    }
> > +    /** Ordered collection of mechanisms for which callback
> > handlers exist. */
> > +    private Collection<String> _mechanisms;
> >
> > -    /**
> > -     * Gets a space delimited list of supported SASL mechanisms.
> > -     *
> > -     * @return A space delimited list of supported SASL
> > mechanisms.
> > -     */
> > -    public String getMechanisms()
> > -    {
> > -        return _mechanisms;
> > -    }
> > -
> > -    /**
> > -     * Creates the call back handler registry from its
> > configuration resource or file. This also has the side effect
> > -     * of configuring and registering the SASL client factory
> > implementations using {@link DynamicSaslRegistrar}.
> > -     */
> > -    private CallbackHandlerRegistry()
> > +    static
> >     {
> >         // Register any configured SASL client factories.
> >         DynamicSaslRegistrar.registerSaslProviders();
> > @@ -120,12 +89,12 @@ public class CallbackHandlerRegistry
> >             FileUtils.openFileOrDefaultResource(filename,
> >             DEFAULT_RESOURCE_NAME,
> >                 CallbackHandlerRegistry.class.getClassLoader());
> >
> > +        final Properties props = new Properties();
> > +
> >         try
> >         {
> > -            Properties props = new Properties();
> > +
> >             props.load(is);
> > -            parseProperties(props);
> > -            _logger.info("Callback handlers available for SASL
> > mechanisms: " + _mechanisms);
> >         }
> >         catch (IOException e)
> >         {
> > @@ -146,32 +115,68 @@ public class CallbackHandlerRegistry
> >                 }
> >             }
> >         }
> > +
> > +        _instance = new CallbackHandlerRegistry(props);
> > +        _logger.info("Callback handlers available for SASL
> > mechanisms: " + _instance._mechanisms);
> > +
> >     }
> >
> > -    /*private InputStream openPropertiesInputStream(String
> > filename)
> > +    /**
> > +     * Gets the singleton instance of this registry.
> > +     *
> > +     * @return The singleton instance of this registry.
> > +     */
> > +    public static CallbackHandlerRegistry getInstance()
> > +    {
> > +        return _instance;
> > +    }
> > +
> > +    public AMQCallbackHandler createCallbackHandler(final String
> > mechanism)
> >     {
> > -        boolean useDefault = true;
> > -        InputStream is = null;
> > -        if (filename != null)
> > +        final Class<AMQCallbackHandler> mechanismClass =
> > _mechanismToHandlerClassMap.get(mechanism);
> > +
> > +        if (mechanismClass == null)
> >         {
> > -            try
> > -            {
> > -                is = new BufferedInputStream(new
> > FileInputStream(new File(filename)));
> > -                useDefault = false;
> > -            }
> > -            catch (FileNotFoundException e)
> > -            {
> > -                _logger.error("Unable to read from file " +
> > filename + ": " + e, e);
> > -            }
> > +            throw new IllegalArgumentException("Mechanism " +
> > mechanism + " not known");
> >         }
> >
> > -        if (useDefault)
> > +        try
> > +        {
> > +            return mechanismClass.newInstance();
> > +        }
> > +        catch (InstantiationException e)
> > +        {
> > +            throw new IllegalArgumentException("Unable to create
> > an instance of mechanism " + mechanism, e);
> > +        }
> > +        catch (IllegalAccessException e)
> >         {
> > -            is =
> > CallbackHandlerRegistry.class.getResourceAsStream(DEFAULT_RESOURCE_NAME);
> > +            throw new IllegalArgumentException("Unable to create
> > an instance of mechanism " + mechanism, e);
> >         }
> > +    }
> >
> > -        return is;
> > -    }*/
> > +    /**
> > +     * Gets collections of supported SASL mechanism names, ordered
> > by preference
> > +     *
> > +     * @return collection of SASL mechanism names.
> > +     */
> > +    public Collection<String> getMechanisms()
> > +    {
> > +        return Collections.unmodifiableCollection(_mechanisms);
> > +    }
> > +
> > +    /**
> > +     * Creates the call back handler registry from its
> > configuration resource or file.
> > +     *
> > +     * This also has the side effect of configuring and
> > registering the SASL client factory
> > +     * implementations using {@link DynamicSaslRegistrar}.
> > +     *
> > +     * This constructor is default protection to allow for
> > effective unit testing.  Clients must use
> > +     * {@link #getInstance()} to obtain the singleton instance.
> > +     */
> > +    CallbackHandlerRegistry(final Properties props)
> > +    {
> > +        parseProperties(props);
> > +    }
> >
> >     /**
> >      * Scans the specified properties as a mapping from IANA
> >      registered SASL mechanism to call back handler
> > @@ -183,20 +188,20 @@ public class CallbackHandlerRegistry
> >      */
> >     private void parseProperties(Properties props)
> >     {
> > +
> > +        final Map<Integer, String> mechanisms = new
> > TreeMap<Integer, String>();
> > +
> >         Enumeration e = props.propertyNames();
> >         while (e.hasMoreElements())
> >         {
> > -            String propertyName = (String) e.nextElement();
> > -            int period = propertyName.indexOf(".");
> > -            if (period < 0)
> > -            {
> > -                _logger.warn("Unable to parse property " +
> > propertyName + " when configuring SASL providers");
> > +            final String propertyName = (String) e.nextElement();
> > +            final String[] parts = propertyName.split("\\.", 2);
> >
> > -                continue;
> > -            }
> > +            checkPropertyNameFormat(propertyName, parts);
> >
> > -            String mechanism = propertyName.substring(period + 1);
> > -            String className = props.getProperty(propertyName);
> > +            final String mechanism = parts[0];
> > +            final int ordinal = getPropertyOrdinal(propertyName,
> > parts);
> > +            final String className =
> > props.getProperty(propertyName);
> >             Class clazz = null;
> >             try
> >             {
> > @@ -205,20 +210,11 @@ public class CallbackHandlerRegistry
> >                 {
> >                     _logger.warn("SASL provider " + clazz + " does
> >                     not implement " + AMQCallbackHandler.class
> >                         + ". Skipping");
> > -
> >                     continue;
> >                 }
> > -
> >                 _mechanismToHandlerClassMap.put(mechanism, clazz);
> > -                if (_mechanisms == null)
> > -                {
> > -                    _mechanisms = mechanism;
> > -                }
> > -                else
> > -                {
> > -                    // one time cost
> > -                    _mechanisms = _mechanisms + " " + mechanism;
> > -                }
> > +
> > +                mechanisms.put(ordinal, mechanism);
> >             }
> >             catch (ClassNotFoundException ex)
> >             {
> > @@ -227,5 +223,91 @@ public class CallbackHandlerRegistry
> >                 continue;
> >             }
> >         }
> > +
> > +        _mechanisms = mechanisms.values();  // order guaranteed by
> > keys of treemap (i.e. our ordinals)
> > +
> > +
> > +    }
> > +
> > +    private void checkPropertyNameFormat(final String
> > propertyName, final String[] parts)
> > +    {
> > +        if (parts.length != 2)
> > +        {
> > +            throw new IllegalArgumentException("Unable to parse
> > property " + propertyName + " when configuring SASL providers");
> > +        }
> > +    }
> > +
> > +    private int getPropertyOrdinal(final String propertyName,
> > final String[] parts)
> > +    {
> > +        try
> > +        {
> > +            return Integer.parseInt(parts[1]);
> > +        }
> > +        catch(NumberFormatException nfe)
> > +        {
> > +            throw new IllegalArgumentException("Unable to parse
> > property " + propertyName + " when configuring SASL providers",
> > nfe);
> > +        }
> > +    }
> > +
> > +    /**
> > +     * Selects a SASL mechanism that is mutually available to both
> > parties.  If more than one
> > +     * mechanism is mutually available the one appearing first (by
> > ordinal) will be returned.
> > +     *
> > +     * @param peerMechanismList space separated list of mechanisms
> > +     * @return selected mechanism, or null if none available
> > +     */
> > +    public String selectMechanism(final String peerMechanismList)
> > +    {
> > +        final Set<String> peerList =
> > mechListToSet(peerMechanismList);
> > +
> > +        return selectMechInternal(peerList,
> > Collections.<String>emptySet());
> > +    }
> > +
> > +    /**
> > +     * Selects a SASL mechanism that is mutually available to both
> > parties.
> > +     *
> > +     * @param peerMechanismList space separated list of mechanisms
> > +     * @param restrictionList space separated list of mechanisms
> > +     * @return selected mechanism, or null if none available
> > +     */
> > +    public String selectMechanism(final String peerMechanismList,
> > final String restrictionList)
> > +    {
> > +        final Set<String> peerList =
> > mechListToSet(peerMechanismList);
> > +        final Set<String> restrictionSet =
> > mechListToSet(restrictionList);
> > +
> > +        return selectMechInternal(peerList, restrictionSet);
> > +    }
> > +
> > +    private String selectMechInternal(final Set<String> peerSet,
> > final Set<String> restrictionSet)
> > +    {
> > +        for (final String mech : _mechanisms)
> > +        {
> > +            if (peerSet.contains(mech))
> > +            {
> > +                if (restrictionSet.isEmpty() ||
> > restrictionSet.contains(mech))
> > +                {
> > +                    return mech;
> > +                }
> > +            }
> > +        }
> > +
> > +        return null;
> > +    }
> > +
> > +    private Set<String> mechListToSet(final String mechanismList)
> > +    {
> > +        if (mechanismList == null)
> > +        {
> > +            return Collections.emptySet();
> > +        }
> > +
> > +        final StringTokenizer tokenizer = new
> > StringTokenizer(mechanismList, " ");
> > +        final Set<String> mechanismSet = new
> > HashSet<String>(tokenizer.countTokens());
> > +        while (tokenizer.hasMoreTokens())
> > +        {
> > +            mechanismSet.add(tokenizer.nextToken());
> > +        }
> > +        return Collections.unmodifiableSet(mechanismSet);
> >     }
> > +
> >  }
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
> > Mon Sep 19 15:13:18 2011
> > @@ -16,7 +16,17 @@
> >  # specific language governing permissions and limitations
> >  # under the License.
> >  #
> > -CallbackHandler.CRAM-MD5-HASHED=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
> > -CallbackHandler.CRAM-MD5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> > -CallbackHandler.AMQPLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> > -CallbackHandler.PLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> > +
> > +#
> > +# Format:
> > +# <mechanism name>.ordinal=<implementation>
> > +#
> > +# @see CallbackHandlerRegistry
> > +#
> > +
> > +EXTERNAL.1=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> > +GSSAPI.2=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> > +CRAM-MD5-HASHED.3=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
> > +CRAM-MD5.4=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> > +AMQPLAIN.5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> > +PLAIN.6=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
> > Mon Sep 19 15:13:18 2011
> > @@ -22,7 +22,7 @@ package org.apache.qpid.jms;
> >
> >  import java.util.Map;
> >
> > -import org.apache.qpid.client.SSLConfiguration;
> > +import org.apache.qpid.transport.ConnectionSettings;
> >
> >  public interface BrokerDetails
> >  {
> > @@ -104,14 +104,12 @@ public interface BrokerDetails
> >     long getTimeout();
> >
> >     void setTimeout(long timeout);
> > -
> > -    SSLConfiguration getSSLConfiguration();
> > -
> > -    void setSSLConfiguration(SSLConfiguration sslConfiguration);
> >
> >     boolean getBooleanProperty(String propName);
> >
> >     String toString();
> >
> >     boolean equals(Object o);
> > +
> > +    ConnectionSettings buildConnectionSettings();
> >  }
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
> > Mon Sep 19 15:13:18 2011
> > @@ -140,7 +140,6 @@ public class FailoverExchangeMethod impl
> >                         broker.setHost(tokens[1]);
> >                         broker.setPort(Integer.parseInt(tokens[2]));
> >                         broker.setProperties(_originalBrokerDetail.getProperties());
> > -
> >                        broker.setSSLConfiguration(_originalBrokerDetail.getSSLConfiguration());
> >                         brokerList.add(broker);
> >
> >                         if
> >                         (currentBrokerIP.equals(broker.getHost())
> >                         &&
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
> > Mon Sep 19 15:13:18 2011
> > @@ -36,6 +36,7 @@ import javax.jms.Queue;
> >  import javax.jms.Topic;
> >  import javax.naming.Context;
> >  import javax.naming.NamingException;
> > +import javax.naming.ConfigurationException;
> >  import javax.naming.spi.InitialContextFactory;
> >
> >  import org.apache.qpid.client.AMQConnectionFactory;
> > @@ -139,7 +140,7 @@ public class PropertiesFileInitialContex
> >         return new ReadOnlyContext(environment, data);
> >     }
> >
> > -    protected void createConnectionFactories(Map data, Hashtable
> > environment)
> > +    protected void createConnectionFactories(Map data, Hashtable
> > environment) throws ConfigurationException
> >     {
> >         for (Iterator iter = environment.entrySet().iterator();
> >         iter.hasNext();)
> >         {
> > @@ -157,7 +158,7 @@ public class PropertiesFileInitialContex
> >         }
> >     }
> >
> > -    protected void createDestinations(Map data, Hashtable
> > environment)
> > +    protected void createDestinations(Map data, Hashtable
> > environment) throws ConfigurationException
> >     {
> >         for (Iterator iter = environment.entrySet().iterator();
> >         iter.hasNext();)
> >         {
> > @@ -225,7 +226,7 @@ public class PropertiesFileInitialContex
> >     /**
> >      * Factory method to create new Connection Factory instances
> >      */
> > -    protected ConnectionFactory createFactory(String url)
> > +    protected ConnectionFactory createFactory(String url) throws
> > ConfigurationException
> >     {
> >         try
> >         {
> > @@ -233,16 +234,18 @@ public class PropertiesFileInitialContex
> >         }
> >         catch (URLSyntaxException urlse)
> >         {
> > -            _logger.warn("Unable to createFactories:" + urlse);
> > -        }
> > +            _logger.warn("Unable to create factory:" + urlse);
> >
> > -        return null;
> > +            ConfigurationException ex = new
> > ConfigurationException("Failed to parse entry: " + urlse + " due
> > to : " +  urlse.getMessage());
> > +            ex.initCause(urlse);
> > +            throw ex;
> > +        }
> >     }
> >
> >     /**
> >      * Factory method to create new Destination instances from an
> >      AMQP BindingURL
> >      */
> > -    protected Destination createDestination(String str)
> > +    protected Destination createDestination(String str) throws
> > ConfigurationException
> >     {
> >         try
> >         {
> > @@ -252,7 +255,9 @@ public class PropertiesFileInitialContex
> >         {
> >             _logger.warn("Unable to create destination:" + e, e);
> >
> > -            return null;
> > +            ConfigurationException ex = new
> > ConfigurationException("Failed to parse entry: " + str + " due to
> > : " +  e.getMessage());
> > +            ex.initCause(e);
> > +            throw ex;
> >         }
> >     }
> >
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
> > Mon Sep 19 15:13:18 2011
> > @@ -23,7 +23,6 @@ package org.apache.qpid.client;
> >  import org.apache.qpid.AMQException;
> >  import org.apache.qpid.client.state.AMQState;
> >  import org.apache.qpid.framing.ProtocolVersion;
> > -import org.apache.qpid.jms.ConnectionURL;
> >  import org.apache.qpid.jms.BrokerDetails;
> >  import org.apache.qpid.url.URLSyntaxException;
> >
> > @@ -37,48 +36,18 @@ public class MockAMQConnection extends A
> >         super(broker, username, password, clientName, virtualHost);
> >     }
> >
> > -    public MockAMQConnection(String broker, String username,
> > String password, String clientName, String virtualHost,
> > SSLConfiguration sslConfig)
> > -            throws AMQException, URLSyntaxException
> > -    {
> > -        super(broker, username, password, clientName, virtualHost,
> > sslConfig);
> > -    }
> > -
> >     public MockAMQConnection(String host, int port, String
> >     username, String password, String clientName, String
> >     virtualHost)
> >             throws AMQException, URLSyntaxException
> >     {
> >         super(host, port, username, password, clientName,
> >         virtualHost);
> >     }
> >
> > -    public MockAMQConnection(String host, int port, String
> > username, String password, String clientName, String virtualHost,
> > SSLConfiguration sslConfig)
> > -            throws AMQException, URLSyntaxException
> > -    {
> > -        super(host, port, username, password, clientName,
> > virtualHost, sslConfig);
> > -    }
> > -
> > -    public MockAMQConnection(String host, int port, boolean
> > useSSL, String username, String password, String clientName,
> > String virtualHost, SSLConfiguration sslConfig)
> > -            throws AMQException, URLSyntaxException
> > -    {
> > -        super(host, port, useSSL, username, password, clientName,
> > virtualHost, sslConfig);
> > -    }
> > -
> >     public MockAMQConnection(String connection)
> >             throws AMQException, URLSyntaxException
> >     {
> >         super(connection);
> >     }
> >
> > -    public MockAMQConnection(String connection, SSLConfiguration
> > sslConfig)
> > -            throws AMQException, URLSyntaxException
> > -    {
> > -        super(connection, sslConfig);
> > -    }
> > -
> > -    public MockAMQConnection(ConnectionURL connectionURL,
> > SSLConfiguration sslConfig)
> > -            throws AMQException
> > -    {
> > -        super(connectionURL, sslConfig);
> > -    }
> > -
> >     @Override
> >     public ProtocolVersion makeBrokerConnection(BrokerDetails
> >     brokerDetail) throws IOException
> >     {
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
> > Mon Sep 19 15:13:18 2011
> > @@ -43,4 +43,9 @@ public class TestMessageHelper
> >     {
> >         return new
> >         JMSStreamMessage(AMQMessageDelegateFactory.FACTORY_0_8);
> >     }
> > +
> > +    public static JMSObjectMessage newJMSObjectMessage()
> > +    {
> > +        return new
> > JMSObjectMessage(AMQMessageDelegateFactory.FACTORY_0_8);
> > +    }
> >  }
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java
> > Mon Sep 19 15:13:18 2011
> > @@ -21,10 +21,10 @@
> >  package org.apache.qpid.test.unit.jndi;
> >
> >  import junit.framework.TestCase;
> > +
> >  import org.apache.qpid.client.AMQConnectionFactory;
> >  import org.apache.qpid.jms.BrokerDetails;
> >  import org.apache.qpid.jms.ConnectionURL;
> > -import org.apache.qpid.url.URLSyntaxException;
> >
> >  public class ConnectionFactoryTest extends TestCase
> >  {
> > @@ -34,21 +34,9 @@ public class ConnectionFactoryTest exten
> >     public static final String URL =
> >     "amqp://guest:guest@clientID/test?brokerlist='tcp://localhost:5672'";
> >     public static final String URL_STAR_PWD =
> >     "amqp://guest:********@clientID/test?brokerlist='tcp://localhost:5672'";
> >
> > -    public void testConnectionURLString()
> > +    public void testConnectionURLStringMasksPassword() throws
> > Exception
> >     {
> > -        AMQConnectionFactory factory = new AMQConnectionFactory();
> > -
> > -        assertNull("ConnectionURL should have no value at start",
> > -                   factory.getConnectionURL());
> > -
> > -        try
> > -        {
> > -            factory.setConnectionURLString(URL);
> > -        }
> > -        catch (URLSyntaxException e)
> > -        {
> > -            fail(e.getMessage());
> > -        }
> > +        AMQConnectionFactory factory = new
> > AMQConnectionFactory(URL);
> >
> >         //URL will be returned with the password field swapped for
> >         '********'
> >         assertEquals("Connection URL not correctly set",
> >         URL_STAR_PWD, factory.getConnectionURLString());
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
> > Mon Sep 19 15:13:18 2011
> > @@ -24,6 +24,7 @@ import java.util.Properties;
> >
> >  import javax.jms.Queue;
> >  import javax.jms.Topic;
> > +import javax.naming.ConfigurationException;
> >  import javax.naming.Context;
> >  import javax.naming.InitialContext;
> >
> > @@ -67,4 +68,22 @@ public class JNDIPropertyFileTest extend
> >             assertEquals("Topic" + i +
> >             "WithSpace",bindingKey.asString());
> >         }
> >     }
> > +
> > +    public void testConfigurationErrors() throws Exception
> > +    {
> > +        Properties properties = new Properties();
> > +        properties.put("java.naming.factory.initial",
> > "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
> > +
> >        properties.put("destination.my-queue","amq.topic/test;create:always}");
> > +
> > +        try
> > +        {
> > +            ctx = new InitialContext(properties);
> > +            fail("A configuration exception should be thrown with
> > details about the address syntax error");
> > +        }
> > +        catch(ConfigurationException e)
> > +        {
> > +            assertTrue("Incorrect exception",
> > e.getMessage().contains("Failed to parse entry:
> > amq.topic/test;create:always}"));
> > +        }
> > +
> > +    }
> >  }
> >
> > Modified: qpid/branches/qpid-3346/qpid/java/common.xml
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common.xml?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > --- qpid/branches/qpid-3346/qpid/java/common.xml (original)
> > +++ qpid/branches/qpid-3346/qpid/java/common.xml Mon Sep 19
> > 15:13:18 2011
> > @@ -132,8 +132,6 @@
> >        </sequential>
> >   </macrodef>
> >
> > -
> > -
> >   <macrodef name="jython">
> >     <attribute name="path"/>
> >     <element name="args"/>
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
> > Mon Sep 19 15:13:18 2011
> > @@ -20,9 +20,6 @@
> >  */
> >  package org.apache.qpid.codec;
> >
> > -import org.apache.mina.filter.codec.ProtocolCodecFactory;
> > -import org.apache.mina.filter.codec.ProtocolDecoder;
> > -import org.apache.mina.filter.codec.ProtocolEncoder;
> >  import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
> >
> >  /**
> > @@ -31,14 +28,11 @@ import org.apache.qpid.protocol.AMQVersi
> >  *
> >  * <p/><table id="crc"><caption>CRC Card</caption>
> >  * <tr><th> Responsibilities <th> Collaborations.
> > - * <tr><td> Supply the protocol encoder. <td> {@link AMQEncoder}
> >  * <tr><td> Supply the protocol decoder. <td> {@link AMQDecoder}
> >  * </table>
> >  */
> > -public class AMQCodecFactory implements ProtocolCodecFactory
> > +public class AMQCodecFactory
> >  {
> > -    /** Holds the protocol encoder. */
> > -    private final AMQEncoder _encoder = new AMQEncoder();
> >
> >     /** Holds the protocol decoder. */
> >     private final AMQDecoder _frameDecoder;
> > @@ -56,15 +50,6 @@ public class AMQCodecFactory implements
> >         _frameDecoder = new AMQDecoder(expectProtocolInitiation,
> >         session);
> >     }
> >
> > -    /**
> > -     * Gets the AMQP encoder.
> > -     *
> > -     * @return The AMQP encoder.
> > -     */
> > -    public ProtocolEncoder getEncoder()
> > -    {
> > -        return _encoder;
> > -    }
> >
> >     /**
> >      * Gets the AMQP decoder.
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
> > Mon Sep 19 15:13:18 2011
> > @@ -20,13 +20,9 @@
> >  */
> >  package org.apache.qpid.codec;
> >
> > -import java.util.ArrayList;
> > -
> > -import org.apache.mina.common.ByteBuffer;
> > -import org.apache.mina.common.IoSession;
> > -import org.apache.mina.common.SimpleByteBufferAllocator;
> > -import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
> > -import org.apache.mina.filter.codec.ProtocolDecoderOutput;
> > +import java.io.*;
> > +import java.nio.ByteBuffer;
> > +import java.util.*;
> >
> >  import org.apache.qpid.framing.AMQDataBlock;
> >  import org.apache.qpid.framing.AMQDataBlockDecoder;
> > @@ -54,11 +50,8 @@ import org.apache.qpid.protocol.AMQVersi
> >  * @todo If protocol initiation decoder not needed, then don't
> >  create it. Probably not a big deal, but it adds to the
> >  *       per-session overhead.
> >  */
> > -public class AMQDecoder extends CumulativeProtocolDecoder
> > +public class AMQDecoder
> >  {
> > -
> > -    private static final String BUFFER =
> > AMQDecoder.class.getName() + ".Buffer";
> > -
> >     /** Holds the 'normal' AMQP data decoder. */
> >     private AMQDataBlockDecoder _dataBlockDecoder = new
> >     AMQDataBlockDecoder();
> >
> > @@ -67,12 +60,11 @@ public class AMQDecoder extends Cumulati
> >
> >     /** Flag to indicate whether this decoder needs to handle
> >     protocol initiation. */
> >     private boolean _expectProtocolInitiation;
> > -    private boolean firstDecode = true;
> >
> >     private AMQMethodBodyFactory _bodyFactory;
> >
> > -    private ByteBuffer _remainingBuf;
> > -
> > +    private List<ByteArrayInputStream> _remainingBufs = new
> > ArrayList<ByteArrayInputStream>();
> > +
> >     /**
> >      * Creates a new AMQP decoder.
> >      *
> > @@ -84,98 +76,7 @@ public class AMQDecoder extends Cumulati
> >         _bodyFactory = new AMQMethodBodyFactory(session);
> >     }
> >
> > -    /**
> > -     * Delegates decoding AMQP from the data buffer that Mina has
> > retrieved from the wire, to the data or protocol
> > -     * intiation decoders.
> > -     *
> > -     * @param session The Mina session.
> > -     * @param in      The raw byte buffer.
> > -     * @param out     The Mina object output gatherer to write
> > decoded objects to.
> > -     *
> > -     * @return <tt>true</tt> if the data was decoded,
> > <tt>false<tt> if more is needed and the data should accumulate.
> > -     *
> > -     * @throws Exception If the data cannot be decoded for any
> > reason.
> > -     */
> > -    protected boolean doDecode(IoSession session, ByteBuffer in,
> > ProtocolDecoderOutput out) throws Exception
> > -    {
> > -
> > -        boolean decoded;
> > -        if (_expectProtocolInitiation
> > -            || (firstDecode
> > -                && (in.remaining() > 0)
> > -                && (in.get(in.position()) == (byte)'A')))
> > -        {
> > -            decoded = doDecodePI(session, in, out);
> > -        }
> > -        else
> > -        {
> > -            decoded = doDecodeDataBlock(session, in, out);
> > -        }
> > -        if(firstDecode && decoded)
> > -        {
> > -            firstDecode = false;
> > -        }
> > -        return decoded;
> > -    }
> > -
> > -    /**
> > -     * Decodes AMQP data, delegating the decoding to an {@link
> > AMQDataBlockDecoder}.
> > -     *
> > -     * @param session The Mina session.
> > -     * @param in      The raw byte buffer.
> > -     * @param out     The Mina object output gatherer to write
> > decoded objects to.
> > -     *
> > -     * @return <tt>true</tt> if the data was decoded,
> > <tt>false<tt> if more is needed and the data should accumulate.
> > -     *
> > -     * @throws Exception If the data cannot be decoded for any
> > reason.
> > -     */
> > -    protected boolean doDecodeDataBlock(IoSession session,
> > ByteBuffer in, ProtocolDecoderOutput out) throws Exception
> > -    {
> > -        int pos = in.position();
> > -        boolean enoughData =
> > _dataBlockDecoder.decodable(in.buf());
> > -        in.position(pos);
> > -        if (!enoughData)
> > -        {
> > -            // returning false means it will leave the contents in
> > the buffer and
> > -            // call us again when more data has been read
> > -            return false;
> > -        }
> > -        else
> > -        {
> > -            _dataBlockDecoder.decode(session, in, out);
> > -
> > -            return true;
> > -        }
> > -    }
> > -
> > -    /**
> > -     * Decodes an AMQP initiation, delegating the decoding to a
> > {@link ProtocolInitiation.Decoder}.
> > -     *
> > -     * @param session The Mina session.
> > -     * @param in      The raw byte buffer.
> > -     * @param out     The Mina object output gatherer to write
> > decoded objects to.
> > -     *
> > -     * @return <tt>true</tt> if the data was decoded,
> > <tt>false<tt> if more is needed and the data should accumulate.
> > -     *
> > -     * @throws Exception If the data cannot be decoded for any
> > reason.
> > -     */
> > -    private boolean doDecodePI(IoSession session, ByteBuffer in,
> > ProtocolDecoderOutput out) throws Exception
> > -    {
> > -        boolean enoughData = _piDecoder.decodable(in.buf());
> > -        if (!enoughData)
> > -        {
> > -            // returning false means it will leave the contents in
> > the buffer and
> > -            // call us again when more data has been read
> > -            return false;
> > -        }
> > -        else
> > -        {
> > -            ProtocolInitiation pi = new
> > ProtocolInitiation(in.buf());
> > -            out.write(pi);
> >
> > -            return true;
> > -        }
> > -    }
> >
> >     /**
> >      * Sets the protocol initation flag, that determines whether
> >      decoding is handled by the data decoder of the protocol
> > @@ -189,151 +90,168 @@ public class AMQDecoder extends Cumulati
> >         _expectProtocolInitiation = expectProtocolInitiation;
> >     }
> >
> > -
> > -    /**
> > -     * Cumulates content of <tt>in</tt> into internal buffer and
> > forwards
> > -     * decoding request to {@link #doDecode(IoSession, ByteBuffer,
> > ProtocolDecoderOutput)}.
> > -     * <tt>doDecode()</tt> is invoked repeatedly until it returns
> > <tt>false</tt>
> > -     * and the cumulative buffer is compacted after decoding ends.
> > -     *
> > -     * @throws IllegalStateException if your <tt>doDecode()</tt>
> > returned
> > -     *                               <tt>true</tt> not consuming
> > the cumulative buffer.
> > -     */
> > -    public void decode( IoSession session, ByteBuffer in,
> > -                        ProtocolDecoderOutput out ) throws
> > Exception
> > +    private class RemainingByteArrayInputStream extends
> > InputStream
> >     {
> > -        ByteBuffer buf = ( ByteBuffer ) session.getAttribute(
> > BUFFER );
> > -        // if we have a session buffer, append data to that
> > otherwise
> > -        // use the buffer read from the network directly
> > -        if( buf != null )
> > -        {
> > -            buf.put( in );
> > -            buf.flip();
> > -        }
> > -        else
> > +        private int _currentListPos;
> > +        private int _markPos;
> > +
> > +
> > +        @Override
> > +        public int read() throws IOException
> >         {
> > -            buf = in;
> > +            ByteArrayInputStream currentStream =
> > _remainingBufs.get(_currentListPos);
> > +            if(currentStream.available() > 0)
> > +            {
> > +                return currentStream.read();
> > +            }
> > +            else if((_currentListPos == _remainingBufs.size())
> > +                    || (++_currentListPos ==
> > _remainingBufs.size()))
> > +            {
> > +                return -1;
> > +            }
> > +            else
> > +            {
> > +
> > +                ByteArrayInputStream stream =
> > _remainingBufs.get(_currentListPos);
> > +                stream.mark(0);
> > +                return stream.read();
> > +            }
> >         }
> >
> > -        for( ;; )
> > +        @Override
> > +        public int read(final byte[] b, final int off, final int
> > len) throws IOException
> >         {
> > -            int oldPos = buf.position();
> > -            boolean decoded = doDecode( session, buf, out );
> > -            if( decoded )
> > +
> > +            if(_currentListPos == _remainingBufs.size())
> >             {
> > -                if( buf.position() == oldPos )
> > +                return -1;
> > +            }
> > +            else
> > +            {
> > +                ByteArrayInputStream currentStream =
> > _remainingBufs.get(_currentListPos);
> > +                final int available = currentStream.available();
> > +                int read = currentStream.read(b, off, len >
> > available ? available : len);
> > +                if(read < len)
> >                 {
> > -                    throw new IllegalStateException(
> > -                            "doDecode() can't return true when
> > buffer is not consumed." );
> > +                    if(_currentListPos++ != _remainingBufs.size())
> > +                    {
> > +
> >                        _remainingBufs.get(_currentListPos).mark(0);
> > +                    }
> > +                    int correctRead = read == -1 ? 0 : read;
> > +                    int subRead = read(b, off+correctRead,
> > len-correctRead);
> > +                    if(subRead == -1)
> > +                    {
> > +                        return read;
> > +                    }
> > +                    else
> > +                    {
> > +                        return correctRead+subRead;
> > +                    }
> >                 }
> > -
> > -                if( !buf.hasRemaining() )
> > +                else
> >                 {
> > -                    break;
> > +                    return len;
> >                 }
> >             }
> > -            else
> > -            {
> > -                break;
> > -            }
> >         }
> >
> > -        // if there is any data left that cannot be decoded, we
> > store
> > -        // it in a buffer in the session and next time this
> > decoder is
> > -        // invoked the session buffer gets appended to
> > -        if ( buf.hasRemaining() )
> > +        @Override
> > +        public int available() throws IOException
> >         {
> > -            storeRemainingInSession( buf, session );
> > +            int total = 0;
> > +            for(int i = _currentListPos; i <
> > _remainingBufs.size(); i++)
> > +            {
> > +                total += _remainingBufs.get(i).available();
> > +            }
> > +            return total;
> >         }
> > -        else
> > +
> > +        @Override
> > +        public void mark(final int readlimit)
> >         {
> > -            removeSessionBuffer( session );
> > +            _markPos = _currentListPos;
> > +            final ByteArrayInputStream stream =
> > _remainingBufs.get(_currentListPos);
> > +            if(stream != null)
> > +            {
> > +                stream.mark(readlimit);
> > +            }
> >         }
> > -    }
> > -
> > -    /**
> > -     * Releases the cumulative buffer used by the specified
> > <tt>session</tt>.
> > -     * Please don't forget to call <tt>super.dispose( session
> > )</tt> when
> > -     * you override this method.
> > -     */
> > -    public void dispose( IoSession session ) throws Exception
> > -    {
> > -        removeSessionBuffer( session );
> > -    }
> >
> > -    private void removeSessionBuffer(IoSession session)
> > -    {
> > -        ByteBuffer buf = ( ByteBuffer ) session.getAttribute(
> > BUFFER );
> > -        if( buf != null )
> > +        @Override
> > +        public void reset() throws IOException
> >         {
> > -            buf.release();
> > -            session.removeAttribute( BUFFER );
> > +            _currentListPos = _markPos;
> > +            final int size = _remainingBufs.size();
> > +            if(_currentListPos < size)
> > +            {
> > +                _remainingBufs.get(_currentListPos).reset();
> > +            }
> > +            for(int i = _currentListPos+1; i<size; i++)
> > +            {
> > +                _remainingBufs.get(i).reset();
> > +            }
> >         }
> >     }
> >
> > -    private static final SimpleByteBufferAllocator
> > SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
> > -
> > -    private void storeRemainingInSession(ByteBuffer buf, IoSession
> > session)
> > -    {
> > -        ByteBuffer remainingBuf =
> > SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false );
> > -        remainingBuf.setAutoExpand( true );
> > -        remainingBuf.put( buf );
> > -        session.setAttribute( BUFFER, remainingBuf );
> > -    }
> >
> > -    public ArrayList<AMQDataBlock>
> > decodeBuffer(java.nio.ByteBuffer buf) throws
> > AMQFrameDecodingException, AMQProtocolVersionException
> > +    public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf)
> > throws AMQFrameDecodingException, AMQProtocolVersionException,
> > IOException
> >     {
> >
> >         // get prior remaining data from accumulator
> >         ArrayList<AMQDataBlock> dataBlocks = new
> >         ArrayList<AMQDataBlock>();
> > -        ByteBuffer msg;
> > -        // if we have a session buffer, append data to that
> > otherwise
> > -        // use the buffer read from the network directly
> > -        if( _remainingBuf != null )
> > +        DataInputStream msg;
> > +
> > +
> > +        ByteArrayInputStream bais = new
> > ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(),
> > buf.remaining());
> > +        if(!_remainingBufs.isEmpty())
> >         {
> > -            _remainingBuf.put(buf);
> > -            _remainingBuf.flip();
> > -            msg = _remainingBuf;
> > +            _remainingBufs.add(bais);
> > +            msg = new DataInputStream(new
> > RemainingByteArrayInputStream());
> >         }
> >         else
> >         {
> > -            msg = ByteBuffer.wrap(buf);
> > +            msg = new DataInputStream(bais);
> >         }
> > -
> > -        if (_expectProtocolInitiation
> > -            || (firstDecode
> > -                && (msg.remaining() > 0)
> > -                && (msg.get(msg.position()) == (byte)'A')))
> > -        {
> > -            if (_piDecoder.decodable(msg.buf()))
> > -            {
> > -                dataBlocks.add(new ProtocolInitiation(msg.buf()));
> > -            }
> > -        }
> > -        else
> > +
> > +        boolean enoughData = true;
> > +        while (enoughData)
> >         {
> > -            boolean enoughData = true;
> > -            while (enoughData)
> > +            if(!_expectProtocolInitiation)
> >             {
> > -                int pos = msg.position();
> > -
> >                 enoughData = _dataBlockDecoder.decodable(msg);
> > -                msg.position(pos);
> >                 if (enoughData)
> >                 {
> >                     dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory,
> >                     msg));
> >                 }
> > -                else
> > +            }
> > +            else
> > +            {
> > +                enoughData = _piDecoder.decodable(msg);
> > +                if (enoughData)
> >                 {
> > -                    _remainingBuf =
> > SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false);
> > -                    _remainingBuf.setAutoExpand(true);
> > -                    _remainingBuf.put(msg);
> > +                    dataBlocks.add(new ProtocolInitiation(msg));
> > +                }
> > +
> > +            }
> > +
> > +            if(!enoughData)
> > +            {
> > +                if(!_remainingBufs.isEmpty())
> > +                {
> > +
> >                    _remainingBufs.remove(_remainingBufs.size()-1);
> > +                    ListIterator<ByteArrayInputStream> iterator =
> > _remainingBufs.listIterator();
> > +                    while(iterator.hasNext() &&
> > iterator.next().available() == 0)
> > +                    {
> > +                        iterator.remove();
> > +                    }
> > +                }
> > +                if(bais.available()!=0)
> > +                {
> > +                    byte[] remaining = new byte[bais.available()];
> > +                    bais.read(remaining);
> > +                    _remainingBufs.add(new
> > ByteArrayInputStream(remaining));
> >                 }
> >             }
> > -        }
> > -        if(firstDecode && dataBlocks.size() > 0)
> > -        {
> > -            firstDecode = false;
> >         }
> >         return dataBlocks;
> >     }
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
> > Mon Sep 19 15:13:18 2011
> > @@ -23,7 +23,7 @@ package org.apache.qpid.configuration;
> >  */
> >  public class ClientProperties
> >  {
> > -
> > +
> >     /**
> >      * Currently with Qpid it is not possible to change the client
> >      ID.
> >      * If one is not specified upon connection construction, an id
> >      is generated automatically.
> > @@ -68,38 +68,50 @@ public class ClientProperties
> >      * by the broker in TuneOK it will be used as the heartbeat
> >      interval.
> >      * If not a warning will be printed and the max value specified
> >      for
> >      * heartbeat in TuneOK will be used
> > -     *
> > +     *
> >      * The default idle timeout is set to 120 secs
> >      */
> >     public static final String IDLE_TIMEOUT_PROP_NAME =
> >     "idle_timeout";
> >     public static final long DEFAULT_IDLE_TIMEOUT = 120000;
> > -
> > +
> >     public static final String HEARTBEAT = "qpid.heartbeat";
> >     public static final int HEARTBEAT_DEFAULT = 120;
> > -
> > +
> >     /**
> >      * This value will be used to determine the default destination
> >      syntax type.
> >      * Currently the two types are Binding URL (java only) and the
> >      Addressing format (used by
> > -     * all clients).
> > +     * all clients).
> >      */
> >     public static final String DEST_SYNTAX = "qpid.dest_syntax";
> > -
> > +
> >     public static final String USE_LEGACY_MAP_MESSAGE_FORMAT =
> >     "qpid.use_legacy_map_message";
> >
> >     public static final String AMQP_VERSION = "qpid.amqp.version";
> > -
> > -    private static ClientProperties _instance = new
> > ClientProperties();
> > -
> > +
> > +    public static final String QPID_VERIFY_CLIENT_ID =
> > "qpid.verify_client_id";
> > +
> > +    /**
> > +     * System properties to change the default timeout used during
> > +     * synchronous operations.
> > +     */
> > +    public static final String QPID_SYNC_OP_TIMEOUT =
> > "qpid.sync_op_timeout";
> > +    public static final String AMQJ_DEFAULT_SYNCWRITE_TIMEOUT =
> > "amqj.default_syncwrite_timeout";
> > +
> > +    /**
> > +     * A default timeout value for synchronous operations
> > +     */
> > +    public static final int DEFAULT_SYNC_OPERATION_TIMEOUT =
> > 60000;
> > +
> >     /*
> > -    public static final QpidProperty<Boolean>
> >  IGNORE_SET_CLIENTID_PROP_NAME =
> > +    public static final QpidProperty<Boolean>
> >  IGNORE_SET_CLIENTID_PROP_NAME =
> >         QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID");
> > -
> > +
> >     public static final QpidProperty<Boolean>
> >     SYNC_PERSISTENT_PROP_NAME =
> >         QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence");
> > -
> > -
> > +
> > +
> >     public static final QpidProperty<Integer>
> >     MAX_PREFETCH_PROP_NAME =
> >         QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch");
> >         */
> > -
> > -
> > +
> > +
> >  }
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
> > Mon Sep 19 15:13:18 2011
> > @@ -20,7 +20,9 @@
> >  */
> >  package org.apache.qpid.framing;
> >
> > -import org.apache.mina.common.ByteBuffer;
> > +import java.io.DataOutputStream;
> > +import java.io.IOException;
> > +
> >  import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
> >  import org.apache.qpid.AMQException;
> >
> > @@ -34,7 +36,7 @@ public interface AMQBody
> >      */
> >     public abstract int getSize();
> >
> > -    public void writePayload(ByteBuffer buffer);
> > +    public void writePayload(DataOutputStream buffer) throws
> > IOException;
> >
> > -    void handle(final int channelId, final
> > AMQVersionAwareProtocolSession amqMinaProtocolSession) throws
> > AMQException;
> > +    void handle(final int channelId, final
> > AMQVersionAwareProtocolSession amqProtocolSession) throws
> > AMQException;
> >  }
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
> > Mon Sep 19 15:13:18 2011
> > @@ -20,7 +20,10 @@
> >  */
> >  package org.apache.qpid.framing;
> >
> > -import org.apache.mina.common.ByteBuffer;
> > +import java.io.DataInputStream;
> > +import java.io.DataOutputStream;
> > +import java.io.IOException;
> > +
> >
> >  /**
> >  * A data block represents something that has a size in bytes and
> >  the ability to write itself to a byte
> > @@ -39,25 +42,6 @@ public abstract class AMQDataBlock imple
> >      * Writes the datablock to the specified buffer.
> >      * @param buffer
> >      */
> > -    public abstract void writePayload(ByteBuffer buffer);
> > -
> > -    public ByteBuffer toByteBuffer()
> > -    {
> > -        final ByteBuffer buffer =
> > ByteBuffer.allocate((int)getSize());
> > -
> > -        writePayload(buffer);
> > -        buffer.flip();
> > -        return buffer;
> > -    }
> > -
> > -    public java.nio.ByteBuffer toNioByteBuffer()
> > -    {
> > -        final java.nio.ByteBuffer buffer =
> > java.nio.ByteBuffer.allocate((int) getSize());
> > -
> > -        ByteBuffer buf = ByteBuffer.wrap(buffer);
> > -        writePayload(buf);
> > -        buffer.flip();
> > -        return buffer;
> > -    }
> > +    public abstract void writePayload(DataOutputStream buffer)
> > throws IOException;
> >
> >  }
> >
> > Modified:
> > qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
> > URL:
> > http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> > ==============================================================================
> > ---
> > qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
> > (original)
> > +++
> > qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
> > Mon Sep 19 15:13:18 2011
> > @@ -20,18 +20,14 @@
> >  */
> >  package org.apache.qpid.framing;
> >
> > -import org.apache.mina.common.ByteBuffer;
> > -import org.apache.mina.common.IoSession;
> > -import org.apache.mina.filter.codec.ProtocolDecoderOutput;
> > -
> > -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
> > -
> >  import org.slf4j.Logger;
> >  import org.slf4j.LoggerFactory;
> >
> > +import java.io.DataInputStream;
> > +import java.io.IOException;
> > +
> >  public class AMQDataBlockDecoder
> >  {
> > -    private static final String SESSION_METHOD_BODY_FACTORY =
> > "QPID_SESSION_METHOD_BODY_FACTORY";
> >
> >     private static final BodyFactory[] _bodiesSupported = new
> >     BodyFactory[Byte.MAX_VALUE];
> >
> > @@ -47,27 +43,32 @@ public class AMQDataBlockDecoder
> >     public AMQDataBlockDecoder()
> >     { }
> >
> > -    public boolean decodable(java.nio.ByteBuffer in) throws
> > AMQFrameDecodingException
> > +    public boolean decodable(DataInputStream in) throws
> > AMQFrameDecodingException, IOException
> >     {
> > -        final int remainingAfterAttributes = in.remaining() - (1 +
> > 2 + 4 + 1);
> > +        final int remainingAfterAttributes = in.available() - (1 +
> > 2 + 4 + 1);
> >         // type, channel, body length and end byte
> >         if (remainingAfterAttributes < 0)
> >         {
> >             return false;
> >         }
> >
> > -        in.position(in.position() + 1 + 2);
> > +        in.mark(8);
> > +        in.skip(1 + 2);
> > +
> > +
> >         // Get an unsigned int, lifted from MINA ByteBuffer
> >         getUnsignedInt()
> > -        final long bodySize = in.getInt() & 0xffffffffL;
> > +        final long bodySize = in.readInt() & 0xffffffffL;
> > +
> > +        in.reset();
> >
> >         return (remainingAfterAttributes >= bodySize);
> >
> >     }
> >
> > -    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory
> > methodBodyFactory, ByteBuffer in)
> > -        throws AMQFrameDecodingException,
> > AMQProtocolVersionException
> > +    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory
> > methodBodyFactory, DataInputStream in)
> > +            throws AMQFrameDecodingException,
> > AMQProtocolVersionException, IOException
> >     {
> > -        final byte type = in.get();
> > +        final byte type = in.readByte();
> >
> >         BodyFactory bodyFactory;
> >         if (type == AMQMethodBody.TYPE)
> > @@ -84,8 +85,8 @@ public class AMQDataBlockDecoder
> >             throw new AMQFrameDecodingException(null, "Unsupported
> >             frame type: " + type, null);
> >         }
> >
> > -        final int channel = in.getUnsignedShort();
> > -        final long bodySize = in.getUnsignedInt();
> > +        final int channel = in.readUnsignedShort();
> > +        final long bodySize =
> > EncodingUtils.readUnsignedInteger(in);
> >
> >         // bodySize can be zero
> >         if ((channel < 0) || (bodySize < 0))
> > @@ -96,7 +97,7 @@ public class AMQDataBlockDecoder
> >
> >         AMQFrame frame = new AMQFrame(in, channel, bodySize,
> >         bodyFactory);
> >
> > -        byte marker = in.get();
> > +        byte marker = in.readByte();
> >         if ((marker & 0xFF) != 0xCE)
> >         {
> >             throw new AMQFrameDecodingException(null, "End of frame
> >             marker not found. Read " + marker + " length=" +
> >             bodySize
> > @@ -106,26 +107,4 @@ public class AMQDataBlockDecoder
> >         return frame;
> >     }
> >
> > -    public void decode(IoSession session, ByteBuffer in,
> > ProtocolDecoderOutput out) throws Exception
> > -    {
> > -        AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory)
> > session.getAttribute(SESSION_METHOD_BODY_FACTORY);
> > -        if (bodyFactory == null)
> > -        {
> > -            AMQVersionAwareProtocolSession protocolSession =
> > (AMQVersionAwareProtocolSession) session.getAttachment();
> > -            bodyFactory = new
> > AMQMethodBodyFactory(protocolSession);
> > -            session.setAttribute(SESSION_METHOD_BODY_FACTORY,
> > bodyFactory);
> > -        }
> > -
> > -        out.write(createAndPopulateFrame(bodyFactory, in));
> > -    }
> > -
> > -    public boolean decodable(ByteBuffer msg) throws
> > AMQFrameDecodingException
> > -    {
> > -        return decodable(msg.buf());
> > -    }
> > -
> > -    public AMQDataBlock
> > createAndPopulateFrame(AMQMethodBodyFactory factory,
> > java.nio.ByteBuffer msg) throws AMQProtocolVersionException,
> > AMQFrameDecodingException
> > -    {
> > -        return createAndPopulateFrame(factory,
> > ByteBuffer.wrap(msg));
> > -    }
> >  }
> >
> >
> >
> > ---------------------------------------------------------------------
> > Apache Qpid - AMQP Messaging Implementation
> > Project:      http://qpid.apache.org
> > Use/Interact: mailto:commits-subscribe@qpid.apache.org
> >
> >
> 

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Mime
View raw message