001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.web;
018
019import java.io.IOException;
020import java.io.PrintWriter;
021import java.io.StringWriter;
022import java.util.HashMap;
023import java.util.Iterator;
024import java.util.LinkedList;
025import java.util.List;
026import java.util.Map;
027import java.util.Timer;
028import java.util.TimerTask;
029
030import javax.jms.Destination;
031import javax.jms.JMSException;
032import javax.jms.Message;
033import javax.jms.MessageConsumer;
034import javax.jms.ObjectMessage;
035import javax.jms.TextMessage;
036import javax.servlet.ServletConfig;
037import javax.servlet.ServletException;
038import javax.servlet.http.HttpServletRequest;
039import javax.servlet.http.HttpServletResponse;
040import javax.servlet.http.HttpSession;
041
042import org.apache.activemq.MessageAvailableConsumer;
043import org.eclipse.jetty.continuation.Continuation;
044import org.eclipse.jetty.continuation.ContinuationListener;
045import org.eclipse.jetty.continuation.ContinuationSupport;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * A servlet for sending and receiving messages to/from JMS destinations using
051 * HTTP POST for sending and HTTP GET for receiving. <p/> You can specify the
052 * destination and whether it is a topic or queue via configuration details on
053 * the servlet or as request parameters. <p/> For reading messages you can
054 * specify a readTimeout parameter to determine how long the servlet should
055 * block for. The servlet can be configured with the following init parameters:
056 * <dl>
057 * <dt>defaultReadTimeout</dt>
058 * <dd>The default time in ms to wait for messages. May be overridden by a
059 * request using the 'timeout' parameter</dd>
060 * <dt>maximumReadTimeout</dt>
061 * <dd>The maximum value a request may specify for the 'timeout' parameter</dd>
062 * <dt>maximumMessages</dt>
063 * <dd>maximum messages to send per response</dd>
064 * <dt></dt>
065 * <dd></dd>
066 * </dl>
067 *
068 *
069 */
070@SuppressWarnings("serial")
071public class MessageListenerServlet extends MessageServletSupport {
072    private static final Logger LOG = LoggerFactory.getLogger(MessageListenerServlet.class);
073
074    private final String readTimeoutParameter = "timeout";
075    private long defaultReadTimeout = -1;
076    private long maximumReadTimeout = 25000;
077    private int maximumMessages = 100;
078    private final Timer clientCleanupTimer = new Timer("ActiveMQ Ajax Client Cleanup Timer", true);
079    private final HashMap<String,AjaxWebClient> ajaxWebClients = new HashMap<String,AjaxWebClient>();
080
081    @Override
082    public void init() throws ServletException {
083        ServletConfig servletConfig = getServletConfig();
084        String name = servletConfig.getInitParameter("defaultReadTimeout");
085        if (name != null) {
086            defaultReadTimeout = asLong(name);
087        }
088        name = servletConfig.getInitParameter("maximumReadTimeout");
089        if (name != null) {
090            maximumReadTimeout = asLong(name);
091        }
092        name = servletConfig.getInitParameter("maximumMessages");
093        if (name != null) {
094            maximumMessages = (int)asLong(name);
095        }
096        clientCleanupTimer.schedule( new ClientCleaner(), 5000, 60000 );
097    }
098
099    /**
100     * Sends a message to a destination or manage subscriptions. If the the
101     * content type of the POST is
102     * <code>application/x-www-form-urlencoded</code>, then the form
103     * parameters "destination", "message" and "type" are used to pass a message
104     * or a subscription. If multiple messages or subscriptions are passed in a
105     * single post, then additional parameters are shortened to "dN", "mN" and
106     * "tN" where N is an index starting from 1. The type is either "send",
107     * "listen" or "unlisten". For send types, the message is the text of the
108     * TextMessage, otherwise it is the ID to be used for the subscription. If
109     * the content type is not <code>application/x-www-form-urlencoded</code>,
110     * then the body of the post is sent as the message to a destination that is
111     * derived from a query parameter, the URL or the default destination.
112     *
113     * @param request
114     * @param response
115     * @throws ServletException
116     * @throws IOException
117     */
118    @Override
119    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
120
121        // lets turn the HTTP post into a JMS Message
122        AjaxWebClient client = getAjaxWebClient( request );
123        String messageIds = "";
124
125        synchronized (client) {
126
127            if (LOG.isDebugEnabled()) {
128                LOG.debug("POST client=" + client + " session=" + request.getSession().getId() + " clientId="+ request.getParameter("clientId") + " info=" + request.getPathInfo() + " contentType=" + request.getContentType());
129                // dump(request.getParameterMap());
130            }
131
132            int messages = 0;
133
134            // loop until no more messages
135            while (true) {
136                // Get the message parameters. Multiple messages are encoded
137                // with more compact parameter names.
138                String destinationName = request.getParameter(messages == 0 ? "destination" : ("d" + messages));
139
140                if (destinationName == null) {
141                    destinationName = request.getHeader("destination");
142                }
143
144                String message = request.getParameter(messages == 0 ? "message" : ("m" + messages));
145                String type = request.getParameter(messages == 0 ? "type" : ("t" + messages));
146
147                if (destinationName == null || message == null || type == null) {
148                    break;
149                }
150
151                try {
152                    Destination destination = getDestination(client, request, destinationName);
153
154                    if (LOG.isDebugEnabled()) {
155                        LOG.debug(messages + " destination=" + destinationName + " message=" + message + " type=" + type);
156                        LOG.debug(destination + " is a " + destination.getClass().getName());
157                    }
158
159                    messages++;
160
161                    if ("listen".equals(type)) {
162                        AjaxListener listener = client.getListener();
163                        Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
164                        Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
165                        client.closeConsumer(destination); // drop any existing
166                        // consumer.
167                        MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
168
169                        consumer.setAvailableListener(listener);
170                        consumerIdMap.put(consumer, message);
171                        consumerDestinationNameMap.put(consumer, destinationName);
172                        if (LOG.isDebugEnabled()) {
173                            LOG.debug("Subscribed: " + consumer + " to " + destination + " id=" + message);
174                        }
175                    } else if ("unlisten".equals(type)) {
176                        Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
177                        Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
178                        MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
179
180                        consumer.setAvailableListener(null);
181                        consumerIdMap.remove(consumer);
182                        consumerDestinationNameMap.remove(consumer);
183                        client.closeConsumer(destination);
184                        if (LOG.isDebugEnabled()) {
185                            LOG.debug("Unsubscribed: " + consumer);
186                        }
187                    } else if ("send".equals(type)) {
188                        TextMessage text = client.getSession().createTextMessage(message);
189                        appendParametersToMessage(request, text);
190
191                        client.send(destination, text);
192                        messageIds += text.getJMSMessageID() + "\n";
193                        if (LOG.isDebugEnabled()) {
194                            LOG.debug("Sent " + message + " to " + destination);
195                        }
196                    } else {
197                        LOG.warn("unknown type " + type);
198                    }
199
200                } catch (JMSException e) {
201                    LOG.warn("jms", e);
202                }
203            }
204        }
205
206        if ("true".equals(request.getParameter("poll"))) {
207            try {
208                // TODO return message IDs
209                doMessages(client, request, response);
210            } catch (JMSException e) {
211                throw new ServletException("JMS problem: " + e, e);
212            }
213        } else {
214            // handle simple POST of a message
215            if (request.getContentLength() != 0 && (request.getContentType() == null || !request.getContentType().toLowerCase().startsWith("application/x-www-form-urlencoded"))) {
216                try {
217                    Destination destination = getDestination(client, request);
218                    String body = getPostedMessageBody(request);
219                    TextMessage message = client.getSession().createTextMessage(body);
220                    appendParametersToMessage(request, message);
221
222                    client.send(destination, message);
223                    if (LOG.isDebugEnabled()) {
224                        LOG.debug("Sent to destination: " + destination + " body: " + body);
225                    }
226                    messageIds += message.getJMSMessageID() + "\n";
227                } catch (JMSException e) {
228                    throw new ServletException(e);
229                }
230            }
231
232            response.setContentType("text/plain");
233            response.setHeader("Cache-Control", "no-cache");
234            response.getWriter().print(messageIds);
235        }
236    }
237
238    /**
239     * Supports a HTTP DELETE to be equivlanent of consuming a singe message
240     * from a queue
241     */
242    @Override
243    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
244        try {
245            AjaxWebClient client = getAjaxWebClient(request);
246            if (LOG.isDebugEnabled()) {
247                LOG.debug("GET client=" + client + " session=" + request.getSession().getId() + " clientId="+ request.getParameter("clientId") + " uri=" + request.getRequestURI() + " query=" + request.getQueryString());
248            }
249
250            doMessages(client, request, response);
251        } catch (JMSException e) {
252            throw new ServletException("JMS problem: " + e, e);
253        }
254    }
255
256    /**
257     * Reads a message from a destination up to some specific timeout period
258     *
259     * @param client The webclient
260     * @param request
261     * @param response
262     * @throws ServletException
263     * @throws IOException
264     */
265    protected void doMessages(AjaxWebClient client, HttpServletRequest request, HttpServletResponse response) throws JMSException, IOException {
266
267        int messages = 0;
268        // This is a poll for any messages
269
270        long timeout = getReadTimeout(request);
271        if (LOG.isDebugEnabled()) {
272            LOG.debug("doMessage timeout=" + timeout);
273        }
274
275        // this is non-null if we're resuming the continuation.
276        // attributes set in AjaxListener
277        UndeliveredAjaxMessage undelivered_message = null;
278        Message message = null;
279        undelivered_message = (UndeliveredAjaxMessage)request.getAttribute("undelivered_message");
280        if( undelivered_message != null ) {
281            message = undelivered_message.getMessage();
282        }
283
284        synchronized (client) {
285
286            List<MessageConsumer> consumers = client.getConsumers();
287            MessageAvailableConsumer consumer = null;
288            if( undelivered_message != null ) {
289                consumer = (MessageAvailableConsumer)undelivered_message.getConsumer();
290            }
291
292            if (message == null) {
293                // Look for a message that is ready to go
294                for (int i = 0; message == null && i < consumers.size(); i++) {
295                    consumer = (MessageAvailableConsumer)consumers.get(i);
296                    if (consumer.getAvailableListener() == null) {
297                        continue;
298                    }
299
300                    // Look for any available messages
301                    message = consumer.receive(10);
302                    if (LOG.isDebugEnabled()) {
303                        LOG.debug("received " + message + " from " + consumer);
304                    }
305                }
306            }
307
308            // prepare the response
309            response.setContentType("text/xml");
310            response.setHeader("Cache-Control", "no-cache");
311
312            if (message == null && client.getListener().getUndeliveredMessages().size() == 0) {
313                Continuation continuation = ContinuationSupport.getContinuation(request);
314
315                // Add a listener to the continuation to make sure it actually
316                // will expire (seems like a bug in Jetty Servlet 3 continuations,
317                // see https://issues.apache.org/jira/browse/AMQ-3447
318                continuation.addContinuationListener(new ContinuationListener() {
319                    @Override
320                    public void onTimeout(Continuation cont) {
321                        if (LOG.isDebugEnabled()) {
322                            LOG.debug("Continuation " + cont.toString() + " expired.");
323                        }
324                    }
325
326                    @Override
327                    public void onComplete(Continuation cont) {
328                        if (LOG.isDebugEnabled()) {
329                           LOG.debug("Continuation " + cont.toString() + " completed.");
330                        }
331                    }
332                });
333
334                if (continuation.isExpired()) {
335                    response.setStatus(HttpServletResponse.SC_OK);
336                    StringWriter swriter = new StringWriter();
337                    PrintWriter writer = new PrintWriter(swriter);
338                    writer.println("<ajax-response>");
339                    writer.print("</ajax-response>");
340
341                    writer.flush();
342                    String m = swriter.toString();
343                    response.getWriter().println(m);
344
345                    return;
346                }
347
348                continuation.setTimeout(timeout);
349                continuation.suspend();
350                LOG.debug( "Suspending continuation " + continuation );
351
352                // Fetch the listeners
353                AjaxListener listener = client.getListener();
354                listener.access();
355
356                // register this continuation with our listener.
357                listener.setContinuation(continuation);
358
359                return;
360            }
361
362            StringWriter swriter = new StringWriter();
363            PrintWriter writer = new PrintWriter(swriter);
364
365            Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
366            Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
367            response.setStatus(HttpServletResponse.SC_OK);
368            writer.println("<ajax-response>");
369
370            // Send any message we already have
371            if (message != null) {
372                String id = consumerIdMap.get(consumer);
373                String destinationName = consumerDestinationNameMap.get(consumer);
374                LOG.debug( "sending pre-existing message" );
375                writeMessageResponse(writer, message, id, destinationName);
376
377                messages++;
378            }
379
380            // send messages buffered while continuation was unavailable.
381            LinkedList<UndeliveredAjaxMessage> undeliveredMessages = ((AjaxListener)consumer.getAvailableListener()).getUndeliveredMessages();
382            LOG.debug("Send " + undeliveredMessages.size() + " unconsumed messages");
383            synchronized( undeliveredMessages ) {
384                for (Iterator<UndeliveredAjaxMessage> it = undeliveredMessages.iterator(); it.hasNext();) {
385                    messages++;
386                    UndeliveredAjaxMessage undelivered = it.next();
387                    Message msg = undelivered.getMessage();
388                    consumer = (MessageAvailableConsumer)undelivered.getConsumer();
389                    String id = consumerIdMap.get(consumer);
390                    String destinationName = consumerDestinationNameMap.get(consumer);
391                    LOG.debug( "sending undelivered/buffered messages" );
392                    LOG.debug( "msg:" +msg+ ", id:" +id+ ", destinationName:" +destinationName);
393                    writeMessageResponse(writer, msg, id, destinationName);
394                    it.remove();
395                    if (messages >= maximumMessages) {
396                        break;
397                    }
398                }
399            }
400
401            // Send the rest of the messages
402            for (int i = 0; i < consumers.size() && messages < maximumMessages; i++) {
403                consumer = (MessageAvailableConsumer)consumers.get(i);
404                if (consumer.getAvailableListener() == null) {
405                    continue;
406                }
407
408                // Look for any available messages
409                while (messages < maximumMessages) {
410                    message = consumer.receiveNoWait();
411                    if (message == null) {
412                        break;
413                    }
414                    messages++;
415                    String id = consumerIdMap.get(consumer);
416                    String destinationName = consumerDestinationNameMap.get(consumer);
417                    LOG.debug( "sending final available messages" );
418                    writeMessageResponse(writer, message, id, destinationName);
419                }
420            }
421
422            writer.print("</ajax-response>");
423
424            writer.flush();
425            String m = swriter.toString();
426            response.getWriter().println(m);
427        }
428    }
429
430    protected void writeMessageResponse(PrintWriter writer, Message message, String id, String destinationName) throws JMSException, IOException {
431        writer.print("<response id='");
432        writer.print(id);
433        writer.print("'");
434        if (destinationName != null) {
435            writer.print(" destination='" + destinationName + "' ");
436        }
437        writer.print(">");
438        if (message instanceof TextMessage) {
439            TextMessage textMsg = (TextMessage)message;
440            String txt = textMsg.getText();
441            if (txt != null) {
442                if (txt.startsWith("<?")) {
443                    txt = txt.substring(txt.indexOf("?>") + 2);
444                }
445                writer.print(txt);
446            }
447        } else if (message instanceof ObjectMessage) {
448            ObjectMessage objectMsg = (ObjectMessage)message;
449            Object object = objectMsg.getObject();
450            if (object != null) {
451                writer.print(object.toString());
452            }
453        }
454        writer.println("</response>");
455    }
456
457    /*
458     * Return the AjaxWebClient for this session+clientId.
459     * Create one if it does not already exist.
460     */
461    protected AjaxWebClient getAjaxWebClient( HttpServletRequest request ) {
462        HttpSession session = request.getSession(true);
463
464        String clientId = request.getParameter( "clientId" );
465        // if user doesn't supply a 'clientId', we'll just use a default.
466        if( clientId == null ) {
467            clientId = "defaultAjaxWebClient";
468        }
469        String sessionKey = session.getId() + '-' + clientId;
470
471        AjaxWebClient client = null;
472        synchronized (ajaxWebClients) {
473            client = ajaxWebClients.get( sessionKey );
474            // create a new AjaxWebClient if one does not already exist for this sessionKey.
475            if( client == null ) {
476                if (LOG.isDebugEnabled()) {
477                    LOG.debug( "creating new AjaxWebClient in "+sessionKey );
478                }
479                client = new AjaxWebClient( request, maximumReadTimeout );
480                ajaxWebClients.put( sessionKey, client );
481            }
482            client.updateLastAccessed();
483        }
484        return client;
485    }
486
487    /**
488     * @return the timeout value for read requests which is always >= 0 and <=
489     *         maximumReadTimeout to avoid DoS attacks
490     */
491    protected long getReadTimeout(HttpServletRequest request) {
492        long answer = defaultReadTimeout;
493
494        String name = request.getParameter(readTimeoutParameter);
495        if (name != null) {
496            answer = asLong(name);
497        }
498        if (answer < 0 || answer > maximumReadTimeout) {
499            answer = maximumReadTimeout;
500        }
501        return answer;
502    }
503
504    /*
505     * an instance of this class runs every minute (started in init), to clean up old web clients & free resources.
506     */
507    private class ClientCleaner extends TimerTask {
508        @Override
509        public void run() {
510            if( LOG.isDebugEnabled() ) {
511                LOG.debug( "Cleaning up expired web clients." );
512            }
513
514            synchronized( ajaxWebClients ) {
515                Iterator<Map.Entry<String, AjaxWebClient>> it = ajaxWebClients.entrySet().iterator();
516                while ( it.hasNext() ) {
517                    Map.Entry<String,AjaxWebClient> e = it.next();
518                    String key = e.getKey();
519                    AjaxWebClient val = e.getValue();
520                    if ( LOG.isDebugEnabled() ) {
521                        LOG.debug( "AjaxWebClient " + key + " last accessed " + val.getMillisSinceLastAccessed()/1000 + " seconds ago." );
522                    }
523                    // close an expired client and remove it from the ajaxWebClients hash.
524                    if( val.closeIfExpired() ) {
525                        if ( LOG.isDebugEnabled() ) {
526                            LOG.debug( "Removing expired AjaxWebClient " + key );
527                        }
528                        it.remove();
529                    }
530                }
531            }
532        }
533    }
534
535    @Override
536    public void destroy() {
537        // make sure we cancel the timer
538        clientCleanupTimer.cancel();
539        super.destroy();
540    }
541}