001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.amqp;
018    
019    import org.apache.activemq.broker.BrokerContext;
020    import org.apache.activemq.command.Command;
021    import org.apache.activemq.transport.Transport;
022    import org.apache.activemq.transport.TransportFilter;
023    import org.apache.activemq.transport.TransportListener;
024    import org.apache.qpid.proton.jms.InboundTransformer;
025    import org.apache.activemq.transport.tcp.SslTransport;
026    import org.apache.activemq.util.IOExceptionSupport;
027    import org.apache.activemq.wireformat.WireFormat;
028    import org.slf4j.Logger;
029    import org.slf4j.LoggerFactory;
030    
031    import java.io.IOException;
032    import java.security.cert.X509Certificate;
033    import java.util.concurrent.locks.ReentrantLock;
034    
035    /**
036     * The AMQPTransportFilter normally sits on top of a TcpTransport that has been
037     * configured with the AmqpWireFormat and is used to convert AMQP commands to
038     * ActiveMQ commands. All of the conversion work is done by delegating to the
039     * AMQPProtocolConverter
040     */
041    public class AmqpTransportFilter extends TransportFilter implements AmqpTransport {
042        private static final Logger LOG = LoggerFactory.getLogger(AmqpTransportFilter.class);
043        static final Logger TRACE_BYTES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".BYTES");
044        static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".FRAMES");
045        private IAmqpProtocolConverter protocolConverter;
046    //    private AmqpInactivityMonitor monitor;
047        private AmqpWireFormat wireFormat;
048    
049        private boolean trace;
050        private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
051        private ReentrantLock lock = new ReentrantLock();
052    
053        public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
054            super(next);
055            this.protocolConverter = new AMQPProtocolDiscriminator(this);
056            if (wireFormat instanceof AmqpWireFormat) {
057                this.wireFormat = (AmqpWireFormat) wireFormat;
058            }
059        }
060    
061        public void oneway(Object o) throws IOException {
062            try {
063                final Command command = (Command) o;
064                lock.lock();
065                try {
066                    protocolConverter.onActiveMQCommand(command);
067                } finally {
068                    lock.unlock();
069                }
070            } catch (Exception e) {
071                throw IOExceptionSupport.create(e);
072            }
073        }
074    
075        @Override
076        public void onException(IOException error) {
077            lock.lock();
078            try {
079                protocolConverter.onAMQPException(error);
080            } finally {
081                lock.unlock();
082            }
083        }
084    
085        public void sendToActiveMQ(IOException error) {
086            super.onException(error);
087        }
088    
089        public void onCommand(Object command) {
090            try {
091                if (trace) {
092                    TRACE_BYTES.trace("Received: \n{}", command);
093                }
094                lock.lock();
095                try {
096                    protocolConverter.onAMQPData(command);
097                } finally {
098                    lock.unlock();
099                }
100            } catch (IOException e) {
101                handleException(e);
102            } catch (Exception e) {
103                onException(IOExceptionSupport.create(e));
104            }
105        }
106    
107        public void sendToActiveMQ(Command command) {
108            assert lock.isHeldByCurrentThread();
109            TransportListener l = transportListener;
110            if (l != null) {
111                l.onCommand(command);
112            }
113        }
114    
115        public void sendToAmqp(Object command) throws IOException {
116            assert lock.isHeldByCurrentThread();
117            if (trace) {
118                TRACE_BYTES.trace("Sending: \n{}", command);
119            }
120            Transport n = next;
121            if (n != null) {
122                n.oneway(command);
123            }
124        }
125    
126        public X509Certificate[] getPeerCertificates() {
127            if (next instanceof SslTransport) {
128                X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates();
129                if (trace && peerCerts != null) {
130                    LOG.debug("Peer Identity has been verified\n");
131                }
132                return peerCerts;
133            }
134            return null;
135        }
136    
137        public boolean isTrace() {
138            return trace;
139        }
140    
141        public void setTrace(boolean trace) {
142            this.trace = trace;
143            this.protocolConverter.updateTracer();
144        }
145    
146    //    @Override
147    //    public AmqpInactivityMonitor getInactivityMonitor() {
148    //        return monitor;
149    //    }
150    //
151    //    public void setInactivityMonitor(AmqpInactivityMonitor monitor) {
152    //        this.monitor = monitor;
153    //    }
154    
155        @Override
156        public AmqpWireFormat getWireFormat() {
157            return this.wireFormat;
158        }
159    
160        public void handleException(IOException e) {
161            super.onException(e);
162        }
163    
164        public String getTransformer() {
165            return transformer;
166        }
167    
168        public void setTransformer(String transformer) {
169            this.transformer = transformer;
170        }
171        public IAmqpProtocolConverter getProtocolConverter() {
172            return protocolConverter;
173        }
174    
175        public void setProtocolConverter(IAmqpProtocolConverter protocolConverter) {
176            this.protocolConverter = protocolConverter;
177        }
178    }