001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.web;
019    
020    import java.io.IOException;
021    import java.io.PrintWriter;
022    import java.util.Enumeration;
023    import java.util.HashMap;
024    
025    import javax.jms.Destination;
026    import javax.jms.JMSException;
027    import javax.jms.Message;
028    import javax.jms.MessageConsumer;
029    import javax.jms.ObjectMessage;
030    import javax.jms.TextMessage;
031    import javax.servlet.ServletConfig;
032    import javax.servlet.ServletException;
033    import javax.servlet.http.HttpServletRequest;
034    import javax.servlet.http.HttpServletResponse;
035    
036    import org.apache.activemq.MessageAvailableConsumer;
037    import org.apache.activemq.MessageAvailableListener;
038    import org.apache.activemq.command.ActiveMQDestination;
039    import org.apache.activemq.command.ActiveMQTextMessage;
040    import org.eclipse.jetty.continuation.Continuation;
041    import org.eclipse.jetty.continuation.ContinuationSupport;
042    import org.slf4j.Logger;
043    import org.slf4j.LoggerFactory;
044    
045    /**
046     * A servlet for sending and receiving messages to/from JMS destinations using
047     * HTTP POST for sending and HTTP GET for receiving. <p/> You can specify the
048     * destination and whether it is a topic or queue via configuration details on
049     * the servlet or as request parameters. <p/> For reading messages you can
050     * specify a readTimeout parameter to determine how long the servlet should
051     * block for.
052     */
053    public class MessageServlet extends MessageServletSupport {
054    
055        // its a bit pita that this servlet got intermixed with jetty continuation/rest
056        // instead of creating a special for that. We should have kept a simple servlet
057        // for good old fashioned request/response blocked communication.
058    
059        private static final long serialVersionUID = 8737914695188481219L;
060    
061        private static final Logger LOG = LoggerFactory.getLogger(MessageServlet.class);
062    
063        private String readTimeoutParameter = "readTimeout";
064        private long defaultReadTimeout = -1;
065        private long maximumReadTimeout = 20000;
066        private long requestTimeout = 1000;
067        private String defaultContentType = "text/xml";
068    
069        private HashMap<String, WebClient> clients = new HashMap<String, WebClient>();
070    
071        public void init() throws ServletException {
072            ServletConfig servletConfig = getServletConfig();
073            String name = servletConfig.getInitParameter("defaultReadTimeout");
074            if (name != null) {
075                defaultReadTimeout = asLong(name);
076            }
077            name = servletConfig.getInitParameter("maximumReadTimeout");
078            if (name != null) {
079                maximumReadTimeout = asLong(name);
080            }
081            name = servletConfig.getInitParameter("replyTimeout");
082            if (name != null) {
083                requestTimeout = asLong(name);
084            }
085            name = servletConfig.getInitParameter("defaultContentType");
086            if (name != null) {
087                defaultContentType = name;
088            }
089        }
090    
091        /**
092         * Sends a message to a destination
093         *
094         * @param request
095         * @param response
096         * @throws ServletException
097         * @throws IOException
098         */
099        protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
100            // lets turn the HTTP post into a JMS Message
101            try {
102    
103                String action = request.getParameter("action");
104                String clientId = request.getParameter("clientId");
105                if (action != null && clientId != null && action.equals("unsubscribe")) {
106                    LOG.info("Unsubscribing client " + clientId);
107                    WebClient client = getWebClient(request);
108                    client.close();
109                    clients.remove(clientId);
110                    return;
111                }
112    
113                WebClient client = getWebClient(request);
114    
115                String text = getPostedMessageBody(request);
116    
117                // lets create the destination from the URI?
118                Destination destination = getDestination(client, request);
119                if (destination == null) {
120                    throw new NoDestinationSuppliedException();
121                }
122    
123                if (LOG.isDebugEnabled()) {
124                    LOG.debug("Sending message to: " + destination + " with text: " + text);
125                }
126    
127                boolean sync = isSync(request);
128                TextMessage message = client.getSession().createTextMessage(text);
129    
130                if (sync) {
131                   String point = "activemq:"
132                       + ((ActiveMQDestination)destination).getPhysicalName().replace("//", "")
133                       + "?requestTimeout=" + requestTimeout;
134                   try {
135                       String body = (String)client.getProducerTemplate().requestBody(point, text);
136                       ActiveMQTextMessage answer = new ActiveMQTextMessage();
137                       answer.setText(body);
138    
139                       writeMessageResponse(response.getWriter(), answer);
140                   } catch (Exception e) {
141                       IOException ex = new IOException();
142                       ex.initCause(e);
143                       throw ex;
144                   }
145                } else {
146                    appendParametersToMessage(request, message);
147                    boolean persistent = isSendPersistent(request);
148                    int priority = getSendPriority(request);
149                    long timeToLive = getSendTimeToLive(request);
150                    client.send(destination, message, persistent, priority, timeToLive);
151                }
152    
153                // lets return a unique URI for reliable messaging
154                response.setHeader("messageID", message.getJMSMessageID());
155                response.setStatus(HttpServletResponse.SC_OK);
156                response.getWriter().write("Message sent");
157    
158            } catch (JMSException e) {
159                throw new ServletException("Could not post JMS message: " + e, e);
160            }
161        }
162    
163        /**
164         * Supports a HTTP DELETE to be equivlanent of consuming a singe message
165         * from a queue
166         */
167        protected void doDelete(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
168            doMessages(request, response);
169        }
170    
171        /**
172         * Supports a HTTP DELETE to be equivlanent of consuming a singe message
173         * from a queue
174         */
175        protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
176            doMessages(request, response);
177        }
178    
179        /**
180         * Reads a message from a destination up to some specific timeout period
181         *
182         * @param request
183         * @param response
184         * @throws ServletException
185         * @throws IOException
186         */
187        protected void doMessages(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
188            try {
189                WebClient client = getWebClient(request);
190                Destination destination = getDestination(client, request);
191                if (destination == null) {
192                    throw new NoDestinationSuppliedException();
193                }
194                MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
195                Message message = null;
196                message = (Message)request.getAttribute("message");
197                if (message != null) {
198                    // we're resuming continuation,
199                    // so just write the message and return
200                    writeResponse(request, response, message);
201                    return;
202                }
203                long timeout = getReadTimeout(request);
204    
205                if (LOG.isDebugEnabled()) {
206                    LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
207                }
208    
209                Continuation continuation = null;
210                Listener listener = null;
211    
212    
213                // Look for any available messages
214                message = consumer.receive(10);
215    
216                // Get an existing Continuation or create a new one if there are
217                // no events.
218                if (message == null) {
219                    continuation = ContinuationSupport.getContinuation(request);
220    
221                    if (continuation.isExpired()) {
222                        response.setStatus(HttpServletResponse.SC_NO_CONTENT);
223                        return;
224                    }
225    
226                    continuation.setTimeout(timeout);
227                    continuation.suspend();
228    
229                    // Fetch the listeners
230                    listener = (Listener)consumer.getAvailableListener();
231                    if (listener == null) {
232                        listener = new Listener(consumer);
233                        consumer.setAvailableListener(listener);
234                    }
235    
236                    // register this continuation with our listener.
237                    listener.setContinuation(continuation);
238                }
239    
240                writeResponse(request, response, message);
241            } catch (JMSException e) {
242                throw new ServletException("Could not post JMS message: " + e, e);
243            }
244        }
245    
246        protected void writeResponse(HttpServletRequest request, HttpServletResponse response, Message message) throws IOException, JMSException {
247            int messages = 0;
248            try {
249    
250                // write a responds
251                PrintWriter writer = response.getWriter();
252    
253                // handle any message(s)
254                if (message == null) {
255                    // No messages so OK response of for ajax else no content.
256                    response.setStatus(HttpServletResponse.SC_NO_CONTENT);
257    //                response.setContentType("text/plain");
258    //                writer.write("No message received");
259    //                writer.flush();
260                } else {
261                    // We have at least one message so set up the response
262                    messages = 1;
263    
264                    String type = getContentType(request);
265                    if (type != null) {
266                        response.setContentType(type);
267                    } else {
268                        if (isXmlContent(message)) {
269                            response.setContentType(defaultContentType);
270                        } else {
271                            response.setContentType("text/plain");
272                        }
273                    }
274                    response.setStatus(HttpServletResponse.SC_OK);
275    
276                    setResponseHeaders(response, message);
277                    writeMessageResponse(writer, message);
278                    writer.flush();
279                }
280            } finally {
281                if (LOG.isDebugEnabled()) {
282                    LOG.debug("Received " + messages + " message(s)");
283                }
284            }
285        }
286    
287        protected void writeMessageResponse(PrintWriter writer, Message message) throws JMSException, IOException {
288            if (message instanceof TextMessage) {
289                TextMessage textMsg = (TextMessage)message;
290                String txt = textMsg.getText();
291                if (txt != null) {
292                    if (txt.startsWith("<?")) {
293                        txt = txt.substring(txt.indexOf("?>") + 2);
294                    }
295                    writer.print(txt);
296                }
297            } else if (message instanceof ObjectMessage) {
298                ObjectMessage objectMsg = (ObjectMessage)message;
299                Object object = objectMsg.getObject();
300                if (object != null) {
301                    writer.print(object.toString());
302                }
303            }
304        }
305    
306        protected boolean isXmlContent(Message message) throws JMSException {
307            if (message instanceof TextMessage) {
308                TextMessage textMsg = (TextMessage)message;
309                String txt = textMsg.getText();
310                if (txt != null) {
311                    // assume its xml when it starts with <
312                    if (txt.startsWith("<")) {
313                        return true;
314                    }
315                }
316            }
317            // for any other kind of messages we dont assume xml
318            return false;
319        }
320    
321        public WebClient getWebClient(HttpServletRequest request) {
322            String clientId = request.getParameter("clientId");
323            if (clientId != null) {
324                synchronized(this) {
325                    LOG.debug("Getting local client [" + clientId + "]");
326                    WebClient client = clients.get(clientId);
327                    if (client == null) {
328                        LOG.debug("Creating new client [" + clientId + "]");
329                        client = new WebClient();
330                        clients.put(clientId, client);
331                    }
332                    return client;
333                }
334    
335            } else {
336                return WebClient.getWebClient(request);
337            }
338        }
339    
340        protected String getContentType(HttpServletRequest request) {
341            String value = request.getParameter("xml");
342            if (value != null && "true".equalsIgnoreCase(value)) {
343                return "text/xml";
344            }
345            value = request.getParameter("json");
346            if (value != null && "true".equalsIgnoreCase(value)) {
347                return "application/json";
348            }
349            return null;
350        }
351    
352        @SuppressWarnings("rawtypes")
353        protected void setResponseHeaders(HttpServletResponse response, Message message) throws JMSException {
354            response.setHeader("destination", message.getJMSDestination().toString());
355            response.setHeader("id", message.getJMSMessageID());
356    
357            // Return JMS properties as header values.
358            for(Enumeration names = message.getPropertyNames(); names.hasMoreElements();) {
359                String name = (String) names.nextElement();
360                response.setHeader(name , message.getObjectProperty(name).toString());
361            }
362        }
363    
364        /**
365         * @return the timeout value for read requests which is always >= 0 and <=
366         *         maximumReadTimeout to avoid DoS attacks
367         */
368        protected long getReadTimeout(HttpServletRequest request) {
369            long answer = defaultReadTimeout;
370    
371            String name = request.getParameter(readTimeoutParameter);
372            if (name != null) {
373                answer = asLong(name);
374            }
375            if (answer < 0 || answer > maximumReadTimeout) {
376                answer = maximumReadTimeout;
377            }
378            return answer;
379        }
380    
381        /*
382         * Listen for available messages and wakeup any continuations.
383         */
384        private static class Listener implements MessageAvailableListener {
385            MessageConsumer consumer;
386            Continuation continuation;
387    
388            Listener(MessageConsumer consumer) {
389                this.consumer = consumer;
390            }
391    
392            public void setContinuation(Continuation continuation) {
393                synchronized (consumer) {
394                    this.continuation = continuation;
395                }
396            }
397    
398            public void onMessageAvailable(MessageConsumer consumer) {
399                assert this.consumer == consumer;
400    
401                synchronized (this.consumer) {
402                    if (continuation != null) {
403                        try {
404                            Message message = consumer.receiveNoWait();
405                            continuation.setAttribute("message", message);
406                        } catch (Exception e) {
407                            LOG.error("Error receiving message " + e, e);
408                        }
409                        continuation.resume();
410                    }
411                }
412            }
413        }
414    
415    }