001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.activemq.transport.nio;
019    
020    import java.io.IOException;
021    import java.io.InputStream;
022    import java.nio.ByteBuffer;
023    import java.nio.channels.Channel;
024    import java.nio.channels.ClosedChannelException;
025    import java.nio.channels.ReadableByteChannel;
026    import java.nio.channels.SelectionKey;
027    import java.nio.channels.Selector;
028    import java.nio.channels.SocketChannel;
029    
030    /**
031     * Implementation of InputStream using Java NIO channel,direct buffer and
032     * Selector
033     */
034    public class NIOBufferedInputStream extends InputStream {
035    
036        private final static int BUFFER_SIZE = 8192;
037    
038        private SocketChannel sc = null;
039    
040        private ByteBuffer bb = null;
041    
042        private Selector rs = null;
043    
044        public NIOBufferedInputStream(ReadableByteChannel channel, int size)
045                throws ClosedChannelException, IOException {
046    
047            if (size <= 0) {
048                throw new IllegalArgumentException("Buffer size <= 0");
049            }
050    
051            this.bb = ByteBuffer.allocateDirect(size);
052            this.sc = (SocketChannel) channel;
053    
054            this.sc.configureBlocking(false);
055    
056            this.rs = Selector.open();
057    
058            sc.register(rs, SelectionKey.OP_READ);
059    
060            bb.position(0);
061            bb.limit(0);
062        }
063    
064        public NIOBufferedInputStream(ReadableByteChannel channel)
065                throws ClosedChannelException, IOException {
066            this(channel, BUFFER_SIZE);
067        }
068    
069        public int available() throws IOException {
070            if (!rs.isOpen())
071                throw new IOException("Input Stream Closed");
072    
073            return bb.remaining();
074        }
075    
076        public void close() throws IOException {
077            if (rs.isOpen()) {
078                rs.close();
079    
080                if (sc.isOpen()) {
081                    sc.socket().shutdownInput();
082                    sc.socket().close();
083                }
084    
085                bb = null;
086                sc = null;
087            }
088        }
089    
090        public int read() throws IOException {
091            if (!rs.isOpen())
092                throw new IOException("Input Stream Closed");
093    
094            if (!bb.hasRemaining()) {
095                try {
096                    fill(1);
097                } catch (ClosedChannelException e) {
098                    close();
099                    return -1;
100                }
101            }
102    
103            return (bb.get() & 0xFF);
104        }
105    
106        public int read(byte[] b, int off, int len) throws IOException {
107            int bytesCopied = -1;
108    
109            if (!rs.isOpen())
110                throw new IOException("Input Stream Closed");
111    
112            while (bytesCopied == -1) {
113                if (bb.hasRemaining()) {
114                    bytesCopied = (len < bb.remaining() ? len : bb.remaining());
115                    bb.get(b, off, bytesCopied);
116                } else {
117                    try {
118                        fill(1);
119                    } catch (ClosedChannelException e) {
120                        close();
121                        return -1;
122                    }
123                }
124            }
125    
126            return bytesCopied;
127        }
128    
129        public long skip(long n) throws IOException {
130            long skiped = 0;
131    
132            if (!rs.isOpen())
133                throw new IOException("Input Stream Closed");
134    
135            while (n > 0) {
136                if (n <= bb.remaining()) {
137                    skiped += n;
138                    bb.position(bb.position() + (int) n);
139                    n = 0;
140                } else {
141                    skiped += bb.remaining();
142                    n -= bb.remaining();
143    
144                    bb.position(bb.limit());
145    
146                    try {
147                        fill((int) n);
148                    } catch (ClosedChannelException e) {
149                        close();
150                        return skiped;
151                    }
152                }
153            }
154    
155            return skiped;
156        }
157    
158        private void fill(int n) throws IOException, ClosedChannelException {
159            int bytesRead = -1;
160    
161            if ((n <= 0) || (n <= bb.remaining()))
162                return;
163    
164            bb.compact();
165    
166            n = (bb.remaining() < n ? bb.remaining() : n);
167    
168            for (;;) {
169                bytesRead = sc.read(bb);
170    
171                if (bytesRead == -1)
172                    throw new ClosedChannelException();
173    
174                n -= bytesRead;
175    
176                if (n <= 0)
177                    break;
178    
179                rs.select(0);
180                rs.selectedKeys().clear();
181            }
182    
183            bb.flip();
184        }
185    }