001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp;
018
019import java.io.DataOutputStream;
020import java.io.EOFException;
021import java.io.IOException;
022import java.net.Socket;
023import java.net.URI;
024import java.net.UnknownHostException;
025import java.nio.ByteBuffer;
026import java.nio.channels.SelectionKey;
027import java.nio.channels.SocketChannel;
028
029import javax.net.SocketFactory;
030
031import org.apache.activemq.transport.nio.NIOOutputStream;
032import org.apache.activemq.transport.nio.SelectorManager;
033import org.apache.activemq.transport.nio.SelectorSelection;
034import org.apache.activemq.transport.tcp.TcpTransport;
035import org.apache.activemq.util.IOExceptionSupport;
036import org.apache.activemq.util.ServiceStopper;
037import org.apache.activemq.wireformat.WireFormat;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO
043 */
044public class AmqpNioTransport extends TcpTransport {
045
046    private static final Logger LOG = LoggerFactory.getLogger(AmqpNioTransport.class);
047
048    private SocketChannel channel;
049    private SelectorSelection selection;
050    private final AmqpFrameParser frameReader = new AmqpFrameParser(this);
051
052    private ByteBuffer inputBuffer;
053
054    public AmqpNioTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
055        super(wireFormat, socketFactory, remoteLocation, localLocation);
056
057        frameReader.setWireFormat((AmqpWireFormat) wireFormat);
058    }
059
060    public AmqpNioTransport(WireFormat wireFormat, Socket socket) throws IOException {
061        super(wireFormat, socket);
062
063        frameReader.setWireFormat((AmqpWireFormat) wireFormat);
064    }
065
066    public AmqpNioTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException {
067        super(wireFormat, socket, initBuffer);
068
069        frameReader.setWireFormat((AmqpWireFormat) wireFormat);
070    }
071
072    @Override
073    protected void initializeStreams() throws IOException {
074        channel = socket.getChannel();
075        channel.configureBlocking(false);
076        // listen for events telling us when the socket is readable.
077        selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
078            @Override
079            public void onSelect(SelectorSelection selection) {
080                if (!isStopped()) {
081                    serviceRead();
082                }
083            }
084
085            @Override
086            public void onError(SelectorSelection selection, Throwable error) {
087                LOG.trace("Error detected: {}", error.getMessage());
088                if (error instanceof IOException) {
089                    onException((IOException) error);
090                } else {
091                    onException(IOExceptionSupport.create(error));
092                }
093            }
094        });
095
096        inputBuffer = ByteBuffer.allocate(8 * 1024);
097        NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
098        this.dataOut = new DataOutputStream(outPutStream);
099        this.buffOut = outPutStream;
100
101        try {
102            if (initBuffer != null) {
103                processBuffer(initBuffer.buffer, initBuffer.readSize);
104            }
105        } catch (IOException e) {
106            onException(e);
107        } catch (Throwable e) {
108            onException(IOExceptionSupport.create(e));
109        }
110
111    }
112
113    boolean magicRead = false;
114
115    private void serviceRead() {
116        try {
117
118            while (isStarted()) {
119                // read channel
120                int readSize = channel.read(inputBuffer);
121
122                // channel is closed, cleanup
123                if (readSize == -1) {
124                    onException(new EOFException());
125                    selection.close();
126                    break;
127                }
128                // nothing more to read, break
129                if (readSize == 0) {
130                    break;
131                }
132
133                processBuffer(inputBuffer, readSize);
134            }
135        } catch (IOException e) {
136            onException(e);
137        } catch (Throwable e) {
138            onException(IOExceptionSupport.create(e));
139        }
140    }
141
142    protected void processBuffer(ByteBuffer buffer, int readSize) throws Exception {
143        receiveCounter += readSize;
144
145        buffer.flip();
146        frameReader.parse(buffer);
147        buffer.clear();
148    }
149
150    @Override
151    protected void doStart() throws Exception {
152        connect();
153        selection.setInterestOps(SelectionKey.OP_READ);
154        selection.enable();
155    }
156
157    @Override
158    protected void doStop(ServiceStopper stopper) throws Exception {
159        try {
160            if (selection != null) {
161                selection.close();
162            }
163        } finally {
164            super.doStop(stopper);
165        }
166    }
167}