001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.nio;
018    
019    import java.io.EOFException;
020    import java.io.IOException;
021    import java.io.InterruptedIOException;
022    import java.io.OutputStream;
023    import java.nio.ByteBuffer;
024    import java.nio.channels.WritableByteChannel;
025    
026    import javax.net.ssl.SSLEngine;
027    
028    import org.apache.activemq.transport.tcp.TimeStampStream;
029    
030    /**
031     * An optimized buffered outputstream for Tcp
032     */
033    public class NIOOutputStream extends OutputStream implements TimeStampStream {
034    
035        private static final int BUFFER_SIZE = 8192;
036    
037        private final WritableByteChannel out;
038        private final byte[] buffer;
039        private final ByteBuffer byteBuffer;
040    
041        private int count;
042        private boolean closed;
043        private volatile long writeTimestamp = -1;//concurrent reads of this value
044    
045        private SSLEngine engine;
046    
047        /**
048         * Constructor
049         *
050         * @param out
051         */
052        public NIOOutputStream(WritableByteChannel out) {
053            this(out, BUFFER_SIZE);
054        }
055    
056        /**
057         * Creates a new buffered output stream to write data to the specified
058         * underlying output stream with the specified buffer size.
059         *
060         * @param out the underlying output stream.
061         * @param size the buffer size.
062         * @throws IllegalArgumentException if size <= 0.
063         */
064        public NIOOutputStream(WritableByteChannel out, int size) {
065            this.out = out;
066            if (size <= 0) {
067                throw new IllegalArgumentException("Buffer size <= 0");
068            }
069            buffer = new byte[size];
070            byteBuffer = ByteBuffer.wrap(buffer);
071        }
072    
073        /**
074         * write a byte on to the stream
075         *
076         * @param b - byte to write
077         * @throws IOException
078         */
079        public void write(int b) throws IOException {
080            checkClosed();
081            if (availableBufferToWrite() < 1) {
082                flush();
083            }
084            buffer[count++] = (byte)b;
085        }
086    
087        /**
088         * write a byte array to the stream
089         *
090         * @param b the byte buffer
091         * @param off the offset into the buffer
092         * @param len the length of data to write
093         * @throws IOException
094         */
095        public void write(byte b[], int off, int len) throws IOException {
096            checkClosed();
097            if (availableBufferToWrite() < len) {
098                flush();
099            }
100            if (buffer.length >= len) {
101                System.arraycopy(b, off, buffer, count, len);
102                count += len;
103            } else {
104                write(ByteBuffer.wrap(b, off, len));
105            }
106        }
107    
108        /**
109         * flush the data to the output stream This doesn't call flush on the
110         * underlying outputstream, because Tcp is particularly efficent at doing
111         * this itself ....
112         *
113         * @throws IOException
114         */
115        public void flush() throws IOException {
116            if (count > 0 && out != null) {
117                byteBuffer.position(0);
118                byteBuffer.limit(count);
119                write(byteBuffer);
120                count = 0;
121            }
122        }
123    
124        /**
125         * close this stream
126         *
127         * @throws IOException
128         */
129        public void close() throws IOException {
130            super.close();
131            if (engine != null) {
132                engine.closeOutbound();
133            }
134            closed = true;
135        }
136    
137        /**
138         * Checks that the stream has not been closed
139         *
140         * @throws IOException
141         */
142        protected void checkClosed() throws IOException {
143            if (closed) {
144                throw new EOFException("Cannot write to the stream any more it has already been closed");
145            }
146        }
147    
148        /**
149         * @return the amount free space in the buffer
150         */
151        private int availableBufferToWrite() {
152            return buffer.length - count;
153        }
154    
155        protected void write(ByteBuffer data) throws IOException {
156            ByteBuffer plain;
157            if (engine != null) {
158                plain = ByteBuffer.allocate(engine.getSession().getPacketBufferSize());
159                plain.clear();
160                engine.wrap(data, plain);
161                plain.flip();
162            }  else {
163                plain = data;
164            }
165    
166            int remaining = plain.remaining();
167            int lastRemaining = remaining - 1;
168            long delay = 1;
169            try {
170                writeTimestamp = System.currentTimeMillis();
171                while (remaining > 0) {
172    
173                    // We may need to do a little bit of sleeping to avoid a busy loop.
174                    // Slow down if no data was written out..
175                    if (remaining == lastRemaining) {
176                        try {
177                            // Use exponential rollback to increase sleep time.
178                            Thread.sleep(delay);
179                            delay *= 2;
180                            if (delay > 1000) {
181                                delay = 1000;
182                            }
183                        } catch (InterruptedException e) {
184                            throw new InterruptedIOException();
185                        }
186                    } else {
187                        delay = 1;
188                    }
189                    lastRemaining = remaining;
190    
191                    // Since the write is non-blocking, all the data may not have been
192                    // written.
193                    out.write(plain);
194                    remaining = data.remaining();
195    
196                    // if the data buffer was larger than the packet buffer we might need to
197                    // wrap more packets until we reach the end of data, but only when plain
198                    // has no more space since we are non-blocking and a write might not have
199                    // written anything.
200                    if (engine != null && data.hasRemaining() && !plain.hasRemaining()) {
201                        plain.clear();
202                        engine.wrap(data, plain);
203                        plain.flip();
204                    }
205                }
206            } finally {
207                writeTimestamp = -1;
208            }
209        }
210    
211    
212        /* (non-Javadoc)
213         * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting()
214         */
215        public boolean isWriting() {
216            return writeTimestamp > 0;
217        }
218    
219        /* (non-Javadoc)
220         * @see org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp()
221         */
222        public long getWriteTimestamp() {
223            return writeTimestamp;
224        }
225    
226        public void setEngine(SSLEngine engine) {
227            this.engine = engine;
228        }
229    }