001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.advisory;
018    
019    import java.util.Set;
020    import java.util.concurrent.CopyOnWriteArraySet;
021    import java.util.concurrent.atomic.AtomicBoolean;
022    
023    import javax.jms.Connection;
024    import javax.jms.JMSException;
025    import javax.jms.Message;
026    import javax.jms.MessageConsumer;
027    import javax.jms.MessageListener;
028    import javax.jms.Session;
029    
030    import org.apache.activemq.Service;
031    import org.apache.activemq.command.ActiveMQDestination;
032    import org.apache.activemq.command.ActiveMQMessage;
033    import org.apache.activemq.command.ActiveMQQueue;
034    import org.apache.activemq.command.ActiveMQTempQueue;
035    import org.apache.activemq.command.ActiveMQTempTopic;
036    import org.apache.activemq.command.ActiveMQTopic;
037    import org.apache.activemq.command.DestinationInfo;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    
041    /**
042     * A helper class which keeps track of the Destinations available in a broker and allows you to listen to them
043     * being created or deleted.
044     *
045     * 
046     */
047    public class DestinationSource implements MessageListener {
048        private static final Logger LOG = LoggerFactory.getLogger(ConsumerEventSource.class);
049        private AtomicBoolean started = new AtomicBoolean(false);
050        private final Connection connection;
051        private Session session;
052        private MessageConsumer queueConsumer;
053        private MessageConsumer topicConsumer;
054        private MessageConsumer tempTopicConsumer;
055        private MessageConsumer tempQueueConsumer;
056        private Set<ActiveMQQueue> queues = new CopyOnWriteArraySet<ActiveMQQueue>();
057        private Set<ActiveMQTopic> topics = new CopyOnWriteArraySet<ActiveMQTopic>();
058        private Set<ActiveMQTempQueue> temporaryQueues = new CopyOnWriteArraySet<ActiveMQTempQueue>();
059        private Set<ActiveMQTempTopic> temporaryTopics = new CopyOnWriteArraySet<ActiveMQTempTopic>();
060        private DestinationListener listener;
061    
062        public DestinationSource(Connection connection) throws JMSException {
063            this.connection = connection;
064        }
065    
066        public DestinationListener getListener() {
067            return listener;
068        }
069    
070        public void setDestinationListener(DestinationListener listener) {
071            this.listener = listener;
072        }
073    
074        /**
075         * Returns the current queues available on the broker
076         */
077        public Set<ActiveMQQueue> getQueues() {
078            return queues;
079        }
080    
081        /**
082         * Returns the current topics on the broker
083         */
084        public Set<ActiveMQTopic> getTopics() {
085            return topics;
086        }
087    
088        /**
089         * Returns the current temporary topics available on the broker
090         */
091        public Set<ActiveMQTempQueue> getTemporaryQueues() {
092            return temporaryQueues;
093        }
094    
095        /**
096         * Returns the current temporary queues available on the broker
097         */
098        public Set<ActiveMQTempTopic> getTemporaryTopics() {
099            return temporaryTopics;
100        }
101    
102        public void start() throws JMSException {
103            if (started.compareAndSet(false, true)) {
104                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
105                queueConsumer = session.createConsumer(AdvisorySupport.QUEUE_ADVISORY_TOPIC);
106                queueConsumer.setMessageListener(this);
107    
108                topicConsumer = session.createConsumer(AdvisorySupport.TOPIC_ADVISORY_TOPIC);
109                topicConsumer.setMessageListener(this);
110    
111                tempQueueConsumer = session.createConsumer(AdvisorySupport.TEMP_QUEUE_ADVISORY_TOPIC);
112                tempQueueConsumer.setMessageListener(this);
113    
114                tempTopicConsumer = session.createConsumer(AdvisorySupport.TEMP_TOPIC_ADVISORY_TOPIC);
115                tempTopicConsumer.setMessageListener(this);
116            }
117        }
118    
119        public void stop() throws JMSException {
120            if (started.compareAndSet(true, false)) {
121                if (session != null) {
122                    session.close();
123                }
124            }
125        }
126    
127        public void onMessage(Message message) {
128            if (message instanceof ActiveMQMessage) {
129                ActiveMQMessage activeMessage = (ActiveMQMessage) message;
130                Object command = activeMessage.getDataStructure();
131                if (command instanceof DestinationInfo) {
132                    DestinationInfo destinationInfo = (DestinationInfo) command;
133                    DestinationEvent event = new DestinationEvent(this, destinationInfo);
134                    fireDestinationEvent(event);
135                }
136                else {
137                    LOG.warn("Unknown dataStructure: " + command);
138                }
139            }
140            else {
141                LOG.warn("Unknown message type: " + message + ". Message ignored");
142            }
143        }
144    
145        protected void fireDestinationEvent(DestinationEvent event) {
146            // now lets update the data structures
147            ActiveMQDestination destination = event.getDestination();
148            boolean add = event.isAddOperation();
149            if (destination instanceof ActiveMQQueue) {
150                ActiveMQQueue queue = (ActiveMQQueue) destination;
151                if (add) {
152                    queues.add(queue);
153                }
154                else {
155                    queues.remove(queue);
156                }
157            }
158            else if (destination instanceof ActiveMQTopic) {
159                ActiveMQTopic topic = (ActiveMQTopic) destination;
160                if (add) {
161                    topics.add(topic);
162                }
163                else {
164                    topics.remove(topic);
165                }
166            }
167            else if (destination instanceof ActiveMQTempQueue) {
168                ActiveMQTempQueue queue = (ActiveMQTempQueue) destination;
169                if (add) {
170                    temporaryQueues.add(queue);
171                }
172                else {
173                    temporaryQueues.remove(queue);
174                }
175            }
176            else if (destination instanceof ActiveMQTempTopic) {
177                ActiveMQTempTopic topic = (ActiveMQTempTopic) destination;
178                if (add) {
179                    temporaryTopics.add(topic);
180                }
181                else {
182                    temporaryTopics.remove(topic);
183                }
184            }
185            else {
186                LOG.warn("Unknown destination type: " + destination);
187            }
188            if (listener != null) {
189                listener.onDestinationEvent(event);
190            }
191        }
192    }