001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.virtual;
018    
019    import org.apache.activemq.broker.*;
020    import org.apache.activemq.broker.region.Destination;
021    import org.apache.activemq.broker.region.DestinationFilter;
022    import org.apache.activemq.broker.region.DestinationInterceptor;
023    import org.apache.activemq.command.ActiveMQDestination;
024    import org.apache.activemq.command.ActiveMQTopic;
025    import org.apache.activemq.command.Message;
026    import org.slf4j.Logger;
027    import org.slf4j.LoggerFactory;
028    
029    /**
030     * Creates <a href="http://activemq.org/site/mirrored-queues.html">Mirrored
031     * Queue</a> using a prefix and postfix to define the topic name on which to mirror the queue to.
032     *
033     * 
034     * @org.apache.xbean.XBean
035     */
036    public class MirroredQueue implements DestinationInterceptor, BrokerServiceAware {
037        private static final transient Logger LOG = LoggerFactory.getLogger(MirroredQueue.class);
038        private String prefix = "VirtualTopic.Mirror.";
039        private String postfix = "";
040        private boolean copyMessage = true;
041        private BrokerService brokerService;
042    
043        public Destination intercept(final Destination destination) {
044            if (destination.getActiveMQDestination().isQueue()) {
045                if (!destination.getActiveMQDestination().isTemporary() || brokerService.isUseTempMirroredQueues()) {
046                    try {
047                        final Destination mirrorDestination = getMirrorDestination(destination);
048                        if (mirrorDestination != null) {
049                            return new DestinationFilter(destination) {
050                                public void send(ProducerBrokerExchange context, Message message) throws Exception {
051                                    message.setDestination(mirrorDestination.getActiveMQDestination());
052                                    mirrorDestination.send(context, message);
053        
054                                    if (isCopyMessage()) {
055                                        message = message.copy();
056                                    }
057                                    message.setDestination(destination.getActiveMQDestination());
058                                    message.setMemoryUsage(null); // set this to null so that it will use the queue memoryUsage instance instead of the topic.
059                                    super.send(context, message);
060                                }
061                            };
062                        }
063                    }
064                    catch (Exception e) {
065                        LOG.error("Failed to lookup the mirror destination for: " + destination + ". Reason: " + e, e);
066                    }
067                }
068            }
069            return destination;
070        }
071        
072    
073        public void remove(Destination destination) {
074            if (brokerService == null) {
075                throw new IllegalArgumentException("No brokerService injected!");
076            }
077            ActiveMQDestination topic = getMirrorTopic(destination.getActiveMQDestination());
078            if (topic != null) {
079                try {
080                    brokerService.removeDestination(topic);
081                } catch (Exception e) {
082                    LOG.error("Failed to remove mirror destination for " + destination + ". Reason: " + e,e);
083                }
084            }
085            
086        }
087    
088        public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) {}
089    
090        // Properties
091        // -------------------------------------------------------------------------
092    
093        public String getPostfix() {
094            return postfix;
095        }
096    
097        /**
098         * Sets any postix used to identify the queue consumers
099         */
100        public void setPostfix(String postfix) {
101            this.postfix = postfix;
102        }
103    
104        public String getPrefix() {
105            return prefix;
106        }
107    
108        /**
109         * Sets the prefix wildcard used to identify the queue consumers for a given
110         * topic
111         */
112        public void setPrefix(String prefix) {
113            this.prefix = prefix;
114        }
115    
116        public boolean isCopyMessage() {
117            return copyMessage;
118        }
119    
120        /**
121         * Sets whether a copy of the message will be sent to each destination.
122         * Defaults to true so that the forward destination is set as the
123         * destination of the message
124         */
125        public void setCopyMessage(boolean copyMessage) {
126            this.copyMessage = copyMessage;
127        }
128    
129        public void setBrokerService(BrokerService brokerService) {
130            this.brokerService = brokerService;
131        }
132    
133        // Implementation methods
134        //-------------------------------------------------------------------------
135        protected Destination getMirrorDestination(Destination destination) throws Exception {
136            if (brokerService == null) {
137                throw new IllegalArgumentException("No brokerService injected!");
138            }
139            ActiveMQDestination topic = getMirrorTopic(destination.getActiveMQDestination());
140            return brokerService.getDestination(topic);
141        }
142    
143        protected ActiveMQDestination getMirrorTopic(ActiveMQDestination original) {
144            return new ActiveMQTopic(prefix + original.getPhysicalName() + postfix);
145        }
146    
147    }