001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq;
019    
020    import java.util.List;
021    import javax.jms.JMSException;
022    import org.apache.activemq.command.ConsumerId;
023    import org.apache.activemq.command.MessageDispatch;
024    import org.apache.activemq.thread.Task;
025    import org.apache.activemq.thread.TaskRunner;
026    import org.apache.activemq.util.JMSExceptionSupport;
027    import org.slf4j.Logger;
028    import org.slf4j.LoggerFactory;
029    
030    /**
031     * A utility class used by the Session for dispatching messages asynchronously
032     * to consumers
033     *
034     * @see javax.jms.Session
035     */
036    public class ActiveMQSessionExecutor implements Task {
037        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSessionExecutor.class);
038    
039        private final ActiveMQSession session;
040        private final MessageDispatchChannel messageQueue;
041        private boolean dispatchedBySessionPool;
042        private volatile TaskRunner taskRunner;
043        private boolean startedOrWarnedThatNotStarted;
044    
045        ActiveMQSessionExecutor(ActiveMQSession session) {
046            this.session = session;
047            if (this.session.connection != null && this.session.connection.isMessagePrioritySupported()) {
048               this.messageQueue = new SimplePriorityMessageDispatchChannel();
049            }else {
050                this.messageQueue = new FifoMessageDispatchChannel();
051            }
052        }
053    
054        void setDispatchedBySessionPool(boolean value) {
055            dispatchedBySessionPool = value;
056            wakeup();
057        }
058    
059        void execute(MessageDispatch message) throws InterruptedException {
060    
061            if (!startedOrWarnedThatNotStarted) {
062    
063                ActiveMQConnection connection = session.connection;
064                long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout();
065                if (connection.isStarted() || aboutUnstartedConnectionTimeout < 0L) {
066                    startedOrWarnedThatNotStarted = true;
067                } else {
068                    long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated();
069    
070                    // lets only warn when a significant amount of time has passed
071                    // just in case its normal operation
072                    if (elapsedTime > aboutUnstartedConnectionTimeout) {
073                        LOG.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection
074                                 + " Received: " + message);
075                        startedOrWarnedThatNotStarted = true;
076                    }
077                }
078            }
079    
080            if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) {
081                dispatch(message);
082            } else {
083                messageQueue.enqueue(message);
084                wakeup();
085            }
086        }
087    
088        public void wakeup() {
089            if (!dispatchedBySessionPool) {
090                if (session.isSessionAsyncDispatch()) {
091                    try {
092                        TaskRunner taskRunner = this.taskRunner;
093                        if (taskRunner == null) {
094                            synchronized (this) {
095                                if (this.taskRunner == null) {
096                                    if (!isRunning()) {
097                                        // stop has been called
098                                        return;
099                                    }
100                                    this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
101                                            "ActiveMQ Session: " + session.getSessionId());
102                                }
103                                taskRunner = this.taskRunner;
104                            }
105                        }
106                        taskRunner.wakeup();
107                    } catch (InterruptedException e) {
108                        Thread.currentThread().interrupt();
109                    }
110                } else {
111                    while (iterate()) {
112                    }
113                }
114            }
115        }
116    
117        void executeFirst(MessageDispatch message) {
118            messageQueue.enqueueFirst(message);
119            wakeup();
120        }
121    
122        public boolean hasUncomsumedMessages() {
123            return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty();
124        }
125    
126        void dispatch(MessageDispatch message) {
127            // TODO - we should use a Map for this indexed by consumerId
128            for (ActiveMQMessageConsumer consumer : this.session.consumers) {
129                ConsumerId consumerId = message.getConsumerId();
130                if (consumerId.equals(consumer.getConsumerId())) {
131                    consumer.dispatch(message);
132                    break;
133                }
134            }
135        }
136    
137        synchronized void start() {
138            if (!messageQueue.isRunning()) {
139                messageQueue.start();
140                if (hasUncomsumedMessages()) {
141                    wakeup();
142                }
143            }
144        }
145    
146        void stop() throws JMSException {
147            try {
148                if (messageQueue.isRunning()) {
149                    synchronized(this) {
150                        messageQueue.stop();
151                        if (this.taskRunner != null) {
152                            this.taskRunner.shutdown();
153                            this.taskRunner = null;
154                        }
155                    }
156                }
157            } catch (InterruptedException e) {
158                Thread.currentThread().interrupt();
159                throw JMSExceptionSupport.create(e);
160            }
161        }
162    
163        boolean isRunning() {
164            return messageQueue.isRunning();
165        }
166    
167        void close() {
168            messageQueue.close();
169        }
170    
171        void clear() {
172            messageQueue.clear();
173        }
174    
175        MessageDispatch dequeueNoWait() {
176            return messageQueue.dequeueNoWait();
177        }
178    
179        protected void clearMessagesInProgress() {
180            messageQueue.clear();
181        }
182    
183        public boolean isEmpty() {
184            return messageQueue.isEmpty();
185        }
186    
187        public boolean iterate() {
188    
189            // Deliver any messages queued on the consumer to their listeners.
190            for (ActiveMQMessageConsumer consumer : this.session.consumers) {
191                if (consumer.iterate()) {
192                    return true;
193                }
194            }
195    
196            // No messages left queued on the listeners.. so now dispatch messages
197            // queued on the session
198            MessageDispatch message = messageQueue.dequeueNoWait();
199            if (message == null) {
200                return false;
201            } else {
202                dispatch(message);
203                return !messageQueue.isEmpty();
204            }
205        }
206    
207        List<MessageDispatch> getUnconsumedMessages() {
208            return messageQueue.removeAll();
209        }
210    }