001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network;
018
019 import java.io.IOException;
020 import java.util.concurrent.atomic.AtomicLong;
021
022 import org.apache.activemq.Service;
023 import org.apache.activemq.command.ActiveMQQueue;
024 import org.apache.activemq.command.ActiveMQTopic;
025 import org.apache.activemq.command.BrokerId;
026 import org.apache.activemq.command.BrokerInfo;
027 import org.apache.activemq.command.Command;
028 import org.apache.activemq.command.ConnectionId;
029 import org.apache.activemq.command.ConnectionInfo;
030 import org.apache.activemq.command.ConsumerInfo;
031 import org.apache.activemq.command.ExceptionResponse;
032 import org.apache.activemq.command.Message;
033 import org.apache.activemq.command.MessageAck;
034 import org.apache.activemq.command.MessageDispatch;
035 import org.apache.activemq.command.ProducerInfo;
036 import org.apache.activemq.command.Response;
037 import org.apache.activemq.command.SessionInfo;
038 import org.apache.activemq.command.ShutdownInfo;
039 import org.apache.activemq.transport.DefaultTransportListener;
040 import org.apache.activemq.transport.FutureResponse;
041 import org.apache.activemq.transport.ResponseCallback;
042 import org.apache.activemq.transport.Transport;
043 import org.apache.activemq.util.IdGenerator;
044 import org.apache.activemq.util.ServiceStopper;
045 import org.apache.activemq.util.ServiceSupport;
046 import org.slf4j.Logger;
047 import org.slf4j.LoggerFactory;
048
049 /**
050 * Forwards all messages from the local broker to the remote broker.
051 *
052 * @org.apache.xbean.XBean
053 *
054 */
055 public class ForwardingBridge implements Service {
056
057 private static final IdGenerator ID_GENERATOR = new IdGenerator();
058 private static final Logger LOG = LoggerFactory.getLogger(ForwardingBridge.class);
059
060 final AtomicLong enqueueCounter = new AtomicLong();
061 final AtomicLong dequeueCounter = new AtomicLong();
062 ConnectionInfo connectionInfo;
063 SessionInfo sessionInfo;
064 ProducerInfo producerInfo;
065 ConsumerInfo queueConsumerInfo;
066 ConsumerInfo topicConsumerInfo;
067 BrokerId localBrokerId;
068 BrokerId remoteBrokerId;
069 BrokerInfo localBrokerInfo;
070 BrokerInfo remoteBrokerInfo;
071
072 private final Transport localBroker;
073 private final Transport remoteBroker;
074 private String clientId;
075 private int prefetchSize = 1000;
076 private boolean dispatchAsync;
077 private String destinationFilter = ">";
078 private NetworkBridgeListener bridgeFailedListener;
079 private boolean useCompression = false;
080
081 public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
082 this.localBroker = localBroker;
083 this.remoteBroker = remoteBroker;
084 }
085
086 public void start() throws Exception {
087 LOG.info("Starting a network connection between " + localBroker + " and " + remoteBroker
088 + " has been established.");
089
090 localBroker.setTransportListener(new DefaultTransportListener() {
091 public void onCommand(Object o) {
092 Command command = (Command)o;
093 serviceLocalCommand(command);
094 }
095
096 public void onException(IOException error) {
097 serviceLocalException(error);
098 }
099 });
100
101 remoteBroker.setTransportListener(new DefaultTransportListener() {
102 public void onCommand(Object o) {
103 Command command = (Command)o;
104 serviceRemoteCommand(command);
105 }
106
107 public void onException(IOException error) {
108 serviceRemoteException(error);
109 }
110 });
111
112 localBroker.start();
113 remoteBroker.start();
114 }
115
116 protected void triggerStartBridge() throws IOException {
117 Thread thead = new Thread() {
118 public void run() {
119 try {
120 startBridge();
121 } catch (IOException e) {
122 LOG.error("Failed to start network bridge: " + e, e);
123 }
124 }
125 };
126 thead.start();
127 }
128
129 /**
130 * @throws IOException
131 */
132 final void startBridge() throws IOException {
133 connectionInfo = new ConnectionInfo();
134 connectionInfo.setConnectionId(new ConnectionId(ID_GENERATOR.generateId()));
135 connectionInfo.setClientId(clientId);
136 localBroker.oneway(connectionInfo);
137 remoteBroker.oneway(connectionInfo);
138
139 sessionInfo = new SessionInfo(connectionInfo, 1);
140 localBroker.oneway(sessionInfo);
141 remoteBroker.oneway(sessionInfo);
142
143 queueConsumerInfo = new ConsumerInfo(sessionInfo, 1);
144 queueConsumerInfo.setDispatchAsync(dispatchAsync);
145 queueConsumerInfo.setDestination(new ActiveMQQueue(destinationFilter));
146 queueConsumerInfo.setPrefetchSize(prefetchSize);
147 queueConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
148 localBroker.oneway(queueConsumerInfo);
149
150 producerInfo = new ProducerInfo(sessionInfo, 1);
151 producerInfo.setResponseRequired(false);
152 remoteBroker.oneway(producerInfo);
153
154 if (connectionInfo.getClientId() != null) {
155 topicConsumerInfo = new ConsumerInfo(sessionInfo, 2);
156 topicConsumerInfo.setDispatchAsync(dispatchAsync);
157 topicConsumerInfo.setSubscriptionName("topic-bridge");
158 topicConsumerInfo.setRetroactive(true);
159 topicConsumerInfo.setDestination(new ActiveMQTopic(destinationFilter));
160 topicConsumerInfo.setPrefetchSize(prefetchSize);
161 topicConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
162 localBroker.oneway(topicConsumerInfo);
163 }
164
165 if (LOG.isInfoEnabled()) {
166 LOG.info("Network connection between " + localBroker + " and " + remoteBroker
167 + " has been established.");
168 }
169 }
170
171 public void stop() throws Exception {
172 try {
173 if (connectionInfo != null) {
174 localBroker.request(connectionInfo.createRemoveCommand());
175 remoteBroker.request(connectionInfo.createRemoveCommand());
176 }
177 localBroker.setTransportListener(null);
178 remoteBroker.setTransportListener(null);
179 localBroker.oneway(new ShutdownInfo());
180 remoteBroker.oneway(new ShutdownInfo());
181 } finally {
182 ServiceStopper ss = new ServiceStopper();
183 ss.stop(localBroker);
184 ss.stop(remoteBroker);
185 ss.throwFirstException();
186 }
187 }
188
189 public void serviceRemoteException(Throwable error) {
190 LOG.info("Unexpected remote exception: " + error);
191 if (LOG.isDebugEnabled()) {
192 LOG.debug("Exception trace: ", error);
193 }
194 }
195
196 protected void serviceRemoteCommand(Command command) {
197 try {
198 if (command.isBrokerInfo()) {
199 synchronized (this) {
200 remoteBrokerInfo = (BrokerInfo)command;
201 remoteBrokerId = remoteBrokerInfo.getBrokerId();
202 if (localBrokerId != null) {
203 if (localBrokerId.equals(remoteBrokerId)) {
204 LOG.info("Disconnecting loop back connection.");
205 ServiceSupport.dispose(this);
206 } else {
207 triggerStartBridge();
208 }
209 }
210 }
211 } else {
212 LOG.warn("Unexpected remote command: " + command);
213 }
214 } catch (IOException e) {
215 serviceLocalException(e);
216 }
217 }
218
219 public void serviceLocalException(Throwable error) {
220 LOG.info("Unexpected local exception: " + error);
221 LOG.debug("Exception trace: ", error);
222 fireBridgeFailed();
223 }
224
225 protected void serviceLocalCommand(Command command) {
226 try {
227 if (command.isMessageDispatch()) {
228
229 enqueueCounter.incrementAndGet();
230
231 final MessageDispatch md = (MessageDispatch)command;
232 Message message = md.getMessage();
233 message.setProducerId(producerInfo.getProducerId());
234
235 if (message.getOriginalTransactionId() == null) {
236 message.setOriginalTransactionId(message.getTransactionId());
237 }
238 message.setTransactionId(null);
239
240 if (isUseCompression()) {
241 message.compress();
242 }
243
244 if (!message.isResponseRequired()) {
245 // If the message was originally sent using async send, we will preserve that
246 // QOS by bridging it using an async send (small chance of message loss).
247 remoteBroker.oneway(message);
248 dequeueCounter.incrementAndGet();
249 localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
250
251 } else {
252
253 // The message was not sent using async send, so we should
254 // only ack the local
255 // broker when we get confirmation that the remote broker
256 // has received the message.
257 ResponseCallback callback = new ResponseCallback() {
258 public void onCompletion(FutureResponse future) {
259 try {
260 Response response = future.getResult();
261 if (response.isException()) {
262 ExceptionResponse er = (ExceptionResponse)response;
263 serviceLocalException(er.getException());
264 } else {
265 dequeueCounter.incrementAndGet();
266 localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
267 }
268 } catch (IOException e) {
269 serviceLocalException(e);
270 }
271 }
272 };
273
274 remoteBroker.asyncRequest(message, callback);
275 }
276
277 // Ack on every message since we don't know if the broker is
278 // blocked due to memory
279 // usage and is waiting for an Ack to un-block him.
280
281 // Acking a range is more efficient, but also more prone to
282 // locking up a server
283 // Perhaps doing something like the following should be policy
284 // based.
285 // if(
286 // md.getConsumerId().equals(queueConsumerInfo.getConsumerId())
287 // ) {
288 // queueDispatched++;
289 // if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2)
290 // ) {
291 // localBroker.oneway(new MessageAck(md,
292 // MessageAck.STANDARD_ACK_TYPE, queueDispatched));
293 // queueDispatched=0;
294 // }
295 // } else {
296 // topicDispatched++;
297 // if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2)
298 // ) {
299 // localBroker.oneway(new MessageAck(md,
300 // MessageAck.STANDARD_ACK_TYPE, topicDispatched));
301 // topicDispatched=0;
302 // }
303 // }
304 } else if (command.isBrokerInfo()) {
305 synchronized (this) {
306 localBrokerInfo = (BrokerInfo)command;
307 localBrokerId = localBrokerInfo.getBrokerId();
308 if (remoteBrokerId != null) {
309 if (remoteBrokerId.equals(localBrokerId)) {
310 LOG.info("Disconnecting loop back connection.");
311 ServiceSupport.dispose(this);
312 } else {
313 triggerStartBridge();
314 }
315 }
316 }
317 } else {
318 LOG.debug("Unexpected local command: " + command);
319 }
320 } catch (IOException e) {
321 serviceLocalException(e);
322 }
323 }
324
325 public String getClientId() {
326 return clientId;
327 }
328
329 public void setClientId(String clientId) {
330 this.clientId = clientId;
331 }
332
333 public int getPrefetchSize() {
334 return prefetchSize;
335 }
336
337 public void setPrefetchSize(int prefetchSize) {
338 this.prefetchSize = prefetchSize;
339 }
340
341 public boolean isDispatchAsync() {
342 return dispatchAsync;
343 }
344
345 public void setDispatchAsync(boolean dispatchAsync) {
346 this.dispatchAsync = dispatchAsync;
347 }
348
349 public String getDestinationFilter() {
350 return destinationFilter;
351 }
352
353 public void setDestinationFilter(String destinationFilter) {
354 this.destinationFilter = destinationFilter;
355 }
356
357 public void setNetworkBridgeFailedListener(NetworkBridgeListener listener) {
358 this.bridgeFailedListener = listener;
359 }
360
361 private void fireBridgeFailed() {
362 NetworkBridgeListener l = this.bridgeFailedListener;
363 if (l != null) {
364 l.bridgeFailed();
365 }
366 }
367
368 public String getRemoteAddress() {
369 return remoteBroker.getRemoteAddress();
370 }
371
372 public String getLocalAddress() {
373 return localBroker.getRemoteAddress();
374 }
375
376 public String getLocalBrokerName() {
377 return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
378 }
379
380 public String getRemoteBrokerName() {
381 return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
382 }
383
384 public long getDequeueCounter() {
385 return dequeueCounter.get();
386 }
387
388 public long getEnqueueCounter() {
389 return enqueueCounter.get();
390 }
391
392 /**
393 * @param useCompression
394 * True if forwarded Messages should have their bodies compressed.
395 */
396 public void setUseCompression(boolean useCompression) {
397 this.useCompression = useCompression;
398 }
399
400 /**
401 * @return the vale of the useCompression setting, true if forwarded messages will be compressed.
402 */
403 public boolean isUseCompression() {
404 return useCompression;
405 }
406 }