001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.mqtt;
018    
019    import java.io.DataOutputStream;
020    import java.io.EOFException;
021    import java.io.IOException;
022    import java.net.Socket;
023    import java.net.URI;
024    import java.net.UnknownHostException;
025    import java.nio.ByteBuffer;
026    import java.nio.channels.SelectionKey;
027    import java.nio.channels.SocketChannel;
028    
029    import javax.net.SocketFactory;
030    
031    import org.apache.activemq.transport.nio.NIOOutputStream;
032    import org.apache.activemq.transport.nio.SelectorManager;
033    import org.apache.activemq.transport.nio.SelectorSelection;
034    import org.apache.activemq.transport.tcp.TcpTransport;
035    import org.apache.activemq.util.IOExceptionSupport;
036    import org.apache.activemq.util.ServiceStopper;
037    import org.apache.activemq.wireformat.WireFormat;
038    import org.fusesource.hawtbuf.DataByteArrayInputStream;
039    
040    /**
041     * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using MQTT over NIO
042     */
043    public class MQTTNIOTransport extends TcpTransport {
044    
045        private SocketChannel channel;
046        private SelectorSelection selection;
047    
048        private ByteBuffer inputBuffer;
049        MQTTCodec codec;
050    
051        public MQTTNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
052            super(wireFormat, socketFactory, remoteLocation, localLocation);
053        }
054    
055        public MQTTNIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
056            super(wireFormat, socket);
057        }
058    
059        protected void initializeStreams() throws IOException {
060            channel = socket.getChannel();
061            channel.configureBlocking(false);
062            // listen for events telling us when the socket is readable.
063            selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
064                public void onSelect(SelectorSelection selection) {
065                    if (!isStopped()) {
066                        serviceRead();
067                    }
068                }
069    
070                public void onError(SelectorSelection selection, Throwable error) {
071                    if (error instanceof IOException) {
072                        onException((IOException) error);
073                    } else {
074                        onException(IOExceptionSupport.create(error));
075                    }
076                }
077            });
078    
079            inputBuffer = ByteBuffer.allocate(8 * 1024);
080            NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
081            this.dataOut = new DataOutputStream(outPutStream);
082            this.buffOut = outPutStream;
083            codec = new MQTTCodec(this);
084        }
085    
086        private void serviceRead() {
087            try {
088    
089                while (isStarted()) {
090                    // read channel
091                    int readSize = channel.read(inputBuffer);
092                    // channel is closed, cleanup
093                    if (readSize == -1) {
094                        onException(new EOFException());
095                        selection.close();
096                        break;
097                    }
098                    // nothing more to read, break
099                    if (readSize == 0) {
100                        break;
101                    }
102    
103                    inputBuffer.flip();
104                    DataByteArrayInputStream dis = new DataByteArrayInputStream(inputBuffer.array());
105                    codec.parse(dis, readSize);
106    
107                    receiveCounter += readSize;
108    
109                    // clear the buffer
110                    inputBuffer.clear();
111                }
112            } catch (IOException e) {
113                onException(e);
114            } catch (Throwable e) {
115                onException(IOExceptionSupport.create(e));
116            }
117        }
118    
119        protected void doStart() throws Exception {
120            connect();
121            selection.setInterestOps(SelectionKey.OP_READ);
122            selection.enable();
123        }
124    
125        protected void doStop(ServiceStopper stopper) throws Exception {
126            try {
127                if (selection != null) {
128                    selection.close();
129                }
130            } finally {
131                super.doStop(stopper);
132            }
133        }
134    }