qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ken Giusti <kgiu...@redhat.com>
Subject Re: svn commit: r1172657 [14/21] - in /qpid/branches/qpid-3346/qpid: ./ cpp/ cpp/bindings/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qpid/dotnet/ cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/Properties/ cpp/bindings/qpid/dotnet/example
Date Mon, 19 Sep 2011 16:07:16 GMT
Rajith,

Yes, not on trunk, but...

Actually, there is a problem with that branch due to that merge.  Involves java - for some reason that branch still contains a "mina" tree in 

java/common/src/test/java/org/apache/qpid/transport/network

even though it no longer appears on trunk.

In any case, I'll avoid merging anything from the java subtree of that branch back to trunk, when the time comes.

thanks,

-K


----- Original Message -----
> Ken,
> 
> Apologies, I didn't realize that you were merging from trunk not into
> it :)
> Thanks Gordon for pointing it out.
> 
> Regards,
> 
> Rajith
> 
> On Mon, Sep 19, 2011 at 11:48 AM, Rajith Attapattu
> <rajith77@gmail.com> wrote:
> > Ken,
> >
> > I assume this wasn't intentional and was merely a side effect of
> > the merge ?
> > I didn't look at the diff very closely, but I wonder what the
> > implication of these changes would be.
> >
> > Looking further it seems a lot of classes on the java client and
> > broker have changed.
> > I think in order to be on the safe side, these changes should be
> > backed out.
> > If I am not mistaken that would not impact anything as I believe
> > your
> > work didn't involve the java side.
> >
> > Rajith
> >
> > On Mon, Sep 19, 2011 at 11:13 AM,  <kgiusti@apache.org> wrote:
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -23,7 +23,8 @@ package org.apache.qpid.client.message;
> >>  import javax.jms.JMSException;
> >>  import javax.jms.StreamMessage;
> >>
> >> -import org.apache.mina.common.ByteBuffer;
> >> +import java.nio.ByteBuffer;
> >> +
> >>  import org.apache.qpid.AMQException;
> >>  import org.apache.qpid.framing.AMQShortString;
> >>  import org.apache.qpid.framing.BasicContentHeaderProperties;
> >> @@ -36,65 +37,76 @@ public class JMSStreamMessage extends Ab
> >>     public static final String MIME_TYPE="jms/stream-message";
> >>
> >>
> >> -
> >> -    /**
> >> -     * This is set when reading a byte array. The
> >> readBytes(byte[]) method supports multiple calls to read
> >> -     * a byte array in multiple chunks, hence this is used to
> >> track how much is left to be read
> >> -     */
> >> -    private int _byteArrayRemaining = -1;
> >> +    private TypedBytesContentReader _typedBytesContentReader;
> >> +    private TypedBytesContentWriter _typedBytesContentWriter;
> >>
> >>     public JMSStreamMessage(AMQMessageDelegateFactory
> >>     delegateFactory)
> >>     {
> >> -        this(delegateFactory,null);
> >> +        super(delegateFactory,false);
> >> +        _typedBytesContentWriter = new TypedBytesContentWriter();
> >>
> >>     }
> >>
> >> -    /**
> >> -     * Construct a stream message with existing data.
> >> -     *
> >> -     * @param delegateFactory
> >> -     * @param data the data that comprises this message. If data
> >> is null, you get a 1024 byte buffer that is
> >> -     */
> >> -    JMSStreamMessage(AMQMessageDelegateFactory delegateFactory,
> >> ByteBuffer data)
> >> -    {
> >>
> >> -        super(delegateFactory, data); // this instanties a
> >> content header
> >> -    }
> >>
> >>     JMSStreamMessage(AMQMessageDelegate delegate, ByteBuffer data)
> >>     throws AMQException
> >>     {
> >> -
> >> -        super(delegate, data);
> >> +        super(delegate, data!=null);
> >> +        _typedBytesContentReader = new
> >> TypedBytesContentReader(data);
> >>     }
> >>
> >> -
> >>     public void reset()
> >>     {
> >> -        super.reset();
> >>         _readableMessage = true;
> >> +
> >> +        if(_typedBytesContentReader != null)
> >> +        {
> >> +            _typedBytesContentReader.reset();
> >> +        }
> >> +        else if (_typedBytesContentWriter != null)
> >> +        {
> >> +            _typedBytesContentReader = new
> >> TypedBytesContentReader(_typedBytesContentWriter.getData());
> >> +        }
> >> +    }
> >> +
> >> +    @Override
> >> +    public void clearBody() throws JMSException
> >> +    {
> >> +        super.clearBody();
> >> +        _typedBytesContentReader = null;
> >> +        _typedBytesContentWriter = new TypedBytesContentWriter();
> >> +
> >>     }
> >>
> >> +
> >>     protected String getMimeType()
> >>     {
> >>         return MIME_TYPE;
> >>     }
> >>
> >> -
> >> +    @Override
> >> +    public java.nio.ByteBuffer getData() throws JMSException
> >> +    {
> >> +        return _typedBytesContentWriter == null ?
> >> _typedBytesContentReader.getData() :
> >> _typedBytesContentWriter.getData();
> >> +    }
> >>
> >>     public boolean readBoolean() throws JMSException
> >>     {
> >> -        return super.readBoolean();
> >> +        checkReadable();
> >> +        return _typedBytesContentReader.readBoolean();
> >>     }
> >>
> >>
> >>     public byte readByte() throws JMSException
> >>     {
> >> -        return super.readByte();
> >> +        checkReadable();
> >> +        return _typedBytesContentReader.readByte();
> >>     }
> >>
> >>     public short readShort() throws JMSException
> >>     {
> >> -        return super.readShort();
> >> +        checkReadable();
> >> +        return _typedBytesContentReader.readShort();
> >>     }
> >>
> >>     /**
> >> @@ -105,102 +117,127 @@ public class JMSStreamMessage extends Ab
> >>      */
> >>     public char readChar() throws JMSException
> >>     {
> >> -        return super.readChar();
> >> +        checkReadable();
> >> +        return _typedBytesContentReader.readChar();
> >>     }
> >>
> >>     public int readInt() throws JMSException
> >>     {
> >> -        return super.readInt();
> >> +        checkReadable();
> >> +        return _typedBytesContentReader.readInt();
> >>     }
> >>
> >>     public long readLong() throws JMSException
> >>     {
> >> -        return super.readLong();
> >> +        checkReadable();
> >> +        return _typedBytesContentReader.readLong();
> >>     }
> >>
> >>     public float readFloat() throws JMSException
> >>     {
> >> -        return super.readFloat();
> >> +        checkReadable();
> >> +        return _typedBytesContentReader.readFloat();
> >>     }
> >>
> >>     public double readDouble() throws JMSException
> >>     {
> >> -        return super.readDouble();
> >> +        checkReadable();
> >> +        return _typedBytesContentReader.readDouble();
> >>     }
> >>
> >>     public String readString() throws JMSException
> >>     {
> >> -        return super.readString();
> >> +        checkReadable();
> >> +        return _typedBytesContentReader.readString();
> >>     }
> >>
> >>     public int readBytes(byte[] bytes) throws JMSException
> >>     {
> >> -        return super.readBytes(bytes);
> >> +        if(bytes == null)
> >> +        {
> >> +            throw new IllegalArgumentException("Must provide
> >> non-null array to read into");
> >> +        }
> >> +
> >> +        checkReadable();
> >> +        return _typedBytesContentReader.readBytes(bytes);
> >>     }
> >>
> >>
> >>     public Object readObject() throws JMSException
> >>     {
> >> -        return super.readObject();
> >> +        checkReadable();
> >> +        return _typedBytesContentReader.readObject();
> >>     }
> >>
> >>     public void writeBoolean(boolean b) throws JMSException
> >>     {
> >> -        super.writeBoolean(b);
> >> +        checkWritable();
> >> +        _typedBytesContentWriter.writeBoolean(b);
> >>     }
> >>
> >>     public void writeByte(byte b) throws JMSException
> >>     {
> >> -        super.writeByte(b);
> >> +        checkWritable();
> >> +        _typedBytesContentWriter.writeByte(b);
> >>     }
> >>
> >>     public void writeShort(short i) throws JMSException
> >>     {
> >> -        super.writeShort(i);
> >> +        checkWritable();
> >> +        _typedBytesContentWriter.writeShort(i);
> >>     }
> >>
> >>     public void writeChar(char c) throws JMSException
> >>     {
> >> -        super.writeChar(c);
> >> +        checkWritable();
> >> +        _typedBytesContentWriter.writeChar(c);
> >>     }
> >>
> >>     public void writeInt(int i) throws JMSException
> >>     {
> >> -        super.writeInt(i);
> >> +        checkWritable();
> >> +        _typedBytesContentWriter.writeInt(i);
> >>     }
> >>
> >>     public void writeLong(long l) throws JMSException
> >>     {
> >> -        super.writeLong(l);
> >> +        checkWritable();
> >> +        _typedBytesContentWriter.writeLong(l);
> >>     }
> >>
> >>     public void writeFloat(float v) throws JMSException
> >>     {
> >> -        super.writeFloat(v);
> >> +        checkWritable();
> >> +        _typedBytesContentWriter.writeFloat(v);
> >>     }
> >>
> >>     public void writeDouble(double v) throws JMSException
> >>     {
> >> -        super.writeDouble(v);
> >> +        checkWritable();
> >> +        _typedBytesContentWriter.writeDouble(v);
> >>     }
> >>
> >>     public void writeString(String string) throws JMSException
> >>     {
> >> -        super.writeString(string);
> >> +        checkWritable();
> >> +        _typedBytesContentWriter.writeString(string);
> >>     }
> >>
> >>     public void writeBytes(byte[] bytes) throws JMSException
> >>     {
> >> -        super.writeBytes(bytes);
> >> +        checkWritable();
> >> +        _typedBytesContentWriter.writeBytes(bytes);
> >>     }
> >>
> >>     public void writeBytes(byte[] bytes, int offset, int length)
> >>     throws JMSException
> >>     {
> >> -        super.writeBytes(bytes,offset,length);
> >> +        checkWritable();
> >> +        _typedBytesContentWriter.writeBytes(bytes, offset,
> >> length);
> >>     }
> >>
> >>     public void writeObject(Object object) throws JMSException
> >>     {
> >> -        super.writeObject(object);
> >> +        checkWritable();
> >> +        _typedBytesContentWriter.writeObject(object);
> >>     }
> >>  }
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -22,10 +22,9 @@ package org.apache.qpid.client.message;
> >>
> >>  import javax.jms.JMSException;
> >>
> >> -import org.apache.mina.common.ByteBuffer;
> >> +import java.nio.ByteBuffer;
> >> +
> >>  import org.apache.qpid.AMQException;
> >> -import org.apache.qpid.framing.AMQShortString;
> >> -import org.apache.qpid.framing.BasicContentHeaderProperties;
> >>
> >>  public class JMSStreamMessageFactory extends
> >>  AbstractJMSMessageFactory
> >>  {
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -20,15 +20,21 @@
> >>  */
> >>  package org.apache.qpid.client.message;
> >>
> >> +import java.io.DataInputStream;
> >>  import java.io.UnsupportedEncodingException;
> >> +import java.nio.ByteBuffer;
> >> +import java.nio.CharBuffer;
> >>  import java.nio.charset.CharacterCodingException;
> >>  import java.nio.charset.Charset;
> >> +import java.nio.charset.CharsetDecoder;
> >> +import java.nio.charset.CharsetEncoder;
> >>
> >>  import javax.jms.JMSException;
> >> +import javax.jms.MessageFormatException;
> >>
> >> -import org.apache.mina.common.ByteBuffer;
> >>  import org.apache.qpid.AMQException;
> >>  import org.apache.qpid.client.CustomJMSXProperty;
> >> +import org.apache.qpid.framing.AMQFrameDecodingException;
> >>  import org.apache.qpid.framing.AMQShortString;
> >>  import org.apache.qpid.framing.BasicContentHeaderProperties;
> >>  import org.apache.qpid.util.Strings;
> >> @@ -37,6 +43,7 @@ public class JMSTextMessage extends Abst
> >>  {
> >>     private static final String MIME_TYPE = "text/plain";
> >>
> >> +    private Exception _exception;
> >>     private String _decodedValue;
> >>
> >>     /**
> >> @@ -45,36 +52,41 @@ public class JMSTextMessage extends Abst
> >>     private static final String PAYLOAD_NULL_PROPERTY =
> >>     CustomJMSXProperty.JMS_AMQP_NULL.toString();
> >>     private static final Charset DEFAULT_CHARSET =
> >>     Charset.forName("UTF-8");
> >>
> >> -    public JMSTextMessage(AMQMessageDelegateFactory
> >> delegateFactory) throws JMSException
> >> -    {
> >> -        this(delegateFactory, null, null);
> >> -    }
> >> +    private CharsetDecoder _decoder =
> >> DEFAULT_CHARSET.newDecoder();
> >> +    private CharsetEncoder _encoder =
> >> DEFAULT_CHARSET.newEncoder();
> >> +
> >> +    private static final ByteBuffer EMPTY_BYTE_BUFFER =
> >> ByteBuffer.allocate(0);
> >>
> >> -    JMSTextMessage(AMQMessageDelegateFactory delegateFactory,
> >> ByteBuffer data, String encoding) throws JMSException
> >> +    public JMSTextMessage(AMQMessageDelegateFactory
> >> delegateFactory) throws JMSException
> >>     {
> >> -        super(delegateFactory, data); // this instantiates a
> >> content header
> >> -        setContentType(getMimeType());
> >> -        setEncoding(encoding);
> >> +        super(delegateFactory, false); // this instantiates a
> >> content header
> >>     }
> >>
> >>     JMSTextMessage(AMQMessageDelegate delegate, ByteBuffer data)
> >>             throws AMQException
> >>     {
> >> -        super(delegate, data);
> >> -        setContentType(getMimeType());
> >> -        _data = data;
> >> -    }
> >> +        super(delegate, data!=null);
> >>
> >> -
> >> -    public void clearBodyImpl() throws JMSException
> >> -    {
> >> -        if (_data != null)
> >> +        try
> >>         {
> >> -            _data.release();
> >> -            _data = null;
> >> +            if(propertyExists(PAYLOAD_NULL_PROPERTY))
> >> +            {
> >> +                _decodedValue = null;
> >> +            }
> >> +            else
> >> +            {
> >> +                _decodedValue = _decoder.decode(data).toString();
> >> +            }
> >> +        }
> >> +        catch (CharacterCodingException e)
> >> +        {
> >> +            _exception = e;
> >> +        }
> >> +        catch (JMSException e)
> >> +        {
> >> +            _exception = e;
> >>         }
> >>
> >> -        _decodedValue = null;
> >>     }
> >>
> >>     public String toBodyString() throws JMSException
> >> @@ -87,95 +99,62 @@ public class JMSTextMessage extends Abst
> >>         return MIME_TYPE;
> >>     }
> >>
> >> -    public void setText(String text) throws JMSException
> >> +    @Override
> >> +    public ByteBuffer getData() throws JMSException
> >>     {
> >> -        checkWritable();
> >> -
> >> -        clearBody();
> >> +        _encoder.reset();
> >>         try
> >>         {
> >> -            if (text != null)
> >> +            if(_exception != null)
> >> +            {
> >> +                final MessageFormatException
> >> messageFormatException = new MessageFormatException("Cannot
> >> decode original message");
> >> +
> >>                messageFormatException.setLinkedException(_exception);
> >> +                throw messageFormatException;
> >> +            }
> >> +            else if(_decodedValue == null)
> >> +            {
> >> +                return EMPTY_BYTE_BUFFER;
> >> +            }
> >> +            else
> >>             {
> >> -                final String encoding = getEncoding();
> >> -                if (encoding == null ||
> >> encoding.equalsIgnoreCase("UTF-8"))
> >> -                {
> >> -                    _data =
> >> ByteBuffer.wrap(Strings.toUTF8(text));
> >> -                    setEncoding("UTF-8");
> >> -                }
> >> -                else
> >> -                {
> >> -                    _data =
> >> ByteBuffer.wrap(text.getBytes(encoding));
> >> -                }
> >> -                _data.position(_data.limit());
> >> -                _changedData=true;
> >> +                return
> >> _encoder.encode(CharBuffer.wrap(_decodedValue));
> >>             }
> >> -            _decodedValue = text;
> >>         }
> >> -        catch (UnsupportedEncodingException e)
> >> +        catch (CharacterCodingException e)
> >>         {
> >> -            // should never occur
> >> -            JMSException jmse = new JMSException("Unable to
> >> decode text data");
> >> -            jmse.setLinkedException(e);
> >> -            jmse.initCause(e);
> >> -            throw jmse;
> >> +            final JMSException jmsException = new
> >> JMSException("Cannot encode string in UFT-8: " + _decodedValue);
> >> +            jmsException.setLinkedException(e);
> >> +            throw jmsException;
> >>         }
> >>     }
> >>
> >> -    public String getText() throws JMSException
> >> +    @Override
> >> +    public void clearBody() throws JMSException
> >>     {
> >> -        if (_data == null && _decodedValue == null)
> >> -        {
> >> -            return null;
> >> -        }
> >> -        else if (_decodedValue != null)
> >> -        {
> >> -            return _decodedValue;
> >> -        }
> >> -        else
> >> -        {
> >> -            _data.rewind();
> >> +        super.clearBody();
> >> +        _decodedValue = null;
> >> +        _exception = null;
> >> +    }
> >>
> >> -            if (propertyExists(PAYLOAD_NULL_PROPERTY) &&
> >> getBooleanProperty(PAYLOAD_NULL_PROPERTY))
> >> -            {
> >> -                return null;
> >> -            }
> >> -            if (getEncoding() != null)
> >> -            {
> >> -                try
> >> -                {
> >> -                    _decodedValue =
> >> _data.getString(Charset.forName(getEncoding()).newDecoder());
> >> -                }
> >> -                catch (CharacterCodingException e)
> >> -                {
> >> -                    JMSException jmse = new JMSException("Could
> >> not decode string data: " + e);
> >> -                    jmse.setLinkedException(e);
> >> -                    jmse.initCause(e);
> >> -                    throw jmse;
> >> -                }
> >> -            }
> >> -            else
> >> -            {
> >> -                try
> >> -                {
> >> -                    _decodedValue =
> >> _data.getString(DEFAULT_CHARSET.newDecoder());
> >> -                }
> >> -                catch (CharacterCodingException e)
> >> -                {
> >> -                    JMSException jmse = new JMSException("Could
> >> not decode string data: " + e);
> >> -                    jmse.setLinkedException(e);
> >> -                    jmse.initCause(e);
> >> -                    throw jmse;
> >> -                }
> >> -            }
> >> -            return _decodedValue;
> >> -        }
> >> +    public void setText(String text) throws JMSException
> >> +    {
> >> +        checkWritable();
> >> +
> >> +        clearBody();
> >> +        _decodedValue = text;
> >> +
> >> +    }
> >> +
> >> +    public String getText() throws JMSException
> >> +    {
> >> +        return _decodedValue;
> >>     }
> >>
> >>     @Override
> >>     public void prepareForSending() throws JMSException
> >>     {
> >>         super.prepareForSending();
> >> -        if (_data == null)
> >> +        if (_decodedValue == null)
> >>         {
> >>             setBooleanProperty(PAYLOAD_NULL_PROPERTY, true);
> >>         }
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -22,7 +22,7 @@ package org.apache.qpid.client.message;
> >>
> >>  import javax.jms.JMSException;
> >>
> >> -import org.apache.mina.common.ByteBuffer;
> >> +import java.nio.ByteBuffer;
> >>  import org.apache.qpid.AMQException;
> >>  import org.apache.qpid.framing.AMQShortString;
> >>  import org.apache.qpid.framing.BasicContentHeaderProperties;
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -87,9 +87,9 @@ public class UnprocessedMessage_0_8 exte
> >>     public void receiveBody(ContentBody body)
> >>     {
> >>
> >> -        if (body.payload != null)
> >> +        if (body._payload != null)
> >>         {
> >> -            final long payloadSize = body.payload.remaining();
> >> +            final long payloadSize = body._payload.length;
> >>
> >>             if (_bodies == null)
> >>             {
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -20,7 +20,9 @@
> >>  */
> >>  package org.apache.qpid.client.protocol;
> >>
> >> +import java.io.DataOutputStream;
> >>  import java.io.IOException;
> >> +import java.io.OutputStream;
> >>  import java.net.SocketAddress;
> >>  import java.nio.ByteBuffer;
> >>  import java.util.ArrayList;
> >> @@ -31,7 +33,6 @@ import java.util.concurrent.CountDownLat
> >>  import java.util.concurrent.ThreadFactory;
> >>  import java.util.concurrent.TimeUnit;
> >>
> >> -import org.apache.mina.filter.codec.ProtocolCodecException;
> >>  import org.apache.qpid.AMQConnectionClosedException;
> >>  import org.apache.qpid.AMQDisconnectedException;
> >>  import org.apache.qpid.AMQException;
> >> @@ -46,6 +47,7 @@ import org.apache.qpid.client.state.AMQS
> >>  import org.apache.qpid.client.state.StateWaiter;
> >>  import
> >>  org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
> >>  import org.apache.qpid.codec.AMQCodecFactory;
> >> +import org.apache.qpid.configuration.ClientProperties;
> >>  import org.apache.qpid.framing.AMQBody;
> >>  import org.apache.qpid.framing.AMQDataBlock;
> >>  import org.apache.qpid.framing.AMQFrame;
> >> @@ -57,8 +59,6 @@ import org.apache.qpid.framing.Heartbeat
> >>  import org.apache.qpid.framing.MethodRegistry;
> >>  import org.apache.qpid.framing.ProtocolInitiation;
> >>  import org.apache.qpid.framing.ProtocolVersion;
> >> -import org.apache.qpid.pool.Job;
> >> -import org.apache.qpid.pool.ReferenceCountingExecutorService;
> >>  import org.apache.qpid.protocol.AMQConstant;
> >>  import org.apache.qpid.protocol.AMQMethodEvent;
> >>  import org.apache.qpid.protocol.AMQMethodListener;
> >> @@ -164,19 +164,19 @@ public class AMQProtocolHandler implemen
> >>     private FailoverException _lastFailoverException;
> >>
> >>     /** Defines the default timeout to use for synchronous
> >>     protocol commands. */
> >> -    private final long DEFAULT_SYNC_TIMEOUT =
> >> Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30);
> >> +    private final long DEFAULT_SYNC_TIMEOUT =
> >> Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
> >> +
> >>                                                           Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
> >> +
> >>                                                                        ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
> >>
> >>     /** Object to lock on when changing the latch */
> >>     private Object _failoverLatchChange = new Object();
> >>     private AMQCodecFactory _codecFactory;
> >> -    private Job _readJob;
> >> -    private Job _writeJob;
> >> -    private ReferenceCountingExecutorService _poolReference =
> >> ReferenceCountingExecutorService.getInstance();
> >> +
> >>     private ProtocolVersion _suggestedProtocolVersion;
> >>
> >>     private long _writtenBytes;
> >>     private long _readBytes;
> >> -    private NetworkTransport _transport;
> >> +
> >>     private NetworkConnection _network;
> >>     private Sender<ByteBuffer> _sender;
> >>
> >> @@ -191,24 +191,6 @@ public class AMQProtocolHandler implemen
> >>         _protocolSession = new AMQProtocolSession(this,
> >>         _connection);
> >>         _stateManager = new AMQStateManager(_protocolSession);
> >>         _codecFactory = new AMQCodecFactory(false,
> >>         _protocolSession);
> >> -        _poolReference.setThreadFactory(new ThreadFactory()
> >> -        {
> >> -
> >> -            public Thread newThread(final Runnable runnable)
> >> -            {
> >> -                try
> >> -                {
> >> -                    return
> >> Threading.getThreadFactory().createThread(runnable);
> >> -                }
> >> -                catch (Exception e)
> >> -                {
> >> -                    throw new RuntimeException("Failed to create
> >> thread", e);
> >> -                }
> >> -            }
> >> -        });
> >> -        _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS,
> >> true);
> >> -        _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS,
> >> false);
> >> -        _poolReference.acquireExecutorService();
> >>         _failoverHandler = new FailoverHandler(this);
> >>     }
> >>
> >> @@ -329,17 +311,7 @@ public class AMQProtocolHandler implemen
> >>             }
> >>             else
> >>             {
> >> -
> >> -                if (cause instanceof ProtocolCodecException)
> >> -                {
> >> -                    _logger.info("Protocol Exception caught NOT
> >> going to attempt failover as " +
> >> -                                 "cause isn't
> >> AMQConnectionClosedException: " + cause, cause);
> >> -
> >> -                    AMQException amqe = new
> >> AMQException("Protocol handler error: " + cause, cause);
> >> -                    propagateExceptionToAllWaiters(amqe);
> >> -                }
> >>                 _connection.exceptionReceived(cause);
> >> -
> >>             }
> >>
> >>             // FIXME Need to correctly handle other exceptions.
> >>             Things like ...
> >> @@ -433,76 +405,63 @@ public class AMQProtocolHandler implemen
> >>
> >>     public void received(ByteBuffer msg)
> >>     {
> >> +        _readBytes += msg.remaining();
> >>         try
> >>         {
> >> -            _readBytes += msg.remaining();
> >>             final ArrayList<AMQDataBlock> dataBlocks =
> >>             _codecFactory.getDecoder().decodeBuffer(msg);
> >>
> >> -            Job.fireAsynchEvent(_poolReference.getPool(),
> >> _readJob, new Runnable()
> >> +            // Decode buffer
> >> +
> >> +            for (AMQDataBlock message : dataBlocks)
> >>             {
> >>
> >> -                public void run()
> >> -                {
> >> -                    // Decode buffer
> >> +                    if (PROTOCOL_DEBUG)
> >> +                    {
> >> +                        _protocolLogger.info(String.format("RECV:
> >> [%s] %s", this, message));
> >> +                    }
> >>
> >> -                    for (AMQDataBlock message : dataBlocks)
> >> +                    if(message instanceof AMQFrame)
> >>                     {
> >> +                        final boolean debug =
> >> _logger.isDebugEnabled();
> >> +                        final long msgNumber =
> >> ++_messageReceivedCount;
> >>
> >> -                        try
> >> -                        {
> >> -                            if (PROTOCOL_DEBUG)
> >> -                            {
> >> -
> >>                                _protocolLogger.info(String.format("RECV:
> >> [%s] %s", this, message));
> >> -                            }
> >> -
> >> -                            if(message instanceof AMQFrame)
> >> -                            {
> >> -                                final boolean debug =
> >> _logger.isDebugEnabled();
> >> -                                final long msgNumber =
> >> ++_messageReceivedCount;
> >> -
> >> -                                if (debug && ((msgNumber % 1000)
> >> == 0))
> >> -                                {
> >> -                                    _logger.debug("Received " +
> >> _messageReceivedCount + " protocol messages");
> >> -                                }
> >> -
> >> -                                AMQFrame frame = (AMQFrame)
> >> message;
> >> -
> >> -                                final AMQBody bodyFrame =
> >> frame.getBodyFrame();
> >> -
> >> -
> >>                                HeartbeatDiagnostics.received(bodyFrame
> >> instanceof HeartbeatBody);
> >> -
> >> -
> >>                                bodyFrame.handle(frame.getChannel(),
> >> _protocolSession);
> >> -
> >> -
> >>                                _connection.bytesReceived(_readBytes);
> >> -                            }
> >> -                            else if (message instanceof
> >> ProtocolInitiation)
> >> -                            {
> >> -                                // We get here if the server
> >> sends a response to our initial protocol header
> >> -                                // suggesting an alternate
> >> ProtocolVersion; the server will then close the
> >> -                                // connection.
> >> -                                ProtocolInitiation protocolInit =
> >> (ProtocolInitiation) message;
> >> -                                _suggestedProtocolVersion =
> >> protocolInit.checkVersion();
> >> -                                _logger.info("Broker suggested
> >> using protocol version:" + _suggestedProtocolVersion);
> >> -
> >> -                                // get round a bug in old
> >> versions of qpid whereby the connection is not closed
> >> -
> >>                                _stateManager.changeState(AMQState.CONNECTION_CLOSED);
> >> -                            }
> >> -                        }
> >> -                        catch (Exception e)
> >> +                        if (debug && ((msgNumber % 1000) == 0))
> >>                         {
> >> -                            _logger.error("Exception processing
> >> frame", e);
> >> -
> >>                            propagateExceptionToFrameListeners(e);
> >> -                            exception(e);
> >> +                            _logger.debug("Received " +
> >> _messageReceivedCount + " protocol messages");
> >>                         }
> >> +
> >> +                        AMQFrame frame = (AMQFrame) message;
> >> +
> >> +                        final AMQBody bodyFrame =
> >> frame.getBodyFrame();
> >> +
> >> +                        HeartbeatDiagnostics.received(bodyFrame
> >> instanceof HeartbeatBody);
> >> +
> >> +                        bodyFrame.handle(frame.getChannel(),
> >> _protocolSession);
> >> +
> >> +                        _connection.bytesReceived(_readBytes);
> >> +                    }
> >> +                    else if (message instanceof
> >> ProtocolInitiation)
> >> +                    {
> >> +                        // We get here if the server sends a
> >> response to our initial protocol header
> >> +                        // suggesting an alternate
> >> ProtocolVersion; the server will then close the
> >> +                        // connection.
> >> +                        ProtocolInitiation protocolInit =
> >> (ProtocolInitiation) message;
> >> +                        _suggestedProtocolVersion =
> >> protocolInit.checkVersion();
> >> +                        _logger.info("Broker suggested using
> >> protocol version:" + _suggestedProtocolVersion);
> >> +
> >> +                        // get round a bug in old versions of
> >> qpid whereby the connection is not closed
> >> +
> >>                        _stateManager.changeState(AMQState.CONNECTION_CLOSED);
> >>                     }
> >>                 }
> >> -            });
> >>         }
> >>         catch (Exception e)
> >>         {
> >> +            _logger.error("Exception processing frame", e);
> >>             propagateExceptionToFrameListeners(e);
> >>             exception(e);
> >>         }
> >> +
> >> +
> >>     }
> >>
> >>     public void methodBodyReceived(final int channelId, final
> >>     AMQBody bodyFrame)
> >> @@ -568,17 +527,13 @@ public class AMQProtocolHandler implemen
> >>         writeFrame(frame, false);
> >>     }
> >>
> >> -    public void writeFrame(AMQDataBlock frame, boolean wait)
> >> +    public  synchronized void writeFrame(AMQDataBlock frame,
> >> boolean wait)
> >>     {
> >> -        final ByteBuffer buf = frame.toNioByteBuffer();
> >> +        final ByteBuffer buf = asByteBuffer(frame);
> >>         _writtenBytes += buf.remaining();
> >> -        Job.fireAsynchEvent(_poolReference.getPool(), _writeJob,
> >> new Runnable()
> >> -        {
> >> -            public void run()
> >> -            {
> >> -                _sender.send(buf);
> >> -            }
> >> -        });
> >> +        _sender.send(buf);
> >> +        _sender.flush();
> >> +
> >>         if (PROTOCOL_DEBUG)
> >>         {
> >>             _protocolLogger.debug(String.format("SEND: [%s] %s",
> >>             this, frame));
> >> @@ -595,12 +550,41 @@ public class AMQProtocolHandler implemen
> >>
> >>         _connection.bytesSent(_writtenBytes);
> >>
> >> -        if (wait)
> >> +    }
> >> +
> >> +    private ByteBuffer asByteBuffer(AMQDataBlock block)
> >> +    {
> >> +        final ByteBuffer buf = ByteBuffer.allocate((int)
> >> block.getSize());
> >> +
> >> +        try
> >> +        {
> >> +            block.writePayload(new DataOutputStream(new
> >> OutputStream()
> >> +            {
> >> +
> >> +
> >> +                @Override
> >> +                public void write(int b) throws IOException
> >> +                {
> >> +                    buf.put((byte) b);
> >> +                }
> >> +
> >> +                @Override
> >> +                public void write(byte[] b, int off, int len)
> >> throws IOException
> >> +                {
> >> +                    buf.put(b, off, len);
> >> +                }
> >> +            }));
> >> +        }
> >> +        catch (IOException e)
> >>         {
> >> -            _sender.flush();
> >> +            throw new RuntimeException(e);
> >>         }
> >> +
> >> +        buf.flip();
> >> +        return buf;
> >>     }
> >>
> >> +
> >>     /**
> >>      * Convenience method that writes a frame to the protocol
> >>      session and waits for a particular response. Equivalent to
> >>      * calling getProtocolSession().write() then waiting for the
> >>      response.
> >> @@ -723,7 +707,7 @@ public class AMQProtocolHandler implemen
> >>                 _logger.debug("FailoverException interrupted
> >>                 connection close, ignoring as connection   close
> >>                 anyway.");
> >>             }
> >>         }
> >> -        _poolReference.releaseExecutorService();
> >> +
> >>     }
> >>
> >>     /** @return the number of bytes read from this protocol
> >>     session */
> >> @@ -841,8 +825,13 @@ public class AMQProtocolHandler implemen
> >>
> >>     public void setNetworkConnection(NetworkConnection network)
> >>     {
> >> +        setNetworkConnection(network, network.getSender());
> >> +    }
> >> +
> >> +    public void setNetworkConnection(NetworkConnection network,
> >> Sender<ByteBuffer> sender)
> >> +    {
> >>         _network = network;
> >> -        _sender = network.getSender();
> >> +        _sender = sender;
> >>     }
> >>
> >>     /** @param delay delay in seconds (not ms) */
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -20,17 +20,22 @@
> >>  */
> >>  package org.apache.qpid.client.security;
> >>
> >> -import org.apache.qpid.util.FileUtils;
> >> -
> >> -import org.slf4j.Logger;
> >> -import org.slf4j.LoggerFactory;
> >> -
> >>  import java.io.IOException;
> >>  import java.io.InputStream;
> >> +import java.util.Collection;
> >> +import java.util.Collections;
> >>  import java.util.Enumeration;
> >>  import java.util.HashMap;
> >> +import java.util.HashSet;
> >>  import java.util.Map;
> >>  import java.util.Properties;
> >> +import java.util.Set;
> >> +import java.util.StringTokenizer;
> >> +import java.util.TreeMap;
> >> +
> >> +import org.apache.qpid.util.FileUtils;
> >> +import org.slf4j.Logger;
> >> +import org.slf4j.LoggerFactory;
> >>
> >>  /**
> >>  * CallbackHandlerRegistry is a registry for call back handlers
> >>  for user authentication and interaction during user
> >> @@ -42,7 +47,7 @@ import java.util.Properties;
> >>  * "amp.callbackhandler.properties". The format of the properties
> >>  file is:
> >>  *
> >>  * <p/><pre>
> >> - * CallbackHanlder.mechanism=fully.qualified.class.name
> >> + * CallbackHanlder.n.mechanism=fully.qualified.class.name where n
> >> is an ordinal
> >>  * </pre>
> >>  *
> >>  * <p/>Where mechanism is an IANA-registered mechanism name and
> >>  the fully qualified class name refers to a
> >> @@ -66,51 +71,15 @@ public class CallbackHandlerRegistry
> >>     public static final String DEFAULT_RESOURCE_NAME =
> >>     "org/apache/qpid/client/security/CallbackHandlerRegistry.properties";
> >>
> >>     /** A static reference to the singleton instance of this
> >>     registry. */
> >> -    private static CallbackHandlerRegistry _instance = new
> >> CallbackHandlerRegistry();
> >> +    private static final CallbackHandlerRegistry _instance;
> >>
> >>     /** Holds a map from SASL mechanism names to call back
> >>     handlers. */
> >> -    private Map<String, Class> _mechanismToHandlerClassMap = new
> >> HashMap<String, Class>();
> >> -
> >> -    /** Holds a space delimited list of mechanisms that callback
> >> handlers exist for. */
> >> -    private String _mechanisms;
> >> -
> >> -    /**
> >> -     * Gets the singleton instance of this registry.
> >> -     *
> >> -     * @return The singleton instance of this registry.
> >> -     */
> >> -    public static CallbackHandlerRegistry getInstance()
> >> -    {
> >> -        return _instance;
> >> -    }
> >> +    private Map<String, Class<AMQCallbackHandler>>
> >> _mechanismToHandlerClassMap = new HashMap<String,
> >> Class<AMQCallbackHandler>>();
> >>
> >> -    /**
> >> -     * Gets the callback handler class for a given SASL mechanism
> >> name.
> >> -     *
> >> -     * @param mechanism The SASL mechanism name.
> >> -     *
> >> -     * @return The callback handler class for the mechanism, or
> >> null if none is configured for that mechanism.
> >> -     */
> >> -    public Class getCallbackHandlerClass(String mechanism)
> >> -    {
> >> -        return (Class)
> >> _mechanismToHandlerClassMap.get(mechanism);
> >> -    }
> >> +    /** Ordered collection of mechanisms for which callback
> >> handlers exist. */
> >> +    private Collection<String> _mechanisms;
> >>
> >> -    /**
> >> -     * Gets a space delimited list of supported SASL mechanisms.
> >> -     *
> >> -     * @return A space delimited list of supported SASL
> >> mechanisms.
> >> -     */
> >> -    public String getMechanisms()
> >> -    {
> >> -        return _mechanisms;
> >> -    }
> >> -
> >> -    /**
> >> -     * Creates the call back handler registry from its
> >> configuration resource or file. This also has the side effect
> >> -     * of configuring and registering the SASL client factory
> >> implementations using {@link DynamicSaslRegistrar}.
> >> -     */
> >> -    private CallbackHandlerRegistry()
> >> +    static
> >>     {
> >>         // Register any configured SASL client factories.
> >>         DynamicSaslRegistrar.registerSaslProviders();
> >> @@ -120,12 +89,12 @@ public class CallbackHandlerRegistry
> >>             FileUtils.openFileOrDefaultResource(filename,
> >>             DEFAULT_RESOURCE_NAME,
> >>                 CallbackHandlerRegistry.class.getClassLoader());
> >>
> >> +        final Properties props = new Properties();
> >> +
> >>         try
> >>         {
> >> -            Properties props = new Properties();
> >> +
> >>             props.load(is);
> >> -            parseProperties(props);
> >> -            _logger.info("Callback handlers available for SASL
> >> mechanisms: " + _mechanisms);
> >>         }
> >>         catch (IOException e)
> >>         {
> >> @@ -146,32 +115,68 @@ public class CallbackHandlerRegistry
> >>                 }
> >>             }
> >>         }
> >> +
> >> +        _instance = new CallbackHandlerRegistry(props);
> >> +        _logger.info("Callback handlers available for SASL
> >> mechanisms: " + _instance._mechanisms);
> >> +
> >>     }
> >>
> >> -    /*private InputStream openPropertiesInputStream(String
> >> filename)
> >> +    /**
> >> +     * Gets the singleton instance of this registry.
> >> +     *
> >> +     * @return The singleton instance of this registry.
> >> +     */
> >> +    public static CallbackHandlerRegistry getInstance()
> >> +    {
> >> +        return _instance;
> >> +    }
> >> +
> >> +    public AMQCallbackHandler createCallbackHandler(final String
> >> mechanism)
> >>     {
> >> -        boolean useDefault = true;
> >> -        InputStream is = null;
> >> -        if (filename != null)
> >> +        final Class<AMQCallbackHandler> mechanismClass =
> >> _mechanismToHandlerClassMap.get(mechanism);
> >> +
> >> +        if (mechanismClass == null)
> >>         {
> >> -            try
> >> -            {
> >> -                is = new BufferedInputStream(new
> >> FileInputStream(new File(filename)));
> >> -                useDefault = false;
> >> -            }
> >> -            catch (FileNotFoundException e)
> >> -            {
> >> -                _logger.error("Unable to read from file " +
> >> filename + ": " + e, e);
> >> -            }
> >> +            throw new IllegalArgumentException("Mechanism " +
> >> mechanism + " not known");
> >>         }
> >>
> >> -        if (useDefault)
> >> +        try
> >> +        {
> >> +            return mechanismClass.newInstance();
> >> +        }
> >> +        catch (InstantiationException e)
> >> +        {
> >> +            throw new IllegalArgumentException("Unable to create
> >> an instance of mechanism " + mechanism, e);
> >> +        }
> >> +        catch (IllegalAccessException e)
> >>         {
> >> -            is =
> >> CallbackHandlerRegistry.class.getResourceAsStream(DEFAULT_RESOURCE_NAME);
> >> +            throw new IllegalArgumentException("Unable to create
> >> an instance of mechanism " + mechanism, e);
> >>         }
> >> +    }
> >>
> >> -        return is;
> >> -    }*/
> >> +    /**
> >> +     * Gets collections of supported SASL mechanism names,
> >> ordered by preference
> >> +     *
> >> +     * @return collection of SASL mechanism names.
> >> +     */
> >> +    public Collection<String> getMechanisms()
> >> +    {
> >> +        return Collections.unmodifiableCollection(_mechanisms);
> >> +    }
> >> +
> >> +    /**
> >> +     * Creates the call back handler registry from its
> >> configuration resource or file.
> >> +     *
> >> +     * This also has the side effect of configuring and
> >> registering the SASL client factory
> >> +     * implementations using {@link DynamicSaslRegistrar}.
> >> +     *
> >> +     * This constructor is default protection to allow for
> >> effective unit testing.  Clients must use
> >> +     * {@link #getInstance()} to obtain the singleton instance.
> >> +     */
> >> +    CallbackHandlerRegistry(final Properties props)
> >> +    {
> >> +        parseProperties(props);
> >> +    }
> >>
> >>     /**
> >>      * Scans the specified properties as a mapping from IANA
> >>      registered SASL mechanism to call back handler
> >> @@ -183,20 +188,20 @@ public class CallbackHandlerRegistry
> >>      */
> >>     private void parseProperties(Properties props)
> >>     {
> >> +
> >> +        final Map<Integer, String> mechanisms = new
> >> TreeMap<Integer, String>();
> >> +
> >>         Enumeration e = props.propertyNames();
> >>         while (e.hasMoreElements())
> >>         {
> >> -            String propertyName = (String) e.nextElement();
> >> -            int period = propertyName.indexOf(".");
> >> -            if (period < 0)
> >> -            {
> >> -                _logger.warn("Unable to parse property " +
> >> propertyName + " when configuring SASL providers");
> >> +            final String propertyName = (String) e.nextElement();
> >> +            final String[] parts = propertyName.split("\\.", 2);
> >>
> >> -                continue;
> >> -            }
> >> +            checkPropertyNameFormat(propertyName, parts);
> >>
> >> -            String mechanism = propertyName.substring(period +
> >> 1);
> >> -            String className = props.getProperty(propertyName);
> >> +            final String mechanism = parts[0];
> >> +            final int ordinal = getPropertyOrdinal(propertyName,
> >> parts);
> >> +            final String className =
> >> props.getProperty(propertyName);
> >>             Class clazz = null;
> >>             try
> >>             {
> >> @@ -205,20 +210,11 @@ public class CallbackHandlerRegistry
> >>                 {
> >>                     _logger.warn("SASL provider " + clazz + " does
> >>                     not implement " + AMQCallbackHandler.class
> >>                         + ". Skipping");
> >> -
> >>                     continue;
> >>                 }
> >> -
> >>                 _mechanismToHandlerClassMap.put(mechanism, clazz);
> >> -                if (_mechanisms == null)
> >> -                {
> >> -                    _mechanisms = mechanism;
> >> -                }
> >> -                else
> >> -                {
> >> -                    // one time cost
> >> -                    _mechanisms = _mechanisms + " " + mechanism;
> >> -                }
> >> +
> >> +                mechanisms.put(ordinal, mechanism);
> >>             }
> >>             catch (ClassNotFoundException ex)
> >>             {
> >> @@ -227,5 +223,91 @@ public class CallbackHandlerRegistry
> >>                 continue;
> >>             }
> >>         }
> >> +
> >> +        _mechanisms = mechanisms.values();  // order guaranteed
> >> by keys of treemap (i.e. our ordinals)
> >> +
> >> +
> >> +    }
> >> +
> >> +    private void checkPropertyNameFormat(final String
> >> propertyName, final String[] parts)
> >> +    {
> >> +        if (parts.length != 2)
> >> +        {
> >> +            throw new IllegalArgumentException("Unable to parse
> >> property " + propertyName + " when configuring SASL providers");
> >> +        }
> >> +    }
> >> +
> >> +    private int getPropertyOrdinal(final String propertyName,
> >> final String[] parts)
> >> +    {
> >> +        try
> >> +        {
> >> +            return Integer.parseInt(parts[1]);
> >> +        }
> >> +        catch(NumberFormatException nfe)
> >> +        {
> >> +            throw new IllegalArgumentException("Unable to parse
> >> property " + propertyName + " when configuring SASL providers",
> >> nfe);
> >> +        }
> >> +    }
> >> +
> >> +    /**
> >> +     * Selects a SASL mechanism that is mutually available to
> >> both parties.  If more than one
> >> +     * mechanism is mutually available the one appearing first
> >> (by ordinal) will be returned.
> >> +     *
> >> +     * @param peerMechanismList space separated list of
> >> mechanisms
> >> +     * @return selected mechanism, or null if none available
> >> +     */
> >> +    public String selectMechanism(final String peerMechanismList)
> >> +    {
> >> +        final Set<String> peerList =
> >> mechListToSet(peerMechanismList);
> >> +
> >> +        return selectMechInternal(peerList,
> >> Collections.<String>emptySet());
> >> +    }
> >> +
> >> +    /**
> >> +     * Selects a SASL mechanism that is mutually available to
> >> both parties.
> >> +     *
> >> +     * @param peerMechanismList space separated list of
> >> mechanisms
> >> +     * @param restrictionList space separated list of mechanisms
> >> +     * @return selected mechanism, or null if none available
> >> +     */
> >> +    public String selectMechanism(final String peerMechanismList,
> >> final String restrictionList)
> >> +    {
> >> +        final Set<String> peerList =
> >> mechListToSet(peerMechanismList);
> >> +        final Set<String> restrictionSet =
> >> mechListToSet(restrictionList);
> >> +
> >> +        return selectMechInternal(peerList, restrictionSet);
> >> +    }
> >> +
> >> +    private String selectMechInternal(final Set<String> peerSet,
> >> final Set<String> restrictionSet)
> >> +    {
> >> +        for (final String mech : _mechanisms)
> >> +        {
> >> +            if (peerSet.contains(mech))
> >> +            {
> >> +                if (restrictionSet.isEmpty() ||
> >> restrictionSet.contains(mech))
> >> +                {
> >> +                    return mech;
> >> +                }
> >> +            }
> >> +        }
> >> +
> >> +        return null;
> >> +    }
> >> +
> >> +    private Set<String> mechListToSet(final String mechanismList)
> >> +    {
> >> +        if (mechanismList == null)
> >> +        {
> >> +            return Collections.emptySet();
> >> +        }
> >> +
> >> +        final StringTokenizer tokenizer = new
> >> StringTokenizer(mechanismList, " ");
> >> +        final Set<String> mechanismSet = new
> >> HashSet<String>(tokenizer.countTokens());
> >> +        while (tokenizer.hasMoreTokens())
> >> +        {
> >> +            mechanismSet.add(tokenizer.nextToken());
> >> +        }
> >> +        return Collections.unmodifiableSet(mechanismSet);
> >>     }
> >> +
> >>  }
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
> >> Mon Sep 19 15:13:18 2011
> >> @@ -16,7 +16,17 @@
> >>  # specific language governing permissions and limitations
> >>  # under the License.
> >>  #
> >> -CallbackHandler.CRAM-MD5-HASHED=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
> >> -CallbackHandler.CRAM-MD5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> >> -CallbackHandler.AMQPLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> >> -CallbackHandler.PLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> >> +
> >> +#
> >> +# Format:
> >> +# <mechanism name>.ordinal=<implementation>
> >> +#
> >> +# @see CallbackHandlerRegistry
> >> +#
> >> +
> >> +EXTERNAL.1=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> >> +GSSAPI.2=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> >> +CRAM-MD5-HASHED.3=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
> >> +CRAM-MD5.4=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> >> +AMQPLAIN.5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> >> +PLAIN.6=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -22,7 +22,7 @@ package org.apache.qpid.jms;
> >>
> >>  import java.util.Map;
> >>
> >> -import org.apache.qpid.client.SSLConfiguration;
> >> +import org.apache.qpid.transport.ConnectionSettings;
> >>
> >>  public interface BrokerDetails
> >>  {
> >> @@ -104,14 +104,12 @@ public interface BrokerDetails
> >>     long getTimeout();
> >>
> >>     void setTimeout(long timeout);
> >> -
> >> -    SSLConfiguration getSSLConfiguration();
> >> -
> >> -    void setSSLConfiguration(SSLConfiguration sslConfiguration);
> >>
> >>     boolean getBooleanProperty(String propName);
> >>
> >>     String toString();
> >>
> >>     boolean equals(Object o);
> >> +
> >> +    ConnectionSettings buildConnectionSettings();
> >>  }
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -140,7 +140,6 @@ public class FailoverExchangeMethod impl
> >>                         broker.setHost(tokens[1]);
> >>                         broker.setPort(Integer.parseInt(tokens[2]));
> >>                         broker.setProperties(_originalBrokerDetail.getProperties());
> >> -
> >>                        broker.setSSLConfiguration(_originalBrokerDetail.getSSLConfiguration());
> >>                         brokerList.add(broker);
> >>
> >>                         if
> >>                         (currentBrokerIP.equals(broker.getHost())
> >>                         &&
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -36,6 +36,7 @@ import javax.jms.Queue;
> >>  import javax.jms.Topic;
> >>  import javax.naming.Context;
> >>  import javax.naming.NamingException;
> >> +import javax.naming.ConfigurationException;
> >>  import javax.naming.spi.InitialContextFactory;
> >>
> >>  import org.apache.qpid.client.AMQConnectionFactory;
> >> @@ -139,7 +140,7 @@ public class PropertiesFileInitialContex
> >>         return new ReadOnlyContext(environment, data);
> >>     }
> >>
> >> -    protected void createConnectionFactories(Map data, Hashtable
> >> environment)
> >> +    protected void createConnectionFactories(Map data, Hashtable
> >> environment) throws ConfigurationException
> >>     {
> >>         for (Iterator iter = environment.entrySet().iterator();
> >>         iter.hasNext();)
> >>         {
> >> @@ -157,7 +158,7 @@ public class PropertiesFileInitialContex
> >>         }
> >>     }
> >>
> >> -    protected void createDestinations(Map data, Hashtable
> >> environment)
> >> +    protected void createDestinations(Map data, Hashtable
> >> environment) throws ConfigurationException
> >>     {
> >>         for (Iterator iter = environment.entrySet().iterator();
> >>         iter.hasNext();)
> >>         {
> >> @@ -225,7 +226,7 @@ public class PropertiesFileInitialContex
> >>     /**
> >>      * Factory method to create new Connection Factory instances
> >>      */
> >> -    protected ConnectionFactory createFactory(String url)
> >> +    protected ConnectionFactory createFactory(String url) throws
> >> ConfigurationException
> >>     {
> >>         try
> >>         {
> >> @@ -233,16 +234,18 @@ public class PropertiesFileInitialContex
> >>         }
> >>         catch (URLSyntaxException urlse)
> >>         {
> >> -            _logger.warn("Unable to createFactories:" + urlse);
> >> -        }
> >> +            _logger.warn("Unable to create factory:" + urlse);
> >>
> >> -        return null;
> >> +            ConfigurationException ex = new
> >> ConfigurationException("Failed to parse entry: " + urlse + " due
> >> to : " +  urlse.getMessage());
> >> +            ex.initCause(urlse);
> >> +            throw ex;
> >> +        }
> >>     }
> >>
> >>     /**
> >>      * Factory method to create new Destination instances from an
> >>      AMQP BindingURL
> >>      */
> >> -    protected Destination createDestination(String str)
> >> +    protected Destination createDestination(String str) throws
> >> ConfigurationException
> >>     {
> >>         try
> >>         {
> >> @@ -252,7 +255,9 @@ public class PropertiesFileInitialContex
> >>         {
> >>             _logger.warn("Unable to create destination:" + e, e);
> >>
> >> -            return null;
> >> +            ConfigurationException ex = new
> >> ConfigurationException("Failed to parse entry: " + str + " due to
> >> : " +  e.getMessage());
> >> +            ex.initCause(e);
> >> +            throw ex;
> >>         }
> >>     }
> >>
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -23,7 +23,6 @@ package org.apache.qpid.client;
> >>  import org.apache.qpid.AMQException;
> >>  import org.apache.qpid.client.state.AMQState;
> >>  import org.apache.qpid.framing.ProtocolVersion;
> >> -import org.apache.qpid.jms.ConnectionURL;
> >>  import org.apache.qpid.jms.BrokerDetails;
> >>  import org.apache.qpid.url.URLSyntaxException;
> >>
> >> @@ -37,48 +36,18 @@ public class MockAMQConnection extends A
> >>         super(broker, username, password, clientName,
> >>         virtualHost);
> >>     }
> >>
> >> -    public MockAMQConnection(String broker, String username,
> >> String password, String clientName, String virtualHost,
> >> SSLConfiguration sslConfig)
> >> -            throws AMQException, URLSyntaxException
> >> -    {
> >> -        super(broker, username, password, clientName,
> >> virtualHost, sslConfig);
> >> -    }
> >> -
> >>     public MockAMQConnection(String host, int port, String
> >>     username, String password, String clientName, String
> >>     virtualHost)
> >>             throws AMQException, URLSyntaxException
> >>     {
> >>         super(host, port, username, password, clientName,
> >>         virtualHost);
> >>     }
> >>
> >> -    public MockAMQConnection(String host, int port, String
> >> username, String password, String clientName, String virtualHost,
> >> SSLConfiguration sslConfig)
> >> -            throws AMQException, URLSyntaxException
> >> -    {
> >> -        super(host, port, username, password, clientName,
> >> virtualHost, sslConfig);
> >> -    }
> >> -
> >> -    public MockAMQConnection(String host, int port, boolean
> >> useSSL, String username, String password, String clientName,
> >> String virtualHost, SSLConfiguration sslConfig)
> >> -            throws AMQException, URLSyntaxException
> >> -    {
> >> -        super(host, port, useSSL, username, password, clientName,
> >> virtualHost, sslConfig);
> >> -    }
> >> -
> >>     public MockAMQConnection(String connection)
> >>             throws AMQException, URLSyntaxException
> >>     {
> >>         super(connection);
> >>     }
> >>
> >> -    public MockAMQConnection(String connection, SSLConfiguration
> >> sslConfig)
> >> -            throws AMQException, URLSyntaxException
> >> -    {
> >> -        super(connection, sslConfig);
> >> -    }
> >> -
> >> -    public MockAMQConnection(ConnectionURL connectionURL,
> >> SSLConfiguration sslConfig)
> >> -            throws AMQException
> >> -    {
> >> -        super(connectionURL, sslConfig);
> >> -    }
> >> -
> >>     @Override
> >>     public ProtocolVersion makeBrokerConnection(BrokerDetails
> >>     brokerDetail) throws IOException
> >>     {
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -43,4 +43,9 @@ public class TestMessageHelper
> >>     {
> >>         return new
> >>         JMSStreamMessage(AMQMessageDelegateFactory.FACTORY_0_8);
> >>     }
> >> +
> >> +    public static JMSObjectMessage newJMSObjectMessage()
> >> +    {
> >> +        return new
> >> JMSObjectMessage(AMQMessageDelegateFactory.FACTORY_0_8);
> >> +    }
> >>  }
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -21,10 +21,10 @@
> >>  package org.apache.qpid.test.unit.jndi;
> >>
> >>  import junit.framework.TestCase;
> >> +
> >>  import org.apache.qpid.client.AMQConnectionFactory;
> >>  import org.apache.qpid.jms.BrokerDetails;
> >>  import org.apache.qpid.jms.ConnectionURL;
> >> -import org.apache.qpid.url.URLSyntaxException;
> >>
> >>  public class ConnectionFactoryTest extends TestCase
> >>  {
> >> @@ -34,21 +34,9 @@ public class ConnectionFactoryTest exten
> >>     public static final String URL =
> >>     "amqp://guest:guest@clientID/test?brokerlist='tcp://localhost:5672'";
> >>     public static final String URL_STAR_PWD =
> >>     "amqp://guest:********@clientID/test?brokerlist='tcp://localhost:5672'";
> >>
> >> -    public void testConnectionURLString()
> >> +    public void testConnectionURLStringMasksPassword() throws
> >> Exception
> >>     {
> >> -        AMQConnectionFactory factory = new
> >> AMQConnectionFactory();
> >> -
> >> -        assertNull("ConnectionURL should have no value at start",
> >> -                   factory.getConnectionURL());
> >> -
> >> -        try
> >> -        {
> >> -            factory.setConnectionURLString(URL);
> >> -        }
> >> -        catch (URLSyntaxException e)
> >> -        {
> >> -            fail(e.getMessage());
> >> -        }
> >> +        AMQConnectionFactory factory = new
> >> AMQConnectionFactory(URL);
> >>
> >>         //URL will be returned with the password field swapped for
> >>         '********'
> >>         assertEquals("Connection URL not correctly set",
> >>         URL_STAR_PWD, factory.getConnectionURLString());
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -24,6 +24,7 @@ import java.util.Properties;
> >>
> >>  import javax.jms.Queue;
> >>  import javax.jms.Topic;
> >> +import javax.naming.ConfigurationException;
> >>  import javax.naming.Context;
> >>  import javax.naming.InitialContext;
> >>
> >> @@ -67,4 +68,22 @@ public class JNDIPropertyFileTest extend
> >>             assertEquals("Topic" + i +
> >>             "WithSpace",bindingKey.asString());
> >>         }
> >>     }
> >> +
> >> +    public void testConfigurationErrors() throws Exception
> >> +    {
> >> +        Properties properties = new Properties();
> >> +        properties.put("java.naming.factory.initial",
> >> "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
> >> +
> >>        properties.put("destination.my-queue","amq.topic/test;create:always}");
> >> +
> >> +        try
> >> +        {
> >> +            ctx = new InitialContext(properties);
> >> +            fail("A configuration exception should be thrown with
> >> details about the address syntax error");
> >> +        }
> >> +        catch(ConfigurationException e)
> >> +        {
> >> +            assertTrue("Incorrect exception",
> >> e.getMessage().contains("Failed to parse entry:
> >> amq.topic/test;create:always}"));
> >> +        }
> >> +
> >> +    }
> >>  }
> >>
> >> Modified: qpid/branches/qpid-3346/qpid/java/common.xml
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common.xml?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> --- qpid/branches/qpid-3346/qpid/java/common.xml (original)
> >> +++ qpid/branches/qpid-3346/qpid/java/common.xml Mon Sep 19
> >> 15:13:18 2011
> >> @@ -132,8 +132,6 @@
> >>        </sequential>
> >>   </macrodef>
> >>
> >> -
> >> -
> >>   <macrodef name="jython">
> >>     <attribute name="path"/>
> >>     <element name="args"/>
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -20,9 +20,6 @@
> >>  */
> >>  package org.apache.qpid.codec;
> >>
> >> -import org.apache.mina.filter.codec.ProtocolCodecFactory;
> >> -import org.apache.mina.filter.codec.ProtocolDecoder;
> >> -import org.apache.mina.filter.codec.ProtocolEncoder;
> >>  import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
> >>
> >>  /**
> >> @@ -31,14 +28,11 @@ import org.apache.qpid.protocol.AMQVersi
> >>  *
> >>  * <p/><table id="crc"><caption>CRC Card</caption>
> >>  * <tr><th> Responsibilities <th> Collaborations.
> >> - * <tr><td> Supply the protocol encoder. <td> {@link AMQEncoder}
> >>  * <tr><td> Supply the protocol decoder. <td> {@link AMQDecoder}
> >>  * </table>
> >>  */
> >> -public class AMQCodecFactory implements ProtocolCodecFactory
> >> +public class AMQCodecFactory
> >>  {
> >> -    /** Holds the protocol encoder. */
> >> -    private final AMQEncoder _encoder = new AMQEncoder();
> >>
> >>     /** Holds the protocol decoder. */
> >>     private final AMQDecoder _frameDecoder;
> >> @@ -56,15 +50,6 @@ public class AMQCodecFactory implements
> >>         _frameDecoder = new AMQDecoder(expectProtocolInitiation,
> >>         session);
> >>     }
> >>
> >> -    /**
> >> -     * Gets the AMQP encoder.
> >> -     *
> >> -     * @return The AMQP encoder.
> >> -     */
> >> -    public ProtocolEncoder getEncoder()
> >> -    {
> >> -        return _encoder;
> >> -    }
> >>
> >>     /**
> >>      * Gets the AMQP decoder.
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -20,13 +20,9 @@
> >>  */
> >>  package org.apache.qpid.codec;
> >>
> >> -import java.util.ArrayList;
> >> -
> >> -import org.apache.mina.common.ByteBuffer;
> >> -import org.apache.mina.common.IoSession;
> >> -import org.apache.mina.common.SimpleByteBufferAllocator;
> >> -import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
> >> -import org.apache.mina.filter.codec.ProtocolDecoderOutput;
> >> +import java.io.*;
> >> +import java.nio.ByteBuffer;
> >> +import java.util.*;
> >>
> >>  import org.apache.qpid.framing.AMQDataBlock;
> >>  import org.apache.qpid.framing.AMQDataBlockDecoder;
> >> @@ -54,11 +50,8 @@ import org.apache.qpid.protocol.AMQVersi
> >>  * @todo If protocol initiation decoder not needed, then don't
> >>  create it. Probably not a big deal, but it adds to the
> >>  *       per-session overhead.
> >>  */
> >> -public class AMQDecoder extends CumulativeProtocolDecoder
> >> +public class AMQDecoder
> >>  {
> >> -
> >> -    private static final String BUFFER =
> >> AMQDecoder.class.getName() + ".Buffer";
> >> -
> >>     /** Holds the 'normal' AMQP data decoder. */
> >>     private AMQDataBlockDecoder _dataBlockDecoder = new
> >>     AMQDataBlockDecoder();
> >>
> >> @@ -67,12 +60,11 @@ public class AMQDecoder extends Cumulati
> >>
> >>     /** Flag to indicate whether this decoder needs to handle
> >>     protocol initiation. */
> >>     private boolean _expectProtocolInitiation;
> >> -    private boolean firstDecode = true;
> >>
> >>     private AMQMethodBodyFactory _bodyFactory;
> >>
> >> -    private ByteBuffer _remainingBuf;
> >> -
> >> +    private List<ByteArrayInputStream> _remainingBufs = new
> >> ArrayList<ByteArrayInputStream>();
> >> +
> >>     /**
> >>      * Creates a new AMQP decoder.
> >>      *
> >> @@ -84,98 +76,7 @@ public class AMQDecoder extends Cumulati
> >>         _bodyFactory = new AMQMethodBodyFactory(session);
> >>     }
> >>
> >> -    /**
> >> -     * Delegates decoding AMQP from the data buffer that Mina has
> >> retrieved from the wire, to the data or protocol
> >> -     * intiation decoders.
> >> -     *
> >> -     * @param session The Mina session.
> >> -     * @param in      The raw byte buffer.
> >> -     * @param out     The Mina object output gatherer to write
> >> decoded objects to.
> >> -     *
> >> -     * @return <tt>true</tt> if the data was decoded,
> >> <tt>false<tt> if more is needed and the data should accumulate.
> >> -     *
> >> -     * @throws Exception If the data cannot be decoded for any
> >> reason.
> >> -     */
> >> -    protected boolean doDecode(IoSession session, ByteBuffer in,
> >> ProtocolDecoderOutput out) throws Exception
> >> -    {
> >> -
> >> -        boolean decoded;
> >> -        if (_expectProtocolInitiation
> >> -            || (firstDecode
> >> -                && (in.remaining() > 0)
> >> -                && (in.get(in.position()) == (byte)'A')))
> >> -        {
> >> -            decoded = doDecodePI(session, in, out);
> >> -        }
> >> -        else
> >> -        {
> >> -            decoded = doDecodeDataBlock(session, in, out);
> >> -        }
> >> -        if(firstDecode && decoded)
> >> -        {
> >> -            firstDecode = false;
> >> -        }
> >> -        return decoded;
> >> -    }
> >> -
> >> -    /**
> >> -     * Decodes AMQP data, delegating the decoding to an {@link
> >> AMQDataBlockDecoder}.
> >> -     *
> >> -     * @param session The Mina session.
> >> -     * @param in      The raw byte buffer.
> >> -     * @param out     The Mina object output gatherer to write
> >> decoded objects to.
> >> -     *
> >> -     * @return <tt>true</tt> if the data was decoded,
> >> <tt>false<tt> if more is needed and the data should accumulate.
> >> -     *
> >> -     * @throws Exception If the data cannot be decoded for any
> >> reason.
> >> -     */
> >> -    protected boolean doDecodeDataBlock(IoSession session,
> >> ByteBuffer in, ProtocolDecoderOutput out) throws Exception
> >> -    {
> >> -        int pos = in.position();
> >> -        boolean enoughData =
> >> _dataBlockDecoder.decodable(in.buf());
> >> -        in.position(pos);
> >> -        if (!enoughData)
> >> -        {
> >> -            // returning false means it will leave the contents
> >> in the buffer and
> >> -            // call us again when more data has been read
> >> -            return false;
> >> -        }
> >> -        else
> >> -        {
> >> -            _dataBlockDecoder.decode(session, in, out);
> >> -
> >> -            return true;
> >> -        }
> >> -    }
> >> -
> >> -    /**
> >> -     * Decodes an AMQP initiation, delegating the decoding to a
> >> {@link ProtocolInitiation.Decoder}.
> >> -     *
> >> -     * @param session The Mina session.
> >> -     * @param in      The raw byte buffer.
> >> -     * @param out     The Mina object output gatherer to write
> >> decoded objects to.
> >> -     *
> >> -     * @return <tt>true</tt> if the data was decoded,
> >> <tt>false<tt> if more is needed and the data should accumulate.
> >> -     *
> >> -     * @throws Exception If the data cannot be decoded for any
> >> reason.
> >> -     */
> >> -    private boolean doDecodePI(IoSession session, ByteBuffer in,
> >> ProtocolDecoderOutput out) throws Exception
> >> -    {
> >> -        boolean enoughData = _piDecoder.decodable(in.buf());
> >> -        if (!enoughData)
> >> -        {
> >> -            // returning false means it will leave the contents
> >> in the buffer and
> >> -            // call us again when more data has been read
> >> -            return false;
> >> -        }
> >> -        else
> >> -        {
> >> -            ProtocolInitiation pi = new
> >> ProtocolInitiation(in.buf());
> >> -            out.write(pi);
> >>
> >> -            return true;
> >> -        }
> >> -    }
> >>
> >>     /**
> >>      * Sets the protocol initation flag, that determines whether
> >>      decoding is handled by the data decoder of the protocol
> >> @@ -189,151 +90,168 @@ public class AMQDecoder extends Cumulati
> >>         _expectProtocolInitiation = expectProtocolInitiation;
> >>     }
> >>
> >> -
> >> -    /**
> >> -     * Cumulates content of <tt>in</tt> into internal buffer and
> >> forwards
> >> -     * decoding request to {@link #doDecode(IoSession,
> >> ByteBuffer, ProtocolDecoderOutput)}.
> >> -     * <tt>doDecode()</tt> is invoked repeatedly until it returns
> >> <tt>false</tt>
> >> -     * and the cumulative buffer is compacted after decoding
> >> ends.
> >> -     *
> >> -     * @throws IllegalStateException if your <tt>doDecode()</tt>
> >> returned
> >> -     *                               <tt>true</tt> not consuming
> >> the cumulative buffer.
> >> -     */
> >> -    public void decode( IoSession session, ByteBuffer in,
> >> -                        ProtocolDecoderOutput out ) throws
> >> Exception
> >> +    private class RemainingByteArrayInputStream extends
> >> InputStream
> >>     {
> >> -        ByteBuffer buf = ( ByteBuffer ) session.getAttribute(
> >> BUFFER );
> >> -        // if we have a session buffer, append data to that
> >> otherwise
> >> -        // use the buffer read from the network directly
> >> -        if( buf != null )
> >> -        {
> >> -            buf.put( in );
> >> -            buf.flip();
> >> -        }
> >> -        else
> >> +        private int _currentListPos;
> >> +        private int _markPos;
> >> +
> >> +
> >> +        @Override
> >> +        public int read() throws IOException
> >>         {
> >> -            buf = in;
> >> +            ByteArrayInputStream currentStream =
> >> _remainingBufs.get(_currentListPos);
> >> +            if(currentStream.available() > 0)
> >> +            {
> >> +                return currentStream.read();
> >> +            }
> >> +            else if((_currentListPos == _remainingBufs.size())
> >> +                    || (++_currentListPos ==
> >> _remainingBufs.size()))
> >> +            {
> >> +                return -1;
> >> +            }
> >> +            else
> >> +            {
> >> +
> >> +                ByteArrayInputStream stream =
> >> _remainingBufs.get(_currentListPos);
> >> +                stream.mark(0);
> >> +                return stream.read();
> >> +            }
> >>         }
> >>
> >> -        for( ;; )
> >> +        @Override
> >> +        public int read(final byte[] b, final int off, final int
> >> len) throws IOException
> >>         {
> >> -            int oldPos = buf.position();
> >> -            boolean decoded = doDecode( session, buf, out );
> >> -            if( decoded )
> >> +
> >> +            if(_currentListPos == _remainingBufs.size())
> >>             {
> >> -                if( buf.position() == oldPos )
> >> +                return -1;
> >> +            }
> >> +            else
> >> +            {
> >> +                ByteArrayInputStream currentStream =
> >> _remainingBufs.get(_currentListPos);
> >> +                final int available = currentStream.available();
> >> +                int read = currentStream.read(b, off, len >
> >> available ? available : len);
> >> +                if(read < len)
> >>                 {
> >> -                    throw new IllegalStateException(
> >> -                            "doDecode() can't return true when
> >> buffer is not consumed." );
> >> +                    if(_currentListPos++ !=
> >> _remainingBufs.size())
> >> +                    {
> >> +
> >>                        _remainingBufs.get(_currentListPos).mark(0);
> >> +                    }
> >> +                    int correctRead = read == -1 ? 0 : read;
> >> +                    int subRead = read(b, off+correctRead,
> >> len-correctRead);
> >> +                    if(subRead == -1)
> >> +                    {
> >> +                        return read;
> >> +                    }
> >> +                    else
> >> +                    {
> >> +                        return correctRead+subRead;
> >> +                    }
> >>                 }
> >> -
> >> -                if( !buf.hasRemaining() )
> >> +                else
> >>                 {
> >> -                    break;
> >> +                    return len;
> >>                 }
> >>             }
> >> -            else
> >> -            {
> >> -                break;
> >> -            }
> >>         }
> >>
> >> -        // if there is any data left that cannot be decoded, we
> >> store
> >> -        // it in a buffer in the session and next time this
> >> decoder is
> >> -        // invoked the session buffer gets appended to
> >> -        if ( buf.hasRemaining() )
> >> +        @Override
> >> +        public int available() throws IOException
> >>         {
> >> -            storeRemainingInSession( buf, session );
> >> +            int total = 0;
> >> +            for(int i = _currentListPos; i <
> >> _remainingBufs.size(); i++)
> >> +            {
> >> +                total += _remainingBufs.get(i).available();
> >> +            }
> >> +            return total;
> >>         }
> >> -        else
> >> +
> >> +        @Override
> >> +        public void mark(final int readlimit)
> >>         {
> >> -            removeSessionBuffer( session );
> >> +            _markPos = _currentListPos;
> >> +            final ByteArrayInputStream stream =
> >> _remainingBufs.get(_currentListPos);
> >> +            if(stream != null)
> >> +            {
> >> +                stream.mark(readlimit);
> >> +            }
> >>         }
> >> -    }
> >> -
> >> -    /**
> >> -     * Releases the cumulative buffer used by the specified
> >> <tt>session</tt>.
> >> -     * Please don't forget to call <tt>super.dispose( session
> >> )</tt> when
> >> -     * you override this method.
> >> -     */
> >> -    public void dispose( IoSession session ) throws Exception
> >> -    {
> >> -        removeSessionBuffer( session );
> >> -    }
> >>
> >> -    private void removeSessionBuffer(IoSession session)
> >> -    {
> >> -        ByteBuffer buf = ( ByteBuffer ) session.getAttribute(
> >> BUFFER );
> >> -        if( buf != null )
> >> +        @Override
> >> +        public void reset() throws IOException
> >>         {
> >> -            buf.release();
> >> -            session.removeAttribute( BUFFER );
> >> +            _currentListPos = _markPos;
> >> +            final int size = _remainingBufs.size();
> >> +            if(_currentListPos < size)
> >> +            {
> >> +                _remainingBufs.get(_currentListPos).reset();
> >> +            }
> >> +            for(int i = _currentListPos+1; i<size; i++)
> >> +            {
> >> +                _remainingBufs.get(i).reset();
> >> +            }
> >>         }
> >>     }
> >>
> >> -    private static final SimpleByteBufferAllocator
> >> SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
> >> -
> >> -    private void storeRemainingInSession(ByteBuffer buf,
> >> IoSession session)
> >> -    {
> >> -        ByteBuffer remainingBuf =
> >> SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false );
> >> -        remainingBuf.setAutoExpand( true );
> >> -        remainingBuf.put( buf );
> >> -        session.setAttribute( BUFFER, remainingBuf );
> >> -    }
> >>
> >> -    public ArrayList<AMQDataBlock>
> >> decodeBuffer(java.nio.ByteBuffer buf) throws
> >> AMQFrameDecodingException, AMQProtocolVersionException
> >> +    public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf)
> >> throws AMQFrameDecodingException, AMQProtocolVersionException,
> >> IOException
> >>     {
> >>
> >>         // get prior remaining data from accumulator
> >>         ArrayList<AMQDataBlock> dataBlocks = new
> >>         ArrayList<AMQDataBlock>();
> >> -        ByteBuffer msg;
> >> -        // if we have a session buffer, append data to that
> >> otherwise
> >> -        // use the buffer read from the network directly
> >> -        if( _remainingBuf != null )
> >> +        DataInputStream msg;
> >> +
> >> +
> >> +        ByteArrayInputStream bais = new
> >> ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(),
> >> buf.remaining());
> >> +        if(!_remainingBufs.isEmpty())
> >>         {
> >> -            _remainingBuf.put(buf);
> >> -            _remainingBuf.flip();
> >> -            msg = _remainingBuf;
> >> +            _remainingBufs.add(bais);
> >> +            msg = new DataInputStream(new
> >> RemainingByteArrayInputStream());
> >>         }
> >>         else
> >>         {
> >> -            msg = ByteBuffer.wrap(buf);
> >> +            msg = new DataInputStream(bais);
> >>         }
> >> -
> >> -        if (_expectProtocolInitiation
> >> -            || (firstDecode
> >> -                && (msg.remaining() > 0)
> >> -                && (msg.get(msg.position()) == (byte)'A')))
> >> -        {
> >> -            if (_piDecoder.decodable(msg.buf()))
> >> -            {
> >> -                dataBlocks.add(new
> >> ProtocolInitiation(msg.buf()));
> >> -            }
> >> -        }
> >> -        else
> >> +
> >> +        boolean enoughData = true;
> >> +        while (enoughData)
> >>         {
> >> -            boolean enoughData = true;
> >> -            while (enoughData)
> >> +            if(!_expectProtocolInitiation)
> >>             {
> >> -                int pos = msg.position();
> >> -
> >>                 enoughData = _dataBlockDecoder.decodable(msg);
> >> -                msg.position(pos);
> >>                 if (enoughData)
> >>                 {
> >>                     dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory,
> >>                     msg));
> >>                 }
> >> -                else
> >> +            }
> >> +            else
> >> +            {
> >> +                enoughData = _piDecoder.decodable(msg);
> >> +                if (enoughData)
> >>                 {
> >> -                    _remainingBuf =
> >> SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false);
> >> -                    _remainingBuf.setAutoExpand(true);
> >> -                    _remainingBuf.put(msg);
> >> +                    dataBlocks.add(new ProtocolInitiation(msg));
> >> +                }
> >> +
> >> +            }
> >> +
> >> +            if(!enoughData)
> >> +            {
> >> +                if(!_remainingBufs.isEmpty())
> >> +                {
> >> +
> >>                    _remainingBufs.remove(_remainingBufs.size()-1);
> >> +                    ListIterator<ByteArrayInputStream> iterator =
> >> _remainingBufs.listIterator();
> >> +                    while(iterator.hasNext() &&
> >> iterator.next().available() == 0)
> >> +                    {
> >> +                        iterator.remove();
> >> +                    }
> >> +                }
> >> +                if(bais.available()!=0)
> >> +                {
> >> +                    byte[] remaining = new
> >> byte[bais.available()];
> >> +                    bais.read(remaining);
> >> +                    _remainingBufs.add(new
> >> ByteArrayInputStream(remaining));
> >>                 }
> >>             }
> >> -        }
> >> -        if(firstDecode && dataBlocks.size() > 0)
> >> -        {
> >> -            firstDecode = false;
> >>         }
> >>         return dataBlocks;
> >>     }
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -23,7 +23,7 @@ package org.apache.qpid.configuration;
> >>  */
> >>  public class ClientProperties
> >>  {
> >> -
> >> +
> >>     /**
> >>      * Currently with Qpid it is not possible to change the client
> >>      ID.
> >>      * If one is not specified upon connection construction, an id
> >>      is generated automatically.
> >> @@ -68,38 +68,50 @@ public class ClientProperties
> >>      * by the broker in TuneOK it will be used as the heartbeat
> >>      interval.
> >>      * If not a warning will be printed and the max value
> >>      specified for
> >>      * heartbeat in TuneOK will be used
> >> -     *
> >> +     *
> >>      * The default idle timeout is set to 120 secs
> >>      */
> >>     public static final String IDLE_TIMEOUT_PROP_NAME =
> >>     "idle_timeout";
> >>     public static final long DEFAULT_IDLE_TIMEOUT = 120000;
> >> -
> >> +
> >>     public static final String HEARTBEAT = "qpid.heartbeat";
> >>     public static final int HEARTBEAT_DEFAULT = 120;
> >> -
> >> +
> >>     /**
> >>      * This value will be used to determine the default
> >>      destination syntax type.
> >>      * Currently the two types are Binding URL (java only) and the
> >>      Addressing format (used by
> >> -     * all clients).
> >> +     * all clients).
> >>      */
> >>     public static final String DEST_SYNTAX = "qpid.dest_syntax";
> >> -
> >> +
> >>     public static final String USE_LEGACY_MAP_MESSAGE_FORMAT =
> >>     "qpid.use_legacy_map_message";
> >>
> >>     public static final String AMQP_VERSION = "qpid.amqp.version";
> >> -
> >> -    private static ClientProperties _instance = new
> >> ClientProperties();
> >> -
> >> +
> >> +    public static final String QPID_VERIFY_CLIENT_ID =
> >> "qpid.verify_client_id";
> >> +
> >> +    /**
> >> +     * System properties to change the default timeout used
> >> during
> >> +     * synchronous operations.
> >> +     */
> >> +    public static final String QPID_SYNC_OP_TIMEOUT =
> >> "qpid.sync_op_timeout";
> >> +    public static final String AMQJ_DEFAULT_SYNCWRITE_TIMEOUT =
> >> "amqj.default_syncwrite_timeout";
> >> +
> >> +    /**
> >> +     * A default timeout value for synchronous operations
> >> +     */
> >> +    public static final int DEFAULT_SYNC_OPERATION_TIMEOUT =
> >> 60000;
> >> +
> >>     /*
> >> -    public static final QpidProperty<Boolean>
> >>  IGNORE_SET_CLIENTID_PROP_NAME =
> >> +    public static final QpidProperty<Boolean>
> >>  IGNORE_SET_CLIENTID_PROP_NAME =
> >>         QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID");
> >> -
> >> +
> >>     public static final QpidProperty<Boolean>
> >>     SYNC_PERSISTENT_PROP_NAME =
> >>         QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence");
> >> -
> >> -
> >> +
> >> +
> >>     public static final QpidProperty<Integer>
> >>     MAX_PREFETCH_PROP_NAME =
> >>         QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch");
> >>         */
> >> -
> >> -
> >> +
> >> +
> >>  }
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -20,7 +20,9 @@
> >>  */
> >>  package org.apache.qpid.framing;
> >>
> >> -import org.apache.mina.common.ByteBuffer;
> >> +import java.io.DataOutputStream;
> >> +import java.io.IOException;
> >> +
> >>  import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
> >>  import org.apache.qpid.AMQException;
> >>
> >> @@ -34,7 +36,7 @@ public interface AMQBody
> >>      */
> >>     public abstract int getSize();
> >>
> >> -    public void writePayload(ByteBuffer buffer);
> >> +    public void writePayload(DataOutputStream buffer) throws
> >> IOException;
> >>
> >> -    void handle(final int channelId, final
> >> AMQVersionAwareProtocolSession amqMinaProtocolSession) throws
> >> AMQException;
> >> +    void handle(final int channelId, final
> >> AMQVersionAwareProtocolSession amqProtocolSession) throws
> >> AMQException;
> >>  }
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -20,7 +20,10 @@
> >>  */
> >>  package org.apache.qpid.framing;
> >>
> >> -import org.apache.mina.common.ByteBuffer;
> >> +import java.io.DataInputStream;
> >> +import java.io.DataOutputStream;
> >> +import java.io.IOException;
> >> +
> >>
> >>  /**
> >>  * A data block represents something that has a size in bytes and
> >>  the ability to write itself to a byte
> >> @@ -39,25 +42,6 @@ public abstract class AMQDataBlock imple
> >>      * Writes the datablock to the specified buffer.
> >>      * @param buffer
> >>      */
> >> -    public abstract void writePayload(ByteBuffer buffer);
> >> -
> >> -    public ByteBuffer toByteBuffer()
> >> -    {
> >> -        final ByteBuffer buffer =
> >> ByteBuffer.allocate((int)getSize());
> >> -
> >> -        writePayload(buffer);
> >> -        buffer.flip();
> >> -        return buffer;
> >> -    }
> >> -
> >> -    public java.nio.ByteBuffer toNioByteBuffer()
> >> -    {
> >> -        final java.nio.ByteBuffer buffer =
> >> java.nio.ByteBuffer.allocate((int) getSize());
> >> -
> >> -        ByteBuffer buf = ByteBuffer.wrap(buffer);
> >> -        writePayload(buf);
> >> -        buffer.flip();
> >> -        return buffer;
> >> -    }
> >> +    public abstract void writePayload(DataOutputStream buffer)
> >> throws IOException;
> >>
> >>  }
> >>
> >> Modified:
> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
> >> URL:
> >> http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=1172657&r1=1172656&r2=1172657&view=diff
> >> ==============================================================================
> >> ---
> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
> >> (original)
> >> +++
> >> qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
> >> Mon Sep 19 15:13:18 2011
> >> @@ -20,18 +20,14 @@
> >>  */
> >>  package org.apache.qpid.framing;
> >>
> >> -import org.apache.mina.common.ByteBuffer;
> >> -import org.apache.mina.common.IoSession;
> >> -import org.apache.mina.filter.codec.ProtocolDecoderOutput;
> >> -
> >> -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
> >> -
> >>  import org.slf4j.Logger;
> >>  import org.slf4j.LoggerFactory;
> >>
> >> +import java.io.DataInputStream;
> >> +import java.io.IOException;
> >> +
> >>  public class AMQDataBlockDecoder
> >>  {
> >> -    private static final String SESSION_METHOD_BODY_FACTORY =
> >> "QPID_SESSION_METHOD_BODY_FACTORY";
> >>
> >>     private static final BodyFactory[] _bodiesSupported = new
> >>     BodyFactory[Byte.MAX_VALUE];
> >>
> >> @@ -47,27 +43,32 @@ public class AMQDataBlockDecoder
> >>     public AMQDataBlockDecoder()
> >>     { }
> >>
> >> -    public boolean decodable(java.nio.ByteBuffer in) throws
> >> AMQFrameDecodingException
> >> +    public boolean decodable(DataInputStream in) throws
> >> AMQFrameDecodingException, IOException
> >>     {
> >> -        final int remainingAfterAttributes = in.remaining() - (1
> >> + 2 + 4 + 1);
> >> +        final int remainingAfterAttributes = in.available() - (1
> >> + 2 + 4 + 1);
> >>         // type, channel, body length and end byte
> >>         if (remainingAfterAttributes < 0)
> >>         {
> >>             return false;
> >>         }
> >>
> >> -        in.position(in.position() + 1 + 2);
> >> +        in.mark(8);
> >> +        in.skip(1 + 2);
> >> +
> >> +
> >>         // Get an unsigned int, lifted from MINA ByteBuffer
> >>         getUnsignedInt()
> >> -        final long bodySize = in.getInt() & 0xffffffffL;
> >> +        final long bodySize = in.readInt() & 0xffffffffL;
> >> +
> >> +        in.reset();
> >>
> >>         return (remainingAfterAttributes >= bodySize);
> >>
> >>     }
> >>
> >> -    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory
> >> methodBodyFactory, ByteBuffer in)
> >> -        throws AMQFrameDecodingException,
> >> AMQProtocolVersionException
> >> +    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory
> >> methodBodyFactory, DataInputStream in)
> >> +            throws AMQFrameDecodingException,
> >> AMQProtocolVersionException, IOException
> >>     {
> >> -        final byte type = in.get();
> >> +        final byte type = in.readByte();
> >>
> >>         BodyFactory bodyFactory;
> >>         if (type == AMQMethodBody.TYPE)
> >> @@ -84,8 +85,8 @@ public class AMQDataBlockDecoder
> >>             throw new AMQFrameDecodingException(null, "Unsupported
> >>             frame type: " + type, null);
> >>         }
> >>
> >> -        final int channel = in.getUnsignedShort();
> >> -        final long bodySize = in.getUnsignedInt();
> >> +        final int channel = in.readUnsignedShort();
> >> +        final long bodySize =
> >> EncodingUtils.readUnsignedInteger(in);
> >>
> >>         // bodySize can be zero
> >>         if ((channel < 0) || (bodySize < 0))
> >> @@ -96,7 +97,7 @@ public class AMQDataBlockDecoder
> >>
> >>         AMQFrame frame = new AMQFrame(in, channel, bodySize,
> >>         bodyFactory);
> >>
> >> -        byte marker = in.get();
> >> +        byte marker = in.readByte();
> >>         if ((marker & 0xFF) != 0xCE)
> >>         {
> >>             throw new AMQFrameDecodingException(null, "End of
> >>             frame marker not found. Read " + marker + " length="
> >>             + bodySize
> >> @@ -106,26 +107,4 @@ public class AMQDataBlockDecoder
> >>         return frame;
> >>     }
> >>
> >> -    public void decode(IoSession session, ByteBuffer in,
> >> ProtocolDecoderOutput out) throws Exception
> >> -    {
> >> -        AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory)
> >> session.getAttribute(SESSION_METHOD_BODY_FACTORY);
> >> -        if (bodyFactory == null)
> >> -        {
> >> -            AMQVersionAwareProtocolSession protocolSession =
> >> (AMQVersionAwareProtocolSession) session.getAttachment();
> >> -            bodyFactory = new
> >> AMQMethodBodyFactory(protocolSession);
> >> -            session.setAttribute(SESSION_METHOD_BODY_FACTORY,
> >> bodyFactory);
> >> -        }
> >> -
> >> -        out.write(createAndPopulateFrame(bodyFactory, in));
> >> -    }
> >> -
> >> -    public boolean decodable(ByteBuffer msg) throws
> >> AMQFrameDecodingException
> >> -    {
> >> -        return decodable(msg.buf());
> >> -    }
> >> -
> >> -    public AMQDataBlock
> >> createAndPopulateFrame(AMQMethodBodyFactory factory,
> >> java.nio.ByteBuffer msg) throws AMQProtocolVersionException,
> >> AMQFrameDecodingException
> >> -    {
> >> -        return createAndPopulateFrame(factory,
> >> ByteBuffer.wrap(msg));
> >> -    }
> >>  }
> >>
> >>
> >>
> >> ---------------------------------------------------------------------
> >> Apache Qpid - AMQP Messaging Implementation
> >> Project:      http://qpid.apache.org
> >> Use/Interact: mailto:commits-subscribe@qpid.apache.org
> >>
> >>
> >
> 

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Mime
View raw message