001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.nio;
018    
019    import java.io.DataInputStream;
020    import java.io.DataOutputStream;
021    import java.io.EOFException;
022    import java.io.IOException;
023    import java.net.Socket;
024    import java.net.URI;
025    import java.net.UnknownHostException;
026    import java.nio.ByteBuffer;
027    import java.nio.channels.SelectionKey;
028    import java.nio.channels.SocketChannel;
029    
030    import javax.net.SocketFactory;
031    
032    import org.apache.activemq.command.Command;
033    import org.apache.activemq.openwire.OpenWireFormat;
034    import org.apache.activemq.transport.Transport;
035    import org.apache.activemq.transport.tcp.TcpTransport;
036    import org.apache.activemq.util.IOExceptionSupport;
037    import org.apache.activemq.util.ServiceStopper;
038    import org.apache.activemq.wireformat.WireFormat;
039    
040    /**
041     * An implementation of the {@link Transport} interface using raw tcp/ip
042     * 
043     * 
044     */
045    public class NIOTransport extends TcpTransport {
046    
047        // private static final Logger log = LoggerFactory.getLogger(NIOTransport.class);
048        protected SocketChannel channel;
049        protected SelectorSelection selection;
050        protected ByteBuffer inputBuffer;
051        protected ByteBuffer currentBuffer;
052        protected int nextFrameSize;
053    
054        public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
055            super(wireFormat, socketFactory, remoteLocation, localLocation);
056        }
057    
058        public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
059            super(wireFormat, socket);
060        }
061    
062        protected void initializeStreams() throws IOException {
063            channel = socket.getChannel();
064            channel.configureBlocking(false);
065    
066            // listen for events telling us when the socket is readable.
067            selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
068                public void onSelect(SelectorSelection selection) {
069                    serviceRead();
070                }
071    
072                public void onError(SelectorSelection selection, Throwable error) {
073                    if (error instanceof IOException) {
074                        onException((IOException)error);
075                    } else {
076                        onException(IOExceptionSupport.create(error));
077                    }
078                }
079            });
080    
081            // Send the data via the channel
082            // inputBuffer = ByteBuffer.allocateDirect(8*1024);
083            inputBuffer = ByteBuffer.allocate(8 * 1024);
084            currentBuffer = inputBuffer;
085            nextFrameSize = -1;
086            currentBuffer.limit(4);
087            NIOOutputStream outPutStream = new NIOOutputStream(channel, 16 * 1024);
088            this.dataOut = new DataOutputStream(outPutStream);
089            this.buffOut = outPutStream;
090        }
091    
092        protected void serviceRead() {
093            try {
094                while (true) {
095    
096                    int readSize = channel.read(currentBuffer);
097                    if (readSize == -1) {
098                        onException(new EOFException());
099                        selection.close();
100                        break;
101                    }
102                    if (readSize == 0) {
103                        break;
104                    }
105    
106                    if (currentBuffer.hasRemaining()) {
107                        continue;
108                    }
109    
110                    // Are we trying to figure out the size of the next frame?
111                    if (nextFrameSize == -1) {
112                        assert inputBuffer == currentBuffer;
113    
114                        // If the frame is too big to fit in our direct byte buffer,
115                        // Then allocate a non direct byte buffer of the right size
116                        // for it.
117                        inputBuffer.flip();
118                        nextFrameSize = inputBuffer.getInt() + 4;
119    
120                        if (wireFormat instanceof OpenWireFormat) {
121                            long maxFrameSize = ((OpenWireFormat)wireFormat).getMaxFrameSize();
122                            if (nextFrameSize > maxFrameSize) {
123                                throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
124                            }
125                        }
126    
127                        if (nextFrameSize > inputBuffer.capacity()) {
128                            currentBuffer = ByteBuffer.allocate(nextFrameSize);
129                            currentBuffer.putInt(nextFrameSize);
130                        } else {
131                            inputBuffer.limit(nextFrameSize);
132                        }
133    
134                    } else {
135                        currentBuffer.flip();
136    
137                        Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
138                        doConsume((Command)command);
139    
140                        nextFrameSize = -1;
141                        inputBuffer.clear();
142                        inputBuffer.limit(4);
143                        currentBuffer = inputBuffer;
144                    }
145    
146                }
147    
148            } catch (IOException e) {
149                onException(e);
150            } catch (Throwable e) {
151                onException(IOExceptionSupport.create(e));
152            }
153        }
154    
155        protected void doStart() throws Exception {
156            connect();
157            selection.setInterestOps(SelectionKey.OP_READ);
158            selection.enable();
159        }
160    
161        protected void doStop(ServiceStopper stopper) throws Exception {
162            if (selection != null) {
163                selection.close();
164                selection = null;
165            }
166            super.doStop(stopper);
167        }
168    }