001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.ws;
018
019import java.io.IOException;
020import java.security.cert.X509Certificate;
021import java.util.Map;
022import java.util.concurrent.CountDownLatch;
023import java.util.concurrent.locks.ReentrantLock;
024
025import org.apache.activemq.broker.BrokerService;
026import org.apache.activemq.broker.BrokerServiceAware;
027import org.apache.activemq.command.Command;
028import org.apache.activemq.jms.pool.IntrospectionSupport;
029import org.apache.activemq.transport.TransportSupport;
030import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor;
031import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
032import org.apache.activemq.transport.mqtt.MQTTTransport;
033import org.apache.activemq.transport.mqtt.MQTTWireFormat;
034import org.apache.activemq.util.IOExceptionSupport;
035import org.apache.activemq.util.ServiceStopper;
036import org.fusesource.mqtt.codec.MQTTFrame;
037
038public abstract class AbstractMQTTSocket extends TransportSupport implements MQTTTransport, BrokerServiceAware {
039
040    protected ReentrantLock protocolLock = new ReentrantLock();
041    protected volatile MQTTProtocolConverter protocolConverter = null;
042    protected MQTTWireFormat wireFormat = new MQTTWireFormat();
043    protected final MQTTInactivityMonitor mqttInactivityMonitor = new MQTTInactivityMonitor(this, wireFormat);
044    protected final CountDownLatch socketTransportStarted = new CountDownLatch(1);
045    protected BrokerService brokerService;
046    protected volatile int receiveCounter;
047    protected final String remoteAddress;
048    protected X509Certificate[] peerCertificates;
049    private Map<String, Object> transportOptions;
050
051    public AbstractMQTTSocket(String remoteAddress) {
052        super();
053        this.remoteAddress = remoteAddress;
054    }
055
056    @Override
057    public void oneway(Object command) throws IOException {
058        protocolLock.lock();
059        try {
060            getProtocolConverter().onActiveMQCommand((Command)command);
061        } catch (Exception e) {
062            onException(IOExceptionSupport.create(e));
063        } finally {
064            protocolLock.unlock();
065        }
066    }
067
068    @Override
069    public void sendToActiveMQ(Command command) {
070        protocolLock.lock();
071        try {
072            doConsume(command);
073        } finally {
074            protocolLock.unlock();
075        }
076    }
077
078    @Override
079    protected void doStop(ServiceStopper stopper) throws Exception {
080        mqttInactivityMonitor.stop();
081        handleStopped();
082    }
083
084    @Override
085    protected void doStart() throws Exception {
086        socketTransportStarted.countDown();
087        mqttInactivityMonitor.setTransportListener(getTransportListener());
088        mqttInactivityMonitor.startConnectChecker(wireFormat.getConnectAttemptTimeout());
089    }
090
091    //----- Abstract methods for subclasses to implement ---------------------//
092
093    @Override
094    public abstract void sendToMQTT(MQTTFrame command) throws IOException;
095
096    /**
097     * Called when the transport is stopping to allow the dervied classes
098     * a chance to close WebSocket resources.
099     *
100     * @throws IOException if an error occurs during the stop.
101     */
102    public abstract void handleStopped() throws IOException;
103
104    //----- Accessor methods -------------------------------------------------//
105
106    @Override
107    public MQTTInactivityMonitor getInactivityMonitor() {
108        return mqttInactivityMonitor;
109    }
110
111    @Override
112    public MQTTWireFormat getWireFormat() {
113        return wireFormat;
114    }
115
116    @Override
117    public String getRemoteAddress() {
118        return remoteAddress;
119    }
120
121    @Override
122    public int getReceiveCounter() {
123        return receiveCounter;
124    }
125
126    @Override
127    public X509Certificate[] getPeerCertificates() {
128        return peerCertificates;
129    }
130
131    @Override
132    public void setPeerCertificates(X509Certificate[] certificates) {
133        this.peerCertificates = certificates;
134    }
135
136    @Override
137    public void setBrokerService(BrokerService brokerService) {
138        this.brokerService = brokerService;
139    }
140
141    //----- Internal support methods -----------------------------------------//
142
143    protected MQTTProtocolConverter getProtocolConverter() {
144        if (protocolConverter == null) {
145            synchronized(this) {
146                if (protocolConverter == null) {
147                    protocolConverter = new MQTTProtocolConverter(this, brokerService);
148                    IntrospectionSupport.setProperties(protocolConverter, transportOptions);
149                }
150            }
151        }
152        return protocolConverter;
153    }
154
155    protected boolean transportStartedAtLeastOnce() {
156        return socketTransportStarted.getCount() == 0;
157    }
158
159    public void setTransportOptions(Map<String, Object> transportOptions) {
160        this.transportOptions = transportOptions;
161    }
162}