001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.tcp;
018    
019    import java.io.FilterInputStream;
020    import java.io.IOException;
021    import java.io.InputStream;
022    
023    /**
024     * An optimized buffered input stream for Tcp
025     * 
026     * 
027     */
028    public class TcpBufferedInputStream extends FilterInputStream {
029        private static final int DEFAULT_BUFFER_SIZE = 8192;
030        protected byte internalBuffer[];
031        protected int count;
032        protected int position;
033    
034        public TcpBufferedInputStream(InputStream in) {
035            this(in, DEFAULT_BUFFER_SIZE);
036        }
037    
038        public TcpBufferedInputStream(InputStream in, int size) {
039            super(in);
040            if (size <= 0) {
041                throw new IllegalArgumentException("Buffer size <= 0");
042            }
043            internalBuffer = new byte[size];
044        }
045    
046        protected void fill() throws IOException {
047            byte[] buffer = internalBuffer;
048            count = 0;
049            position = 0;
050            int n = in.read(buffer, position, buffer.length - position);
051            if (n > 0) {
052                count = n + position;
053            }
054        }
055    
056        public int read() throws IOException {
057            if (position >= count) {
058                fill();
059                if (position >= count) {
060                    return -1;
061                }
062            }
063            return internalBuffer[position++] & 0xff;
064        }
065    
066        private int readStream(byte[] b, int off, int len) throws IOException {
067            int avail = count - position;
068            if (avail <= 0) {
069                if (len >= internalBuffer.length) {
070                    return in.read(b, off, len);
071                }
072                fill();
073                avail = count - position;
074                if (avail <= 0) {
075                    return -1;
076                }
077            }
078            int cnt = (avail < len) ? avail : len;
079            System.arraycopy(internalBuffer, position, b, off, cnt);
080            position += cnt;
081            return cnt;
082        }
083    
084        public int read(byte b[], int off, int len) throws IOException {
085            if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
086                throw new IndexOutOfBoundsException();
087            } else if (len == 0) {
088                return 0;
089            }
090            int n = 0;
091            for (;;) {
092                int nread = readStream(b, off + n, len - n);
093                if (nread <= 0) {
094                    return (n == 0) ? nread : n;
095                }
096                n += nread;
097                if (n >= len) {
098                    return n;
099                }
100                // if not closed but no bytes available, return
101                InputStream input = in;
102                if (input != null && input.available() <= 0) {
103                    return n;
104                }
105            }
106        }
107    
108        public long skip(long n) throws IOException {
109            if (n <= 0) {
110                return 0;
111            }
112            long avail = count - position;
113            if (avail <= 0) {
114                return in.skip(n);
115            }
116            long skipped = (avail < n) ? avail : n;
117            position += skipped;
118            return skipped;
119        }
120    
121        public int available() throws IOException {
122            return in.available() + (count - position);
123        }
124    
125        public boolean markSupported() {
126            return false;
127        }
128    
129        public void close() throws IOException {
130            if (in != null) {
131                in.close();
132            }
133        }
134    }