001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.ra;
018    
019    import java.util.ArrayList;
020    import java.util.Iterator;
021    import java.util.List;
022    import java.util.concurrent.atomic.AtomicBoolean;
023    import java.util.concurrent.locks.Lock;
024    import java.util.concurrent.locks.ReentrantLock;
025    
026    import javax.jms.JMSException;
027    import javax.jms.ServerSession;
028    import javax.jms.ServerSessionPool;
029    import javax.jms.Session;
030    import javax.resource.spi.UnavailableException;
031    import javax.resource.spi.endpoint.MessageEndpoint;
032    
033    import org.apache.activemq.ActiveMQQueueSession;
034    import org.apache.activemq.ActiveMQSession;
035    import org.apache.activemq.ActiveMQTopicSession;
036    import org.apache.activemq.command.MessageDispatch;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    /**
041     *  $Date$
042     */
043    public class ServerSessionPoolImpl implements ServerSessionPool {
044    
045        private static final Logger LOG = LoggerFactory.getLogger(ServerSessionPoolImpl.class);
046    
047        private final ActiveMQEndpointWorker activeMQAsfEndpointWorker;
048        private final int maxSessions;
049    
050        private final List<ServerSessionImpl> idleSessions = new ArrayList<ServerSessionImpl>();
051        private final List<ServerSessionImpl> activeSessions = new ArrayList<ServerSessionImpl>();
052        private final Lock sessionLock = new ReentrantLock();
053        private final AtomicBoolean closing = new AtomicBoolean(false);
054    
055        public ServerSessionPoolImpl(ActiveMQEndpointWorker activeMQAsfEndpointWorker, int maxSessions) {
056            this.activeMQAsfEndpointWorker = activeMQAsfEndpointWorker;
057            this.maxSessions = maxSessions;
058        }
059    
060        private ServerSessionImpl createServerSessionImpl() throws JMSException {
061            MessageActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec();
062            int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession();
063            final ActiveMQSession session = (ActiveMQSession)activeMQAsfEndpointWorker.getConnection().createSession(activeMQAsfEndpointWorker.transacted, acknowledge);
064            MessageEndpoint endpoint;
065            try {
066                int batchSize = 0;
067                if (activationSpec.getEnableBatchBooleanValue()) {
068                    batchSize = activationSpec.getMaxMessagesPerBatchIntValue();
069                }
070                if (activationSpec.isUseRAManagedTransactionEnabled()) {
071                    // The RA will manage the transaction commit.
072                    endpoint = createEndpoint(null);
073                    return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, true, batchSize);
074                } else {
075                    // Give the container an object to manage to transaction with.
076                    endpoint = createEndpoint(new LocalAndXATransaction(session.getTransactionContext()));
077                    return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, false, batchSize);
078                }
079            } catch (UnavailableException e) {
080                // The container could be limiting us on the number of endpoints
081                // that are being created.
082                if (LOG.isDebugEnabled()) {
083                    LOG.debug("Could not create an endpoint.", e);
084                }
085                session.close();
086                return null;
087            }
088        }
089    
090        private MessageEndpoint createEndpoint(LocalAndXATransaction txResourceProxy) throws UnavailableException {
091            MessageEndpoint endpoint;
092            endpoint = activeMQAsfEndpointWorker.endpointFactory.createEndpoint(txResourceProxy);
093            MessageEndpointProxy endpointProxy = new MessageEndpointProxy(endpoint);
094            return endpointProxy;
095        }
096    
097        /**
098         */
099        public ServerSession getServerSession() throws JMSException {
100            if (LOG.isDebugEnabled()) {
101                LOG.debug("ServerSession requested.");
102            }
103            if (closing.get()) {
104                throw new JMSException("Session Pool Shutting Down.");
105            }
106            ServerSessionImpl ss = null;
107            sessionLock.lock();
108            try {
109                ss = getExistingServerSession(false);
110            } finally {
111                sessionLock.unlock();
112            }
113            if (ss != null) {
114                return ss;
115            }
116            ss = createServerSessionImpl();
117            sessionLock.lock();
118            try {
119                // We may not be able to create a session due to the container
120                // restricting us.
121                if (ss == null) {
122                    if (activeSessions.isEmpty() && idleSessions.isEmpty()) {
123                        throw new JMSException("Endpoint factory did not allow creation of any endpoints.");
124                    }
125    
126                    ss = getExistingServerSession(true);
127                } else {
128                    activeSessions.add(ss);
129                }
130            } finally {
131                sessionLock.unlock();
132            }
133            if (LOG.isDebugEnabled()) {
134                LOG.debug("Created a new session: " + ss);
135            }
136            return ss;
137    
138        }
139    
140        /**
141         * Must be called with sessionLock held.
142         * Returns an idle session if one exists or an active session if no more
143         * sessions can be created.  Sessions can not be created if force is true
144         * or activeSessions >= maxSessions.
145         * @param force do not check activeSessions >= maxSessions, return an active connection anyway.
146         * @return an already existing session.
147         */
148        private ServerSessionImpl getExistingServerSession(boolean force) {
149            ServerSessionImpl ss = null;
150            if (idleSessions.size() > 0) {
151                ss = idleSessions.remove(idleSessions.size() - 1);
152            }
153            if (ss != null) {
154                activeSessions.add(ss);
155                if (LOG.isDebugEnabled()) {
156                    LOG.debug("Using idle session: " + ss);
157                }
158            } else if (force || activeSessions.size() >= maxSessions) {
159                // If we are at the upper limit
160                // then reuse the already created sessions..
161                // This is going to queue up messages into a session for
162                // processing.
163                ss = getExistingActiveServerSession();
164            }
165            return ss;
166        }
167    
168        /**
169         * Must be called with sessionLock held.
170         * Returns the first session from activeSessions, shifting it to last.
171         * @return session
172         */
173        private ServerSessionImpl getExistingActiveServerSession() {
174            ServerSessionImpl ss = null;
175            if (!activeSessions.isEmpty()) {
176                if (activeSessions.size() > 1) {
177                    // round robin
178                    ss = activeSessions.remove(0);
179                    activeSessions.add(ss);
180                } else {
181                    ss = activeSessions.get(0);
182                }
183            }
184            if (LOG.isDebugEnabled()) {
185                LOG.debug("Reusing an active session: " + ss);
186            }
187            return ss;
188        }
189    
190        public void returnToPool(ServerSessionImpl ss) {
191            sessionLock.lock();
192                activeSessions.remove(ss);
193            try {
194                // make sure we only return non-stale sessions to the pool
195                if ( ss.isStale() ) {
196                    if ( LOG.isDebugEnabled() ) {
197                        LOG.debug("Discarding stale ServerSession to be returned to pool: " + ss);
198                    }
199                    ss.close();
200                } else {
201                    if (LOG.isDebugEnabled()) {
202                        LOG.debug("ServerSession returned to pool: " + ss);
203                    }
204                idleSessions.add(ss);
205                }
206            } finally {
207                sessionLock.unlock();
208            }
209            synchronized (closing) {
210                closing.notify();
211            }
212        }
213    
214        public void removeFromPool(ServerSessionImpl ss) {
215            sessionLock.lock();
216            try {
217                activeSessions.remove(ss);
218            } finally {
219                sessionLock.unlock();
220            }
221            try {
222                ActiveMQSession session = (ActiveMQSession)ss.getSession();
223                List l = session.getUnconsumedMessages();
224                for (Iterator i = l.iterator(); i.hasNext();) {
225                    dispatchToSession((MessageDispatch)i.next());
226                }
227            } catch (Throwable t) {
228                LOG.error("Error redispatching unconsumed messages from stale session", t);
229            }
230            ss.close();
231            synchronized (closing) {
232                closing.notify();
233            }
234        }
235    
236        /**
237         * @param messageDispatch
238         *            the message to dispatch
239         * @throws JMSException
240         */
241        private void dispatchToSession(MessageDispatch messageDispatch)
242                throws JMSException {
243    
244            ServerSession serverSession = getServerSession();
245            Session s = serverSession.getSession();
246            ActiveMQSession session = null;
247            if (s instanceof ActiveMQSession) {
248                session = (ActiveMQSession) s;
249            } else if (s instanceof ActiveMQQueueSession) {
250                session = (ActiveMQSession) s;
251            } else if (s instanceof ActiveMQTopicSession) {
252                session = (ActiveMQSession) s;
253            } else {
254                activeMQAsfEndpointWorker.getConnection()
255                        .onAsyncException(new JMSException(
256                                "Session pool provided an invalid session type: "
257                                        + s.getClass()));
258            }
259            session.dispatch(messageDispatch);
260            serverSession.start();
261        }
262    
263        public void close() {
264            closing.set(true);
265            int activeCount = closeIdleSessions();
266            // we may have to wait erroneously 250ms if an
267            // active session is removed during our wait and we
268            // are not notified
269            while (activeCount > 0) {
270                if (LOG.isDebugEnabled()) {
271                    LOG.debug("Active Sessions = " + activeCount);
272                }
273                try {
274                    synchronized (closing) {
275                        closing.wait(250);
276                    }
277                } catch (InterruptedException e) {
278                    Thread.currentThread().interrupt();
279                    return;
280                }
281                activeCount = closeIdleSessions();
282            }
283        }
284    
285    
286        protected int closeIdleSessions() {
287            sessionLock.lock();
288            try {
289                for (ServerSessionImpl ss : idleSessions) {
290                    ss.close();
291                }
292                idleSessions.clear();
293                return activeSessions.size();
294            } finally {
295                sessionLock.unlock();
296            }
297        }
298    
299        /**
300         * @return Returns the closing.
301         */
302        public boolean isClosing() {
303            return closing.get();
304        }
305    
306        /**
307         * @param closing The closing to set.
308         */
309        public void setClosing(boolean closing) {
310            this.closing.set(closing);
311        }
312    
313    }