001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import java.io.IOException;
020    
021    import javax.jms.JMSException;
022    
023    import org.apache.activemq.broker.BrokerContext;
024    import org.apache.activemq.command.Command;
025    import org.apache.activemq.transport.Transport;
026    import org.apache.activemq.transport.TransportFilter;
027    import org.apache.activemq.transport.TransportListener;
028    import org.apache.activemq.util.IOExceptionSupport;
029    import org.apache.activemq.wireformat.WireFormat;
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    /**
034     * The StompTransportFilter normally sits on top of a TcpTransport that has been
035     * configured with the StompWireFormat and is used to convert STOMP commands to
036     * ActiveMQ commands. All of the conversion work is done by delegating to the
037     * ProtocolConverter.
038     *
039     * @author <a href="http://hiramchirino.com">chirino</a>
040     */
041    public class StompTransportFilter extends TransportFilter implements StompTransport {
042        private static final Logger TRACE = LoggerFactory.getLogger(StompTransportFilter.class.getPackage().getName() + ".StompIO");
043        private final ProtocolConverter protocolConverter;
044        private StompInactivityMonitor monitor;
045        private StompWireFormat wireFormat;
046    
047        private boolean trace;
048    
049        public StompTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
050            super(next);
051            this.protocolConverter = new ProtocolConverter(this, brokerContext);
052    
053            if (wireFormat instanceof StompWireFormat) {
054                this.wireFormat = (StompWireFormat) wireFormat;
055            }
056        }
057    
058        public void oneway(Object o) throws IOException {
059            try {
060                final Command command = (Command) o;
061                protocolConverter.onActiveMQCommand(command);
062            } catch (JMSException e) {
063                throw IOExceptionSupport.create(e);
064            }
065        }
066    
067        public void onCommand(Object command) {
068            try {
069                if (trace) {
070                    TRACE.trace("Received: \n" + command);
071                }
072    
073                protocolConverter.onStompCommand((StompFrame) command);
074            } catch (IOException e) {
075                onException(e);
076            } catch (JMSException e) {
077                onException(IOExceptionSupport.create(e));
078            }
079        }
080    
081        public void sendToActiveMQ(Command command) {
082            TransportListener l = transportListener;
083            if (l != null) {
084                l.onCommand(command);
085            }
086        }
087    
088        public void sendToStomp(StompFrame command) throws IOException {
089            if (trace) {
090                TRACE.trace("Sending: \n" + command);
091            }
092            Transport n = next;
093            if (n != null) {
094                n.oneway(command);
095            }
096        }
097    
098        public boolean isTrace() {
099            return trace;
100        }
101    
102        public void setTrace(boolean trace) {
103            this.trace = trace;
104        }
105    
106        @Override
107        public StompInactivityMonitor getInactivityMonitor() {
108            return monitor;
109        }
110    
111        public void setInactivityMonitor(StompInactivityMonitor monitor) {
112            this.monitor = monitor;
113        }
114    
115        @Override
116        public StompWireFormat getWireFormat() {
117            return this.wireFormat;
118        }
119    
120        public String getDefaultHeartBeat() {
121            return protocolConverter != null ? protocolConverter.getDefaultHeartBeat() : null;
122        }
123    
124        public void setDefaultHeartBeat(String defaultHeartBeat) {
125            protocolConverter.setDefaultHeartBeat(defaultHeartBeat);
126        }
127    
128    }