001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.advisory;
018
019import java.util.Set;
020import java.util.concurrent.CopyOnWriteArraySet;
021import java.util.concurrent.atomic.AtomicBoolean;
022
023import javax.jms.Connection;
024import javax.jms.JMSException;
025import javax.jms.Message;
026import javax.jms.MessageConsumer;
027import javax.jms.MessageListener;
028import javax.jms.Session;
029
030import org.apache.activemq.command.ActiveMQDestination;
031import org.apache.activemq.command.ActiveMQMessage;
032import org.apache.activemq.command.ActiveMQQueue;
033import org.apache.activemq.command.ActiveMQTempQueue;
034import org.apache.activemq.command.ActiveMQTempTopic;
035import org.apache.activemq.command.ActiveMQTopic;
036import org.apache.activemq.command.DestinationInfo;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * A helper class which keeps track of the Destinations available in a broker and allows you to listen to them
042 * being created or deleted.
043 *
044 * 
045 */
046public class DestinationSource implements MessageListener {
047    private static final Logger LOG = LoggerFactory.getLogger(ConsumerEventSource.class);
048    private AtomicBoolean started = new AtomicBoolean(false);
049    private final Connection connection;
050    private Session session;
051    private MessageConsumer queueConsumer;
052    private MessageConsumer topicConsumer;
053    private MessageConsumer tempTopicConsumer;
054    private MessageConsumer tempQueueConsumer;
055    private Set<ActiveMQQueue> queues = new CopyOnWriteArraySet<ActiveMQQueue>();
056    private Set<ActiveMQTopic> topics = new CopyOnWriteArraySet<ActiveMQTopic>();
057    private Set<ActiveMQTempQueue> temporaryQueues = new CopyOnWriteArraySet<ActiveMQTempQueue>();
058    private Set<ActiveMQTempTopic> temporaryTopics = new CopyOnWriteArraySet<ActiveMQTempTopic>();
059    private DestinationListener listener;
060
061    public DestinationSource(Connection connection) throws JMSException {
062        this.connection = connection;
063    }
064
065    public DestinationListener getListener() {
066        return listener;
067    }
068
069    public void setDestinationListener(DestinationListener listener) {
070        this.listener = listener;
071    }
072
073    /**
074     * Returns the current queues available on the broker
075     */
076    public Set<ActiveMQQueue> getQueues() {
077        return queues;
078    }
079
080    /**
081     * Returns the current topics on the broker
082     */
083    public Set<ActiveMQTopic> getTopics() {
084        return topics;
085    }
086
087    /**
088     * Returns the current temporary topics available on the broker
089     */
090    public Set<ActiveMQTempQueue> getTemporaryQueues() {
091        return temporaryQueues;
092    }
093
094    /**
095     * Returns the current temporary queues available on the broker
096     */
097    public Set<ActiveMQTempTopic> getTemporaryTopics() {
098        return temporaryTopics;
099    }
100
101    public void start() throws JMSException {
102        if (started.compareAndSet(false, true)) {
103            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
104            queueConsumer = session.createConsumer(AdvisorySupport.QUEUE_ADVISORY_TOPIC);
105            queueConsumer.setMessageListener(this);
106
107            topicConsumer = session.createConsumer(AdvisorySupport.TOPIC_ADVISORY_TOPIC);
108            topicConsumer.setMessageListener(this);
109
110            tempQueueConsumer = session.createConsumer(AdvisorySupport.TEMP_QUEUE_ADVISORY_TOPIC);
111            tempQueueConsumer.setMessageListener(this);
112
113            tempTopicConsumer = session.createConsumer(AdvisorySupport.TEMP_TOPIC_ADVISORY_TOPIC);
114            tempTopicConsumer.setMessageListener(this);
115        }
116    }
117
118    public void stop() throws JMSException {
119        if (started.compareAndSet(true, false)) {
120            if (session != null) {
121                session.close();
122            }
123        }
124    }
125
126    public void onMessage(Message message) {
127        if (message instanceof ActiveMQMessage) {
128            ActiveMQMessage activeMessage = (ActiveMQMessage) message;
129            Object command = activeMessage.getDataStructure();
130            if (command instanceof DestinationInfo) {
131                DestinationInfo destinationInfo = (DestinationInfo) command;
132                DestinationEvent event = new DestinationEvent(this, destinationInfo);
133                fireDestinationEvent(event);
134            }
135            else {
136                LOG.warn("Unknown dataStructure: " + command);
137            }
138        }
139        else {
140            LOG.warn("Unknown message type: " + message + ". Message ignored");
141        }
142    }
143
144    protected void fireDestinationEvent(DestinationEvent event) {
145        // now lets update the data structures
146        ActiveMQDestination destination = event.getDestination();
147        boolean add = event.isAddOperation();
148        if (destination instanceof ActiveMQQueue) {
149            ActiveMQQueue queue = (ActiveMQQueue) destination;
150            if (add) {
151                queues.add(queue);
152            }
153            else {
154                queues.remove(queue);
155            }
156        }
157        else if (destination instanceof ActiveMQTopic) {
158            ActiveMQTopic topic = (ActiveMQTopic) destination;
159            if (add) {
160                topics.add(topic);
161            }
162            else {
163                topics.remove(topic);
164            }
165        }
166        else if (destination instanceof ActiveMQTempQueue) {
167            ActiveMQTempQueue queue = (ActiveMQTempQueue) destination;
168            if (add) {
169                temporaryQueues.add(queue);
170            }
171            else {
172                temporaryQueues.remove(queue);
173            }
174        }
175        else if (destination instanceof ActiveMQTempTopic) {
176            ActiveMQTempTopic topic = (ActiveMQTempTopic) destination;
177            if (add) {
178                temporaryTopics.add(topic);
179            }
180            else {
181                temporaryTopics.remove(topic);
182            }
183        }
184        else {
185            LOG.warn("Unknown destination type: " + destination);
186        }
187        if (listener != null) {
188            listener.onDestinationEvent(event);
189        }
190    }
191}