001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp;
018
019import java.io.IOException;
020import java.security.cert.X509Certificate;
021import java.util.concurrent.locks.ReentrantLock;
022
023import org.apache.activemq.broker.BrokerService;
024import org.apache.activemq.command.Command;
025import org.apache.activemq.transport.Transport;
026import org.apache.activemq.transport.TransportFilter;
027import org.apache.activemq.transport.TransportListener;
028import org.apache.activemq.transport.nio.NIOSSLTransport;
029import org.apache.activemq.transport.tcp.SslTransport;
030import org.apache.activemq.util.IOExceptionSupport;
031import org.apache.activemq.wireformat.WireFormat;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * The AMQPTransportFilter normally sits on top of a TcpTransport that has been
037 * configured with the AmqpWireFormat and is used to convert AMQP commands to
038 * ActiveMQ commands. All of the conversion work is done by delegating to the
039 * AMQPProtocolConverter
040 */
041public class AmqpTransportFilter extends TransportFilter implements AmqpTransport {
042    private static final Logger LOG = LoggerFactory.getLogger(AmqpTransportFilter.class);
043    static final Logger TRACE_BYTES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".BYTES");
044    public static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".FRAMES");
045    private AmqpProtocolConverter protocolConverter;
046    private AmqpWireFormat wireFormat;
047    private AmqpInactivityMonitor monitor;
048
049    private boolean trace;
050    private final ReentrantLock lock = new ReentrantLock();
051
052    public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerService brokerService) {
053        super(next);
054        this.protocolConverter = new AmqpProtocolDiscriminator(this, brokerService);
055        if (wireFormat instanceof AmqpWireFormat) {
056            this.wireFormat = (AmqpWireFormat) wireFormat;
057        }
058    }
059
060    @Override
061    public void start() throws Exception {
062        if (monitor != null) {
063            monitor.setAmqpTransport(this);
064            monitor.startConnectionTimeoutChecker(getConnectAttemptTimeout());
065        }
066        super.start();
067    }
068
069    @Override
070    public void oneway(Object o) throws IOException {
071        try {
072            final Command command = (Command) o;
073            lock.lock();
074            try {
075                protocolConverter.onActiveMQCommand(command);
076            } finally {
077                lock.unlock();
078            }
079        } catch (Exception e) {
080            throw IOExceptionSupport.create(e);
081        }
082    }
083
084    @Override
085    public void onException(IOException error) {
086        lock.lock();
087        try {
088            protocolConverter.onAMQPException(error);
089        } finally {
090            lock.unlock();
091        }
092    }
093
094    @Override
095    public void sendToActiveMQ(IOException error) {
096        super.onException(error);
097    }
098
099    @Override
100    public void onCommand(Object command) {
101        try {
102            if (trace) {
103                TRACE_BYTES.trace("Received: \n{}", command);
104            }
105            lock.lock();
106            try {
107                protocolConverter.onAMQPData(command);
108            } finally {
109                lock.unlock();
110            }
111        } catch (IOException e) {
112            handleException(e);
113        } catch (Exception e) {
114            onException(IOExceptionSupport.create(e));
115        }
116    }
117
118    @Override
119    public void sendToActiveMQ(Command command) {
120        assert lock.isHeldByCurrentThread();
121        TransportListener l = transportListener;
122        if (l != null) {
123            l.onCommand(command);
124        }
125    }
126
127    @Override
128    public void sendToAmqp(Object command) throws IOException {
129        assert lock.isHeldByCurrentThread();
130        if (trace) {
131            TRACE_BYTES.trace("Sending: \n{}", command);
132        }
133        Transport n = next;
134        if (n != null) {
135            n.oneway(command);
136        }
137    }
138
139    @Override
140    public long keepAlive() {
141        long nextKeepAliveDelay = 0l;
142
143        try {
144            lock.lock();
145            try {
146                nextKeepAliveDelay = protocolConverter.keepAlive();
147            } finally {
148                lock.unlock();
149            }
150        } catch (IOException e) {
151            handleException(e);
152        } catch (Exception e) {
153            onException(IOExceptionSupport.create(e));
154        }
155
156        return nextKeepAliveDelay;
157    }
158
159    @Override
160    public X509Certificate[] getPeerCertificates() {
161        X509Certificate[] peerCerts = null;
162        if (next instanceof SslTransport) {
163            peerCerts = ((SslTransport) next).getPeerCertificates();
164        } else if (next instanceof NIOSSLTransport) {
165            peerCerts = ((NIOSSLTransport) next).getPeerCertificates();
166        }
167        if (trace && peerCerts != null) {
168            LOG.debug("Peer Identity has been verified\n");
169        }
170        return peerCerts;
171    }
172
173    @Override
174    public boolean isTrace() {
175        return trace;
176    }
177
178    public void setTrace(boolean trace) {
179        this.trace = trace;
180        this.protocolConverter.updateTracer();
181    }
182
183    @Override
184    public AmqpWireFormat getWireFormat() {
185        return this.wireFormat;
186    }
187
188    public void handleException(IOException e) {
189        super.onException(e);
190    }
191
192    @Override
193    public String getTransformer() {
194        return wireFormat.getTransformer();
195    }
196
197    public void setTransformer(String transformer) {
198        wireFormat.setTransformer(transformer);
199    }
200
201    @Override
202    public AmqpProtocolConverter getProtocolConverter() {
203        return protocolConverter;
204    }
205
206    @Override
207    public void setProtocolConverter(AmqpProtocolConverter protocolConverter) {
208        this.protocolConverter = protocolConverter;
209    }
210
211    public void setProducerCredit(int producerCredit) {
212        wireFormat.setProducerCredit(producerCredit);
213    }
214
215    public int getProducerCredit() {
216        return wireFormat.getProducerCredit();
217    }
218
219    @Override
220    public void setInactivityMonitor(AmqpInactivityMonitor monitor) {
221        this.monitor = monitor;
222    }
223
224    @Override
225    public AmqpInactivityMonitor getInactivityMonitor() {
226        return monitor;
227    }
228
229    @Override
230    public boolean isUseInactivityMonitor() {
231        return monitor != null;
232    }
233
234    public int getConnectAttemptTimeout() {
235        return wireFormat.getConnectAttemptTimeout();
236    }
237
238    public void setConnectAttemptTimeout(int connectAttemptTimeout) {
239        wireFormat.setConnectAttemptTimeout(connectAttemptTimeout);
240    }
241
242    public long getMaxFrameSize() {
243        return wireFormat.getMaxFrameSize();
244    }
245
246    public void setMaxFrameSize(long maxFrameSize) {
247        wireFormat.setMaxFrameSize(maxFrameSize);
248    }
249}