001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq;
019    
020    import java.util.Collections;
021    import java.util.LinkedList;
022    import java.util.List;
023    
024    import javax.jms.ConnectionConsumer;
025    import javax.jms.IllegalStateException;
026    import javax.jms.JMSException;
027    import javax.jms.ServerSession;
028    import javax.jms.ServerSessionPool;
029    import javax.jms.Session;
030    
031    import org.apache.activemq.command.ConsumerId;
032    import org.apache.activemq.command.ConsumerInfo;
033    import org.apache.activemq.command.MessageDispatch;
034    
035    /**
036     * For application servers, <CODE>Connection</CODE> objects provide a special
037     * facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The
038     * messages it is to consume are specified by a <CODE>Destination</CODE> and a
039     * message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be
040     * given a <CODE>ServerSessionPool</CODE> to use for processing its messages.
041     * <p/>
042     * <P>
043     * Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a
044     * <CODE>ServerSession</CODE> from its pool, loads it with a single message,
045     * and starts it. As traffic picks up, messages can back up. If this happens, a
046     * <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE>
047     * with more than one message. This reduces the thread context switches and
048     * minimizes resource use at the expense of some serialization of message
049     * processing.
050     * 
051     * @see javax.jms.Connection#createConnectionConsumer
052     * @see javax.jms.Connection#createDurableConnectionConsumer
053     * @see javax.jms.QueueConnection#createConnectionConsumer
054     * @see javax.jms.TopicConnection#createConnectionConsumer
055     * @see javax.jms.TopicConnection#createDurableConnectionConsumer
056     */
057    
058    public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQDispatcher {
059    
060        private ActiveMQConnection connection;
061        private ServerSessionPool sessionPool;
062        private ConsumerInfo consumerInfo;
063        private boolean closed;
064    
065        /**
066         * Create a ConnectionConsumer
067         * 
068         * @param theConnection
069         * @param theSessionPool
070         * @param theConsumerInfo
071         * @throws JMSException
072         */
073        protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, ServerSessionPool theSessionPool, ConsumerInfo theConsumerInfo) throws JMSException {
074            this.connection = theConnection;
075            this.sessionPool = theSessionPool;
076            this.consumerInfo = theConsumerInfo;
077    
078            this.connection.addConnectionConsumer(this);
079            this.connection.addDispatcher(consumerInfo.getConsumerId(), this);
080            this.connection.syncSendPacket(this.consumerInfo);
081        }
082    
083        /**
084         * Gets the server session pool associated with this connection consumer.
085         * 
086         * @return the server session pool used by this connection consumer
087         * @throws JMSException if the JMS provider fails to get the server session
088         *                 pool associated with this consumer due to some internal
089         *                 error.
090         */
091    
092        public ServerSessionPool getServerSessionPool() throws JMSException {
093            if (closed) {
094                throw new IllegalStateException("The Connection Consumer is closed");
095            }
096            return this.sessionPool;
097        }
098    
099        /**
100         * Closes the connection consumer. <p/>
101         * <P>
102         * Since a provider may allocate some resources on behalf of a connection
103         * consumer outside the Java virtual machine, clients should close these
104         * resources when they are not needed. Relying on garbage collection to
105         * eventually reclaim these resources may not be timely enough.
106         * 
107         * @throws JMSException
108         */
109    
110        public void close() throws JMSException {
111            if (!closed) {
112                dispose();
113                this.connection.asyncSendPacket(this.consumerInfo.createRemoveCommand());
114            }
115    
116        }
117    
118        public void dispose() {
119            if (!closed) {
120                this.connection.removeDispatcher(consumerInfo.getConsumerId());
121                this.connection.removeConnectionConsumer(this);
122                closed = true;
123            }
124        }
125    
126        public void dispatch(MessageDispatch messageDispatch) {
127            try {
128                messageDispatch.setConsumer(this);
129    
130                ServerSession serverSession = sessionPool.getServerSession();
131                Session s = serverSession.getSession();
132                ActiveMQSession session = null;
133    
134                if (s instanceof ActiveMQSession) {
135                    session = (ActiveMQSession)s;
136                } else if (s instanceof ActiveMQTopicSession) {
137                    ActiveMQTopicSession topicSession = (ActiveMQTopicSession)s;
138                    session = (ActiveMQSession)topicSession.getNext();
139                } else if (s instanceof ActiveMQQueueSession) {
140                    ActiveMQQueueSession queueSession = (ActiveMQQueueSession)s;
141                    session = (ActiveMQSession)queueSession.getNext();
142                } else {
143                    connection.onClientInternalException(new JMSException("Session pool provided an invalid session type: " + s.getClass()));
144                    return;
145                }
146    
147                session.dispatch(messageDispatch);
148                serverSession.start();
149            } catch (JMSException e) {
150                connection.onAsyncException(e);
151            }
152        }
153    
154        public String toString() {
155            return "ActiveMQConnectionConsumer { value=" + consumerInfo.getConsumerId() + " }";
156        }
157    
158        public void clearMessagesInProgress() {
159            // future: may want to deal with rollback of in progress messages to track re deliveries
160            // before indicating that all is complete.        
161            // Till there is a need, lets immediately allow dispatch
162            this.connection.transportInterruptionProcessingComplete();
163        }
164    
165        public ConsumerInfo getConsumerInfo() {
166            return consumerInfo;
167        }
168    }