Author: chirino Date: Mon Jun 1 04:07:14 2009 New Revision: 780559 URL: http://svn.apache.org/viewvc?rev=780559&view=rev Log: adding missing classes and resources.. Added: activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/ activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/ activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/ activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/ activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/ activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/ activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2 activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/MonitoredTransport.java activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/ activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/ activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/ activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/ activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/ activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/transport/ activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/transport/pipe activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java (with props) activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java (with props) Added: activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto?rev=780559&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto (added) +++ activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto Mon Jun 1 04:07:14 2009 @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.flow.ProtoWireFormatFactory Added: activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2 URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2?rev=780559&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2 (added) +++ activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2 Mon Jun 1 04:07:14 2009 @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.flow.Proto2WireFormatFactory Added: activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test?rev=780559&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test (added) +++ activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test Mon Jun 1 04:07:14 2009 @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.flow.TestWireFormatFactory Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/MonitoredTransport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/MonitoredTransport.java?rev=780559&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/MonitoredTransport.java (added) +++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/MonitoredTransport.java Mon Jun 1 04:07:14 2009 @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport; + +public interface MonitoredTransport { + + long getWriteTimestamp(); + + boolean isWriting(); + +} Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java?rev=780559&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java (added) +++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java Mon Jun 1 04:07:14 2009 @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport; + +import java.io.IOException; +import java.net.Socket; +import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * This filter implements write timeouts for socket write operations. + * When using blocking IO, the Java implementation doesn't have an explicit flag + * to set a timeout, and can cause operations to block forever (or until the TCP stack implementation times out the retransmissions, + * which is usually around 13-30 minutes).
+ * To enable this transport, in the transport URI, simpley add
+ * transport.soWriteTimeout=.
+ * For example (15 second timeout on write operations to the socket):
+ *

+ * <transportConnector 
+ *     name="tcp1" 
+ *     uri="tcp://127.0.0.1:61616?transport.soTimeout=10000&transport.soWriteTimeout=15000"
+ * />
+ * 

+ * For example (enable default timeout on the socket):
+ *

