001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq;
019
020import java.util.Collections;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.concurrent.CountDownLatch;
024import java.util.concurrent.atomic.AtomicInteger;
025
026import javax.jms.ConnectionConsumer;
027import javax.jms.IllegalStateException;
028import javax.jms.JMSException;
029import javax.jms.ServerSession;
030import javax.jms.ServerSessionPool;
031import javax.jms.Session;
032
033import org.apache.activemq.command.ConsumerId;
034import org.apache.activemq.command.ConsumerInfo;
035import org.apache.activemq.command.MessageDispatch;
036
037/**
038 * For application servers, <CODE>Connection</CODE> objects provide a special
039 * facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The
040 * messages it is to consume are specified by a <CODE>Destination</CODE> and a
041 * message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be
042 * given a <CODE>ServerSessionPool</CODE> to use for processing its messages.
043 * <p/>
044 * <P>
045 * Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a
046 * <CODE>ServerSession</CODE> from its pool, loads it with a single message,
047 * and starts it. As traffic picks up, messages can back up. If this happens, a
048 * <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE>
049 * with more than one message. This reduces the thread context switches and
050 * minimizes resource use at the expense of some serialization of message
051 * processing.
052 * 
053 * @see javax.jms.Connection#createConnectionConsumer
054 * @see javax.jms.Connection#createDurableConnectionConsumer
055 * @see javax.jms.QueueConnection#createConnectionConsumer
056 * @see javax.jms.TopicConnection#createConnectionConsumer
057 * @see javax.jms.TopicConnection#createDurableConnectionConsumer
058 */
059
060public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQDispatcher {
061
062    private ActiveMQConnection connection;
063    private ServerSessionPool sessionPool;
064    private ConsumerInfo consumerInfo;
065    private boolean closed;
066
067    /**
068     * Create a ConnectionConsumer
069     * 
070     * @param theConnection
071     * @param theSessionPool
072     * @param theConsumerInfo
073     * @throws JMSException
074     */
075    protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, ServerSessionPool theSessionPool, ConsumerInfo theConsumerInfo) throws JMSException {
076        this.connection = theConnection;
077        this.sessionPool = theSessionPool;
078        this.consumerInfo = theConsumerInfo;
079
080        this.connection.addConnectionConsumer(this);
081        this.connection.addDispatcher(consumerInfo.getConsumerId(), this);
082        this.connection.syncSendPacket(this.consumerInfo);
083    }
084
085    /**
086     * Gets the server session pool associated with this connection consumer.
087     * 
088     * @return the server session pool used by this connection consumer
089     * @throws JMSException if the JMS provider fails to get the server session
090     *                 pool associated with this consumer due to some internal
091     *                 error.
092     */
093
094    public ServerSessionPool getServerSessionPool() throws JMSException {
095        if (closed) {
096            throw new IllegalStateException("The Connection Consumer is closed");
097        }
098        return this.sessionPool;
099    }
100
101    /**
102     * Closes the connection consumer. <p/>
103     * <P>
104     * Since a provider may allocate some resources on behalf of a connection
105     * consumer outside the Java virtual machine, clients should close these
106     * resources when they are not needed. Relying on garbage collection to
107     * eventually reclaim these resources may not be timely enough.
108     * 
109     * @throws JMSException
110     */
111
112    public void close() throws JMSException {
113        if (!closed) {
114            dispose();
115            this.connection.asyncSendPacket(this.consumerInfo.createRemoveCommand());
116        }
117
118    }
119
120    public void dispose() {
121        if (!closed) {
122            this.connection.removeDispatcher(consumerInfo.getConsumerId());
123            this.connection.removeConnectionConsumer(this);
124            closed = true;
125        }
126    }
127
128    public void dispatch(MessageDispatch messageDispatch) {
129        try {
130            messageDispatch.setConsumer(this);
131
132            ServerSession serverSession = sessionPool.getServerSession();
133            Session s = serverSession.getSession();
134            ActiveMQSession session = null;
135
136            if (s instanceof ActiveMQSession) {
137                session = (ActiveMQSession)s;
138            } else if (s instanceof ActiveMQTopicSession) {
139                ActiveMQTopicSession topicSession = (ActiveMQTopicSession)s;
140                session = (ActiveMQSession)topicSession.getNext();
141            } else if (s instanceof ActiveMQQueueSession) {
142                ActiveMQQueueSession queueSession = (ActiveMQQueueSession)s;
143                session = (ActiveMQSession)queueSession.getNext();
144            } else {
145                connection.onClientInternalException(new JMSException("Session pool provided an invalid session type: " + s.getClass()));
146                return;
147            }
148
149            session.dispatch(messageDispatch);
150            serverSession.start();
151        } catch (JMSException e) {
152            connection.onAsyncException(e);
153        }
154    }
155
156    public String toString() {
157        return "ActiveMQConnectionConsumer { value=" + consumerInfo.getConsumerId() + " }";
158    }
159
160    public void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) {
161        // future: may want to deal with rollback of in progress messages to track re deliveries
162        // before indicating that all is complete.        
163    }
164
165    public ConsumerInfo getConsumerInfo() {
166        return consumerInfo;
167    }
168}