001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.mqtt;
018    
019    import java.io.DataOutputStream;
020    import java.io.EOFException;
021    import java.io.IOException;
022    import java.net.Socket;
023    import java.net.URI;
024    import java.net.UnknownHostException;
025    import java.nio.ByteBuffer;
026    import java.nio.channels.SelectionKey;
027    import java.nio.channels.SocketChannel;
028    
029    import javax.net.SocketFactory;
030    import org.apache.activemq.transport.nio.NIOOutputStream;
031    import org.apache.activemq.transport.nio.SelectorManager;
032    import org.apache.activemq.transport.nio.SelectorSelection;
033    import org.apache.activemq.transport.tcp.TcpTransport;
034    import org.apache.activemq.util.IOExceptionSupport;
035    import org.apache.activemq.util.ServiceStopper;
036    import org.apache.activemq.wireformat.WireFormat;
037    import org.fusesource.hawtbuf.DataByteArrayInputStream;
038    
039    /**
040     * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using MQTT over NIO
041     */
042    public class MQTTNIOTransport extends TcpTransport {
043    
044        private SocketChannel channel;
045        private SelectorSelection selection;
046    
047        private ByteBuffer inputBuffer;
048        MQTTCodec codec;
049    
050        public MQTTNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
051            super(wireFormat, socketFactory, remoteLocation, localLocation);
052        }
053    
054        public MQTTNIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
055            super(wireFormat, socket);
056        }
057    
058        protected void initializeStreams() throws IOException {
059            channel = socket.getChannel();
060            channel.configureBlocking(false);
061            // listen for events telling us when the socket is readable.
062            selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
063                public void onSelect(SelectorSelection selection) {
064                    if (!isStopped()) {
065                        serviceRead();
066                    }
067                }
068    
069                public void onError(SelectorSelection selection, Throwable error) {
070                    if (error instanceof IOException) {
071                        onException((IOException) error);
072                    } else {
073                        onException(IOExceptionSupport.create(error));
074                    }
075                }
076            });
077    
078            inputBuffer = ByteBuffer.allocate(8 * 1024);
079            NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
080            this.dataOut = new DataOutputStream(outPutStream);
081            this.buffOut = outPutStream;
082            codec = new MQTTCodec(this);
083        }
084    
085        private void serviceRead() {
086            try {
087    
088                while (isStarted()) {
089                    // read channel
090                    int readSize = channel.read(inputBuffer);
091                    // channel is closed, cleanup
092                    if (readSize == -1) {
093                        onException(new EOFException());
094                        selection.close();
095                        break;
096                    }
097                    // nothing more to read, break
098                    if (readSize == 0) {
099                        break;
100                    }
101    
102                    inputBuffer.flip();
103                    DataByteArrayInputStream dis = new DataByteArrayInputStream(inputBuffer.array());
104                    codec.parse(dis, readSize);
105    
106                    // clear the buffer
107                    inputBuffer.clear();
108    
109                }
110            } catch (IOException e) {
111                onException(e);
112            } catch (Throwable e) {
113                onException(IOExceptionSupport.create(e));
114            }
115        }
116    
117        protected void doStart() throws Exception {
118            connect();
119            selection.setInterestOps(SelectionKey.OP_READ);
120            selection.enable();
121        }
122    
123        protected void doStop(ServiceStopper stopper) throws Exception {
124            try {
125                if (selection != null) {
126                    selection.close();
127                }
128            } finally {
129                super.doStop(stopper);
130            }
131        }
132    }