001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.transport.tcp;
019    
020    import java.io.FilterOutputStream;
021    import java.io.IOException;
022    import java.io.OutputStream;
023    
024    /**
025     * An optimized buffered outputstream for Tcp
026     * 
027     * 
028     */
029    
030    public class TcpBufferedOutputStream extends FilterOutputStream implements TimeStampStream {
031        private static final int BUFFER_SIZE = 8192;
032        private byte[] buffer;
033        private int bufferlen;
034        private int count;
035        private volatile long writeTimestamp = -1;//concurrent reads of this value
036        
037    
038        /**
039         * Constructor
040         * 
041         * @param out
042         */
043        public TcpBufferedOutputStream(OutputStream out) {
044            this(out, BUFFER_SIZE);
045        }
046    
047        /**
048         * Creates a new buffered output stream to write data to the specified
049         * underlying output stream with the specified buffer size.
050         * 
051         * @param out the underlying output stream.
052         * @param size the buffer size.
053         * @throws IllegalArgumentException if size <= 0.
054         */
055        public TcpBufferedOutputStream(OutputStream out, int size) {
056            super(out);
057            if (size <= 0) {
058                throw new IllegalArgumentException("Buffer size <= 0");
059            }
060            buffer = new byte[size];
061            bufferlen = size;
062        }
063    
064        /**
065         * write a byte on to the stream
066         * 
067         * @param b - byte to write
068         * @throws IOException
069         */
070        public void write(int b) throws IOException {
071            if ((bufferlen - count) < 1) {
072                flush();
073            }
074            buffer[count++] = (byte)b;
075        }
076    
077        /**
078         * write a byte array to the stream
079         * 
080         * @param b the byte buffer
081         * @param off the offset into the buffer
082         * @param len the length of data to write
083         * @throws IOException
084         */
085        public void write(byte b[], int off, int len) throws IOException {
086            if (b != null) {
087                if ((bufferlen - count) < len) {
088                    flush();
089                }
090                if (buffer.length >= len) {
091                    System.arraycopy(b, off, buffer, count, len);
092                    count += len;
093                } else {
094                    try {
095                        writeTimestamp = System.currentTimeMillis();
096                        out.write(b, off, len);
097                    } finally {
098                        writeTimestamp = -1;
099                    }
100                }
101            }
102        }
103    
104        /**
105         * flush the data to the output stream This doesn't call flush on the
106         * underlying outputstream, because Tcp is particularly efficent at doing
107         * this itself ....
108         * 
109         * @throws IOException
110         */
111        public void flush() throws IOException {
112            if (count > 0 && out != null) {
113                try {
114                    writeTimestamp = System.currentTimeMillis();
115                    out.write(buffer, 0, count);
116                } finally {
117                    writeTimestamp = -1;
118                }
119                count = 0;
120            }
121        }
122    
123        /**
124         * close this stream
125         * 
126         * @throws IOException
127         */
128        public void close() throws IOException {
129            super.close();
130        }
131    
132        /* (non-Javadoc)
133         * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting()
134         */
135        public boolean isWriting() {
136            return writeTimestamp > 0;
137        }
138        
139        /* (non-Javadoc)
140         * @see org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp()
141         */
142        public long getWriteTimestamp() {
143            return writeTimestamp;
144        }
145    
146    }