001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.amqp;
018    
019    import java.io.DataOutputStream;
020    import java.io.EOFException;
021    import java.io.IOException;
022    import java.net.Socket;
023    import java.net.URI;
024    import java.net.UnknownHostException;
025    import java.nio.ByteBuffer;
026    import java.nio.channels.SelectionKey;
027    import java.nio.channels.SocketChannel;
028    
029    import javax.net.SocketFactory;
030    
031    import org.apache.activemq.transport.nio.NIOOutputStream;
032    import org.apache.activemq.transport.nio.SelectorManager;
033    import org.apache.activemq.transport.nio.SelectorSelection;
034    import org.apache.activemq.transport.tcp.TcpTransport;
035    import org.apache.activemq.util.IOExceptionSupport;
036    import org.apache.activemq.util.ServiceStopper;
037    import org.apache.activemq.wireformat.WireFormat;
038    
039    /**
040     * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO
041     */
042    public class AmqpNioTransport extends TcpTransport {
043    
044        private SocketChannel channel;
045        private SelectorSelection selection;
046    
047        private ByteBuffer inputBuffer;
048    
049        public AmqpNioTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
050            super(wireFormat, socketFactory, remoteLocation, localLocation);
051        }
052    
053        public AmqpNioTransport(WireFormat wireFormat, Socket socket) throws IOException {
054            super(wireFormat, socket);
055        }
056    
057        protected void initializeStreams() throws IOException {
058            channel = socket.getChannel();
059            channel.configureBlocking(false);
060            // listen for events telling us when the socket is readable.
061            selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
062                public void onSelect(SelectorSelection selection) {
063                    if (!isStopped()) {
064                        serviceRead();
065                    }
066                }
067    
068                public void onError(SelectorSelection selection, Throwable error) {
069                    if (error instanceof IOException) {
070                        onException((IOException) error);
071                    } else {
072                        onException(IOExceptionSupport.create(error));
073                    }
074                }
075            });
076    
077            inputBuffer = ByteBuffer.allocate(8 * 1024);
078            NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
079            this.dataOut = new DataOutputStream(outPutStream);
080            this.buffOut = outPutStream;
081        }
082    
083        private void serviceRead() {
084            try {
085    
086                while (isStarted()) {
087                    // read channel
088                    int readSize = channel.read(inputBuffer);
089                    // channel is closed, cleanup
090                    if (readSize == -1) {
091                        onException(new EOFException());
092                        selection.close();
093                        break;
094                    }
095                    // nothing more to read, break
096                    if (readSize == 0) {
097                        break;
098                    }
099    
100                    receiveCounter += readSize;
101    
102                    inputBuffer.flip();
103                    doConsume(AmqpSupport.toBuffer(inputBuffer));
104                    // clear the buffer
105                    inputBuffer.clear();
106                }
107            } catch (IOException e) {
108                onException(e);
109            } catch (Throwable e) {
110                onException(IOExceptionSupport.create(e));
111            }
112        }
113    
114        protected void doStart() throws Exception {
115            connect();
116            selection.setInterestOps(SelectionKey.OP_READ);
117            selection.enable();
118        }
119    
120        protected void doStop(ServiceStopper stopper) throws Exception {
121            try {
122                if (selection != null) {
123                    selection.close();
124                }
125            } finally {
126                super.doStop(stopper);
127            }
128        }
129    }