001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.stomp;
018
019import java.io.IOException;
020
021import javax.jms.JMSException;
022
023import org.apache.activemq.broker.BrokerContext;
024import org.apache.activemq.command.Command;
025import org.apache.activemq.transport.Transport;
026import org.apache.activemq.transport.TransportFilter;
027import org.apache.activemq.transport.TransportListener;
028import org.apache.activemq.util.IOExceptionSupport;
029import org.apache.activemq.wireformat.WireFormat;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * The StompTransportFilter normally sits on top of a TcpTransport that has been
035 * configured with the StompWireFormat and is used to convert STOMP commands to
036 * ActiveMQ commands. All of the conversion work is done by delegating to the
037 * ProtocolConverter.
038 *
039 * @author <a href="http://hiramchirino.com">chirino</a>
040 */
041public class StompTransportFilter extends TransportFilter implements StompTransport {
042
043    private static final Logger TRACE = LoggerFactory.getLogger(StompTransportFilter.class.getPackage().getName() + ".StompIO");
044
045    private final ProtocolConverter protocolConverter;
046    private StompInactivityMonitor monitor;
047    private StompWireFormat wireFormat;
048
049    private boolean trace;
050
051    public StompTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
052        super(next);
053        this.protocolConverter = new ProtocolConverter(this, brokerContext);
054
055        if (wireFormat instanceof StompWireFormat) {
056            this.wireFormat = (StompWireFormat) wireFormat;
057        }
058    }
059
060    @Override
061    public void start() throws Exception {
062        if (monitor != null) {
063            monitor.startConnectCheckTask(getConnectAttemptTimeout());
064        }
065        super.start();
066    }
067
068    @Override
069    public void oneway(Object o) throws IOException {
070        try {
071            final Command command = (Command) o;
072            protocolConverter.onActiveMQCommand(command);
073        } catch (JMSException e) {
074            throw IOExceptionSupport.create(e);
075        }
076    }
077
078    @Override
079    public void onCommand(Object command) {
080        try {
081            if (trace) {
082                TRACE.trace("Received: \n" + command);
083            }
084
085            protocolConverter.onStompCommand((StompFrame) command);
086        } catch (IOException e) {
087            onException(e);
088        } catch (JMSException e) {
089            onException(IOExceptionSupport.create(e));
090        }
091    }
092
093    @Override
094    public void sendToActiveMQ(Command command) {
095        TransportListener l = transportListener;
096        if (l != null) {
097            l.onCommand(command);
098        }
099    }
100
101    @Override
102    public void sendToStomp(StompFrame command) throws IOException {
103        if (trace) {
104            TRACE.trace("Sending: \n" + command);
105        }
106        Transport n = next;
107        if (n != null) {
108            n.oneway(command);
109        }
110    }
111
112    public boolean isTrace() {
113        return trace;
114    }
115
116    public void setTrace(boolean trace) {
117        this.trace = trace;
118    }
119
120    @Override
121    public StompInactivityMonitor getInactivityMonitor() {
122        return monitor;
123    }
124
125    public void setInactivityMonitor(StompInactivityMonitor monitor) {
126        this.monitor = monitor;
127    }
128
129    @Override
130    public StompWireFormat getWireFormat() {
131        return this.wireFormat;
132    }
133
134    public String getDefaultHeartBeat() {
135        return protocolConverter != null ? protocolConverter.getDefaultHeartBeat() : null;
136    }
137
138    public void setDefaultHeartBeat(String defaultHeartBeat) {
139        protocolConverter.setDefaultHeartBeat(defaultHeartBeat);
140    }
141
142    /**
143     * Returns the currently configured Read check grace period multiplier.
144     *
145     * @return the hbGracePeriodMultiplier
146     */
147    public float getHbGracePeriodMultiplier() {
148        return protocolConverter != null ? protocolConverter.getHbGracePeriodMultiplier() : null;
149    }
150
151    /**
152     * Sets the read check grace period multiplier.  New CONNECT frames that indicate a heart beat
153     * value with a read check interval will have that value multiplied by this value to add a
154     * grace period before the connection is considered invalid.  By default this value is set to
155     * zero and no grace period is given.  When set the value must be larger than 1.0 or it will
156     * be ignored.
157     *
158     * @param hbGracePeriodMultiplier the hbGracePeriodMultiplier to set
159     */
160    public void setHbGracePeriodMultiplier(float hbGracePeriodMultiplier) {
161        if (hbGracePeriodMultiplier > 1.0f) {
162            protocolConverter.setHbGracePeriodMultiplier(hbGracePeriodMultiplier);
163        }
164    }
165
166    /**
167     * Sets the maximum number of bytes that the data portion of a STOMP frame is allowed to
168     * be, any incoming STOMP frame with a data section larger than this value will receive
169     * an error response.
170     *
171     * @param maxDataLength
172     *        size in bytes of the maximum data portion of a STOMP frame.
173     */
174    public void setMaxDataLength(int maxDataLength) {
175        wireFormat.setMaxDataLength(maxDataLength);
176    }
177
178    public int getMaxDataLength() {
179        return wireFormat.getMaxDataLength();
180    }
181
182    public void setMaxFrameSize(int maxFrameSize) {
183        wireFormat.setMaxFrameSize(maxFrameSize);
184    }
185
186    public long getMaxFrameSize() {
187        return wireFormat.getMaxFrameSize();
188    }
189
190    public long getConnectAttemptTimeout() {
191        return wireFormat.getConnectionAttemptTimeout();
192    }
193
194    public void setConnectAttemptTimeout(long timeout) {
195        wireFormat.setConnectionAttemptTimeout(timeout);
196    }
197}