001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.io.IOException;
020    import java.util.concurrent.atomic.AtomicLong;
021    
022    import org.apache.activemq.Service;
023    import org.apache.activemq.command.ActiveMQQueue;
024    import org.apache.activemq.command.ActiveMQTopic;
025    import org.apache.activemq.command.BrokerId;
026    import org.apache.activemq.command.BrokerInfo;
027    import org.apache.activemq.command.Command;
028    import org.apache.activemq.command.ConnectionId;
029    import org.apache.activemq.command.ConnectionInfo;
030    import org.apache.activemq.command.ConsumerInfo;
031    import org.apache.activemq.command.ExceptionResponse;
032    import org.apache.activemq.command.Message;
033    import org.apache.activemq.command.MessageAck;
034    import org.apache.activemq.command.MessageDispatch;
035    import org.apache.activemq.command.ProducerInfo;
036    import org.apache.activemq.command.Response;
037    import org.apache.activemq.command.SessionInfo;
038    import org.apache.activemq.command.ShutdownInfo;
039    import org.apache.activemq.transport.DefaultTransportListener;
040    import org.apache.activemq.transport.FutureResponse;
041    import org.apache.activemq.transport.ResponseCallback;
042    import org.apache.activemq.transport.Transport;
043    import org.apache.activemq.util.IdGenerator;
044    import org.apache.activemq.util.ServiceStopper;
045    import org.apache.activemq.util.ServiceSupport;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    /**
050     * Forwards all messages from the local broker to the remote broker.
051     *
052     * @org.apache.xbean.XBean
053     *
054     */
055    public class ForwardingBridge implements Service {
056    
057        private static final IdGenerator ID_GENERATOR = new IdGenerator();
058        private static final Logger LOG = LoggerFactory.getLogger(ForwardingBridge.class);
059    
060        final AtomicLong enqueueCounter = new AtomicLong();
061        final AtomicLong dequeueCounter = new AtomicLong();
062        ConnectionInfo connectionInfo;
063        SessionInfo sessionInfo;
064        ProducerInfo producerInfo;
065        ConsumerInfo queueConsumerInfo;
066        ConsumerInfo topicConsumerInfo;
067        BrokerId localBrokerId;
068        BrokerId remoteBrokerId;
069        BrokerInfo localBrokerInfo;
070        BrokerInfo remoteBrokerInfo;
071    
072        private final Transport localBroker;
073        private final Transport remoteBroker;
074        private String clientId;
075        private int prefetchSize = 1000;
076        private boolean dispatchAsync;
077        private String destinationFilter = ">";
078        private NetworkBridgeListener bridgeFailedListener;
079        private boolean useCompression = false;
080    
081        public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
082            this.localBroker = localBroker;
083            this.remoteBroker = remoteBroker;
084        }
085    
086        public void start() throws Exception {
087            LOG.info("Starting a network connection between " + localBroker + " and " + remoteBroker
088                     + " has been established.");
089    
090            localBroker.setTransportListener(new DefaultTransportListener() {
091                public void onCommand(Object o) {
092                    Command command = (Command)o;
093                    serviceLocalCommand(command);
094                }
095    
096                public void onException(IOException error) {
097                    serviceLocalException(error);
098                }
099            });
100    
101            remoteBroker.setTransportListener(new DefaultTransportListener() {
102                public void onCommand(Object o) {
103                    Command command = (Command)o;
104                    serviceRemoteCommand(command);
105                }
106    
107                public void onException(IOException error) {
108                    serviceRemoteException(error);
109                }
110            });
111    
112            localBroker.start();
113            remoteBroker.start();
114        }
115    
116        protected void triggerStartBridge() throws IOException {
117            Thread thead = new Thread() {
118                public void run() {
119                    try {
120                        startBridge();
121                    } catch (IOException e) {
122                        LOG.error("Failed to start network bridge: " + e, e);
123                    }
124                }
125            };
126            thead.start();
127        }
128    
129        /**
130         * @throws IOException
131         */
132        final void startBridge() throws IOException {
133            connectionInfo = new ConnectionInfo();
134            connectionInfo.setConnectionId(new ConnectionId(ID_GENERATOR.generateId()));
135            connectionInfo.setClientId(clientId);
136            localBroker.oneway(connectionInfo);
137            remoteBroker.oneway(connectionInfo);
138    
139            sessionInfo = new SessionInfo(connectionInfo, 1);
140            localBroker.oneway(sessionInfo);
141            remoteBroker.oneway(sessionInfo);
142    
143            queueConsumerInfo = new ConsumerInfo(sessionInfo, 1);
144            queueConsumerInfo.setDispatchAsync(dispatchAsync);
145            queueConsumerInfo.setDestination(new ActiveMQQueue(destinationFilter));
146            queueConsumerInfo.setPrefetchSize(prefetchSize);
147            queueConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
148            localBroker.oneway(queueConsumerInfo);
149    
150            producerInfo = new ProducerInfo(sessionInfo, 1);
151            producerInfo.setResponseRequired(false);
152            remoteBroker.oneway(producerInfo);
153    
154            if (connectionInfo.getClientId() != null) {
155                topicConsumerInfo = new ConsumerInfo(sessionInfo, 2);
156                topicConsumerInfo.setDispatchAsync(dispatchAsync);
157                topicConsumerInfo.setSubscriptionName("topic-bridge");
158                topicConsumerInfo.setRetroactive(true);
159                topicConsumerInfo.setDestination(new ActiveMQTopic(destinationFilter));
160                topicConsumerInfo.setPrefetchSize(prefetchSize);
161                topicConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
162                localBroker.oneway(topicConsumerInfo);
163            }
164    
165            if (LOG.isInfoEnabled()) {
166                LOG.info("Network connection between " + localBroker + " and " + remoteBroker
167                         + " has been established.");
168            }
169        }
170    
171        public void stop() throws Exception {
172            try {
173                if (connectionInfo != null) {
174                    localBroker.request(connectionInfo.createRemoveCommand());
175                    remoteBroker.request(connectionInfo.createRemoveCommand());
176                }
177                localBroker.setTransportListener(null);
178                remoteBroker.setTransportListener(null);
179                localBroker.oneway(new ShutdownInfo());
180                remoteBroker.oneway(new ShutdownInfo());
181            } finally {
182                ServiceStopper ss = new ServiceStopper();
183                ss.stop(localBroker);
184                ss.stop(remoteBroker);
185                ss.throwFirstException();
186            }
187        }
188    
189        public void serviceRemoteException(Throwable error) {
190            LOG.info("Unexpected remote exception: " + error);
191            if (LOG.isDebugEnabled()) {
192                LOG.debug("Exception trace: ", error);
193            }
194        }
195    
196        protected void serviceRemoteCommand(Command command) {
197            try {
198                if (command.isBrokerInfo()) {
199                    synchronized (this) {
200                        remoteBrokerInfo = (BrokerInfo)command;
201                        remoteBrokerId = remoteBrokerInfo.getBrokerId();
202                        if (localBrokerId != null) {
203                            if (localBrokerId.equals(remoteBrokerId)) {
204                                LOG.info("Disconnecting loop back connection.");
205                                ServiceSupport.dispose(this);
206                            } else {
207                                triggerStartBridge();
208                            }
209                        }
210                    }
211                } else {
212                    LOG.warn("Unexpected remote command: " + command);
213                }
214            } catch (IOException e) {
215                serviceLocalException(e);
216            }
217        }
218    
219        public void serviceLocalException(Throwable error) {
220            LOG.info("Unexpected local exception: " + error);
221            LOG.debug("Exception trace: ", error);
222            fireBridgeFailed();
223        }
224    
225        protected void serviceLocalCommand(Command command) {
226            try {
227                if (command.isMessageDispatch()) {
228    
229                    enqueueCounter.incrementAndGet();
230    
231                    final MessageDispatch md = (MessageDispatch)command;
232                    Message message = md.getMessage();
233                    message.setProducerId(producerInfo.getProducerId());
234    
235                    if (message.getOriginalTransactionId() == null) {
236                        message.setOriginalTransactionId(message.getTransactionId());
237                    }
238                    message.setTransactionId(null);
239    
240                    if (isUseCompression()) {
241                        message.compress();
242                    }
243    
244                    if (!message.isResponseRequired()) {
245                        // If the message was originally sent using async send, we will preserve that
246                        // QOS by bridging it using an async send (small chance of message loss).
247                        remoteBroker.oneway(message);
248                        dequeueCounter.incrementAndGet();
249                        localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
250    
251                    } else {
252    
253                        // The message was not sent using async send, so we should
254                        // only ack the local
255                        // broker when we get confirmation that the remote broker
256                        // has received the message.
257                        ResponseCallback callback = new ResponseCallback() {
258                            public void onCompletion(FutureResponse future) {
259                                try {
260                                    Response response = future.getResult();
261                                    if (response.isException()) {
262                                        ExceptionResponse er = (ExceptionResponse)response;
263                                        serviceLocalException(er.getException());
264                                    } else {
265                                        dequeueCounter.incrementAndGet();
266                                        localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
267                                    }
268                                } catch (IOException e) {
269                                    serviceLocalException(e);
270                                }
271                            }
272                        };
273    
274                        remoteBroker.asyncRequest(message, callback);
275                    }
276    
277                    // Ack on every message since we don't know if the broker is
278                    // blocked due to memory
279                    // usage and is waiting for an Ack to un-block him.
280    
281                    // Acking a range is more efficient, but also more prone to
282                    // locking up a server
283                    // Perhaps doing something like the following should be policy
284                    // based.
285                    // if(
286                    // md.getConsumerId().equals(queueConsumerInfo.getConsumerId())
287                    // ) {
288                    // queueDispatched++;
289                    // if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2)
290                    // ) {
291                    // localBroker.oneway(new MessageAck(md,
292                    // MessageAck.STANDARD_ACK_TYPE, queueDispatched));
293                    // queueDispatched=0;
294                    // }
295                    // } else {
296                    // topicDispatched++;
297                    // if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2)
298                    // ) {
299                    // localBroker.oneway(new MessageAck(md,
300                    // MessageAck.STANDARD_ACK_TYPE, topicDispatched));
301                    // topicDispatched=0;
302                    // }
303                    // }
304                } else if (command.isBrokerInfo()) {
305                    synchronized (this) {
306                        localBrokerInfo = (BrokerInfo)command;
307                        localBrokerId = localBrokerInfo.getBrokerId();
308                        if (remoteBrokerId != null) {
309                            if (remoteBrokerId.equals(localBrokerId)) {
310                                LOG.info("Disconnecting loop back connection.");
311                                ServiceSupport.dispose(this);
312                            } else {
313                                triggerStartBridge();
314                            }
315                        }
316                    }
317                } else {
318                    LOG.debug("Unexpected local command: " + command);
319                }
320            } catch (IOException e) {
321                serviceLocalException(e);
322            }
323        }
324    
325        public String getClientId() {
326            return clientId;
327        }
328    
329        public void setClientId(String clientId) {
330            this.clientId = clientId;
331        }
332    
333        public int getPrefetchSize() {
334            return prefetchSize;
335        }
336    
337        public void setPrefetchSize(int prefetchSize) {
338            this.prefetchSize = prefetchSize;
339        }
340    
341        public boolean isDispatchAsync() {
342            return dispatchAsync;
343        }
344    
345        public void setDispatchAsync(boolean dispatchAsync) {
346            this.dispatchAsync = dispatchAsync;
347        }
348    
349        public String getDestinationFilter() {
350            return destinationFilter;
351        }
352    
353        public void setDestinationFilter(String destinationFilter) {
354            this.destinationFilter = destinationFilter;
355        }
356    
357        public void setNetworkBridgeFailedListener(NetworkBridgeListener listener) {
358            this.bridgeFailedListener = listener;
359        }
360    
361        private void fireBridgeFailed() {
362            NetworkBridgeListener l = this.bridgeFailedListener;
363            if (l != null) {
364                l.bridgeFailed();
365            }
366        }
367    
368        public String getRemoteAddress() {
369            return remoteBroker.getRemoteAddress();
370        }
371    
372        public String getLocalAddress() {
373            return localBroker.getRemoteAddress();
374        }
375    
376        public String getLocalBrokerName() {
377            return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
378        }
379    
380        public String getRemoteBrokerName() {
381            return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
382        }
383    
384        public long getDequeueCounter() {
385            return dequeueCounter.get();
386        }
387    
388        public long getEnqueueCounter() {
389            return enqueueCounter.get();
390        }
391    
392        /**
393         * @param useCompression
394         *      True if forwarded Messages should have their bodies compressed.
395         */
396        public void setUseCompression(boolean useCompression) {
397            this.useCompression = useCompression;
398        }
399    
400        /**
401         * @return the vale of the useCompression setting, true if forwarded messages will be compressed.
402         */
403        public boolean isUseCompression() {
404            return useCompression;
405        }
406    }