001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq;
019
020import java.util.List;
021import javax.jms.JMSException;
022import org.apache.activemq.command.ConsumerId;
023import org.apache.activemq.command.MessageDispatch;
024import org.apache.activemq.thread.Task;
025import org.apache.activemq.thread.TaskRunner;
026import org.apache.activemq.util.JMSExceptionSupport;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * A utility class used by the Session for dispatching messages asynchronously
032 * to consumers
033 *
034 * @see javax.jms.Session
035 */
036public class ActiveMQSessionExecutor implements Task {
037    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSessionExecutor.class);
038
039    private final ActiveMQSession session;
040    private final MessageDispatchChannel messageQueue;
041    private boolean dispatchedBySessionPool;
042    private volatile TaskRunner taskRunner;
043    private boolean startedOrWarnedThatNotStarted;
044
045    ActiveMQSessionExecutor(ActiveMQSession session) {
046        this.session = session;
047        if (this.session.connection != null && this.session.connection.isMessagePrioritySupported()) {
048           this.messageQueue = new SimplePriorityMessageDispatchChannel();
049        }else {
050            this.messageQueue = new FifoMessageDispatchChannel();
051        }
052    }
053
054    void setDispatchedBySessionPool(boolean value) {
055        dispatchedBySessionPool = value;
056        wakeup();
057    }
058
059    void execute(MessageDispatch message) throws InterruptedException {
060
061        if (!startedOrWarnedThatNotStarted) {
062
063            ActiveMQConnection connection = session.connection;
064            long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout();
065            if (connection.isStarted() || aboutUnstartedConnectionTimeout < 0L) {
066                startedOrWarnedThatNotStarted = true;
067            } else {
068                long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated();
069
070                // lets only warn when a significant amount of time has passed
071                // just in case its normal operation
072                if (elapsedTime > aboutUnstartedConnectionTimeout) {
073                    LOG.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection
074                             + " Received: " + message);
075                    startedOrWarnedThatNotStarted = true;
076                }
077            }
078        }
079
080        if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) {
081            dispatch(message);
082        } else {
083            messageQueue.enqueue(message);
084            wakeup();
085        }
086    }
087
088    public void wakeup() {
089        if (!dispatchedBySessionPool) {
090            if (session.isSessionAsyncDispatch()) {
091                try {
092                    TaskRunner taskRunner = this.taskRunner;
093                    if (taskRunner == null) {
094                        synchronized (this) {
095                            if (this.taskRunner == null) {
096                                if (!isRunning()) {
097                                    // stop has been called
098                                    return;
099                                }
100                                this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
101                                        "ActiveMQ Session: " + session.getSessionId());
102                            }
103                            taskRunner = this.taskRunner;
104                        }
105                    }
106                    taskRunner.wakeup();
107                } catch (InterruptedException e) {
108                    Thread.currentThread().interrupt();
109                }
110            } else {
111                while (iterate()) {
112                }
113            }
114        }
115    }
116
117    void executeFirst(MessageDispatch message) {
118        messageQueue.enqueueFirst(message);
119        wakeup();
120    }
121
122    public boolean hasUncomsumedMessages() {
123        return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty();
124    }
125
126    void dispatch(MessageDispatch message) {
127        // TODO - we should use a Map for this indexed by consumerId
128        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
129            ConsumerId consumerId = message.getConsumerId();
130            if (consumerId.equals(consumer.getConsumerId())) {
131                consumer.dispatch(message);
132                break;
133            }
134        }
135    }
136
137    synchronized void start() {
138        if (!messageQueue.isRunning()) {
139            messageQueue.start();
140            if (hasUncomsumedMessages()) {
141                wakeup();
142            }
143        }
144    }
145
146    void stop() throws JMSException {
147        try {
148            if (messageQueue.isRunning()) {
149                synchronized(this) {
150                    messageQueue.stop();
151                    if (this.taskRunner != null) {
152                        this.taskRunner.shutdown();
153                        this.taskRunner = null;
154                    }
155                }
156            }
157        } catch (InterruptedException e) {
158            Thread.currentThread().interrupt();
159            throw JMSExceptionSupport.create(e);
160        }
161    }
162
163    boolean isRunning() {
164        return messageQueue.isRunning();
165    }
166
167    void close() {
168        messageQueue.close();
169    }
170
171    void clear() {
172        messageQueue.clear();
173    }
174
175    MessageDispatch dequeueNoWait() {
176        return messageQueue.dequeueNoWait();
177    }
178
179    protected void clearMessagesInProgress() {
180        messageQueue.clear();
181    }
182
183    public boolean isEmpty() {
184        return messageQueue.isEmpty();
185    }
186
187    public boolean iterate() {
188
189        // Deliver any messages queued on the consumer to their listeners.
190        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
191            if (consumer.iterate()) {
192                return true;
193            }
194        }
195
196        // No messages left queued on the listeners.. so now dispatch messages
197        // queued on the session
198        MessageDispatch message = messageQueue.dequeueNoWait();
199        if (message == null) {
200            return false;
201        } else {
202            dispatch(message);
203            return !messageQueue.isEmpty();
204        }
205    }
206
207    List<MessageDispatch> getUnconsumedMessages() {
208        return messageQueue.removeAll();
209    }
210    
211    void waitForQueueRestart() throws InterruptedException {
212        synchronized (messageQueue.getMutex()) {
213            while (messageQueue.isRunning() == false) {
214                if (messageQueue.isClosed()) {
215                    break;
216                }
217                messageQueue.getMutex().wait();
218            }
219        }
220    }
221}