001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.mqtt;
018    
019    import java.io.IOException;
020    import java.security.cert.X509Certificate;
021    import java.util.concurrent.atomic.AtomicBoolean;
022    
023    import javax.jms.JMSException;
024    import org.apache.activemq.broker.BrokerContext;
025    import org.apache.activemq.command.Command;
026    import org.apache.activemq.transport.Transport;
027    import org.apache.activemq.transport.TransportFilter;
028    import org.apache.activemq.transport.TransportListener;
029    import org.apache.activemq.transport.tcp.SslTransport;
030    import org.apache.activemq.util.IOExceptionSupport;
031    import org.apache.activemq.wireformat.WireFormat;
032    import org.fusesource.mqtt.codec.MQTTFrame;
033    import org.slf4j.Logger;
034    import org.slf4j.LoggerFactory;
035    
036    /**
037     * The MQTTTransportFilter normally sits on top of a TcpTransport that has been
038     * configured with the StompWireFormat and is used to convert MQTT commands to
039     * ActiveMQ commands. All of the conversion work is done by delegating to the
040     * MQTTProtocolConverter
041     */
042    public class MQTTTransportFilter extends TransportFilter implements MQTTTransport {
043        private static final Logger LOG = LoggerFactory.getLogger(MQTTTransportFilter.class);
044        private static final Logger TRACE = LoggerFactory.getLogger(MQTTTransportFilter.class.getPackage().getName() + ".MQTTIO");
045        private final MQTTProtocolConverter protocolConverter;
046        private MQTTInactivityMonitor monitor;
047        private MQTTWireFormat wireFormat;
048        private final AtomicBoolean stopped = new AtomicBoolean();
049    
050        private boolean trace;
051    
052        public MQTTTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
053            super(next);
054            this.protocolConverter = new MQTTProtocolConverter(this, brokerContext);
055    
056            if (wireFormat instanceof MQTTWireFormat) {
057                this.wireFormat = (MQTTWireFormat) wireFormat;
058            }
059        }
060    
061        public void oneway(Object o) throws IOException {
062            try {
063                final Command command = (Command) o;
064                protocolConverter.onActiveMQCommand(command);
065            } catch (Exception e) {
066                throw IOExceptionSupport.create(e);
067            }
068        }
069    
070        public void onCommand(Object command) {
071            try {
072                if (trace) {
073                    TRACE.trace("Received: \n" + command);
074                }
075    
076                protocolConverter.onMQTTCommand((MQTTFrame) command);
077            } catch (IOException e) {
078                handleException(e);
079            } catch (JMSException e) {
080                onException(IOExceptionSupport.create(e));
081            }
082        }
083    
084        public void sendToActiveMQ(Command command) {
085            TransportListener l = transportListener;
086            if (l != null) {
087                l.onCommand(command);
088            }
089        }
090    
091        public void sendToMQTT(MQTTFrame command) throws IOException {
092            if( !stopped.get() ) {
093                if (trace) {
094                    TRACE.trace("Sending: \n" + command);
095                }
096                Transport n = next;
097                if (n != null) {
098                    n.oneway(command);
099                }
100            }
101        }
102    
103        @Override
104        public void stop() throws Exception {
105            if( stopped.compareAndSet(false, true) ) {
106                super.stop();
107            }
108        }
109    
110        public X509Certificate[] getPeerCertificates() {
111            if (next instanceof SslTransport) {
112                X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates();
113                if (trace && peerCerts != null) {
114                    LOG.debug("Peer Identity has been verified\n");
115                }
116                return peerCerts;
117            }
118            return null;
119        }
120    
121        public boolean isTrace() {
122            return trace;
123        }
124    
125        public void setTrace(boolean trace) {
126            this.trace = trace;
127        }
128    
129        @Override
130        public MQTTInactivityMonitor getInactivityMonitor() {
131            return monitor;
132        }
133    
134        public void setInactivityMonitor(MQTTInactivityMonitor monitor) {
135            this.monitor = monitor;
136        }
137    
138        @Override
139        public MQTTWireFormat getWireFormat() {
140            return this.wireFormat;
141        }
142    
143        public void handleException(IOException e) {
144            protocolConverter.onTransportError();
145            super.onException(e);
146        }
147    
148        public long getDefaultKeepAlive() {
149            return protocolConverter != null ? protocolConverter.getDefaultKeepAlive() : -1;
150        }
151    
152        public void setDefaultKeepAlive(long defaultHeartBeat) {
153            protocolConverter.setDefaultKeepAlive(defaultHeartBeat);
154        }
155    
156        public int getActiveMQSubscriptionPrefetch() {
157            return protocolConverter.getActiveMQSubscriptionPrefetch();
158        }
159    
160        /**
161         * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one
162         * The default = 1
163         * @param activeMQSubscriptionPrefetch set the prefetch for the corresponding ActiveMQ subscription
164         */
165    
166        public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
167            protocolConverter.setActiveMQSubscriptionPrefetch(activeMQSubscriptionPrefetch);
168        }
169    
170    
171    }