001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.advisory;
018    
019    import java.util.Set;
020    import java.util.concurrent.CopyOnWriteArraySet;
021    import java.util.concurrent.atomic.AtomicBoolean;
022    
023    import javax.jms.Connection;
024    import javax.jms.JMSException;
025    import javax.jms.Message;
026    import javax.jms.MessageConsumer;
027    import javax.jms.MessageListener;
028    import javax.jms.Session;
029    
030    import org.apache.activemq.command.ActiveMQDestination;
031    import org.apache.activemq.command.ActiveMQMessage;
032    import org.apache.activemq.command.ActiveMQQueue;
033    import org.apache.activemq.command.ActiveMQTempQueue;
034    import org.apache.activemq.command.ActiveMQTempTopic;
035    import org.apache.activemq.command.ActiveMQTopic;
036    import org.apache.activemq.command.DestinationInfo;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    /**
041     * A helper class which keeps track of the Destinations available in a broker and allows you to listen to them
042     * being created or deleted.
043     *
044     * 
045     */
046    public class DestinationSource implements MessageListener {
047        private static final Logger LOG = LoggerFactory.getLogger(ConsumerEventSource.class);
048        private AtomicBoolean started = new AtomicBoolean(false);
049        private final Connection connection;
050        private Session session;
051        private MessageConsumer queueConsumer;
052        private MessageConsumer topicConsumer;
053        private MessageConsumer tempTopicConsumer;
054        private MessageConsumer tempQueueConsumer;
055        private Set<ActiveMQQueue> queues = new CopyOnWriteArraySet<ActiveMQQueue>();
056        private Set<ActiveMQTopic> topics = new CopyOnWriteArraySet<ActiveMQTopic>();
057        private Set<ActiveMQTempQueue> temporaryQueues = new CopyOnWriteArraySet<ActiveMQTempQueue>();
058        private Set<ActiveMQTempTopic> temporaryTopics = new CopyOnWriteArraySet<ActiveMQTempTopic>();
059        private DestinationListener listener;
060    
061        public DestinationSource(Connection connection) throws JMSException {
062            this.connection = connection;
063        }
064    
065        public DestinationListener getListener() {
066            return listener;
067        }
068    
069        public void setDestinationListener(DestinationListener listener) {
070            this.listener = listener;
071        }
072    
073        /**
074         * Returns the current queues available on the broker
075         */
076        public Set<ActiveMQQueue> getQueues() {
077            return queues;
078        }
079    
080        /**
081         * Returns the current topics on the broker
082         */
083        public Set<ActiveMQTopic> getTopics() {
084            return topics;
085        }
086    
087        /**
088         * Returns the current temporary topics available on the broker
089         */
090        public Set<ActiveMQTempQueue> getTemporaryQueues() {
091            return temporaryQueues;
092        }
093    
094        /**
095         * Returns the current temporary queues available on the broker
096         */
097        public Set<ActiveMQTempTopic> getTemporaryTopics() {
098            return temporaryTopics;
099        }
100    
101        public void start() throws JMSException {
102            if (started.compareAndSet(false, true)) {
103                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
104                queueConsumer = session.createConsumer(AdvisorySupport.QUEUE_ADVISORY_TOPIC);
105                queueConsumer.setMessageListener(this);
106    
107                topicConsumer = session.createConsumer(AdvisorySupport.TOPIC_ADVISORY_TOPIC);
108                topicConsumer.setMessageListener(this);
109    
110                tempQueueConsumer = session.createConsumer(AdvisorySupport.TEMP_QUEUE_ADVISORY_TOPIC);
111                tempQueueConsumer.setMessageListener(this);
112    
113                tempTopicConsumer = session.createConsumer(AdvisorySupport.TEMP_TOPIC_ADVISORY_TOPIC);
114                tempTopicConsumer.setMessageListener(this);
115            }
116        }
117    
118        public void stop() throws JMSException {
119            if (started.compareAndSet(true, false)) {
120                if (session != null) {
121                    session.close();
122                }
123            }
124        }
125    
126        public void onMessage(Message message) {
127            if (message instanceof ActiveMQMessage) {
128                ActiveMQMessage activeMessage = (ActiveMQMessage) message;
129                Object command = activeMessage.getDataStructure();
130                if (command instanceof DestinationInfo) {
131                    DestinationInfo destinationInfo = (DestinationInfo) command;
132                    DestinationEvent event = new DestinationEvent(this, destinationInfo);
133                    fireDestinationEvent(event);
134                }
135                else {
136                    LOG.warn("Unknown dataStructure: " + command);
137                }
138            }
139            else {
140                LOG.warn("Unknown message type: " + message + ". Message ignored");
141            }
142        }
143    
144        protected void fireDestinationEvent(DestinationEvent event) {
145            // now lets update the data structures
146            ActiveMQDestination destination = event.getDestination();
147            boolean add = event.isAddOperation();
148            if (destination instanceof ActiveMQQueue) {
149                ActiveMQQueue queue = (ActiveMQQueue) destination;
150                if (add) {
151                    queues.add(queue);
152                }
153                else {
154                    queues.remove(queue);
155                }
156            }
157            else if (destination instanceof ActiveMQTopic) {
158                ActiveMQTopic topic = (ActiveMQTopic) destination;
159                if (add) {
160                    topics.add(topic);
161                }
162                else {
163                    topics.remove(topic);
164                }
165            }
166            else if (destination instanceof ActiveMQTempQueue) {
167                ActiveMQTempQueue queue = (ActiveMQTempQueue) destination;
168                if (add) {
169                    temporaryQueues.add(queue);
170                }
171                else {
172                    temporaryQueues.remove(queue);
173                }
174            }
175            else if (destination instanceof ActiveMQTempTopic) {
176                ActiveMQTempTopic topic = (ActiveMQTempTopic) destination;
177                if (add) {
178                    temporaryTopics.add(topic);
179                }
180                else {
181                    temporaryTopics.remove(topic);
182                }
183            }
184            else {
185                LOG.warn("Unknown destination type: " + destination);
186            }
187            if (listener != null) {
188                listener.onDestinationEvent(event);
189            }
190        }
191    }