001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.advisory;
018    
019    import java.util.concurrent.atomic.AtomicBoolean;
020    import java.util.concurrent.atomic.AtomicInteger;
021    
022    import javax.jms.Connection;
023    import javax.jms.Destination;
024    import javax.jms.JMSException;
025    import javax.jms.Message;
026    import javax.jms.MessageConsumer;
027    import javax.jms.MessageListener;
028    import javax.jms.Session;
029    
030    import org.apache.activemq.Service;
031    import org.apache.activemq.command.ActiveMQDestination;
032    import org.apache.activemq.command.ActiveMQMessage;
033    import org.apache.activemq.command.ActiveMQTopic;
034    import org.apache.activemq.command.ProducerId;
035    import org.apache.activemq.command.ProducerInfo;
036    import org.apache.activemq.command.RemoveInfo;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    /**
041     * An object which can be used to listen to the number of active consumers
042     * available on a given destination.
043     * 
044     * 
045     */
046    public class ProducerEventSource implements Service, MessageListener {
047        private static final Logger LOG = LoggerFactory.getLogger(ProducerEventSource.class);
048    
049        private final Connection connection;
050        private final ActiveMQDestination destination;
051        private ProducerListener listener;
052        private AtomicBoolean started = new AtomicBoolean(false);
053        private AtomicInteger producerCount = new AtomicInteger();
054        private Session session;
055        private MessageConsumer consumer;
056    
057        public ProducerEventSource(Connection connection, Destination destination) throws JMSException {
058            this.connection = connection;
059            this.destination = ActiveMQDestination.transform(destination);
060        }
061    
062        public void setProducerListener(ProducerListener listener) {
063            this.listener = listener;
064        }
065    
066        public void start() throws Exception {
067            if (started.compareAndSet(false, true)) {
068                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
069                ActiveMQTopic advisoryTopic = AdvisorySupport.getProducerAdvisoryTopic(destination);
070                consumer = session.createConsumer(advisoryTopic);
071                consumer.setMessageListener(this);
072            }
073        }
074    
075        public void stop() throws Exception {
076            if (started.compareAndSet(true, false)) {
077                if (session != null) {
078                    session.close();
079                }
080            }
081        }
082    
083        public void onMessage(Message message) {
084            if (message instanceof ActiveMQMessage) {
085                ActiveMQMessage activeMessage = (ActiveMQMessage)message;
086                Object command = activeMessage.getDataStructure();
087                int count = 0;
088                if (command instanceof ProducerInfo) {
089                    count = producerCount.incrementAndGet();
090                    count = extractProducerCountFromMessage(message, count);
091                    fireProducerEvent(new ProducerStartedEvent(this, destination, (ProducerInfo)command, count));
092                } else if (command instanceof RemoveInfo) {
093                    RemoveInfo removeInfo = (RemoveInfo)command;
094                    if (removeInfo.isProducerRemove()) {
095                        count = producerCount.decrementAndGet();
096                        count = extractProducerCountFromMessage(message, count);
097                        fireProducerEvent(new ProducerStoppedEvent(this, destination, (ProducerId)removeInfo.getObjectId(), count));
098                    }
099                } else {
100                    LOG.warn("Unknown command: " + command);
101                }
102            } else {
103                LOG.warn("Unknown message type: " + message + ". Message ignored");
104            }
105        }
106    
107        protected int extractProducerCountFromMessage(Message message, int count) {
108            try {
109                Object value = message.getObjectProperty("producerCount");
110                if (value instanceof Number) {
111                    Number n = (Number)value;
112                    return n.intValue();
113                }
114                LOG.warn("No producerCount header available on the message: " + message);
115            } catch (Exception e) {
116                LOG.warn("Failed to extract producerCount from message: " + message + ".Reason: " + e, e);
117            }
118            return count;
119        }
120    
121        protected void fireProducerEvent(ProducerEvent event) {
122            if (listener != null) {
123                listener.onProducerEvent(event);
124            }
125        }
126    
127    }