001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.virtual;
018    
019    import org.apache.activemq.broker.ProducerBrokerExchange;
020    import org.apache.activemq.broker.region.Destination;
021    import org.apache.activemq.broker.region.DestinationFilter;
022    import org.apache.activemq.broker.region.Topic;
023    import org.apache.activemq.command.ActiveMQDestination;
024    import org.apache.activemq.command.ActiveMQQueue;
025    import org.apache.activemq.command.Message;
026    import org.apache.activemq.util.LRUCache;
027    
028    /**
029     * A Destination which implements <a href="http://activemq.org/site/virtual-destinations.html">Virtual Topic</a>
030     */
031    public class VirtualTopicInterceptor extends DestinationFilter {
032    
033        private final String prefix;
034        private final String postfix;
035        private final boolean local;
036        private final LRUCache<ActiveMQDestination, ActiveMQQueue> cache = new LRUCache<ActiveMQDestination, ActiveMQQueue>();
037    
038        public VirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
039            super(next);
040            this.prefix = prefix;
041            this.postfix = postfix;
042            this.local = local;
043        }
044    
045        public Topic getTopic() {
046            return (Topic) this.next;
047        }
048    
049        @Override
050        public void send(ProducerBrokerExchange context, Message message) throws Exception {
051            if (!message.isAdvisory() && !(local && message.getBrokerPath() != null)) {
052                ActiveMQDestination queueConsumers = getQueueConsumersWildcard(message.getDestination());
053                send(context, message, queueConsumers);
054            }
055            super.send(context, message);
056        }
057    
058        protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) {
059            ActiveMQQueue queue;
060            synchronized (cache) {
061                queue = cache.get(original);
062                if (queue == null) {
063                    queue = new ActiveMQQueue(prefix + original.getPhysicalName() + postfix);
064                    cache.put(original, queue);
065                }
066            }
067            return queue;
068        }
069    }