001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.amqp;
018    
019    import org.apache.activemq.broker.BrokerContext;
020    import org.apache.activemq.command.Command;
021    import org.apache.activemq.transport.Transport;
022    import org.apache.activemq.transport.TransportFilter;
023    import org.apache.activemq.transport.TransportListener;
024    import org.apache.qpid.proton.jms.InboundTransformer;
025    import org.apache.activemq.transport.tcp.SslTransport;
026    import org.apache.activemq.util.IOExceptionSupport;
027    import org.apache.activemq.wireformat.WireFormat;
028    import org.slf4j.Logger;
029    import org.slf4j.LoggerFactory;
030    
031    import java.io.IOException;
032    import java.security.cert.X509Certificate;
033    
034    /**
035     * The AMQPTransportFilter normally sits on top of a TcpTransport that has been
036     * configured with the StompWireFormat and is used to convert AMQP commands to
037     * ActiveMQ commands. All of the conversion work is done by delegating to the
038     * AMQPProtocolConverter
039     */
040    public class AmqpTransportFilter extends TransportFilter implements AmqpTransport {
041        private static final Logger LOG = LoggerFactory.getLogger(AmqpTransportFilter.class);
042        private static final Logger TRACE = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".AMQPIO");
043        private final AmqpProtocolConverter protocolConverter;
044    //    private AmqpInactivityMonitor monitor;
045        private AmqpWireFormat wireFormat;
046    
047        private boolean trace;
048        private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
049    
050        public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
051            super(next);
052            this.protocolConverter = new AmqpProtocolConverter(this, brokerContext);
053    
054            if (wireFormat instanceof AmqpWireFormat) {
055                this.wireFormat = (AmqpWireFormat) wireFormat;
056            }
057        }
058    
059        public void oneway(Object o) throws IOException {
060            try {
061                final Command command = (Command) o;
062                protocolConverter.lock.lock();
063                try {
064                    protocolConverter.onActiveMQCommand(command);
065                } finally {
066                    protocolConverter.lock.unlock();
067                }
068            } catch (Exception e) {
069                throw IOExceptionSupport.create(e);
070            }
071        }
072    
073        @Override
074        public void onException(IOException error) {
075            protocolConverter.lock.lock();
076            try {
077                protocolConverter.onAMQPException(error);
078            } finally {
079                protocolConverter.lock.unlock();
080            }
081        }
082    
083        public void sendToActiveMQ(IOException error) {
084            super.onException(error);
085        }
086    
087        public void onCommand(Object command) {
088            try {
089                if (trace) {
090                    TRACE.trace("Received: \n" + command);
091                }
092                protocolConverter.lock.lock();
093                try {
094                    protocolConverter.onAMQPData(command);
095                } finally {
096                    protocolConverter.lock.unlock();
097                }
098            } catch (IOException e) {
099                handleException(e);
100            } catch (Exception e) {
101                onException(IOExceptionSupport.create(e));
102            }
103        }
104    
105        public void sendToActiveMQ(Command command) {
106            assert protocolConverter.lock.isHeldByCurrentThread();
107            TransportListener l = transportListener;
108            if (l != null) {
109                l.onCommand(command);
110            }
111        }
112    
113        public void sendToAmqp(Object command) throws IOException {
114            assert protocolConverter.lock.isHeldByCurrentThread();
115            if (trace) {
116                TRACE.trace("Sending: \n" + command);
117            }
118            Transport n = next;
119            if (n != null) {
120                n.oneway(command);
121            }
122        }
123    
124        public X509Certificate[] getPeerCertificates() {
125            if (next instanceof SslTransport) {
126                X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates();
127                if (trace && peerCerts != null) {
128                    LOG.debug("Peer Identity has been verified\n");
129                }
130                return peerCerts;
131            }
132            return null;
133        }
134    
135        public boolean isTrace() {
136            return trace;
137        }
138    
139        public void setTrace(boolean trace) {
140            this.trace = trace;
141        }
142    
143    //    @Override
144    //    public AmqpInactivityMonitor getInactivityMonitor() {
145    //        return monitor;
146    //    }
147    //
148    //    public void setInactivityMonitor(AmqpInactivityMonitor monitor) {
149    //        this.monitor = monitor;
150    //    }
151    
152        @Override
153        public AmqpWireFormat getWireFormat() {
154            return this.wireFormat;
155        }
156    
157    
158    
159        public void handleException(IOException e) {
160            super.onException(e);
161        }
162    
163    
164        public String getTransformer() {
165            return transformer;
166        }
167    
168        public void setTransformer(String transformer) {
169            this.transformer = transformer;
170        }
171    }