001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.transport.tcp;
019
020import java.io.FilterOutputStream;
021import java.io.IOException;
022import java.io.OutputStream;
023
024/**
025 * An optimized buffered outputstream for Tcp
026 * 
027 * 
028 */
029
030public class TcpBufferedOutputStream extends FilterOutputStream implements TimeStampStream {
031    private static final int BUFFER_SIZE = 8192;
032    private byte[] buffer;
033    private int bufferlen;
034    private int count;
035    private volatile long writeTimestamp = -1;//concurrent reads of this value
036    
037
038    /**
039     * Constructor
040     * 
041     * @param out
042     */
043    public TcpBufferedOutputStream(OutputStream out) {
044        this(out, BUFFER_SIZE);
045    }
046
047    /**
048     * Creates a new buffered output stream to write data to the specified
049     * underlying output stream with the specified buffer size.
050     * 
051     * @param out the underlying output stream.
052     * @param size the buffer size.
053     * @throws IllegalArgumentException if size <= 0.
054     */
055    public TcpBufferedOutputStream(OutputStream out, int size) {
056        super(out);
057        if (size <= 0) {
058            throw new IllegalArgumentException("Buffer size <= 0");
059        }
060        buffer = new byte[size];
061        bufferlen = size;
062    }
063
064    /**
065     * write a byte on to the stream
066     * 
067     * @param b - byte to write
068     * @throws IOException
069     */
070    public void write(int b) throws IOException {
071        if ((bufferlen - count) < 1) {
072            flush();
073        }
074        buffer[count++] = (byte)b;
075    }
076
077    /**
078     * write a byte array to the stream
079     * 
080     * @param b the byte buffer
081     * @param off the offset into the buffer
082     * @param len the length of data to write
083     * @throws IOException
084     */
085    public void write(byte b[], int off, int len) throws IOException {
086        if (b != null) {
087            if ((bufferlen - count) < len) {
088                flush();
089            }
090            if (buffer.length >= len) {
091                System.arraycopy(b, off, buffer, count, len);
092                count += len;
093            } else {
094                try {
095                    writeTimestamp = System.currentTimeMillis();
096                    out.write(b, off, len);
097                } finally {
098                    writeTimestamp = -1;
099                }
100            }
101        }
102    }
103
104    /**
105     * flush the data to the output stream This doesn't call flush on the
106     * underlying outputstream, because Tcp is particularly efficent at doing
107     * this itself ....
108     * 
109     * @throws IOException
110     */
111    public void flush() throws IOException {
112        if (count > 0 && out != null) {
113            try {
114                writeTimestamp = System.currentTimeMillis();
115                out.write(buffer, 0, count);
116            } finally {
117                writeTimestamp = -1;
118            }
119            count = 0;
120        }
121    }
122
123    /**
124     * close this stream
125     * 
126     * @throws IOException
127     */
128    public void close() throws IOException {
129        super.close();
130    }
131
132    /* (non-Javadoc)
133     * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting()
134     */
135    public boolean isWriting() {
136        return writeTimestamp > 0;
137    }
138    
139    /* (non-Javadoc)
140     * @see org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp()
141     */
142    public long getWriteTimestamp() {
143        return writeTimestamp;
144    }
145
146}