001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq;
019    
020    import java.util.Collections;
021    import java.util.LinkedList;
022    import java.util.List;
023    import java.util.concurrent.CountDownLatch;
024    import java.util.concurrent.atomic.AtomicInteger;
025    
026    import javax.jms.ConnectionConsumer;
027    import javax.jms.IllegalStateException;
028    import javax.jms.JMSException;
029    import javax.jms.ServerSession;
030    import javax.jms.ServerSessionPool;
031    import javax.jms.Session;
032    
033    import org.apache.activemq.command.ConsumerId;
034    import org.apache.activemq.command.ConsumerInfo;
035    import org.apache.activemq.command.MessageDispatch;
036    
037    /**
038     * For application servers, <CODE>Connection</CODE> objects provide a special
039     * facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The
040     * messages it is to consume are specified by a <CODE>Destination</CODE> and a
041     * message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be
042     * given a <CODE>ServerSessionPool</CODE> to use for processing its messages.
043     * <p/>
044     * <P>
045     * Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a
046     * <CODE>ServerSession</CODE> from its pool, loads it with a single message,
047     * and starts it. As traffic picks up, messages can back up. If this happens, a
048     * <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE>
049     * with more than one message. This reduces the thread context switches and
050     * minimizes resource use at the expense of some serialization of message
051     * processing.
052     * 
053     * @see javax.jms.Connection#createConnectionConsumer
054     * @see javax.jms.Connection#createDurableConnectionConsumer
055     * @see javax.jms.QueueConnection#createConnectionConsumer
056     * @see javax.jms.TopicConnection#createConnectionConsumer
057     * @see javax.jms.TopicConnection#createDurableConnectionConsumer
058     */
059    
060    public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQDispatcher {
061    
062        private ActiveMQConnection connection;
063        private ServerSessionPool sessionPool;
064        private ConsumerInfo consumerInfo;
065        private boolean closed;
066    
067        /**
068         * Create a ConnectionConsumer
069         * 
070         * @param theConnection
071         * @param theSessionPool
072         * @param theConsumerInfo
073         * @throws JMSException
074         */
075        protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, ServerSessionPool theSessionPool, ConsumerInfo theConsumerInfo) throws JMSException {
076            this.connection = theConnection;
077            this.sessionPool = theSessionPool;
078            this.consumerInfo = theConsumerInfo;
079    
080            this.connection.addConnectionConsumer(this);
081            this.connection.addDispatcher(consumerInfo.getConsumerId(), this);
082            this.connection.syncSendPacket(this.consumerInfo);
083        }
084    
085        /**
086         * Gets the server session pool associated with this connection consumer.
087         * 
088         * @return the server session pool used by this connection consumer
089         * @throws JMSException if the JMS provider fails to get the server session
090         *                 pool associated with this consumer due to some internal
091         *                 error.
092         */
093    
094        public ServerSessionPool getServerSessionPool() throws JMSException {
095            if (closed) {
096                throw new IllegalStateException("The Connection Consumer is closed");
097            }
098            return this.sessionPool;
099        }
100    
101        /**
102         * Closes the connection consumer. <p/>
103         * <P>
104         * Since a provider may allocate some resources on behalf of a connection
105         * consumer outside the Java virtual machine, clients should close these
106         * resources when they are not needed. Relying on garbage collection to
107         * eventually reclaim these resources may not be timely enough.
108         * 
109         * @throws JMSException
110         */
111    
112        public void close() throws JMSException {
113            if (!closed) {
114                dispose();
115                this.connection.asyncSendPacket(this.consumerInfo.createRemoveCommand());
116            }
117    
118        }
119    
120        public void dispose() {
121            if (!closed) {
122                this.connection.removeDispatcher(consumerInfo.getConsumerId());
123                this.connection.removeConnectionConsumer(this);
124                closed = true;
125            }
126        }
127    
128        public void dispatch(MessageDispatch messageDispatch) {
129            try {
130                messageDispatch.setConsumer(this);
131    
132                ServerSession serverSession = sessionPool.getServerSession();
133                Session s = serverSession.getSession();
134                ActiveMQSession session = null;
135    
136                if (s instanceof ActiveMQSession) {
137                    session = (ActiveMQSession)s;
138                } else if (s instanceof ActiveMQTopicSession) {
139                    ActiveMQTopicSession topicSession = (ActiveMQTopicSession)s;
140                    session = (ActiveMQSession)topicSession.getNext();
141                } else if (s instanceof ActiveMQQueueSession) {
142                    ActiveMQQueueSession queueSession = (ActiveMQQueueSession)s;
143                    session = (ActiveMQSession)queueSession.getNext();
144                } else {
145                    connection.onClientInternalException(new JMSException("Session pool provided an invalid session type: " + s.getClass()));
146                    return;
147                }
148    
149                session.dispatch(messageDispatch);
150                serverSession.start();
151            } catch (JMSException e) {
152                connection.onAsyncException(e);
153            }
154        }
155    
156        public String toString() {
157            return "ActiveMQConnectionConsumer { value=" + consumerInfo.getConsumerId() + " }";
158        }
159    
160        public void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) {
161            // future: may want to deal with rollback of in progress messages to track re deliveries
162            // before indicating that all is complete.        
163        }
164    
165        public ConsumerInfo getConsumerInfo() {
166            return consumerInfo;
167        }
168    }