001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.web;
018
019import java.util.LinkedList;
020
021import javax.jms.Message;
022import javax.jms.MessageConsumer;
023
024import org.apache.activemq.MessageAvailableListener;
025import org.eclipse.jetty.continuation.Continuation;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029/*
030 * Listen for available messages and wakeup any continuations.
031 */
032public class AjaxListener implements MessageAvailableListener {
033    private static final Logger LOG = LoggerFactory.getLogger(AjaxListener.class);
034
035    private final long maximumReadTimeout;
036    private final AjaxWebClient client;
037    private long lastAccess;
038    private Continuation continuation;
039    private final LinkedList<UndeliveredAjaxMessage> undeliveredMessages = new LinkedList<UndeliveredAjaxMessage>();
040
041    AjaxListener(AjaxWebClient client, long maximumReadTimeout) {
042        this.client = client;
043        this.maximumReadTimeout = maximumReadTimeout;
044        access();
045    }
046
047    public void access() {
048        lastAccess = System.currentTimeMillis();
049    }
050
051    public synchronized void setContinuation(Continuation continuation) {
052        this.continuation = continuation;
053    }
054
055    public LinkedList<UndeliveredAjaxMessage> getUndeliveredMessages() {
056        return undeliveredMessages;
057    }
058
059    @Override
060    public synchronized void onMessageAvailable(MessageConsumer consumer) {
061        LOG.debug("Message for consumer: {} continuation: {}", consumer, continuation);
062
063        if (continuation != null) {
064            try {
065                Message message = consumer.receive(10);
066                LOG.debug("message is " + message);
067                if (message != null) {
068                    if (!continuation.isResumed()) {
069                        LOG.debug("Resuming suspended continuation {}", continuation);
070                        continuation.setAttribute("undelivered_message", new UndeliveredAjaxMessage(message, consumer));
071                        continuation.resume();
072                    } else {
073                        LOG.debug("Message available, but continuation is already resumed. Buffer for next time.");
074                        bufferMessageForDelivery(message, consumer);
075                    }
076                }
077            } catch (Exception e) {
078                LOG.warn("Error receiving message " + e.getMessage() + ". This exception is ignored.", e);
079            }
080
081        } else if (System.currentTimeMillis() - lastAccess > 2 * this.maximumReadTimeout) {
082            new Thread() {
083                @Override
084                public void run() {
085                    LOG.debug("Closing consumers on client: {}", client);
086                    client.closeConsumers();
087                }
088            }.start();
089        } else {
090            try {
091                Message message = consumer.receive(10);
092                bufferMessageForDelivery(message, consumer);
093            } catch (Exception e) {
094                LOG.warn("Error receiving message " + e.getMessage() + ". This exception is ignored.", e);
095            }
096        }
097    }
098
099    public void bufferMessageForDelivery(Message message, MessageConsumer consumer) {
100        if (message != null) {
101            synchronized (undeliveredMessages) {
102                undeliveredMessages.addLast(new UndeliveredAjaxMessage(message, consumer));
103            }
104        }
105    }
106}