001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.ra;
018
019import java.lang.reflect.Method;
020
021import javax.jms.JMSException;
022import javax.jms.Message;
023import javax.jms.MessageListener;
024import javax.jms.MessageProducer;
025import javax.jms.ServerSession;
026import javax.jms.Session;
027import javax.resource.spi.endpoint.MessageEndpoint;
028import javax.resource.spi.work.Work;
029import javax.resource.spi.work.WorkEvent;
030import javax.resource.spi.work.WorkException;
031import javax.resource.spi.work.WorkListener;
032import javax.resource.spi.work.WorkManager;
033
034import org.apache.activemq.ActiveMQSession;
035import org.apache.activemq.ActiveMQSession.DeliveryListener;
036import org.apache.activemq.TransactionContext;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * 
042 */
043public class ServerSessionImpl implements ServerSession, InboundContext, Work, DeliveryListener {
044
045    public static final Method ON_MESSAGE_METHOD;
046    private static int nextLogId;
047
048    static {
049        try {
050            ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", new Class[] {
051                Message.class
052            });
053        } catch (Exception e) {
054            throw new ExceptionInInitializerError(e);
055        }
056    }
057
058
059    private int serverSessionId = getNextLogId();
060    private final Logger log = LoggerFactory.getLogger(ServerSessionImpl.class.getName() + ":" + serverSessionId);
061
062    private ActiveMQSession session;
063    private WorkManager workManager;
064    private MessageEndpoint endpoint;
065    private MessageProducer messageProducer;
066    private final ServerSessionPoolImpl pool;
067
068    private Object runControlMutex = new Object();
069    private boolean runningFlag;
070    /**
071     * True if an error was detected that cause this session to be stale. When a
072     * session is stale, it should not be used again for proccessing.
073     */
074    private boolean stale;
075    /**
076     * Does the TX commit need to be managed by the RA?
077     */
078    private final boolean useRAManagedTx;
079    /**
080     * The maximum number of messages to batch
081     */
082    private final int batchSize;
083    /**
084     * The current number of messages in the batch
085     */
086    private int currentBatchSize;
087
088    public ServerSessionImpl(ServerSessionPoolImpl pool, ActiveMQSession session, WorkManager workManager, MessageEndpoint endpoint, boolean useRAManagedTx, int batchSize) throws JMSException {
089        this.pool = pool;
090        this.session = session;
091        this.workManager = workManager;
092        this.endpoint = endpoint;
093        this.useRAManagedTx = useRAManagedTx;
094        this.session.setMessageListener((MessageListener)endpoint);
095        this.session.setDeliveryListener(this);
096        this.batchSize = batchSize;
097    }
098
099    private static synchronized int getNextLogId() {
100        return nextLogId++;
101    }
102
103    public Session getSession() throws JMSException {
104        return session;
105    }
106
107    protected boolean isStale() {
108        return stale || !session.isRunning();
109    }
110
111    public MessageProducer getMessageProducer() throws JMSException {
112        if (messageProducer == null) {
113            messageProducer = getSession().createProducer(null);
114        }
115        return messageProducer;
116    }
117
118    /**
119     * @see javax.jms.ServerSession#start()
120     */
121    public void start() throws JMSException {
122
123        synchronized (runControlMutex) {
124            if (runningFlag) {
125                log.debug("Start request ignored, already running.");
126                return;
127            }
128            runningFlag = true;
129        }
130
131        // We get here because we need to start a async worker.
132        log.debug("Starting run.");
133        try {
134            workManager.scheduleWork(this, WorkManager.INDEFINITE, null, new WorkListener() {
135                // The work listener is useful only for debugging...
136                public void workAccepted(WorkEvent event) {
137                    log.debug("Work accepted: " + event);
138                }
139
140                public void workRejected(WorkEvent event) {
141                    log.debug("Work rejected: " + event);
142                }
143
144                public void workStarted(WorkEvent event) {
145                    log.debug("Work started: " + event);
146                }
147
148                public void workCompleted(WorkEvent event) {
149                    log.debug("Work completed: " + event);
150                }
151
152            });
153        } catch (WorkException e) {
154            throw (JMSException)new JMSException("Start failed: " + e).initCause(e);
155        }
156    }
157
158    /**
159     * @see java.lang.Runnable#run()
160     */
161    public void run() {
162        log.debug("Running");
163        currentBatchSize = 0;
164        while (true) {
165            log.debug("run loop start");
166            try {
167                InboundContextSupport.register(this);
168                if ( session.isRunning() ) {
169                    session.run();
170                } else {
171                    log.debug("JMS Session {} with unconsumed {} is no longer running (maybe due to loss of connection?), marking ServerSession as stale", session, session.getUnconsumedMessages().size());
172                    stale = true;
173                }
174            } catch (Throwable e) {
175                stale = true;
176                if ( log.isDebugEnabled() ) {
177                    log.debug("Endpoint {} failed to process message.", session, e);
178                } else if ( log.isInfoEnabled() ) {
179                    log.info("Endpoint {} failed to process message. Reason: " + e.getMessage(), session);
180                }
181            } finally {
182                InboundContextSupport.unregister(this);
183                log.debug("run loop end");
184                synchronized (runControlMutex) {
185                    // This endpoint may have gone stale due to error
186                    if (stale) {
187                        runningFlag = false;
188                        pool.removeFromPool(this);
189                        break;
190                    }
191                    if (!session.hasUncomsumedMessages()) {
192                        runningFlag = false;
193                        log.debug("Session has no unconsumed message, returning to pool");
194                        pool.returnToPool(this);
195                        break;
196                    }
197                }
198            }
199        }
200        log.debug("Run finished");
201    }
202
203    /**
204     * The ActiveMQSession's run method will call back to this method before
205     * dispactching a message to the MessageListener.
206     */
207    public void beforeDelivery(ActiveMQSession session, Message msg) {
208        if (currentBatchSize == 0) {
209            try {
210                endpoint.beforeDelivery(ON_MESSAGE_METHOD);
211            } catch (Throwable e) {
212                throw new RuntimeException("Endpoint before delivery notification failure", e);
213            }
214        }
215    }
216
217    /**
218     * The ActiveMQSession's run method will call back to this method after
219     * dispactching a message to the MessageListener.
220     */
221    public void afterDelivery(ActiveMQSession session, Message msg) {
222        if (++currentBatchSize >= batchSize || !session.hasUncomsumedMessages()) {
223            currentBatchSize = 0;
224            try {
225                endpoint.afterDelivery();
226            } catch (Throwable e) {
227                throw new RuntimeException("Endpoint after delivery notification failure: " + e, e);
228            } finally {
229                TransactionContext transactionContext = session.getTransactionContext();
230                if (transactionContext != null && transactionContext.isInLocalTransaction()) {
231                    if (!useRAManagedTx) {
232                        // Sanitiy Check: If the local transaction has not been
233                        // commited..
234                        // Commit it now.
235                        log.warn("Local transaction had not been commited. Commiting now.");
236                    }
237                    try {
238                        session.commit();
239                    } catch (JMSException e) {
240                        log.info("Commit failed:", e);
241                    }
242                }
243            }
244        }
245    }
246
247    /**
248     * @see javax.resource.spi.work.Work#release()
249     */
250    public void release() {
251        log.debug("release called");
252    }
253
254    /**
255     * @see java.lang.Object#toString()
256     */
257    @Override
258    public String toString() {
259        return "ServerSessionImpl:" + serverSessionId + "{" + session +"}";
260    }
261
262    public void close() {
263        try {
264            endpoint.release();
265        } catch (Throwable e) {
266            log.debug("Endpoint did not release properly: " + e.getMessage(), e);
267        }
268        try {
269            session.close();
270        } catch (Throwable e) {
271            log.debug("Session did not close properly: " + e.getMessage(), e);
272        }
273    }
274
275}