001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.plugin;
018
019import java.io.File;
020import java.io.FileInputStream;
021import java.io.FileOutputStream;
022import java.io.IOException;
023import java.io.ObjectInputStream;
024import java.io.ObjectOutputStream;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030
031import javax.management.JMException;
032import javax.management.ObjectName;
033
034import org.apache.activemq.advisory.AdvisorySupport;
035import org.apache.activemq.broker.Broker;
036import org.apache.activemq.broker.BrokerFilter;
037import org.apache.activemq.broker.BrokerService;
038import org.apache.activemq.broker.ConnectionContext;
039import org.apache.activemq.broker.jmx.AnnotatedMBean;
040import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
041import org.apache.activemq.broker.jmx.VirtualDestinationSelectorCacheView;
042import org.apache.activemq.broker.region.Subscription;
043import org.apache.activemq.command.ConsumerInfo;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * A plugin which allows the caching of the selector from a subscription queue.
049 * <p/>
050 * This stops the build-up of unwanted messages, especially when consumers may
051 * disconnect from time to time when using virtual destinations.
052 * <p/>
053 * This is influenced by code snippets developed by Maciej Rakowicz
054 *
055 * Refer to:
056 * https://issues.apache.org/activemq/browse/AMQ-3004
057 * http://mail-archives.apache.org/mod_mbox/activemq-users/201011.mbox/%3C8A013711-2613-450A-A487-379E784AF1D6@homeaway.co.uk%3E
058 */
059public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnable {
060    private static final Logger LOG = LoggerFactory.getLogger(SubQueueSelectorCacheBroker.class);
061    public static final String MATCH_EVERYTHING = "TRUE";
062
063    /**
064     * The subscription's selector cache. We cache compiled expressions keyed
065     * by the target destination.
066     */
067    private ConcurrentMap<String, Set<String>> subSelectorCache = new ConcurrentHashMap<String, Set<String>>();
068
069    private final File persistFile;
070    private boolean singleSelectorPerDestination = false;
071    private boolean ignoreWildcardSelectors = false;
072    private ObjectName objectName;
073
074    private boolean running = true;
075    private final Thread persistThread;
076    private long persistInterval = MAX_PERSIST_INTERVAL;
077    public static final long MAX_PERSIST_INTERVAL = 600000;
078    private static final String SELECTOR_CACHE_PERSIST_THREAD_NAME = "SelectorCachePersistThread";
079
080    /**
081     * Constructor
082     */
083    public SubQueueSelectorCacheBroker(Broker next, final File persistFile) {
084        super(next);
085        this.persistFile = persistFile;
086        LOG.info("Using persisted selector cache from[{}]", persistFile);
087
088        readCache();
089
090        persistThread = new Thread(this, SELECTOR_CACHE_PERSIST_THREAD_NAME);
091        persistThread.start();
092        enableJmx();
093    }
094
095    private void enableJmx() {
096        BrokerService broker = getBrokerService();
097        if (broker.isUseJmx()) {
098            VirtualDestinationSelectorCacheView view = new VirtualDestinationSelectorCacheView(this);
099            try {
100                objectName = BrokerMBeanSupport.createVirtualDestinationSelectorCacheName(broker.getBrokerObjectName(), "plugin", "virtualDestinationCache");
101                LOG.trace("virtualDestinationCacheSelector mbean name; " + objectName.toString());
102                AnnotatedMBean.registerMBean(broker.getManagementContext(), view, objectName);
103            } catch (Exception e) {
104                LOG.warn("JMX is enabled, but when installing the VirtualDestinationSelectorCache, couldn't install the JMX mbeans. Continuing without installing the mbeans.");
105            }
106
107        }
108    }
109
110    @Override
111    public void stop() throws Exception {
112        running = false;
113        if (persistThread != null) {
114            persistThread.interrupt();
115            persistThread.join();
116        } //if
117        unregisterMBeans();
118    }
119
120    private void unregisterMBeans() {
121        BrokerService broker = getBrokerService();
122        if (broker.isUseJmx() && this.objectName != null) {
123            try {
124                broker.getManagementContext().unregisterMBean(objectName);
125            } catch (JMException e) {
126                LOG.warn("Trying uninstall VirtualDestinationSelectorCache; couldn't uninstall mbeans, continuting...");
127            }
128        }
129    }
130
131    @Override
132    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
133        // don't track selectors for advisory topics
134        if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
135            String destinationName = info.getDestination().getQualifiedName();
136            LOG.debug("Caching consumer selector [{}] on  '{}'", info.getSelector(), destinationName);
137
138            String selector = info.getSelector() == null ? MATCH_EVERYTHING : info.getSelector();
139
140            if (!(ignoreWildcardSelectors && hasWildcards(selector))) {
141
142                Set<String> selectors = subSelectorCache.get(destinationName);
143                if (selectors == null) {
144                    selectors = Collections.synchronizedSet(new HashSet<String>());
145                } else if (singleSelectorPerDestination && !MATCH_EVERYTHING.equals(selector)) {
146                    // in this case, we allow only ONE selector. But we don't count the catch-all "null/TRUE" selector
147                    // here, we always allow that one. But only one true selector.
148                    boolean containsMatchEverything = selectors.contains(MATCH_EVERYTHING);
149                    selectors.clear();
150
151                    // put back the MATCH_EVERYTHING selector
152                    if (containsMatchEverything) {
153                        selectors.add(MATCH_EVERYTHING);
154                    }
155                }
156
157                LOG.debug("adding new selector: into cache " + selector);
158                selectors.add(selector);
159                LOG.debug("current selectors in cache: " + selectors);
160                subSelectorCache.put(destinationName, selectors);
161            }
162
163
164        }
165        return super.addConsumer(context, info);
166    }
167
168    // trivial check for SQL92/selector wildcards
169    private boolean hasWildcards(String selector) {
170        return selector.contains("%") || selector.contains("_");
171    }
172
173    @Override
174    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
175        if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
176
177            if (singleSelectorPerDestination) {
178                String destinationName = info.getDestination().getQualifiedName();
179                Set<String> selectors = subSelectorCache.get(destinationName);
180                if (info.getSelector() == null && selectors.size() > 1) {
181                    boolean removed = selectors.remove(MATCH_EVERYTHING);
182                    LOG.debug("A non-selector consumer has dropped. Removing the catchall matching pattern 'TRUE'. Successful? " + removed);
183                }
184            }
185
186        }
187        super.removeConsumer(context, info);
188    }
189
190    private void readCache() {
191        if (persistFile != null && persistFile.exists()) {
192            try {
193                FileInputStream fis = new FileInputStream(persistFile);
194                try {
195                    ObjectInputStream in = new ObjectInputStream(fis);
196                    try {
197                        subSelectorCache = (ConcurrentHashMap<String, Set<String>>) in.readObject();
198                    } catch (ClassNotFoundException ex) {
199                        LOG.error("Invalid selector cache data found. Please remove file.", ex);
200                    } finally {
201                        in.close();
202                    } //try
203                } finally {
204                    fis.close();
205                } //try
206            } catch (IOException ex) {
207                LOG.error("Unable to read persisted selector cache...it will be ignored!", ex);
208            } //try
209        } //if
210    }
211
212    /**
213     * Persist the selector cache.
214     */
215    private void persistCache() {
216        LOG.debug("Persisting selector cache....");
217        try {
218            FileOutputStream fos = new FileOutputStream(persistFile);
219            try {
220                ObjectOutputStream out = new ObjectOutputStream(fos);
221                try {
222                    out.writeObject(subSelectorCache);
223                } finally {
224                    out.flush();
225                    out.close();
226                } //try
227            } catch (IOException ex) {
228                LOG.error("Unable to persist selector cache", ex);
229            } finally {
230                fos.close();
231            } //try
232        } catch (IOException ex) {
233            LOG.error("Unable to access file[{}]", persistFile, ex);
234        } //try
235    }
236
237    /**
238     * @return The JMS selector for the specified {@code destination}
239     */
240    public Set<String> getSelector(final String destination) {
241        return subSelectorCache.get(destination);
242    }
243
244    /**
245     * Persist the selector cache every {@code MAX_PERSIST_INTERVAL}ms.
246     *
247     * @see java.lang.Runnable#run()
248     */
249    @Override
250    public void run() {
251        while (running) {
252            try {
253                Thread.sleep(persistInterval);
254            } catch (InterruptedException ex) {
255            } //try
256
257            persistCache();
258        }
259    }
260
261    public boolean isSingleSelectorPerDestination() {
262        return singleSelectorPerDestination;
263    }
264
265    public void setSingleSelectorPerDestination(boolean singleSelectorPerDestination) {
266        this.singleSelectorPerDestination = singleSelectorPerDestination;
267    }
268
269    public Set<String> getSelectorsForDestination(String destinationName) {
270        if (subSelectorCache.containsKey(destinationName)) {
271            return new HashSet<String>(subSelectorCache.get(destinationName));
272        }
273
274        return Collections.EMPTY_SET;
275    }
276
277    public long getPersistInterval() {
278        return persistInterval;
279    }
280
281    public void setPersistInterval(long persistInterval) {
282        this.persistInterval = persistInterval;
283    }
284
285    public boolean deleteSelectorForDestination(String destinationName, String selector) {
286        if (subSelectorCache.containsKey(destinationName)) {
287            Set<String> cachedSelectors = subSelectorCache.get(destinationName);
288            return cachedSelectors.remove(selector);
289        }
290
291        return false;
292    }
293
294    public boolean deleteAllSelectorsForDestination(String destinationName) {
295        if (subSelectorCache.containsKey(destinationName)) {
296            Set<String> cachedSelectors = subSelectorCache.get(destinationName);
297            cachedSelectors.clear();
298        }
299        return true;
300    }
301
302    public boolean isIgnoreWildcardSelectors() {
303        return ignoreWildcardSelectors;
304    }
305
306    public void setIgnoreWildcardSelectors(boolean ignoreWildcardSelectors) {
307        this.ignoreWildcardSelectors = ignoreWildcardSelectors;
308    }
309}
310