001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.mqtt;
018
019import java.io.DataOutputStream;
020import java.io.EOFException;
021import java.io.IOException;
022import java.net.Socket;
023import java.net.URI;
024import java.net.UnknownHostException;
025import java.nio.ByteBuffer;
026import java.nio.channels.SelectionKey;
027import java.nio.channels.SocketChannel;
028
029import javax.net.SocketFactory;
030
031import org.apache.activemq.transport.nio.NIOOutputStream;
032import org.apache.activemq.transport.nio.SelectorManager;
033import org.apache.activemq.transport.nio.SelectorSelection;
034import org.apache.activemq.transport.tcp.TcpTransport;
035import org.apache.activemq.util.IOExceptionSupport;
036import org.apache.activemq.util.ServiceStopper;
037import org.apache.activemq.wireformat.WireFormat;
038import org.fusesource.hawtbuf.DataByteArrayInputStream;
039
040/**
041 * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using MQTT over NIO
042 */
043public class MQTTNIOTransport extends TcpTransport {
044
045    private SocketChannel channel;
046    private SelectorSelection selection;
047
048    private ByteBuffer inputBuffer;
049    MQTTCodec codec;
050
051    public MQTTNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
052        super(wireFormat, socketFactory, remoteLocation, localLocation);
053    }
054
055    public MQTTNIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
056        super(wireFormat, socket);
057    }
058
059    public MQTTNIOTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException {
060        super(wireFormat, socket, initBuffer);
061    }
062
063    @Override
064    protected void initializeStreams() throws IOException {
065        channel = socket.getChannel();
066        channel.configureBlocking(false);
067        // listen for events telling us when the socket is readable.
068        selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
069            @Override
070            public void onSelect(SelectorSelection selection) {
071                if (!isStopped()) {
072                    serviceRead();
073                }
074            }
075
076            @Override
077            public void onError(SelectorSelection selection, Throwable error) {
078                if (error instanceof IOException) {
079                    onException((IOException) error);
080                } else {
081                    onException(IOExceptionSupport.create(error));
082                }
083            }
084        });
085
086        inputBuffer = ByteBuffer.allocate(8 * 1024);
087        NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
088        dataOut = new DataOutputStream(outPutStream);
089        buffOut = outPutStream;
090        codec = new MQTTCodec(this, (MQTTWireFormat) getWireFormat());
091
092        try {
093            if (initBuffer != null) {
094                processBuffer(initBuffer.buffer, initBuffer.readSize);
095            }
096        } catch (IOException e) {
097            onException(e);
098        } catch (Throwable e) {
099            onException(IOExceptionSupport.create(e));
100        }
101    }
102
103    private void serviceRead() {
104        try {
105
106            while (isStarted()) {
107                // read channel
108                int readSize = channel.read(inputBuffer);
109                // channel is closed, cleanup
110                if (readSize == -1) {
111                    onException(new EOFException());
112                    selection.close();
113                    break;
114                }
115                // nothing more to read, break
116                if (readSize == 0) {
117                    break;
118                }
119
120                processBuffer(inputBuffer, readSize);
121            }
122        } catch (IOException e) {
123            onException(e);
124        } catch (Throwable e) {
125            onException(IOExceptionSupport.create(e));
126        }
127    }
128
129    protected void processBuffer(ByteBuffer buffer, int readSize) throws Exception {
130        buffer.flip();
131        DataByteArrayInputStream dis = new DataByteArrayInputStream(buffer.array());
132        codec.parse(dis, readSize);
133
134        receiveCounter += readSize;
135
136        // clear the buffer
137        buffer.clear();
138    }
139
140    @Override
141    protected void doStart() throws Exception {
142        connect();
143        selection.setInterestOps(SelectionKey.OP_READ);
144        selection.enable();
145    }
146
147    @Override
148    protected void doStop(ServiceStopper stopper) throws Exception {
149        try {
150            if (selection != null) {
151                selection.close();
152            }
153        } finally {
154            super.doStop(stopper);
155        }
156    }
157}