001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.plugin;
018
019import java.io.File;
020import java.io.FileInputStream;
021import java.io.FileOutputStream;
022import java.io.IOException;
023import java.io.ObjectInputStream;
024import java.io.ObjectOutputStream;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030import java.util.regex.Matcher;
031import java.util.regex.Pattern;
032
033import javax.management.JMException;
034import javax.management.ObjectName;
035
036import org.apache.activemq.advisory.AdvisorySupport;
037import org.apache.activemq.broker.Broker;
038import org.apache.activemq.broker.BrokerFilter;
039import org.apache.activemq.broker.BrokerService;
040import org.apache.activemq.broker.ConnectionContext;
041import org.apache.activemq.broker.jmx.AnnotatedMBean;
042import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
043import org.apache.activemq.broker.jmx.VirtualDestinationSelectorCacheView;
044import org.apache.activemq.broker.region.Subscription;
045import org.apache.activemq.command.ConsumerInfo;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * A plugin which allows the caching of the selector from a subscription queue.
051 * <p/>
052 * This stops the build-up of unwanted messages, especially when consumers may
053 * disconnect from time to time when using virtual destinations.
054 * <p/>
055 * This is influenced by code snippets developed by Maciej Rakowicz
056 *
057 * Refer to:
058 * https://issues.apache.org/activemq/browse/AMQ-3004
059 * http://mail-archives.apache.org/mod_mbox/activemq-users/201011.mbox/%3C8A013711-2613-450A-A487-379E784AF1D6@homeaway.co.uk%3E
060 */
061public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnable {
062    private static final Logger LOG = LoggerFactory.getLogger(SubQueueSelectorCacheBroker.class);
063    public static final String MATCH_EVERYTHING = "TRUE";
064
065    /**
066     * The subscription's selector cache. We cache compiled expressions keyed
067     * by the target destination.
068     */
069    private ConcurrentMap<String, Set<String>> subSelectorCache = new ConcurrentHashMap<String, Set<String>>();
070
071    private final File persistFile;
072    private boolean singleSelectorPerDestination = false;
073    private boolean ignoreWildcardSelectors = false;
074    private ObjectName objectName;
075
076    private boolean running = true;
077    private final Thread persistThread;
078    private long persistInterval = MAX_PERSIST_INTERVAL;
079    public static final long MAX_PERSIST_INTERVAL = 600000;
080    private static final String SELECTOR_CACHE_PERSIST_THREAD_NAME = "SelectorCachePersistThread";
081
082    /**
083     * Constructor
084     */
085    public SubQueueSelectorCacheBroker(Broker next, final File persistFile) {
086        super(next);
087        this.persistFile = persistFile;
088        LOG.info("Using persisted selector cache from[{}]", persistFile);
089
090        readCache();
091
092        persistThread = new Thread(this, SELECTOR_CACHE_PERSIST_THREAD_NAME);
093        persistThread.start();
094        enableJmx();
095    }
096
097    private void enableJmx() {
098        BrokerService broker = getBrokerService();
099        if (broker.isUseJmx()) {
100            VirtualDestinationSelectorCacheView view = new VirtualDestinationSelectorCacheView(this);
101            try {
102                objectName = BrokerMBeanSupport.createVirtualDestinationSelectorCacheName(broker.getBrokerObjectName(), "plugin", "virtualDestinationCache");
103                LOG.trace("virtualDestinationCacheSelector mbean name; " + objectName.toString());
104                AnnotatedMBean.registerMBean(broker.getManagementContext(), view, objectName);
105            } catch (Exception e) {
106                LOG.warn("JMX is enabled, but when installing the VirtualDestinationSelectorCache, couldn't install the JMX mbeans. Continuing without installing the mbeans.");
107            }
108        }
109    }
110
111    @Override
112    public void stop() throws Exception {
113        running = false;
114        if (persistThread != null) {
115            persistThread.interrupt();
116            persistThread.join();
117        }
118        unregisterMBeans();
119    }
120
121    private void unregisterMBeans() {
122        BrokerService broker = getBrokerService();
123        if (broker.isUseJmx() && this.objectName != null) {
124            try {
125                broker.getManagementContext().unregisterMBean(objectName);
126            } catch (JMException e) {
127                LOG.warn("Trying uninstall VirtualDestinationSelectorCache; couldn't uninstall mbeans, continuting...");
128            }
129        }
130    }
131
132    @Override
133    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
134        // don't track selectors for advisory topics or temp destinations
135        if (!AdvisorySupport.isAdvisoryTopic(info.getDestination()) && !info.getDestination().isTemporary()) {
136            String destinationName = info.getDestination().getQualifiedName();
137            LOG.debug("Caching consumer selector [{}] on  '{}'", info.getSelector(), destinationName);
138
139            String selector = info.getSelector() == null ? MATCH_EVERYTHING : info.getSelector();
140
141            if (!(ignoreWildcardSelectors && hasWildcards(selector))) {
142
143                Set<String> selectors = subSelectorCache.get(destinationName);
144                if (selectors == null) {
145                    selectors = Collections.synchronizedSet(new HashSet<String>());
146                } else if (singleSelectorPerDestination && !MATCH_EVERYTHING.equals(selector)) {
147                    // in this case, we allow only ONE selector. But we don't count the catch-all "null/TRUE" selector
148                    // here, we always allow that one. But only one true selector.
149                    boolean containsMatchEverything = selectors.contains(MATCH_EVERYTHING);
150                    selectors.clear();
151
152                    // put back the MATCH_EVERYTHING selector
153                    if (containsMatchEverything) {
154                        selectors.add(MATCH_EVERYTHING);
155                    }
156                }
157
158                LOG.debug("adding new selector: into cache " + selector);
159                selectors.add(selector);
160                LOG.debug("current selectors in cache: " + selectors);
161                subSelectorCache.put(destinationName, selectors);
162            }
163        }
164
165        return super.addConsumer(context, info);
166    }
167
168    static boolean hasWildcards(String selector) {
169        return WildcardFinder.hasWildcards(selector);
170    }
171
172    @Override
173    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
174        if (!AdvisorySupport.isAdvisoryTopic(info.getDestination()) && !info.getDestination().isTemporary()) {
175            if (singleSelectorPerDestination) {
176                String destinationName = info.getDestination().getQualifiedName();
177                Set<String> selectors = subSelectorCache.get(destinationName);
178                if (info.getSelector() == null && selectors.size() > 1) {
179                    boolean removed = selectors.remove(MATCH_EVERYTHING);
180                    LOG.debug("A non-selector consumer has dropped. Removing the catchall matching pattern 'TRUE'. Successful? " + removed);
181                }
182            }
183
184        }
185        super.removeConsumer(context, info);
186    }
187
188    @SuppressWarnings("unchecked")
189    private void readCache() {
190        if (persistFile != null && persistFile.exists()) {
191            try {
192                try (FileInputStream fis = new FileInputStream(persistFile);) {
193                    ObjectInputStream in = new ObjectInputStream(fis);
194                    try {
195                        subSelectorCache = (ConcurrentHashMap<String, Set<String>>) in.readObject();
196                    } catch (ClassNotFoundException ex) {
197                        LOG.error("Invalid selector cache data found. Please remove file.", ex);
198                    } finally {
199                        in.close();
200                    }
201                }
202            } catch (IOException ex) {
203                LOG.error("Unable to read persisted selector cache...it will be ignored!", ex);
204            }
205        }
206    }
207
208    /**
209     * Persist the selector cache.
210     */
211    private void persistCache() {
212        LOG.debug("Persisting selector cache....");
213        try {
214            FileOutputStream fos = new FileOutputStream(persistFile);
215            try {
216                ObjectOutputStream out = new ObjectOutputStream(fos);
217                try {
218                    out.writeObject(subSelectorCache);
219                } finally {
220                    out.flush();
221                    out.close();
222                }
223            } catch (IOException ex) {
224                LOG.error("Unable to persist selector cache", ex);
225            } finally {
226                fos.close();
227            }
228        } catch (IOException ex) {
229            LOG.error("Unable to access file[{}]", persistFile, ex);
230        }
231    }
232
233    /**
234     * @return The JMS selector for the specified {@code destination}
235     */
236    public Set<String> getSelector(final String destination) {
237        return subSelectorCache.get(destination);
238    }
239
240    /**
241     * Persist the selector cache every {@code MAX_PERSIST_INTERVAL}ms.
242     *
243     * @see java.lang.Runnable#run()
244     */
245    @Override
246    public void run() {
247        while (running) {
248            try {
249                Thread.sleep(persistInterval);
250            } catch (InterruptedException ex) {
251            }
252
253            persistCache();
254        }
255    }
256
257    public boolean isSingleSelectorPerDestination() {
258        return singleSelectorPerDestination;
259    }
260
261    public void setSingleSelectorPerDestination(boolean singleSelectorPerDestination) {
262        this.singleSelectorPerDestination = singleSelectorPerDestination;
263    }
264
265    @SuppressWarnings("unchecked")
266    public Set<String> getSelectorsForDestination(String destinationName) {
267        if (subSelectorCache.containsKey(destinationName)) {
268            return new HashSet<String>(subSelectorCache.get(destinationName));
269        }
270
271        return Collections.EMPTY_SET;
272    }
273
274    public long getPersistInterval() {
275        return persistInterval;
276    }
277
278    public void setPersistInterval(long persistInterval) {
279        this.persistInterval = persistInterval;
280    }
281
282    public boolean deleteSelectorForDestination(String destinationName, String selector) {
283        if (subSelectorCache.containsKey(destinationName)) {
284            Set<String> cachedSelectors = subSelectorCache.get(destinationName);
285            return cachedSelectors.remove(selector);
286        }
287
288        return false;
289    }
290
291    public boolean deleteAllSelectorsForDestination(String destinationName) {
292        if (subSelectorCache.containsKey(destinationName)) {
293            Set<String> cachedSelectors = subSelectorCache.get(destinationName);
294            cachedSelectors.clear();
295        }
296        return true;
297    }
298
299    public boolean isIgnoreWildcardSelectors() {
300        return ignoreWildcardSelectors;
301    }
302
303    public void setIgnoreWildcardSelectors(boolean ignoreWildcardSelectors) {
304        this.ignoreWildcardSelectors = ignoreWildcardSelectors;
305    }
306
307    // find wildcards inside like operator arguments
308    static class WildcardFinder {
309
310        private static final Pattern LIKE_PATTERN=Pattern.compile(
311                "\\bLIKE\\s+'(?<like>([^']|'')+)'(\\s+ESCAPE\\s+'(?<escape>.)')?",
312                Pattern.CASE_INSENSITIVE);
313
314        private static final String REGEX_SPECIAL = ".+?*(){}[]\\-";
315
316        private static String getLike(final Matcher matcher) {
317            return matcher.group("like");
318        }
319
320        private static boolean hasLikeOperator(final Matcher matcher) {
321            return matcher.find();
322        }
323
324        private static String getEscape(final Matcher matcher) {
325            String escapeChar = matcher.group("escape");
326            if (escapeChar == null) {
327                return null;
328            } else if (REGEX_SPECIAL.contains(escapeChar)) {
329                escapeChar = "\\"+escapeChar;
330            }
331            return escapeChar;
332        }
333
334        private static boolean hasWildcardInCurrentMatch(final Matcher matcher) {
335            String wildcards = "[_%]";
336            if (getEscape(matcher) != null) {
337                wildcards = "(^|[^" + getEscape(matcher) + "])" + wildcards;
338            }
339            return Pattern.compile(wildcards).matcher(getLike(matcher)).find();
340        }
341
342        public static boolean hasWildcards(String selector) {
343            Matcher matcher = LIKE_PATTERN.matcher(selector);
344
345            while(hasLikeOperator(matcher)) {
346                if (hasWildcardInCurrentMatch(matcher)) {
347                    return true;
348                }
349            }
350            return false;
351        }
352    }
353}