001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq;
019
020 import java.util.Collections;
021 import java.util.LinkedList;
022 import java.util.List;
023
024 import javax.jms.ConnectionConsumer;
025 import javax.jms.IllegalStateException;
026 import javax.jms.JMSException;
027 import javax.jms.ServerSession;
028 import javax.jms.ServerSessionPool;
029 import javax.jms.Session;
030
031 import org.apache.activemq.command.ConsumerId;
032 import org.apache.activemq.command.ConsumerInfo;
033 import org.apache.activemq.command.MessageDispatch;
034
035 /**
036 * For application servers, <CODE>Connection</CODE> objects provide a special
037 * facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The
038 * messages it is to consume are specified by a <CODE>Destination</CODE> and a
039 * message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be
040 * given a <CODE>ServerSessionPool</CODE> to use for processing its messages.
041 * <p/>
042 * <P>
043 * Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a
044 * <CODE>ServerSession</CODE> from its pool, loads it with a single message,
045 * and starts it. As traffic picks up, messages can back up. If this happens, a
046 * <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE>
047 * with more than one message. This reduces the thread context switches and
048 * minimizes resource use at the expense of some serialization of message
049 * processing.
050 *
051 * @see javax.jms.Connection#createConnectionConsumer
052 * @see javax.jms.Connection#createDurableConnectionConsumer
053 * @see javax.jms.QueueConnection#createConnectionConsumer
054 * @see javax.jms.TopicConnection#createConnectionConsumer
055 * @see javax.jms.TopicConnection#createDurableConnectionConsumer
056 */
057
058 public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQDispatcher {
059
060 private ActiveMQConnection connection;
061 private ServerSessionPool sessionPool;
062 private ConsumerInfo consumerInfo;
063 private boolean closed;
064
065 /**
066 * Create a ConnectionConsumer
067 *
068 * @param theConnection
069 * @param theSessionPool
070 * @param theConsumerInfo
071 * @throws JMSException
072 */
073 protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, ServerSessionPool theSessionPool, ConsumerInfo theConsumerInfo) throws JMSException {
074 this.connection = theConnection;
075 this.sessionPool = theSessionPool;
076 this.consumerInfo = theConsumerInfo;
077
078 this.connection.addConnectionConsumer(this);
079 this.connection.addDispatcher(consumerInfo.getConsumerId(), this);
080 this.connection.syncSendPacket(this.consumerInfo);
081 }
082
083 /**
084 * Gets the server session pool associated with this connection consumer.
085 *
086 * @return the server session pool used by this connection consumer
087 * @throws JMSException if the JMS provider fails to get the server session
088 * pool associated with this consumer due to some internal
089 * error.
090 */
091
092 public ServerSessionPool getServerSessionPool() throws JMSException {
093 if (closed) {
094 throw new IllegalStateException("The Connection Consumer is closed");
095 }
096 return this.sessionPool;
097 }
098
099 /**
100 * Closes the connection consumer. <p/>
101 * <P>
102 * Since a provider may allocate some resources on behalf of a connection
103 * consumer outside the Java virtual machine, clients should close these
104 * resources when they are not needed. Relying on garbage collection to
105 * eventually reclaim these resources may not be timely enough.
106 *
107 * @throws JMSException
108 */
109
110 public void close() throws JMSException {
111 if (!closed) {
112 dispose();
113 this.connection.asyncSendPacket(this.consumerInfo.createRemoveCommand());
114 }
115
116 }
117
118 public void dispose() {
119 if (!closed) {
120 this.connection.removeDispatcher(consumerInfo.getConsumerId());
121 this.connection.removeConnectionConsumer(this);
122 closed = true;
123 }
124 }
125
126 public void dispatch(MessageDispatch messageDispatch) {
127 try {
128 messageDispatch.setConsumer(this);
129
130 ServerSession serverSession = sessionPool.getServerSession();
131 Session s = serverSession.getSession();
132 ActiveMQSession session = null;
133
134 if (s instanceof ActiveMQSession) {
135 session = (ActiveMQSession)s;
136 } else if (s instanceof ActiveMQTopicSession) {
137 ActiveMQTopicSession topicSession = (ActiveMQTopicSession)s;
138 session = (ActiveMQSession)topicSession.getNext();
139 } else if (s instanceof ActiveMQQueueSession) {
140 ActiveMQQueueSession queueSession = (ActiveMQQueueSession)s;
141 session = (ActiveMQSession)queueSession.getNext();
142 } else {
143 connection.onClientInternalException(new JMSException("Session pool provided an invalid session type: " + s.getClass()));
144 return;
145 }
146
147 session.dispatch(messageDispatch);
148 serverSession.start();
149 } catch (JMSException e) {
150 connection.onAsyncException(e);
151 }
152 }
153
154 public String toString() {
155 return "ActiveMQConnectionConsumer { value=" + consumerInfo.getConsumerId() + " }";
156 }
157
158 public void clearMessagesInProgress() {
159 // future: may want to deal with rollback of in progress messages to track re deliveries
160 // before indicating that all is complete.
161 // Till there is a need, lets immediately allow dispatch
162 this.connection.transportInterruptionProcessingComplete();
163 }
164
165 public ConsumerInfo getConsumerInfo() {
166 return consumerInfo;
167 }
168 }