001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.web;
018    
019    import java.io.IOException;
020    import java.io.PrintWriter;
021    import java.io.StringWriter;
022    import java.util.HashMap;
023    import java.util.Iterator;
024    import java.util.LinkedList;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.Timer;
028    import java.util.TimerTask;
029    
030    import javax.jms.Destination;
031    import javax.jms.JMSException;
032    import javax.jms.Message;
033    import javax.jms.MessageConsumer;
034    import javax.jms.ObjectMessage;
035    import javax.jms.TextMessage;
036    import javax.servlet.ServletConfig;
037    import javax.servlet.ServletException;
038    import javax.servlet.http.HttpServletRequest;
039    import javax.servlet.http.HttpServletResponse;
040    import javax.servlet.http.HttpSession;
041    
042    import org.apache.activemq.MessageAvailableConsumer;
043    import org.eclipse.jetty.continuation.Continuation;
044    import org.eclipse.jetty.continuation.ContinuationListener;
045    import org.eclipse.jetty.continuation.ContinuationSupport;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    /**
050     * A servlet for sending and receiving messages to/from JMS destinations using
051     * HTTP POST for sending and HTTP GET for receiving. <p/> You can specify the
052     * destination and whether it is a topic or queue via configuration details on
053     * the servlet or as request parameters. <p/> For reading messages you can
054     * specify a readTimeout parameter to determine how long the servlet should
055     * block for. The servlet can be configured with the following init parameters:
056     * <dl>
057     * <dt>defaultReadTimeout</dt>
058     * <dd>The default time in ms to wait for messages. May be overridden by a
059     * request using the 'timeout' parameter</dd>
060     * <dt>maximumReadTimeout</dt>
061     * <dd>The maximum value a request may specify for the 'timeout' parameter</dd>
062     * <dt>maximumMessages</dt>
063     * <dd>maximum messages to send per response</dd>
064     * <dt></dt>
065     * <dd></dd>
066     * </dl>
067     *
068     *
069     */
070    @SuppressWarnings("serial")
071    public class MessageListenerServlet extends MessageServletSupport {
072        private static final Logger LOG = LoggerFactory.getLogger(MessageListenerServlet.class);
073    
074        private final String readTimeoutParameter = "timeout";
075        private long defaultReadTimeout = -1;
076        private long maximumReadTimeout = 25000;
077        private int maximumMessages = 100;
078        private final Timer clientCleanupTimer = new Timer("ActiveMQ Ajax Client Cleanup Timer", true);
079        private final HashMap<String,AjaxWebClient> ajaxWebClients = new HashMap<String,AjaxWebClient>();
080    
081        @Override
082        public void init() throws ServletException {
083            ServletConfig servletConfig = getServletConfig();
084            String name = servletConfig.getInitParameter("defaultReadTimeout");
085            if (name != null) {
086                defaultReadTimeout = asLong(name);
087            }
088            name = servletConfig.getInitParameter("maximumReadTimeout");
089            if (name != null) {
090                maximumReadTimeout = asLong(name);
091            }
092            name = servletConfig.getInitParameter("maximumMessages");
093            if (name != null) {
094                maximumMessages = (int)asLong(name);
095            }
096            clientCleanupTimer.schedule( new ClientCleaner(), 5000, 60000 );
097        }
098    
099        /**
100         * Sends a message to a destination or manage subscriptions. If the the
101         * content type of the POST is
102         * <code>application/x-www-form-urlencoded</code>, then the form
103         * parameters "destination", "message" and "type" are used to pass a message
104         * or a subscription. If multiple messages or subscriptions are passed in a
105         * single post, then additional parameters are shortened to "dN", "mN" and
106         * "tN" where N is an index starting from 1. The type is either "send",
107         * "listen" or "unlisten". For send types, the message is the text of the
108         * TextMessage, otherwise it is the ID to be used for the subscription. If
109         * the content type is not <code>application/x-www-form-urlencoded</code>,
110         * then the body of the post is sent as the message to a destination that is
111         * derived from a query parameter, the URL or the default destination.
112         *
113         * @param request
114         * @param response
115         * @throws ServletException
116         * @throws IOException
117         */
118        @Override
119        protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
120    
121            // lets turn the HTTP post into a JMS Message
122            AjaxWebClient client = getAjaxWebClient( request );
123            String messageIds = "";
124    
125            synchronized (client) {
126    
127                if (LOG.isDebugEnabled()) {
128                    LOG.debug("POST client=" + client + " session=" + request.getSession().getId() + " clientId="+ request.getParameter("clientId") + " info=" + request.getPathInfo() + " contentType=" + request.getContentType());
129                    // dump(request.getParameterMap());
130                }
131    
132                int messages = 0;
133    
134                // loop until no more messages
135                while (true) {
136                    // Get the message parameters. Multiple messages are encoded
137                    // with more compact parameter names.
138                    String destinationName = request.getParameter(messages == 0 ? "destination" : ("d" + messages));
139    
140                    if (destinationName == null) {
141                        destinationName = request.getHeader("destination");
142                    }
143    
144                    String message = request.getParameter(messages == 0 ? "message" : ("m" + messages));
145                    String type = request.getParameter(messages == 0 ? "type" : ("t" + messages));
146    
147                    if (destinationName == null || message == null || type == null) {
148                        break;
149                    }
150    
151                    try {
152                        Destination destination = getDestination(client, request, destinationName);
153    
154                        if (LOG.isDebugEnabled()) {
155                            LOG.debug(messages + " destination=" + destinationName + " message=" + message + " type=" + type);
156                            LOG.debug(destination + " is a " + destination.getClass().getName());
157                        }
158    
159                        messages++;
160    
161                        if ("listen".equals(type)) {
162                            AjaxListener listener = client.getListener();
163                            Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
164                            Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
165                            client.closeConsumer(destination); // drop any existing
166                            // consumer.
167                            MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
168    
169                            consumer.setAvailableListener(listener);
170                            consumerIdMap.put(consumer, message);
171                            consumerDestinationNameMap.put(consumer, destinationName);
172                            if (LOG.isDebugEnabled()) {
173                                LOG.debug("Subscribed: " + consumer + " to " + destination + " id=" + message);
174                            }
175                        } else if ("unlisten".equals(type)) {
176                            Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
177                            Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
178                            MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
179    
180                            consumer.setAvailableListener(null);
181                            consumerIdMap.remove(consumer);
182                            consumerDestinationNameMap.remove(consumer);
183                            client.closeConsumer(destination);
184                            if (LOG.isDebugEnabled()) {
185                                LOG.debug("Unsubscribed: " + consumer);
186                            }
187                        } else if ("send".equals(type)) {
188                            TextMessage text = client.getSession().createTextMessage(message);
189                            appendParametersToMessage(request, text);
190    
191                            client.send(destination, text);
192                            messageIds += text.getJMSMessageID() + "\n";
193                            if (LOG.isDebugEnabled()) {
194                                LOG.debug("Sent " + message + " to " + destination);
195                            }
196                        } else {
197                            LOG.warn("unknown type " + type);
198                        }
199    
200                    } catch (JMSException e) {
201                        LOG.warn("jms", e);
202                    }
203                }
204            }
205    
206            if ("true".equals(request.getParameter("poll"))) {
207                try {
208                    // TODO return message IDs
209                    doMessages(client, request, response);
210                } catch (JMSException e) {
211                    throw new ServletException("JMS problem: " + e, e);
212                }
213            } else {
214                // handle simple POST of a message
215                if (request.getContentLength() != 0 && (request.getContentType() == null || !request.getContentType().toLowerCase().startsWith("application/x-www-form-urlencoded"))) {
216                    try {
217                        Destination destination = getDestination(client, request);
218                        String body = getPostedMessageBody(request);
219                        TextMessage message = client.getSession().createTextMessage(body);
220                        appendParametersToMessage(request, message);
221    
222                        client.send(destination, message);
223                        if (LOG.isDebugEnabled()) {
224                            LOG.debug("Sent to destination: " + destination + " body: " + body);
225                        }
226                        messageIds += message.getJMSMessageID() + "\n";
227                    } catch (JMSException e) {
228                        throw new ServletException(e);
229                    }
230                }
231    
232                response.setContentType("text/plain");
233                response.setHeader("Cache-Control", "no-cache");
234                response.getWriter().print(messageIds);
235            }
236        }
237    
238        /**
239         * Supports a HTTP DELETE to be equivlanent of consuming a singe message
240         * from a queue
241         */
242        @Override
243        protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
244            try {
245                AjaxWebClient client = getAjaxWebClient(request);
246                if (LOG.isDebugEnabled()) {
247                    LOG.debug("GET client=" + client + " session=" + request.getSession().getId() + " clientId="+ request.getParameter("clientId") + " uri=" + request.getRequestURI() + " query=" + request.getQueryString());
248                }
249    
250                doMessages(client, request, response);
251            } catch (JMSException e) {
252                throw new ServletException("JMS problem: " + e, e);
253            }
254        }
255    
256        /**
257         * Reads a message from a destination up to some specific timeout period
258         *
259         * @param client The webclient
260         * @param request
261         * @param response
262         * @throws ServletException
263         * @throws IOException
264         */
265        protected void doMessages(AjaxWebClient client, HttpServletRequest request, HttpServletResponse response) throws JMSException, IOException {
266    
267            int messages = 0;
268            // This is a poll for any messages
269    
270            long timeout = getReadTimeout(request);
271            if (LOG.isDebugEnabled()) {
272                LOG.debug("doMessage timeout=" + timeout);
273            }
274    
275            // this is non-null if we're resuming the continuation.
276            // attributes set in AjaxListener
277            UndeliveredAjaxMessage undelivered_message = null;
278            Message message = null;
279            undelivered_message = (UndeliveredAjaxMessage)request.getAttribute("undelivered_message");
280            if( undelivered_message != null ) {
281                message = undelivered_message.getMessage();
282            }
283    
284            synchronized (client) {
285    
286                List<MessageConsumer> consumers = client.getConsumers();
287                MessageAvailableConsumer consumer = null;
288                if( undelivered_message != null ) {
289                    consumer = (MessageAvailableConsumer)undelivered_message.getConsumer();
290                }
291    
292                if (message == null) {
293                    // Look for a message that is ready to go
294                    for (int i = 0; message == null && i < consumers.size(); i++) {
295                        consumer = (MessageAvailableConsumer)consumers.get(i);
296                        if (consumer.getAvailableListener() == null) {
297                            continue;
298                        }
299    
300                        // Look for any available messages
301                        message = consumer.receive(10);
302                        if (LOG.isDebugEnabled()) {
303                            LOG.debug("received " + message + " from " + consumer);
304                        }
305                    }
306                }
307    
308                // prepare the response
309                response.setContentType("text/xml");
310                response.setHeader("Cache-Control", "no-cache");
311    
312                if (message == null && client.getListener().getUndeliveredMessages().size() == 0) {
313                    Continuation continuation = ContinuationSupport.getContinuation(request);
314    
315                    // Add a listener to the continuation to make sure it actually
316                    // will expire (seems like a bug in Jetty Servlet 3 continuations,
317                    // see https://issues.apache.org/jira/browse/AMQ-3447
318                    continuation.addContinuationListener(new ContinuationListener() {
319                        @Override
320                        public void onTimeout(Continuation cont) {
321                            if (LOG.isDebugEnabled()) {
322                                LOG.debug("Continuation " + cont.toString() + " expired.");
323                            }
324                        }
325    
326                        @Override
327                        public void onComplete(Continuation cont) {
328                            if (LOG.isDebugEnabled()) {
329                               LOG.debug("Continuation " + cont.toString() + " completed.");
330                            }
331                        }
332                    });
333    
334                    if (continuation.isExpired()) {
335                        response.setStatus(HttpServletResponse.SC_OK);
336                        StringWriter swriter = new StringWriter();
337                        PrintWriter writer = new PrintWriter(swriter);
338                        writer.println("<ajax-response>");
339                        writer.print("</ajax-response>");
340    
341                        writer.flush();
342                        String m = swriter.toString();
343                        response.getWriter().println(m);
344    
345                        return;
346                    }
347    
348                    continuation.setTimeout(timeout);
349                    continuation.suspend();
350                    LOG.debug( "Suspending continuation " + continuation );
351    
352                    // Fetch the listeners
353                    AjaxListener listener = client.getListener();
354                    listener.access();
355    
356                    // register this continuation with our listener.
357                    listener.setContinuation(continuation);
358    
359                    return;
360                }
361    
362                StringWriter swriter = new StringWriter();
363                PrintWriter writer = new PrintWriter(swriter);
364    
365                Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
366                Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
367                response.setStatus(HttpServletResponse.SC_OK);
368                writer.println("<ajax-response>");
369    
370                // Send any message we already have
371                if (message != null) {
372                    String id = consumerIdMap.get(consumer);
373                    String destinationName = consumerDestinationNameMap.get(consumer);
374                    LOG.debug( "sending pre-existing message" );
375                    writeMessageResponse(writer, message, id, destinationName);
376    
377                    messages++;
378                }
379    
380                // send messages buffered while continuation was unavailable.
381                LinkedList<UndeliveredAjaxMessage> undeliveredMessages = ((AjaxListener)consumer.getAvailableListener()).getUndeliveredMessages();
382                LOG.debug("Send " + undeliveredMessages.size() + " unconsumed messages");
383                synchronized( undeliveredMessages ) {
384                    for (Iterator<UndeliveredAjaxMessage> it = undeliveredMessages.iterator(); it.hasNext();) {
385                        messages++;
386                        UndeliveredAjaxMessage undelivered = it.next();
387                        Message msg = undelivered.getMessage();
388                        consumer = (MessageAvailableConsumer)undelivered.getConsumer();
389                        String id = consumerIdMap.get(consumer);
390                        String destinationName = consumerDestinationNameMap.get(consumer);
391                        LOG.debug( "sending undelivered/buffered messages" );
392                        LOG.debug( "msg:" +msg+ ", id:" +id+ ", destinationName:" +destinationName);
393                        writeMessageResponse(writer, msg, id, destinationName);
394                        it.remove();
395                        if (messages >= maximumMessages) {
396                            break;
397                        }
398                    }
399                }
400    
401                // Send the rest of the messages
402                for (int i = 0; i < consumers.size() && messages < maximumMessages; i++) {
403                    consumer = (MessageAvailableConsumer)consumers.get(i);
404                    if (consumer.getAvailableListener() == null) {
405                        continue;
406                    }
407    
408                    // Look for any available messages
409                    while (messages < maximumMessages) {
410                        message = consumer.receiveNoWait();
411                        if (message == null) {
412                            break;
413                        }
414                        messages++;
415                        String id = consumerIdMap.get(consumer);
416                        String destinationName = consumerDestinationNameMap.get(consumer);
417                        LOG.debug( "sending final available messages" );
418                        writeMessageResponse(writer, message, id, destinationName);
419                    }
420                }
421    
422                writer.print("</ajax-response>");
423    
424                writer.flush();
425                String m = swriter.toString();
426                response.getWriter().println(m);
427            }
428        }
429    
430        protected void writeMessageResponse(PrintWriter writer, Message message, String id, String destinationName) throws JMSException, IOException {
431            writer.print("<response id='");
432            writer.print(id);
433            writer.print("'");
434            if (destinationName != null) {
435                writer.print(" destination='" + destinationName + "' ");
436            }
437            writer.print(">");
438            if (message instanceof TextMessage) {
439                TextMessage textMsg = (TextMessage)message;
440                String txt = textMsg.getText();
441                if (txt != null) {
442                    if (txt.startsWith("<?")) {
443                        txt = txt.substring(txt.indexOf("?>") + 2);
444                    }
445                    writer.print(txt);
446                }
447            } else if (message instanceof ObjectMessage) {
448                ObjectMessage objectMsg = (ObjectMessage)message;
449                Object object = objectMsg.getObject();
450                if (object != null) {
451                    writer.print(object.toString());
452                }
453            }
454            writer.println("</response>");
455        }
456    
457        /*
458         * Return the AjaxWebClient for this session+clientId.
459         * Create one if it does not already exist.
460         */
461        protected AjaxWebClient getAjaxWebClient( HttpServletRequest request ) {
462            HttpSession session = request.getSession(true);
463    
464            String clientId = request.getParameter( "clientId" );
465            // if user doesn't supply a 'clientId', we'll just use a default.
466            if( clientId == null ) {
467                clientId = "defaultAjaxWebClient";
468            }
469            String sessionKey = session.getId() + '-' + clientId;
470    
471            AjaxWebClient client = null;
472            synchronized (ajaxWebClients) {
473                client = ajaxWebClients.get( sessionKey );
474                // create a new AjaxWebClient if one does not already exist for this sessionKey.
475                if( client == null ) {
476                    if (LOG.isDebugEnabled()) {
477                        LOG.debug( "creating new AjaxWebClient in "+sessionKey );
478                    }
479                    client = new AjaxWebClient( request, maximumReadTimeout );
480                    ajaxWebClients.put( sessionKey, client );
481                }
482                client.updateLastAccessed();
483            }
484            return client;
485        }
486    
487        /**
488         * @return the timeout value for read requests which is always >= 0 and <=
489         *         maximumReadTimeout to avoid DoS attacks
490         */
491        protected long getReadTimeout(HttpServletRequest request) {
492            long answer = defaultReadTimeout;
493    
494            String name = request.getParameter(readTimeoutParameter);
495            if (name != null) {
496                answer = asLong(name);
497            }
498            if (answer < 0 || answer > maximumReadTimeout) {
499                answer = maximumReadTimeout;
500            }
501            return answer;
502        }
503    
504        /*
505         * an instance of this class runs every minute (started in init), to clean up old web clients & free resources.
506         */
507        private class ClientCleaner extends TimerTask {
508            @Override
509            public void run() {
510                if( LOG.isDebugEnabled() ) {
511                    LOG.debug( "Cleaning up expired web clients." );
512                }
513    
514                synchronized( ajaxWebClients ) {
515                    Iterator<Map.Entry<String, AjaxWebClient>> it = ajaxWebClients.entrySet().iterator();
516                    while ( it.hasNext() ) {
517                        Map.Entry<String,AjaxWebClient> e = it.next();
518                        String key = e.getKey();
519                        AjaxWebClient val = e.getValue();
520                        if ( LOG.isDebugEnabled() ) {
521                            LOG.debug( "AjaxWebClient " + key + " last accessed " + val.getMillisSinceLastAccessed()/1000 + " seconds ago." );
522                        }
523                        // close an expired client and remove it from the ajaxWebClients hash.
524                        if( val.closeIfExpired() ) {
525                            if ( LOG.isDebugEnabled() ) {
526                                LOG.debug( "Removing expired AjaxWebClient " + key );
527                            }
528                            it.remove();
529                        }
530                    }
531                }
532            }
533        }
534    
535        @Override
536        public void destroy() {
537            // make sure we cancel the timer
538            clientCleanupTimer.cancel();
539            super.destroy();
540        }
541    }