001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.web;
018    
019    import java.io.IOException;
020    import java.io.PrintWriter;
021    import java.io.StringWriter;
022    import java.util.HashMap;
023    import java.util.Iterator;
024    import java.util.LinkedList;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.Timer;
028    import java.util.TimerTask;
029    
030    import javax.jms.Destination;
031    import javax.jms.JMSException;
032    import javax.jms.Message;
033    import javax.jms.MessageConsumer;
034    import javax.jms.ObjectMessage;
035    import javax.jms.TextMessage;
036    import javax.servlet.ServletConfig;
037    import javax.servlet.ServletException;
038    import javax.servlet.http.HttpServletRequest;
039    import javax.servlet.http.HttpServletResponse;
040    import javax.servlet.http.HttpSession;
041    
042    import org.apache.activemq.MessageAvailableConsumer;
043    import org.eclipse.jetty.continuation.Continuation;
044    import org.eclipse.jetty.continuation.ContinuationSupport;
045    import org.slf4j.Logger;
046    import org.slf4j.LoggerFactory;
047    
048    /**
049     * A servlet for sending and receiving messages to/from JMS destinations using
050     * HTTP POST for sending and HTTP GET for receiving. <p/> You can specify the
051     * destination and whether it is a topic or queue via configuration details on
052     * the servlet or as request parameters. <p/> For reading messages you can
053     * specify a readTimeout parameter to determine how long the servlet should
054     * block for. The servlet can be configured with the following init parameters:
055     * <dl>
056     * <dt>defaultReadTimeout</dt>
057     * <dd>The default time in ms to wait for messages. May be overridden by a
058     * request using the 'timeout' parameter</dd>
059     * <dt>maximumReadTimeout</dt>
060     * <dd>The maximum value a request may specify for the 'timeout' parameter</dd>
061     * <dt>maximumMessages</dt>
062     * <dd>maximum messages to send per response</dd>
063     * <dt></dt>
064     * <dd></dd>
065     * </dl>
066     *
067     *
068     */
069    @SuppressWarnings("serial")
070    public class MessageListenerServlet extends MessageServletSupport {
071        private static final Logger LOG = LoggerFactory.getLogger(MessageListenerServlet.class);
072    
073        private String readTimeoutParameter = "timeout";
074        private long defaultReadTimeout = -1;
075        private long maximumReadTimeout = 25000;
076        private int maximumMessages = 100;
077        private Timer clientCleanupTimer = new Timer("ActiveMQ Ajax Client Cleanup Timer", true);
078        private HashMap<String,AjaxWebClient> ajaxWebClients = new HashMap<String,AjaxWebClient>();
079    
080        public void init() throws ServletException {
081            ServletConfig servletConfig = getServletConfig();
082            String name = servletConfig.getInitParameter("defaultReadTimeout");
083            if (name != null) {
084                defaultReadTimeout = asLong(name);
085            }
086            name = servletConfig.getInitParameter("maximumReadTimeout");
087            if (name != null) {
088                maximumReadTimeout = asLong(name);
089            }
090            name = servletConfig.getInitParameter("maximumMessages");
091            if (name != null) {
092                maximumMessages = (int)asLong(name);
093            }
094            clientCleanupTimer.schedule( new ClientCleaner(), 5000, 60000 );
095        }
096    
097        /**
098         * Sends a message to a destination or manage subscriptions. If the the
099         * content type of the POST is
100         * <code>application/x-www-form-urlencoded</code>, then the form
101         * parameters "destination", "message" and "type" are used to pass a message
102         * or a subscription. If multiple messages or subscriptions are passed in a
103         * single post, then additional parameters are shortened to "dN", "mN" and
104         * "tN" where N is an index starting from 1. The type is either "send",
105         * "listen" or "unlisten". For send types, the message is the text of the
106         * TextMessage, otherwise it is the ID to be used for the subscription. If
107         * the content type is not <code>application/x-www-form-urlencoded</code>,
108         * then the body of the post is sent as the message to a destination that is
109         * derived from a query parameter, the URL or the default destination.
110         *
111         * @param request
112         * @param response
113         * @throws ServletException
114         * @throws IOException
115         */
116        protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
117    
118            // lets turn the HTTP post into a JMS Message
119            AjaxWebClient client = getAjaxWebClient( request );
120            String messageIds = "";
121    
122            synchronized (client) {
123    
124                if (LOG.isDebugEnabled()) {
125                    LOG.debug("POST client=" + client + " session=" + request.getSession().getId() + " clientId="+ request.getParameter("clientId") + " info=" + request.getPathInfo() + " contentType=" + request.getContentType());
126                    // dump(request.getParameterMap());
127                }
128    
129                int messages = 0;
130    
131                // loop until no more messages
132                while (true) {
133                    // Get the message parameters. Multiple messages are encoded
134                    // with more compact parameter names.
135                    String destinationName = request.getParameter(messages == 0 ? "destination" : ("d" + messages));
136    
137                    if (destinationName == null) {
138                        destinationName = request.getHeader("destination");
139                    }
140    
141                    String message = request.getParameter(messages == 0 ? "message" : ("m" + messages));
142                    String type = request.getParameter(messages == 0 ? "type" : ("t" + messages));
143    
144                    if (destinationName == null || message == null || type == null) {
145                        break;
146                    }
147    
148                    try {
149                        Destination destination = getDestination(client, request, destinationName);
150    
151                        if (LOG.isDebugEnabled()) {
152                            LOG.debug(messages + " destination=" + destinationName + " message=" + message + " type=" + type);
153                            LOG.debug(destination + " is a " + destination.getClass().getName());
154                        }
155    
156                        messages++;
157    
158                        if ("listen".equals(type)) {
159                            AjaxListener listener = client.getListener();
160                            Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
161                            Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
162                            client.closeConsumer(destination); // drop any existing
163                            // consumer.
164                            MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
165    
166                            consumer.setAvailableListener(listener);
167                            consumerIdMap.put(consumer, message);
168                            consumerDestinationNameMap.put(consumer, destinationName);
169                            if (LOG.isDebugEnabled()) {
170                                LOG.debug("Subscribed: " + consumer + " to " + destination + " id=" + message);
171                            }
172                        } else if ("unlisten".equals(type)) {
173                            Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
174                            Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
175                            MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
176    
177                            consumer.setAvailableListener(null);
178                            consumerIdMap.remove(consumer);
179                            consumerDestinationNameMap.remove(consumer);
180                            client.closeConsumer(destination);
181                            if (LOG.isDebugEnabled()) {
182                                LOG.debug("Unsubscribed: " + consumer);
183                            }
184                        } else if ("send".equals(type)) {
185                            TextMessage text = client.getSession().createTextMessage(message);
186                            appendParametersToMessage(request, text);
187    
188                            client.send(destination, text);
189                            messageIds += text.getJMSMessageID() + "\n";
190                            if (LOG.isDebugEnabled()) {
191                                LOG.debug("Sent " + message + " to " + destination);
192                            }
193                        } else {
194                            LOG.warn("unknown type " + type);
195                        }
196    
197                    } catch (JMSException e) {
198                        LOG.warn("jms", e);
199                    }
200                }
201            }
202    
203            if ("true".equals(request.getParameter("poll"))) {
204                try {
205                    // TODO return message IDs
206                    doMessages(client, request, response);
207                } catch (JMSException e) {
208                    throw new ServletException("JMS problem: " + e, e);
209                }
210            } else {
211                // handle simple POST of a message
212                if (request.getContentLength() != 0 && (request.getContentType() == null || !request.getContentType().toLowerCase().startsWith("application/x-www-form-urlencoded"))) {
213                    try {
214                        Destination destination = getDestination(client, request);
215                        String body = getPostedMessageBody(request);
216                        TextMessage message = client.getSession().createTextMessage(body);
217                        appendParametersToMessage(request, message);
218    
219                        client.send(destination, message);
220                        if (LOG.isDebugEnabled()) {
221                            LOG.debug("Sent to destination: " + destination + " body: " + body);
222                        }
223                        messageIds += message.getJMSMessageID() + "\n";
224                    } catch (JMSException e) {
225                        throw new ServletException(e);
226                    }
227                }
228    
229                response.setContentType("text/plain");
230                response.setHeader("Cache-Control", "no-cache");
231                response.getWriter().print(messageIds);
232            }
233        }
234    
235        /**
236         * Supports a HTTP DELETE to be equivlanent of consuming a singe message
237         * from a queue
238         */
239        protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
240            try {
241                AjaxWebClient client = getAjaxWebClient(request);
242                if (LOG.isDebugEnabled()) {
243                    LOG.debug("GET client=" + client + " session=" + request.getSession().getId() + " clientId="+ request.getParameter("clientId") + " uri=" + request.getRequestURI() + " query=" + request.getQueryString());
244                }
245    
246                doMessages(client, request, response);
247            } catch (JMSException e) {
248                throw new ServletException("JMS problem: " + e, e);
249            }
250        }
251    
252        /**
253         * Reads a message from a destination up to some specific timeout period
254         *
255         * @param client The webclient
256         * @param request
257         * @param response
258         * @throws ServletException
259         * @throws IOException
260         */
261        protected void doMessages(AjaxWebClient client, HttpServletRequest request, HttpServletResponse response) throws JMSException, IOException {
262    
263            int messages = 0;
264            // This is a poll for any messages
265    
266            long timeout = getReadTimeout(request);
267            if (LOG.isDebugEnabled()) {
268                LOG.debug("doMessage timeout=" + timeout);
269            }
270    
271            // this is non-null if we're resuming the continuation.
272            // attributes set in AjaxListener
273            UndeliveredAjaxMessage undelivered_message = null;
274            Message message = null;
275            undelivered_message = (UndeliveredAjaxMessage)request.getAttribute("undelivered_message");
276            if( undelivered_message != null ) {
277                message = (Message)undelivered_message.getMessage();
278            }
279    
280            synchronized (client) {
281    
282                List<MessageConsumer> consumers = client.getConsumers();
283                MessageAvailableConsumer consumer = null;
284                if( undelivered_message != null ) {
285                    consumer = (MessageAvailableConsumer)undelivered_message.getConsumer();
286                }
287    
288                if (message == null) {
289                    // Look for a message that is ready to go
290                    for (int i = 0; message == null && i < consumers.size(); i++) {
291                        consumer = (MessageAvailableConsumer)consumers.get(i);
292                        if (consumer.getAvailableListener() == null) {
293                            continue;
294                        }
295    
296                        // Look for any available messages
297                        message = consumer.receive(10);
298                        if (LOG.isDebugEnabled()) {
299                            LOG.debug("received " + message + " from " + consumer);
300                        }
301                    }
302                }
303    
304                // prepare the response
305                response.setContentType("text/xml");
306                response.setHeader("Cache-Control", "no-cache");
307    
308                if (message == null && client.getListener().getUndeliveredMessages().size() == 0) {
309                    Continuation continuation = ContinuationSupport.getContinuation(request);
310    
311                    if (continuation.isExpired()) {
312                        response.setStatus(HttpServletResponse.SC_OK);
313                        StringWriter swriter = new StringWriter();
314                        PrintWriter writer = new PrintWriter(swriter);
315                        writer.println("<ajax-response>");
316                        writer.print("</ajax-response>");
317    
318                        writer.flush();
319                        String m = swriter.toString();
320                        response.getWriter().println(m);
321    
322                        return;
323                    }
324    
325                    continuation.setTimeout(timeout);
326                    continuation.suspend();
327                    LOG.debug( "Suspending continuation " + continuation );
328    
329                    // Fetch the listeners
330                    AjaxListener listener = client.getListener();
331                    listener.access();
332    
333                    // register this continuation with our listener.
334                    listener.setContinuation(continuation);
335    
336                    return;
337                }
338    
339                StringWriter swriter = new StringWriter();
340                PrintWriter writer = new PrintWriter(swriter);
341    
342                Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
343                Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
344                response.setStatus(HttpServletResponse.SC_OK);
345                writer.println("<ajax-response>");
346    
347                // Send any message we already have
348                if (message != null) {
349                    String id = consumerIdMap.get(consumer);
350                    String destinationName = consumerDestinationNameMap.get(consumer);
351                    LOG.debug( "sending pre-existing message" );
352                    writeMessageResponse(writer, message, id, destinationName);
353    
354                    messages++;
355                }
356    
357                // send messages buffered while continuation was unavailable.
358                LinkedList<UndeliveredAjaxMessage> undeliveredMessages = ((AjaxListener)consumer.getAvailableListener()).getUndeliveredMessages();
359                LOG.debug("Send " + undeliveredMessages.size() + " unconsumed messages");
360                synchronized( undeliveredMessages ) {
361                    for (Iterator<UndeliveredAjaxMessage> it = undeliveredMessages.iterator(); it.hasNext(); ) {
362                        messages++;
363                        UndeliveredAjaxMessage undelivered = it.next();
364                        Message msg = (Message)undelivered.getMessage();
365                        consumer = (MessageAvailableConsumer)undelivered.getConsumer();
366                        String id = consumerIdMap.get(consumer);
367                        String destinationName = consumerDestinationNameMap.get(consumer);
368                        LOG.debug( "sending undelivered/buffered messages" );
369                        LOG.debug( "msg:" +msg+ ", id:" +id+ ", destinationName:" +destinationName);
370                        writeMessageResponse(writer, msg, id, destinationName);
371                        it.remove();
372                        if (messages >= maximumMessages) {
373                            break;
374                        }
375                    }
376                }
377    
378                // Send the rest of the messages
379                for (int i = 0; i < consumers.size() && messages < maximumMessages; i++) {
380                    consumer = (MessageAvailableConsumer)consumers.get(i);
381                    if (consumer.getAvailableListener() == null) {
382                        continue;
383                    }
384    
385                    // Look for any available messages
386                    while (messages < maximumMessages) {
387                        message = consumer.receiveNoWait();
388                        if (message == null) {
389                            break;
390                        }
391                        messages++;
392                        String id = consumerIdMap.get(consumer);
393                        String destinationName = consumerDestinationNameMap.get(consumer);
394                        LOG.debug( "sending final available messages" );
395                        writeMessageResponse(writer, message, id, destinationName);
396                    }
397                }
398    
399                writer.print("</ajax-response>");
400    
401                writer.flush();
402                String m = swriter.toString();
403                response.getWriter().println(m);
404            }
405    
406        }
407    
408        protected void writeMessageResponse(PrintWriter writer, Message message, String id, String destinationName) throws JMSException, IOException {
409            writer.print("<response id='");
410            writer.print(id);
411            writer.print("'");
412            if (destinationName != null) {
413                writer.print(" destination='" + destinationName + "' ");
414            }
415            writer.print(">");
416            if (message instanceof TextMessage) {
417                TextMessage textMsg = (TextMessage)message;
418                String txt = textMsg.getText();
419                if (txt != null) {
420                    if (txt.startsWith("<?")) {
421                        txt = txt.substring(txt.indexOf("?>") + 2);
422                    }
423                    writer.print(txt);
424                }
425            } else if (message instanceof ObjectMessage) {
426                ObjectMessage objectMsg = (ObjectMessage)message;
427                Object object = objectMsg.getObject();
428                if (object != null) {
429                    writer.print(object.toString());
430                }
431            }
432            writer.println("</response>");
433        }
434    
435        /*
436         * Return the AjaxWebClient for this session+clientId.
437         * Create one if it does not already exist.
438         */
439        protected AjaxWebClient getAjaxWebClient( HttpServletRequest request ) {
440            HttpSession session = request.getSession(true);
441    
442            String clientId = request.getParameter( "clientId" );
443            // if user doesn't supply a 'clientId', we'll just use a default.
444            if( clientId == null ) {
445                clientId = "defaultAjaxWebClient";
446            }
447            String sessionKey = session.getId() + '-' + clientId;
448    
449            AjaxWebClient client = null;
450            synchronized (ajaxWebClients) {
451                client = ajaxWebClients.get( sessionKey );
452                // create a new AjaxWebClient if one does not already exist for this sessionKey.
453                if( client == null ) {
454                    if (LOG.isDebugEnabled()) {
455                        LOG.debug( "creating new AjaxWebClient in "+sessionKey );
456                    }
457                    client = new AjaxWebClient( request, maximumReadTimeout );
458                    ajaxWebClients.put( sessionKey, client );
459                }
460                client.updateLastAccessed();
461            }
462            return client;
463        }
464    
465        /**
466         * @return the timeout value for read requests which is always >= 0 and <=
467         *         maximumReadTimeout to avoid DoS attacks
468         */
469        protected long getReadTimeout(HttpServletRequest request) {
470            long answer = defaultReadTimeout;
471    
472            String name = request.getParameter(readTimeoutParameter);
473            if (name != null) {
474                answer = asLong(name);
475            }
476            if (answer < 0 || answer > maximumReadTimeout) {
477                answer = maximumReadTimeout;
478            }
479            return answer;
480        }
481    
482        /*
483         * an instance of this class runs every minute (started in init), to clean up old web clients & free resources.
484         */
485        private class ClientCleaner extends TimerTask {
486            public void run() {
487                if( LOG.isDebugEnabled() ) {
488                    LOG.debug( "Cleaning up expired web clients." );
489                }
490    
491                synchronized( ajaxWebClients ) {
492                    Iterator<Map.Entry<String, AjaxWebClient>> it = ajaxWebClients.entrySet().iterator();
493                    while ( it.hasNext() ) {
494                        Map.Entry<String,AjaxWebClient> e = it.next();
495                        String key = e.getKey();
496                        AjaxWebClient val = e.getValue();
497                        if ( LOG.isDebugEnabled() ) {
498                            LOG.debug( "AjaxWebClient " + key + " last accessed " + val.getMillisSinceLastAccessed()/1000 + " seconds ago." );
499                        }
500                        // close an expired client and remove it from the ajaxWebClients hash.
501                        if( val.closeIfExpired() ) {
502                            if ( LOG.isDebugEnabled() ) {
503                                LOG.debug( "Removing expired AjaxWebClient " + key );
504                            }
505                            it.remove();
506                        }
507                    }
508                }
509            }
510        }
511    
512        public void destroy() {
513            // make sure we cancel the timer
514            clientCleanupTimer.cancel();
515            super.destroy();
516        }
517    }