001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.amqp;
018
019 import org.apache.activemq.broker.BrokerContext;
020 import org.apache.activemq.command.Command;
021 import org.apache.activemq.transport.Transport;
022 import org.apache.activemq.transport.TransportFilter;
023 import org.apache.activemq.transport.TransportListener;
024 import org.apache.qpid.proton.jms.InboundTransformer;
025 import org.apache.activemq.transport.tcp.SslTransport;
026 import org.apache.activemq.util.IOExceptionSupport;
027 import org.apache.activemq.wireformat.WireFormat;
028 import org.slf4j.Logger;
029 import org.slf4j.LoggerFactory;
030
031 import java.io.IOException;
032 import java.security.cert.X509Certificate;
033
034 /**
035 * The AMQPTransportFilter normally sits on top of a TcpTransport that has been
036 * configured with the StompWireFormat and is used to convert AMQP commands to
037 * ActiveMQ commands. All of the conversion work is done by delegating to the
038 * AMQPProtocolConverter
039 */
040 public class AmqpTransportFilter extends TransportFilter implements AmqpTransport {
041 private static final Logger LOG = LoggerFactory.getLogger(AmqpTransportFilter.class);
042 private static final Logger TRACE = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".AMQPIO");
043 private final AmqpProtocolConverter protocolConverter;
044 // private AmqpInactivityMonitor monitor;
045 private AmqpWireFormat wireFormat;
046
047 private boolean trace;
048 private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
049
050 public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
051 super(next);
052 this.protocolConverter = new AmqpProtocolConverter(this, brokerContext);
053
054 if (wireFormat instanceof AmqpWireFormat) {
055 this.wireFormat = (AmqpWireFormat) wireFormat;
056 }
057 }
058
059 public void oneway(Object o) throws IOException {
060 try {
061 final Command command = (Command) o;
062 protocolConverter.lock.lock();
063 try {
064 protocolConverter.onActiveMQCommand(command);
065 } finally {
066 protocolConverter.lock.unlock();
067 }
068 } catch (Exception e) {
069 throw IOExceptionSupport.create(e);
070 }
071 }
072
073 @Override
074 public void onException(IOException error) {
075 protocolConverter.lock.lock();
076 try {
077 protocolConverter.onAMQPException(error);
078 } finally {
079 protocolConverter.lock.unlock();
080 }
081 }
082
083 public void sendToActiveMQ(IOException error) {
084 super.onException(error);
085 }
086
087 public void onCommand(Object command) {
088 try {
089 if (trace) {
090 TRACE.trace("Received: \n" + command);
091 }
092 protocolConverter.lock.lock();
093 try {
094 protocolConverter.onAMQPData(command);
095 } finally {
096 protocolConverter.lock.unlock();
097 }
098 } catch (IOException e) {
099 handleException(e);
100 } catch (Exception e) {
101 onException(IOExceptionSupport.create(e));
102 }
103 }
104
105 public void sendToActiveMQ(Command command) {
106 assert protocolConverter.lock.isHeldByCurrentThread();
107 TransportListener l = transportListener;
108 if (l != null) {
109 l.onCommand(command);
110 }
111 }
112
113 public void sendToAmqp(Object command) throws IOException {
114 assert protocolConverter.lock.isHeldByCurrentThread();
115 if (trace) {
116 TRACE.trace("Sending: \n" + command);
117 }
118 Transport n = next;
119 if (n != null) {
120 n.oneway(command);
121 }
122 }
123
124 public X509Certificate[] getPeerCertificates() {
125 if (next instanceof SslTransport) {
126 X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates();
127 if (trace && peerCerts != null) {
128 LOG.debug("Peer Identity has been verified\n");
129 }
130 return peerCerts;
131 }
132 return null;
133 }
134
135 public boolean isTrace() {
136 return trace;
137 }
138
139 public void setTrace(boolean trace) {
140 this.trace = trace;
141 }
142
143 // @Override
144 // public AmqpInactivityMonitor getInactivityMonitor() {
145 // return monitor;
146 // }
147 //
148 // public void setInactivityMonitor(AmqpInactivityMonitor monitor) {
149 // this.monitor = monitor;
150 // }
151
152 @Override
153 public AmqpWireFormat getWireFormat() {
154 return this.wireFormat;
155 }
156
157
158
159 public void handleException(IOException e) {
160 super.onException(e);
161 }
162
163
164 public String getTransformer() {
165 return transformer;
166 }
167
168 public void setTransformer(String transformer) {
169 this.transformer = transformer;
170 }
171 }