001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp;
018
019import java.io.IOException;
020import java.security.cert.X509Certificate;
021import java.util.concurrent.locks.ReentrantLock;
022
023import org.apache.activemq.broker.BrokerService;
024import org.apache.activemq.command.Command;
025import org.apache.activemq.transport.Transport;
026import org.apache.activemq.transport.TransportFilter;
027import org.apache.activemq.transport.TransportListener;
028import org.apache.activemq.transport.tcp.SslTransport;
029import org.apache.activemq.util.IOExceptionSupport;
030import org.apache.activemq.wireformat.WireFormat;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * The AMQPTransportFilter normally sits on top of a TcpTransport that has been
036 * configured with the AmqpWireFormat and is used to convert AMQP commands to
037 * ActiveMQ commands. All of the conversion work is done by delegating to the
038 * AMQPProtocolConverter
039 */
040public class AmqpTransportFilter extends TransportFilter implements AmqpTransport {
041    private static final Logger LOG = LoggerFactory.getLogger(AmqpTransportFilter.class);
042    static final Logger TRACE_BYTES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".BYTES");
043    public static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".FRAMES");
044    private AmqpProtocolConverter protocolConverter;
045    private AmqpWireFormat wireFormat;
046    private AmqpInactivityMonitor monitor;
047
048    private boolean trace;
049    private final ReentrantLock lock = new ReentrantLock();
050
051    public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerService brokerService) {
052        super(next);
053        this.protocolConverter = new AmqpProtocolDiscriminator(this, brokerService);
054        if (wireFormat instanceof AmqpWireFormat) {
055            this.wireFormat = (AmqpWireFormat) wireFormat;
056        }
057    }
058
059    @Override
060    public void start() throws Exception {
061        if (monitor != null) {
062            monitor.setAmqpTransport(this);
063            monitor.startConnectionTimeoutChecker(getConnectAttemptTimeout());
064        }
065        super.start();
066    }
067
068    @Override
069    public void oneway(Object o) throws IOException {
070        try {
071            final Command command = (Command) o;
072            lock.lock();
073            try {
074                protocolConverter.onActiveMQCommand(command);
075            } finally {
076                lock.unlock();
077            }
078        } catch (Exception e) {
079            throw IOExceptionSupport.create(e);
080        }
081    }
082
083    @Override
084    public void onException(IOException error) {
085        lock.lock();
086        try {
087            protocolConverter.onAMQPException(error);
088        } finally {
089            lock.unlock();
090        }
091    }
092
093    @Override
094    public void sendToActiveMQ(IOException error) {
095        super.onException(error);
096    }
097
098    @Override
099    public void onCommand(Object command) {
100        try {
101            if (trace) {
102                TRACE_BYTES.trace("Received: \n{}", command);
103            }
104            lock.lock();
105            try {
106                protocolConverter.onAMQPData(command);
107            } finally {
108                lock.unlock();
109            }
110        } catch (IOException e) {
111            handleException(e);
112        } catch (Exception e) {
113            onException(IOExceptionSupport.create(e));
114        }
115    }
116
117    @Override
118    public void sendToActiveMQ(Command command) {
119        assert lock.isHeldByCurrentThread();
120        TransportListener l = transportListener;
121        if (l != null) {
122            l.onCommand(command);
123        }
124    }
125
126    @Override
127    public void sendToAmqp(Object command) throws IOException {
128        assert lock.isHeldByCurrentThread();
129        if (trace) {
130            TRACE_BYTES.trace("Sending: \n{}", command);
131        }
132        Transport n = next;
133        if (n != null) {
134            n.oneway(command);
135        }
136    }
137
138    @Override
139    public long keepAlive() {
140        long nextKeepAliveDelay = 0l;
141
142        try {
143            lock.lock();
144            try {
145                nextKeepAliveDelay = protocolConverter.keepAlive();
146            } finally {
147                lock.unlock();
148            }
149        } catch (IOException e) {
150            handleException(e);
151        } catch (Exception e) {
152            onException(IOExceptionSupport.create(e));
153        }
154
155        return nextKeepAliveDelay;
156    }
157
158    @Override
159    public X509Certificate[] getPeerCertificates() {
160        if (next instanceof SslTransport) {
161            X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates();
162            if (trace && peerCerts != null) {
163                LOG.debug("Peer Identity has been verified\n");
164            }
165            return peerCerts;
166        }
167        return null;
168    }
169
170    @Override
171    public boolean isTrace() {
172        return trace;
173    }
174
175    public void setTrace(boolean trace) {
176        this.trace = trace;
177        this.protocolConverter.updateTracer();
178    }
179
180    @Override
181    public AmqpWireFormat getWireFormat() {
182        return this.wireFormat;
183    }
184
185    public void handleException(IOException e) {
186        super.onException(e);
187    }
188
189    @Override
190    public String getTransformer() {
191        return wireFormat.getTransformer();
192    }
193
194    public void setTransformer(String transformer) {
195        wireFormat.setTransformer(transformer);
196    }
197
198    @Override
199    public AmqpProtocolConverter getProtocolConverter() {
200        return protocolConverter;
201    }
202
203    @Override
204    public void setProtocolConverter(AmqpProtocolConverter protocolConverter) {
205        this.protocolConverter = protocolConverter;
206    }
207
208    public void setProducerCredit(int producerCredit) {
209        wireFormat.setProducerCredit(producerCredit);
210    }
211
212    public int getProducerCredit() {
213        return wireFormat.getProducerCredit();
214    }
215
216    @Override
217    public void setInactivityMonitor(AmqpInactivityMonitor monitor) {
218        this.monitor = monitor;
219    }
220
221    @Override
222    public AmqpInactivityMonitor getInactivityMonitor() {
223        return monitor;
224    }
225
226    @Override
227    public boolean isUseInactivityMonitor() {
228        return monitor != null;
229    }
230
231    public int getConnectAttemptTimeout() {
232        return wireFormat.getConnectAttemptTimeout();
233    }
234
235    public void setConnectAttemptTimeout(int connectAttemptTimeout) {
236        wireFormat.setConnectAttemptTimeout(connectAttemptTimeout);
237    }
238
239    public long getMaxFrameSize() {
240        return wireFormat.getMaxFrameSize();
241    }
242
243    public void setMaxFrameSize(long maxFrameSize) {
244        wireFormat.setMaxFrameSize(maxFrameSize);
245    }
246}