+ * <transportConnector 
+ *     name="tcp1" 
+ *     uri="tcp://127.0.0.1:61616?transport.soTimeout=10000&transport.soWriteTimeout=15000"
+ * />
+ * 
+ * @author Filip Hanik + * + */ +public class WriteTimeoutFilter extends TransportFilter { + + private static final Log LOG = LogFactory.getLog(WriteTimeoutFilter.class); + protected static ConcurrentLinkedQueue writers = new ConcurrentLinkedQueue(); + protected static AtomicInteger messageCounter = new AtomicInteger(0); + protected static TimeoutThread timeoutThread = new TimeoutThread(); + + protected static long sleep = 5000l; + + protected long writeTimeout = -1; + + public WriteTimeoutFilter(Transport next) { + super(next); + } + + @Override + public void oneway(Object command) throws IOException { + try { + registerWrite(this); + super.oneway(command); + } catch (IOException x) { + throw x; + } finally { + deRegisterWrite(this,false,null); + } + } + + public long getWriteTimeout() { + return writeTimeout; + } + + public void setWriteTimeout(long writeTimeout) { + this.writeTimeout = writeTimeout; + } + + public static long getSleep() { + return sleep; + } + + public static void setSleep(long sleep) { + WriteTimeoutFilter.sleep = sleep; + } + + + protected MonitoredTransport getWriter() { + return next.narrow(MonitoredTransport.class); + } + + protected Socket getSocket() { + return next.narrow(Socket.class); + } + + protected static void registerWrite(WriteTimeoutFilter filter) { + writers.add(filter); + } + + protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) { + boolean result = writers.remove(filter); + if (result) { + if (fail) { + String message = "Forced write timeout for:"+filter.getNext().getRemoteAddress(); + LOG.warn(message); + Socket sock = filter.getSocket(); + if (sock==null) { + LOG.error("Destination socket is null, unable to close socket.("+message+")"); + } else { + try { + sock.close(); + }catch (IOException ignore) { + } + } + } + } + return result; + } + + @Override + public void start() throws Exception { + super.start(); + } + + @Override + public void stop() throws Exception { + super.stop(); + } + + protected static class TimeoutThread extends Thread { + static AtomicInteger instance = new AtomicInteger(0); + boolean run = true; + public TimeoutThread() { + setName("WriteTimeoutFilter-Timeout-"+instance.incrementAndGet()); + setDaemon(true); + setPriority(Thread.MIN_PRIORITY); + start(); + } + + + public void run() { + while (run) { + boolean error = false; + try { + if (!interrupted()) { + Iterator filters = writers.iterator(); + while (run && filters.hasNext()) { + WriteTimeoutFilter filter = filters.next(); + if (filter.getWriteTimeout()<=0) continue; //no timeout set + long writeStart = filter.getWriter().getWriteTimestamp(); + long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1; + if (delta>filter.getWriteTimeout()) { + WriteTimeoutFilter.deRegisterWrite(filter, true,null); + }//if timeout + }//while + }//if interrupted + try { + Thread.sleep(getSleep()); + error = false; + } catch (InterruptedException x) { + //do nothing + } + }catch (Throwable t) { //make sure this thread never dies + if (!error) { //use error flag to avoid filling up the logs + LOG.error("WriteTimeout thread unable validate existing sockets.",t); + error = true; + } + } + } + } + } + +} Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/transport/pipe URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/transport/pipe?rev=780559&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/transport/pipe (added) +++ activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/transport/pipe Mon Jun 1 04:07:14 2009 @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.pipe.PipeTransportFactory Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java?rev=780559&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java (added) +++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java Mon Jun 1 04:07:14 2009 @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import java.io.DataInput; +import java.io.IOException; +import java.io.InputStream; +import java.io.UTFDataFormatException; + +/** + * Optimized ByteArrayInputStream that can be used more than once + * + * @version $Revision: 1.1.1.1 $ + */ +public final class DataByteArrayInputStream extends InputStream implements DataInput { + private byte[] buf; + private int pos; + private int offset; + + /** + * Creates a StoreByteArrayInputStream. + * + * @param buf the input buffer. + */ + public DataByteArrayInputStream(byte buf[]) { + this.buf = buf; + this.pos = 0; + this.offset = 0; + } + + /** + * Creates a StoreByteArrayInputStream. + * + * @param sequence the input buffer. + */ + public DataByteArrayInputStream(ByteSequence sequence) { + this.buf = sequence.getData(); + this.offset = sequence.getOffset(); + this.pos = this.offset; + } + + /** + * Creates WireByteArrayInputStream with a minmalist byte + * array + */ + public DataByteArrayInputStream() { + this(new byte[0]); + } + + /** + * @return the size + */ + public int size() { + return pos - offset; + } + + /** + * @return the underlying data array + */ + public byte[] getRawData() { + return buf; + } + + /** + * reset the StoreByteArrayInputStream to use an new byte + * array + * + * @param newBuff + */ + public void restart(byte[] newBuff) { + buf = newBuff; + pos = 0; + } + + /** + * reset the StoreByteArrayInputStream to use an new + * ByteSequence + * + * @param sequence + */ + public void restart(ByteSequence sequence) { + this.buf = sequence.getData(); + this.pos = sequence.getOffset(); + } + + /** + * re-start the input stream - reusing the current buffer + * + * @param size + */ + public void restart(int size) { + if (buf == null || buf.length < size) { + buf = new byte[size]; + } + restart(buf); + } + + /** + * Reads the next byte of data from this input stream. The value byte is + * returned as an int in the range 0 to + * 255. If no byte is available because the end of the + * stream has been reached, the value -1 is returned. + *

+ * This read method cannot block. + * + * @return the next byte of data, or -1 if the end of the + * stream has been reached. + */ + public int read() { + return (pos < buf.length) ? (buf[pos++] & 0xff) : -1; + } + + /** + * Reads up to len bytes of data into an array of bytes from + * this input stream. + * + * @param b the buffer into which the data is read. + * @param off the start offset of the data. + * @param len the maximum number of bytes read. + * @return the total number of bytes read into the buffer, or + * -1 if there is no more data because the end of the + * stream has been reached. + */ + public int read(byte b[], int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (pos >= buf.length) { + return -1; + } + if (pos + len > buf.length) { + len = buf.length - pos; + } + if (len <= 0) { + return 0; + } + System.arraycopy(buf, pos, b, off, len); + pos += len; + return len; + } + + /** + * @return the number of bytes that can be read from the input stream + * without blocking. + */ + public int available() { + return buf.length - pos; + } + + public void readFully(byte[] b) { + read(b, 0, b.length); + } + + public void readFully(byte[] b, int off, int len) { + read(b, off, len); + } + + public int skipBytes(int n) { + if (pos + n > buf.length) { + n = buf.length - pos; + } + if (n < 0) { + return 0; + } + pos += n; + return n; + } + + public boolean readBoolean() { + return read() != 0; + } + + public byte readByte() { + return (byte)read(); + } + + public int readUnsignedByte() { + return read(); + } + + public short readShort() { + int ch1 = read(); + int ch2 = read(); + return (short)((ch1 << 8) + (ch2 << 0)); + } + + public int readUnsignedShort() { + int ch1 = read(); + int ch2 = read(); + return (ch1 << 8) + (ch2 << 0); + } + + public char readChar() { + int ch1 = read(); + int ch2 = read(); + return (char)((ch1 << 8) + (ch2 << 0)); + } + + public int readInt() { + int ch1 = read(); + int ch2 = read(); + int ch3 = read(); + int ch4 = read(); + return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0); + } + + public long readLong() { + long rc = ((long)buf[pos++] << 56) + ((long)(buf[pos++] & 255) << 48) + ((long)(buf[pos++] & 255) << 40) + ((long)(buf[pos++] & 255) << 32); + return rc + ((long)(buf[pos++] & 255) << 24) + ((buf[pos++] & 255) << 16) + ((buf[pos++] & 255) << 8) + ((buf[pos++] & 255) << 0); + } + + public float readFloat() throws IOException { + return Float.intBitsToFloat(readInt()); + } + + public double readDouble() throws IOException { + return Double.longBitsToDouble(readLong()); + } + + public String readLine() { + int start = pos; + while (pos < buf.length) { + int c = read(); + if (c == '\n') { + break; + } + if (c == '\r') { + c = read(); + if (c != '\n' && c != -1) { + pos--; + } + break; + } + } + return new String(buf, start, pos); + } + + public String readUTF() throws IOException { + int length = readUnsignedShort(); + char[] characters = new char[length]; + int c; + int c2; + int c3; + int count = 0; + int total = pos + length; + while (pos < total) { + c = (int)buf[pos] & 0xff; + if (c > 127) { + break; + } + pos++; + characters[count++] = (char)c; + } + while (pos < total) { + c = (int)buf[pos] & 0xff; + switch (c >> 4) { + case 0: + case 1: + case 2: + case 3: + case 4: + case 5: + case 6: + case 7: + pos++; + characters[count++] = (char)c; + break; + case 12: + case 13: + pos += 2; + if (pos > total) { + throw new UTFDataFormatException("bad string"); + } + c2 = (int)buf[pos - 1]; + if ((c2 & 0xC0) != 0x80) { + throw new UTFDataFormatException("bad string"); + } + characters[count++] = (char)(((c & 0x1F) << 6) | (c2 & 0x3F)); + break; + case 14: + pos += 3; + if (pos > total) { + throw new UTFDataFormatException("bad string"); + } + c2 = (int)buf[pos - 2]; + c3 = (int)buf[pos - 1]; + if (((c2 & 0xC0) != 0x80) || ((c3 & 0xC0) != 0x80)) { + throw new UTFDataFormatException("bad string"); + } + characters[count++] = (char)(((c & 0x0F) << 12) | ((c2 & 0x3F) << 6) | ((c3 & 0x3F) << 0)); + break; + default: + throw new UTFDataFormatException("bad string"); + } + } + return new String(characters, 0, count); + } +} Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java?rev=780559&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java (added) +++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java Mon Jun 1 04:07:14 2009 @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UTFDataFormatException; + +/** + * Optimized ByteArrayOutputStream + * + * @version $Revision: 1.1.1.1 $ + */ +public final class DataByteArrayOutputStream extends OutputStream implements DataOutput { + private static final int DEFAULT_SIZE = 2048; + private byte buf[]; + private int pos; + + /** + * Creates a new byte array output stream, with a buffer capacity of the + * specified size, in bytes. + * + * @param size the initial size. + * @exception IllegalArgumentException if size is negative. + */ + public DataByteArrayOutputStream(int size) { + if (size < 0) { + throw new IllegalArgumentException("Invalid size: " + size); + } + buf = new byte[size]; + } + + /** + * Creates a new byte array output stream. + */ + public DataByteArrayOutputStream() { + this(DEFAULT_SIZE); + } + + /** + * start using a fresh byte array + * + * @param size + */ + public void restart(int size) { + buf = new byte[size]; + pos = 0; + } + + /** + * start using a fresh byte array + */ + public void restart() { + restart(DEFAULT_SIZE); + } + + /** + * Get a ByteSequence from the stream + * + * @return the byte sequence + */ + public ByteSequence toByteSequence() { + return new ByteSequence(buf, 0, pos); + } + + /** + * Writes the specified byte to this byte array output stream. + * + * @param b the byte to be written. + */ + public void write(int b) { + int newcount = pos + 1; + ensureEnoughBuffer(newcount); + buf[pos] = (byte)b; + pos = newcount; + } + + /** + * Writes len bytes from the specified byte array starting at + * offset off to this byte array output stream. + * + * @param b the data. + * @param off the start offset in the data. + * @param len the number of bytes to write. + */ + public void write(byte b[], int off, int len) { + if (len == 0) { + return; + } + int newcount = pos + len; + ensureEnoughBuffer(newcount); + System.arraycopy(b, off, buf, pos, len); + pos = newcount; + } + + /** + * @return the underlying byte[] buffer + */ + public byte[] getData() { + return buf; + } + + /** + * reset the output stream + */ + public void reset() { + pos = 0; + } + + /** + * Set the current position for writing + * + * @param offset + */ + public void position(int offset) { + ensureEnoughBuffer(offset); + pos = offset; + } + + public int size() { + return pos; + } + + public void writeBoolean(boolean v) { + ensureEnoughBuffer(pos + 1); + buf[pos++] = (byte)(v ? 1 : 0); + } + + public void writeByte(int v) { + ensureEnoughBuffer(pos + 1); + buf[pos++] = (byte)(v >>> 0); + } + + public void writeShort(int v) { + ensureEnoughBuffer(pos + 2); + buf[pos++] = (byte)(v >>> 8); + buf[pos++] = (byte)(v >>> 0); + } + + public void writeChar(int v) { + ensureEnoughBuffer(pos + 2); + buf[pos++] = (byte)(v >>> 8); + buf[pos++] = (byte)(v >>> 0); + } + + public void writeInt(int v) { + ensureEnoughBuffer(pos + 4); + buf[pos++] = (byte)(v >>> 24); + buf[pos++] = (byte)(v >>> 16); + buf[pos++] = (byte)(v >>> 8); + buf[pos++] = (byte)(v >>> 0); + } + + public void writeLong(long v) { + ensureEnoughBuffer(pos + 8); + buf[pos++] = (byte)(v >>> 56); + buf[pos++] = (byte)(v >>> 48); + buf[pos++] = (byte)(v >>> 40); + buf[pos++] = (byte)(v >>> 32); + buf[pos++] = (byte)(v >>> 24); + buf[pos++] = (byte)(v >>> 16); + buf[pos++] = (byte)(v >>> 8); + buf[pos++] = (byte)(v >>> 0); + } + + public void writeFloat(float v) throws IOException { + writeInt(Float.floatToIntBits(v)); + } + + public void writeDouble(double v) throws IOException { + writeLong(Double.doubleToLongBits(v)); + } + + public void writeBytes(String s) { + int length = s.length(); + for (int i = 0; i < length; i++) { + write((byte)s.charAt(i)); + } + } + + public void writeChars(String s) { + int length = s.length(); + for (int i = 0; i < length; i++) { + int c = s.charAt(i); + write((c >>> 8) & 0xFF); + write((c >>> 0) & 0xFF); + } + } + + public void writeUTF(String str) throws IOException { + int strlen = str.length(); + int encodedsize = 0; + int c; + for (int i = 0; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + encodedsize++; + } else if (c > 0x07FF) { + encodedsize += 3; + } else { + encodedsize += 2; + } + } + if (encodedsize > 65535) { + throw new UTFDataFormatException("encoded string too long: " + encodedsize + " bytes"); + } + ensureEnoughBuffer(pos + encodedsize + 2); + writeShort(encodedsize); + int i = 0; + for (i = 0; i < strlen; i++) { + c = str.charAt(i); + if (!((c >= 0x0001) && (c <= 0x007F))) { + break; + } + buf[pos++] = (byte)c; + } + for (; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + buf[pos++] = (byte)c; + } else if (c > 0x07FF) { + buf[pos++] = (byte)(0xE0 | ((c >> 12) & 0x0F)); + buf[pos++] = (byte)(0x80 | ((c >> 6) & 0x3F)); + buf[pos++] = (byte)(0x80 | ((c >> 0) & 0x3F)); + } else { + buf[pos++] = (byte)(0xC0 | ((c >> 6) & 0x1F)); + buf[pos++] = (byte)(0x80 | ((c >> 0) & 0x3F)); + } + } + } + + private void ensureEnoughBuffer(int newcount) { + if (newcount > buf.length) { + byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)]; + System.arraycopy(buf, 0, newbuf, 0, pos); + buf = newbuf; + } + } +} Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java ------------------------------------------------------------------------------ svn:executable = *