001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.virtual;
018    
019    import java.io.IOException;
020    import java.util.List;
021    import java.util.Set;
022    
023    import org.apache.activemq.broker.Broker;
024    import org.apache.activemq.broker.ProducerBrokerExchange;
025    import org.apache.activemq.broker.region.Destination;
026    import org.apache.activemq.broker.region.Subscription;
027    import org.apache.activemq.command.ActiveMQDestination;
028    import org.apache.activemq.command.Message;
029    import org.apache.activemq.filter.BooleanExpression;
030    import org.apache.activemq.filter.MessageEvaluationContext;
031    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
032    import org.apache.activemq.plugin.SubQueueSelectorCacheBroker;
033    import org.apache.activemq.selector.SelectorParser;
034    import org.apache.activemq.util.LRUCache;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor {
039        private static final Logger LOG = LoggerFactory.getLogger(SelectorAwareVirtualTopicInterceptor.class);
040        LRUCache<String,BooleanExpression> expressionCache = new LRUCache<String,BooleanExpression>();
041        private SubQueueSelectorCacheBroker selectorCachePlugin;
042    
043        public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
044            super(next, prefix, postfix, local);
045        }
046    
047        /**
048         * Respect the selectors of the subscriptions to ensure only matched messages are dispatched to
049         * the virtual queues, hence there is no build up of unmatched messages on these destinations
050         */
051        @Override
052        protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception {
053            Broker broker = context.getConnectionContext().getBroker();
054            Set<Destination> destinations = broker.getDestinations(destination);
055    
056            for (Destination dest : destinations) {
057                if (matchesSomeConsumer(broker, message, dest)) {
058                    dest.send(context, message.copy());
059                }
060            }
061        }
062    
063        private boolean matchesSomeConsumer(final Broker broker, Message message, Destination dest) throws IOException {
064            boolean matches = false;
065            MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
066            msgContext.setDestination(dest.getActiveMQDestination());
067            msgContext.setMessageReference(message);
068            List<Subscription> subs = dest.getConsumers();
069            for (Subscription sub : subs) {
070                if (sub.matches(message, msgContext)) {
071                    matches = true;
072                    break;
073    
074                }
075            }
076            if (matches == false && subs.size() == 0) {
077                matches = tryMatchingCachedSubs(broker, dest, msgContext);
078            }
079            return matches;
080        }
081    
082        private boolean tryMatchingCachedSubs(final Broker broker, Destination dest, MessageEvaluationContext msgContext) {
083            boolean matches = false;
084            LOG.debug("No active consumer match found. Will try cache if configured...");
085    
086            //retrieve the specific plugin class and lookup the selector for the destination.
087            final SubQueueSelectorCacheBroker cache = getSubQueueSelectorCacheBrokerPlugin(broker);
088    
089            if (cache != null) {
090                final String selector = cache.getSelector(dest.getActiveMQDestination().getQualifiedName());
091                if (selector != null) {
092                    try {
093                        final BooleanExpression expression = getExpression(selector);
094                        matches = expression.matches(msgContext);
095                    } catch (Exception e) {
096                        LOG.error(e.getMessage(), e);
097                    }
098                }
099            }
100            return matches;
101        }
102    
103        private BooleanExpression getExpression(String selector) throws Exception{
104            BooleanExpression result;
105            synchronized(expressionCache){
106                result = expressionCache.get(selector);
107                if (result == null){
108                    result = compileSelector(selector);
109                    expressionCache.put(selector,result);
110                }
111            }
112            return result;
113        }
114    
115        /**
116         * @return The SubQueueSelectorCacheBroker instance or null if no such broker is available.
117         */
118        private SubQueueSelectorCacheBroker getSubQueueSelectorCacheBrokerPlugin(final Broker broker) {
119            if (selectorCachePlugin == null) {
120                selectorCachePlugin = (SubQueueSelectorCacheBroker) broker.getAdaptor(SubQueueSelectorCacheBroker.class);
121            } //if
122    
123            return selectorCachePlugin;
124        }
125    
126        /**
127         * Pre-compile the JMS selector.
128         *
129         * @param selectorExpression The non-null JMS selector expression.
130         */
131        private BooleanExpression compileSelector(final String selectorExpression) throws Exception {
132            return SelectorParser.parse(selectorExpression);
133        }
134    }