001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.web;
019
020import java.io.IOException;
021import java.io.PrintWriter;
022import java.util.Enumeration;
023import java.util.HashMap;
024import java.util.HashSet;
025
026import javax.jms.Destination;
027import javax.jms.JMSException;
028import javax.jms.Message;
029import javax.jms.MessageConsumer;
030import javax.jms.ObjectMessage;
031import javax.jms.TextMessage;
032import javax.servlet.ServletConfig;
033import javax.servlet.ServletException;
034import javax.servlet.http.HttpServletRequest;
035import javax.servlet.http.HttpServletResponse;
036
037import org.apache.activemq.MessageAvailableConsumer;
038import org.apache.activemq.MessageAvailableListener;
039import org.eclipse.jetty.continuation.Continuation;
040import org.eclipse.jetty.continuation.ContinuationSupport;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * A servlet for sending and receiving messages to/from JMS destinations using
046 * HTTP POST for sending and HTTP GET for receiving.
047 * <p/>
048 * You can specify the destination and whether it is a topic or queue via
049 * configuration details on the servlet or as request parameters.
050 * <p/>
051 * For reading messages you can specify a readTimeout parameter to determine how
052 * long the servlet should block for.
053 *
054 * One thing to keep in mind with this solution - due to the nature of REST,
055 * there will always be a chance of losing messages. Consider what happens when
056 * a message is retrieved from the broker but the web call is interrupted before
057 * the client receives the message in the response - the message is lost.
058 */
059public class MessageServlet extends MessageServletSupport {
060
061    // its a bit pita that this servlet got intermixed with jetty continuation/rest
062    // instead of creating a special for that. We should have kept a simple servlet
063    // for good old fashioned request/response blocked communication.
064
065    private static final long serialVersionUID = 8737914695188481219L;
066
067    private static final Logger LOG = LoggerFactory.getLogger(MessageServlet.class);
068
069    private final String readTimeoutParameter = "readTimeout";
070    private final String readTimeoutRequestAtt = "xamqReadDeadline";
071    private final String oneShotParameter = "oneShot";
072    private long defaultReadTimeout = -1;
073    private long maximumReadTimeout = 20000;
074    private long requestTimeout = 1000;
075    private String defaultContentType = "application/xml";
076
077    private final HashMap<String, WebClient> clients = new HashMap<String, WebClient>();
078    private final HashSet<MessageAvailableConsumer> activeConsumers = new HashSet<MessageAvailableConsumer>();
079
080    @Override
081    public void init() throws ServletException {
082        ServletConfig servletConfig = getServletConfig();
083        String name = servletConfig.getInitParameter("defaultReadTimeout");
084        if (name != null) {
085            defaultReadTimeout = asLong(name);
086        }
087        name = servletConfig.getInitParameter("maximumReadTimeout");
088        if (name != null) {
089            maximumReadTimeout = asLong(name);
090        }
091        name = servletConfig.getInitParameter("replyTimeout");
092        if (name != null) {
093            requestTimeout = asLong(name);
094        }
095        name = servletConfig.getInitParameter("defaultContentType");
096        if (name != null) {
097            defaultContentType = name;
098        }
099    }
100
101    /**
102     * Sends a message to a destination
103     *
104     * @param request
105     * @param response
106     * @throws ServletException
107     * @throws IOException
108     */
109    @Override
110    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
111        // lets turn the HTTP post into a JMS Message
112        try {
113
114            String action = request.getParameter("action");
115            String clientId = request.getParameter("clientId");
116            if (action != null && clientId != null && action.equals("unsubscribe")) {
117                LOG.info("Unsubscribing client " + clientId);
118                WebClient client = getWebClient(request);
119                client.close();
120                clients.remove(clientId);
121                return;
122            }
123
124            WebClient client = getWebClient(request);
125
126            String text = getPostedMessageBody(request);
127
128            // lets create the destination from the URI?
129            Destination destination = getDestination(client, request);
130            if (destination == null) {
131                throw new NoDestinationSuppliedException();
132            }
133
134            if (LOG.isDebugEnabled()) {
135                LOG.debug("Sending message to: " + destination + " with text: " + text);
136            }
137
138            boolean sync = isSync(request);
139            TextMessage message = client.getSession().createTextMessage(text);
140
141            appendParametersToMessage(request, message);
142            boolean persistent = isSendPersistent(request);
143            int priority = getSendPriority(request);
144            long timeToLive = getSendTimeToLive(request);
145            client.send(destination, message, persistent, priority, timeToLive);
146
147            // lets return a unique URI for reliable messaging
148            response.setHeader("messageID", message.getJMSMessageID());
149            response.setStatus(HttpServletResponse.SC_OK);
150            response.getWriter().write("Message sent");
151
152        } catch (JMSException e) {
153            throw new ServletException("Could not post JMS message: " + e, e);
154        }
155    }
156
157    /**
158     * Supports a HTTP DELETE to be equivalent of consuming a singe message
159     * from a queue
160     */
161    @Override
162    protected void doDelete(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
163        doMessages(request, response);
164    }
165
166    /**
167     * Supports a HTTP DELETE to be equivalent of consuming a singe message
168     * from a queue
169     */
170    @Override
171    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
172        doMessages(request, response);
173    }
174
175    /**
176     * Reads a message from a destination up to some specific timeout period
177     *
178     * @param request
179     * @param response
180     * @throws ServletException
181     * @throws IOException
182     */
183    protected void doMessages(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
184        MessageAvailableConsumer consumer = null;
185
186        try {
187            WebClient client = getWebClient(request);
188            Destination destination = getDestination(client, request);
189            if (destination == null) {
190                throw new NoDestinationSuppliedException();
191            }
192            consumer = (MessageAvailableConsumer) client.getConsumer(destination, request.getHeader(WebClient.selectorName));
193            Continuation continuation = ContinuationSupport.getContinuation(request);
194
195            // Don't allow concurrent use of the consumer. Do make sure to allow
196            // subsequent calls on continuation to use the consumer.
197            if (continuation.isInitial()) {
198                synchronized (activeConsumers) {
199                    if (activeConsumers.contains(consumer)) {
200                        throw new ServletException("Concurrent access to consumer is not supported");
201                    } else {
202                        activeConsumers.add(consumer);
203                    }
204                }
205            }
206
207            Message message = null;
208
209            long deadline = getReadDeadline(request);
210            long timeout = deadline - System.currentTimeMillis();
211
212            // Set the message available listener *before* calling receive to eliminate any
213            // chance of a missed notification between the time receive() completes without
214            // a message and the time the listener is set.
215            synchronized (consumer) {
216                Listener listener = (Listener) consumer.getAvailableListener();
217                if (listener == null) {
218                    listener = new Listener(consumer);
219                    consumer.setAvailableListener(listener);
220                }
221            }
222
223            if (LOG.isDebugEnabled()) {
224                LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
225            }
226
227            // Look for any available messages (need a little timeout). Always
228            // try at least one lookup; don't block past the deadline.
229            if (timeout <= 0) {
230                message = consumer.receiveNoWait();
231            } else if (timeout < 10) {
232                message = consumer.receive(timeout);
233            } else {
234                message = consumer.receive(10);
235            }
236
237            if (message == null) {
238                handleContinuation(request, response, client, destination, consumer, deadline);
239            } else {
240                writeResponse(request, response, message);
241                closeConsumerOnOneShot(request, client, destination);
242
243                synchronized (activeConsumers) {
244                    activeConsumers.remove(consumer);
245                }
246            }
247        } catch (JMSException e) {
248            throw new ServletException("Could not post JMS message: " + e, e);
249        }
250    }
251
252    protected void handleContinuation(HttpServletRequest request, HttpServletResponse response, WebClient client, Destination destination,
253        MessageAvailableConsumer consumer, long deadline) {
254        // Get an existing Continuation or create a new one if there are no events.
255        Continuation continuation = ContinuationSupport.getContinuation(request);
256
257        long timeout = deadline - System.currentTimeMillis();
258        if ((continuation.isExpired()) || (timeout <= 0)) {
259            // Reset the continuation on the available listener for the consumer to prevent the
260            // next message receipt from being consumed without a valid, active continuation.
261            synchronized (consumer) {
262                Object obj = consumer.getAvailableListener();
263                if (obj instanceof Listener) {
264                    ((Listener) obj).setContinuation(null);
265                }
266            }
267            response.setStatus(HttpServletResponse.SC_NO_CONTENT);
268            closeConsumerOnOneShot(request, client, destination);
269            synchronized (activeConsumers) {
270                activeConsumers.remove(consumer);
271            }
272            return;
273        }
274
275        continuation.setTimeout(timeout);
276        continuation.suspend();
277
278        synchronized (consumer) {
279            Listener listener = (Listener) consumer.getAvailableListener();
280
281            // register this continuation with our listener.
282            listener.setContinuation(continuation);
283        }
284    }
285
286    protected void writeResponse(HttpServletRequest request, HttpServletResponse response, Message message) throws IOException, JMSException {
287        int messages = 0;
288        try {
289            response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate"); // HTTP
290                                                                                        // 1.1
291            response.setHeader("Pragma", "no-cache"); // HTTP 1.0
292            response.setDateHeader("Expires", 0);
293            // write a responds
294            PrintWriter writer = response.getWriter();
295
296            // handle any message(s)
297            if (message == null) {
298                // No messages so OK response of for ajax else no content.
299                response.setStatus(HttpServletResponse.SC_NO_CONTENT);
300            } else {
301                // We have at least one message so set up the response
302                messages = 1;
303
304                String type = getContentType(request);
305                if (type != null) {
306                    response.setContentType(type);
307                } else {
308                    if (isXmlContent(message)) {
309                        response.setContentType(defaultContentType);
310                    } else {
311                        response.setContentType("text/plain");
312                    }
313                }
314                response.setStatus(HttpServletResponse.SC_OK);
315
316                setResponseHeaders(response, message);
317                writeMessageResponse(writer, message);
318                writer.flush();
319            }
320        } finally {
321            if (LOG.isDebugEnabled()) {
322                LOG.debug("Received " + messages + " message(s)");
323            }
324        }
325    }
326
327    protected void writeMessageResponse(PrintWriter writer, Message message) throws JMSException, IOException {
328        if (message instanceof TextMessage) {
329            TextMessage textMsg = (TextMessage) message;
330            String txt = textMsg.getText();
331            if (txt != null) {
332                if (txt.startsWith("<?")) {
333                    txt = txt.substring(txt.indexOf("?>") + 2);
334                }
335                writer.print(txt);
336            }
337        } else if (message instanceof ObjectMessage) {
338            ObjectMessage objectMsg = (ObjectMessage) message;
339            Object object = objectMsg.getObject();
340            if (object != null) {
341                writer.print(object.toString());
342            }
343        }
344    }
345
346    protected boolean isXmlContent(Message message) throws JMSException {
347        if (message instanceof TextMessage) {
348            TextMessage textMsg = (TextMessage) message;
349            String txt = textMsg.getText();
350            if (txt != null) {
351                // assume its xml when it starts with <
352                if (txt.startsWith("<")) {
353                    return true;
354                }
355            }
356        }
357        // for any other kind of messages we dont assume xml
358        return false;
359    }
360
361    public WebClient getWebClient(HttpServletRequest request) {
362        String clientId = request.getParameter("clientId");
363        if (clientId != null) {
364            synchronized (this) {
365                LOG.debug("Getting local client [" + clientId + "]");
366                WebClient client = clients.get(clientId);
367                if (client == null) {
368                    LOG.debug("Creating new client [" + clientId + "]");
369                    client = new WebClient();
370                    clients.put(clientId, client);
371                }
372                return client;
373            }
374
375        } else {
376            return WebClient.getWebClient(request);
377        }
378    }
379
380    protected String getContentType(HttpServletRequest request) {
381        String value = request.getParameter("xml");
382        if (value != null && "true".equalsIgnoreCase(value)) {
383            return "application/xml";
384        }
385        value = request.getParameter("json");
386        if (value != null && "true".equalsIgnoreCase(value)) {
387            return "application/json";
388        }
389        return null;
390    }
391
392    @SuppressWarnings("rawtypes")
393    protected void setResponseHeaders(HttpServletResponse response, Message message) throws JMSException {
394        response.setHeader("destination", message.getJMSDestination().toString());
395        response.setHeader("id", message.getJMSMessageID());
396
397        // Return JMS properties as header values.
398        for (Enumeration names = message.getPropertyNames(); names.hasMoreElements();) {
399            String name = (String) names.nextElement();
400            response.setHeader(name, message.getObjectProperty(name).toString());
401        }
402    }
403
404    /**
405     * @return the timeout value for read requests which is always >= 0 and <=
406     *         maximumReadTimeout to avoid DoS attacks
407     */
408    protected long getReadDeadline(HttpServletRequest request) {
409        Long answer;
410
411        answer = (Long) request.getAttribute(readTimeoutRequestAtt);
412
413        if (answer == null) {
414            long timeout = defaultReadTimeout;
415            String name = request.getParameter(readTimeoutParameter);
416            if (name != null) {
417                timeout = asLong(name);
418            }
419            if (timeout < 0 || timeout > maximumReadTimeout) {
420                timeout = maximumReadTimeout;
421            }
422
423            answer = Long.valueOf(System.currentTimeMillis() + timeout);
424        }
425        return answer.longValue();
426    }
427
428    /**
429     * Close the consumer if one-shot mode is used on the given request.
430     */
431    protected void closeConsumerOnOneShot(HttpServletRequest request, WebClient client, Destination dest) {
432        if (asBoolean(request.getParameter(oneShotParameter), false)) {
433            try {
434                client.closeConsumer(dest);
435            } catch (JMSException jms_exc) {
436                LOG.warn("JMS exception on closing consumer after request with one-shot mode", jms_exc);
437            }
438        }
439    }
440
441    /*
442     * Listen for available messages and wakeup any continuations.
443     */
444    private static class Listener implements MessageAvailableListener {
445        MessageConsumer consumer;
446        Continuation continuation;
447
448        Listener(MessageConsumer consumer) {
449            this.consumer = consumer;
450        }
451
452        public void setContinuation(Continuation continuation) {
453            synchronized (consumer) {
454                this.continuation = continuation;
455            }
456        }
457
458        @Override
459        public void onMessageAvailable(MessageConsumer consumer) {
460            assert this.consumer == consumer;
461
462            ((MessageAvailableConsumer) consumer).setAvailableListener(null);
463
464            synchronized (this.consumer) {
465                if (continuation != null) {
466                    continuation.resume();
467                }
468            }
469        }
470    }
471}