001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.ra;
018    
019    import java.lang.reflect.Method;
020    
021    import javax.jms.JMSException;
022    import javax.jms.Message;
023    import javax.jms.MessageListener;
024    import javax.jms.MessageProducer;
025    import javax.jms.ServerSession;
026    import javax.jms.Session;
027    import javax.resource.spi.endpoint.MessageEndpoint;
028    import javax.resource.spi.work.Work;
029    import javax.resource.spi.work.WorkEvent;
030    import javax.resource.spi.work.WorkException;
031    import javax.resource.spi.work.WorkListener;
032    import javax.resource.spi.work.WorkManager;
033    
034    import org.apache.activemq.ActiveMQSession;
035    import org.apache.activemq.ActiveMQSession.DeliveryListener;
036    import org.apache.activemq.TransactionContext;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    /**
041     * 
042     */
043    public class ServerSessionImpl implements ServerSession, InboundContext, Work, DeliveryListener {
044    
045        public static final Method ON_MESSAGE_METHOD;
046        private static int nextLogId;
047    
048        static {
049            try {
050                ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", new Class[] {
051                    Message.class
052                });
053            } catch (Exception e) {
054                throw new ExceptionInInitializerError(e);
055            }
056        }
057    
058    
059        private int serverSessionId = getNextLogId();
060        private final Logger log = LoggerFactory.getLogger(ServerSessionImpl.class.getName() + ":" + serverSessionId);
061    
062        private ActiveMQSession session;
063        private WorkManager workManager;
064        private MessageEndpoint endpoint;
065        private MessageProducer messageProducer;
066        private final ServerSessionPoolImpl pool;
067    
068        private Object runControlMutex = new Object();
069        private boolean runningFlag;
070        /**
071         * True if an error was detected that cause this session to be stale. When a
072         * session is stale, it should not be used again for proccessing.
073         */
074        private boolean stale;
075        /**
076         * Does the TX commit need to be managed by the RA?
077         */
078        private final boolean useRAManagedTx;
079        /**
080         * The maximum number of messages to batch
081         */
082        private final int batchSize;
083        /**
084         * The current number of messages in the batch
085         */
086        private int currentBatchSize;
087    
088        public ServerSessionImpl(ServerSessionPoolImpl pool, ActiveMQSession session, WorkManager workManager, MessageEndpoint endpoint, boolean useRAManagedTx, int batchSize) throws JMSException {
089            this.pool = pool;
090            this.session = session;
091            this.workManager = workManager;
092            this.endpoint = endpoint;
093            this.useRAManagedTx = useRAManagedTx;
094            this.session.setMessageListener((MessageListener)endpoint);
095            this.session.setDeliveryListener(this);
096            this.batchSize = batchSize;
097        }
098    
099        private static synchronized int getNextLogId() {
100            return nextLogId++;
101        }
102    
103        public Session getSession() throws JMSException {
104            return session;
105        }
106    
107        protected boolean isStale() {
108            return stale || !session.isRunning();
109        }
110    
111        public MessageProducer getMessageProducer() throws JMSException {
112            if (messageProducer == null) {
113                messageProducer = getSession().createProducer(null);
114            }
115            return messageProducer;
116        }
117    
118        /**
119         * @see javax.jms.ServerSession#start()
120         */
121        public void start() throws JMSException {
122    
123            synchronized (runControlMutex) {
124                if (runningFlag) {
125                    log.debug("Start request ignored, already running.");
126                    return;
127                }
128                runningFlag = true;
129            }
130    
131            // We get here because we need to start a async worker.
132            log.debug("Starting run.");
133            try {
134                workManager.scheduleWork(this, WorkManager.INDEFINITE, null, new WorkListener() {
135                    // The work listener is useful only for debugging...
136                    public void workAccepted(WorkEvent event) {
137                        log.debug("Work accepted: " + event);
138                    }
139    
140                    public void workRejected(WorkEvent event) {
141                        log.debug("Work rejected: " + event);
142                    }
143    
144                    public void workStarted(WorkEvent event) {
145                        log.debug("Work started: " + event);
146                    }
147    
148                    public void workCompleted(WorkEvent event) {
149                        log.debug("Work completed: " + event);
150                    }
151    
152                });
153            } catch (WorkException e) {
154                throw (JMSException)new JMSException("Start failed: " + e).initCause(e);
155            }
156        }
157    
158        /**
159         * @see java.lang.Runnable#run()
160         */
161        public void run() {
162            log.debug("Running");
163            currentBatchSize = 0;
164            while (true) {
165                log.debug("run loop start");
166                try {
167                    InboundContextSupport.register(this);
168                    if ( session.isRunning() ) {
169                    session.run();
170                    } else {
171                        log.debug("JMS Session is no longer running (maybe due to loss of connection?), marking ServerSesison as stale");
172                        stale = true;
173                    }
174                } catch (Throwable e) {
175                    stale = true;
176                    if ( log.isDebugEnabled() ) {
177                        log.debug("Endpoint failed to process message.", e);
178                    } else if ( log.isInfoEnabled() ) {
179                        log.info("Endpoint failed to process message. Reason: " + e.getMessage());                    
180                    }
181                } finally {
182                    InboundContextSupport.unregister(this);
183                    log.debug("run loop end");
184                    synchronized (runControlMutex) {
185                        // This endpoint may have gone stale due to error
186                        if (stale) {
187                            runningFlag = false;
188                            pool.removeFromPool(this);
189                            break;
190                        }
191                        if (!session.hasUncomsumedMessages()) {
192                            runningFlag = false;
193                            pool.returnToPool(this);
194                            break;
195                        }
196                    }
197                }
198            }
199            log.debug("Run finished");
200        }
201    
202        /**
203         * The ActiveMQSession's run method will call back to this method before
204         * dispactching a message to the MessageListener.
205         */
206        public void beforeDelivery(ActiveMQSession session, Message msg) {
207            if (currentBatchSize == 0) {
208                try {
209                    endpoint.beforeDelivery(ON_MESSAGE_METHOD);
210                } catch (Throwable e) {
211                    throw new RuntimeException("Endpoint before delivery notification failure", e);
212                }
213            }
214        }
215    
216        /**
217         * The ActiveMQSession's run method will call back to this method after
218         * dispactching a message to the MessageListener.
219         */
220        public void afterDelivery(ActiveMQSession session, Message msg) {
221            if (++currentBatchSize >= batchSize || !session.hasUncomsumedMessages()) {
222                currentBatchSize = 0;
223                try {
224                    endpoint.afterDelivery();
225                } catch (Throwable e) {
226                    throw new RuntimeException("Endpoint after delivery notification failure", e);
227                } finally {
228                    TransactionContext transactionContext = session.getTransactionContext();
229                    if (transactionContext != null && transactionContext.isInLocalTransaction()) {
230                        if (!useRAManagedTx) {
231                            // Sanitiy Check: If the local transaction has not been
232                            // commited..
233                            // Commit it now.
234                            log.warn("Local transaction had not been commited. Commiting now.");
235                        }
236                        try {
237                            session.commit();
238                        } catch (JMSException e) {
239                            log.info("Commit failed:", e);
240                        }
241                    }
242                }
243            }
244        }
245    
246        /**
247         * @see javax.resource.spi.work.Work#release()
248         */
249        public void release() {
250            log.debug("release called");
251        }
252    
253        /**
254         * @see java.lang.Object#toString()
255         */
256        @Override
257        public String toString() {
258            return "ServerSessionImpl:" + serverSessionId;
259        }
260    
261        public void close() {
262            try {
263                endpoint.release();
264            } catch (Throwable e) {
265                log.debug("Endpoint did not release properly: " + e.getMessage(), e);
266            }
267            try {
268                session.close();
269            } catch (Throwable e) {
270                log.debug("Session did not close properly: " + e.getMessage(), e);
271            }
272        }
273    
274    }