001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.web;
018    
019    import javax.jms.Message;
020    import javax.jms.MessageConsumer;
021    
022    import org.eclipse.jetty.continuation.Continuation;
023    import org.eclipse.jetty.continuation.ContinuationSupport;
024    
025    import org.slf4j.Logger;
026    import org.slf4j.LoggerFactory;
027    
028    import org.apache.activemq.MessageAvailableListener;
029    
030    import java.util.LinkedList;
031    
032    /*
033     * Listen for available messages and wakeup any continuations.
034     */
035    public class AjaxListener implements MessageAvailableListener {
036        private static final Logger LOG = LoggerFactory.getLogger(AjaxListener.class);
037        
038        private long maximumReadTimeout;
039        private AjaxWebClient client;
040        private long lastAccess;
041        private Continuation continuation;
042        private LinkedList<UndeliveredAjaxMessage> undeliveredMessages = new LinkedList<UndeliveredAjaxMessage>();
043    
044        AjaxListener(AjaxWebClient client, long maximumReadTimeout) {
045            this.client = client;
046            this.maximumReadTimeout = maximumReadTimeout;
047            access();
048        }
049    
050        public void access() {
051            lastAccess = System.currentTimeMillis();
052        }
053    
054        public synchronized void setContinuation(Continuation continuation) {
055            this.continuation = continuation;
056        }
057    
058        public LinkedList<UndeliveredAjaxMessage> getUndeliveredMessages() {
059            return undeliveredMessages;
060        }
061        
062        public synchronized void onMessageAvailable(MessageConsumer consumer) {
063            if (LOG.isDebugEnabled()) {
064                LOG.debug("message for " + consumer + " continuation=" + continuation);
065            }
066            if (continuation != null) {
067                try {
068                    Message message = consumer.receive(10);
069                    LOG.debug( "message is " + message );
070                    if( message != null ) {
071                        if( continuation.isSuspended() ) {
072                            LOG.debug( "Resuming suspended continuation " + continuation );
073                            continuation.setAttribute("undelivered_message", new UndeliveredAjaxMessage( message, consumer ) );
074                            continuation.resume();
075                        } else {
076                            LOG.debug( "Message available, but continuation is already resumed.  Buffer for next time." );
077                            bufferMessageForDelivery( message, consumer );
078                        }
079                    }
080                } catch (Exception e) {
081                    LOG.error("Error receiving message " + e, e);
082                }
083                
084            } else if (System.currentTimeMillis() - lastAccess > 2 * this.maximumReadTimeout) {
085                new Thread() {
086                    public void run() {
087                        client.closeConsumers();
088                    };
089                }.start();
090            } else {
091                try {
092                    Message message = consumer.receive(10);
093                    bufferMessageForDelivery( message, consumer );
094                } catch (Exception e) {
095                    LOG.error("Error receiving message " + e, e);
096                }
097            }
098        }
099        
100        public void bufferMessageForDelivery( Message message, MessageConsumer consumer ) {
101            if( message != null ) {
102                synchronized( undeliveredMessages ) {
103                    undeliveredMessages.addLast( new UndeliveredAjaxMessage( message, consumer ) );
104                }
105            }
106        }
107    }