001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq;
019    
020    import java.util.Collections;
021    import java.util.LinkedList;
022    import java.util.List;
023    
024    import javax.jms.ConnectionConsumer;
025    import javax.jms.IllegalStateException;
026    import javax.jms.JMSException;
027    import javax.jms.ServerSession;
028    import javax.jms.ServerSessionPool;
029    import javax.jms.Session;
030    
031    import org.apache.activemq.command.ConsumerInfo;
032    import org.apache.activemq.command.MessageDispatch;
033    
034    /**
035     * For application servers, <CODE>Connection</CODE> objects provide a special
036     * facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The
037     * messages it is to consume are specified by a <CODE>Destination</CODE> and a
038     * message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be
039     * given a <CODE>ServerSessionPool</CODE> to use for processing its messages.
040     * <p/>
041     * <P>
042     * Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a
043     * <CODE>ServerSession</CODE> from its pool, loads it with a single message,
044     * and starts it. As traffic picks up, messages can back up. If this happens, a
045     * <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE>
046     * with more than one message. This reduces the thread context switches and
047     * minimizes resource use at the expense of some serialization of message
048     * processing.
049     * 
050     * @see javax.jms.Connection#createConnectionConsumer
051     * @see javax.jms.Connection#createDurableConnectionConsumer
052     * @see javax.jms.QueueConnection#createConnectionConsumer
053     * @see javax.jms.TopicConnection#createConnectionConsumer
054     * @see javax.jms.TopicConnection#createDurableConnectionConsumer
055     */
056    
057    public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQDispatcher {
058    
059        private ActiveMQConnection connection;
060        private ServerSessionPool sessionPool;
061        private ConsumerInfo consumerInfo;
062        private boolean closed;
063    
064        /**
065         * Create a ConnectionConsumer
066         * 
067         * @param theConnection
068         * @param theSessionPool
069         * @param theConsumerInfo
070         * @throws JMSException
071         */
072        protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, ServerSessionPool theSessionPool, ConsumerInfo theConsumerInfo) throws JMSException {
073            this.connection = theConnection;
074            this.sessionPool = theSessionPool;
075            this.consumerInfo = theConsumerInfo;
076    
077            this.connection.addConnectionConsumer(this);
078            this.connection.addDispatcher(consumerInfo.getConsumerId(), this);
079            this.connection.asyncSendPacket(this.consumerInfo);
080        }
081    
082        /**
083         * Gets the server session pool associated with this connection consumer.
084         * 
085         * @return the server session pool used by this connection consumer
086         * @throws JMSException if the JMS provider fails to get the server session
087         *                 pool associated with this consumer due to some internal
088         *                 error.
089         */
090    
091        public ServerSessionPool getServerSessionPool() throws JMSException {
092            if (closed) {
093                throw new IllegalStateException("The Connection Consumer is closed");
094            }
095            return this.sessionPool;
096        }
097    
098        /**
099         * Closes the connection consumer. <p/>
100         * <P>
101         * Since a provider may allocate some resources on behalf of a connection
102         * consumer outside the Java virtual machine, clients should close these
103         * resources when they are not needed. Relying on garbage collection to
104         * eventually reclaim these resources may not be timely enough.
105         * 
106         * @throws JMSException
107         */
108    
109        public void close() throws JMSException {
110            if (!closed) {
111                dispose();
112                this.connection.asyncSendPacket(this.consumerInfo.createRemoveCommand());
113            }
114    
115        }
116    
117        public void dispose() {
118            if (!closed) {
119                this.connection.removeDispatcher(consumerInfo.getConsumerId());
120                this.connection.removeConnectionConsumer(this);
121                closed = true;
122            }
123        }
124    
125        public void dispatch(MessageDispatch messageDispatch) {
126            try {
127                messageDispatch.setConsumer(this);
128    
129                ServerSession serverSession = sessionPool.getServerSession();
130                Session s = serverSession.getSession();
131                ActiveMQSession session = null;
132    
133                if (s instanceof ActiveMQSession) {
134                    session = (ActiveMQSession)s;
135                } else if (s instanceof ActiveMQTopicSession) {
136                    ActiveMQTopicSession topicSession = (ActiveMQTopicSession)s;
137                    session = (ActiveMQSession)topicSession.getNext();
138                } else if (s instanceof ActiveMQQueueSession) {
139                    ActiveMQQueueSession queueSession = (ActiveMQQueueSession)s;
140                    session = (ActiveMQSession)queueSession.getNext();
141                } else {
142                    connection.onClientInternalException(new JMSException("Session pool provided an invalid session type: " + s.getClass()));
143                    return;
144                }
145    
146                session.dispatch(messageDispatch);
147                serverSession.start();
148            } catch (JMSException e) {
149                connection.onAsyncException(e);
150            }
151        }
152    
153        public String toString() {
154            return "ActiveMQConnectionConsumer { value=" + consumerInfo.getConsumerId() + " }";
155        }
156    
157        public void clearMessagesInProgress() {
158            // future: may want to deal with rollback of in progress messages to track re deliveries
159            // before indicating that all is complete.        
160            // Till there is a need, lets immediately allow dispatch
161            this.connection.transportInterruptionProcessingComplete();
162        }
163    }