001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.http;
018
019 import java.io.BufferedReader;
020 import java.io.DataOutputStream;
021 import java.io.IOException;
022 import java.io.InputStream;
023 import java.io.InputStreamReader;
024 import java.util.HashMap;
025 import java.util.concurrent.ConcurrentHashMap;
026 import java.util.concurrent.ConcurrentMap;
027 import java.util.concurrent.LinkedBlockingQueue;
028 import java.util.concurrent.TimeUnit;
029 import java.util.zip.GZIPInputStream;
030 import javax.servlet.ServletException;
031 import javax.servlet.http.HttpServlet;
032 import javax.servlet.http.HttpServletRequest;
033 import javax.servlet.http.HttpServletResponse;
034
035 import org.apache.activemq.Service;
036 import org.apache.activemq.command.Command;
037 import org.apache.activemq.command.WireFormatInfo;
038 import org.apache.activemq.transport.Transport;
039 import org.apache.activemq.transport.TransportAcceptListener;
040 import org.apache.activemq.transport.util.TextWireFormat;
041 import org.apache.activemq.transport.xstream.XStreamWireFormat;
042 import org.apache.activemq.util.IOExceptionSupport;
043 import org.apache.activemq.util.ServiceListener;
044 import org.slf4j.Logger;
045 import org.slf4j.LoggerFactory;
046
047 /**
048 * A servlet which handles server side HTTP transport, delegating to the
049 * ActiveMQ broker. This servlet is designed for being embedded inside an
050 * ActiveMQ Broker using an embedded Jetty or Tomcat instance.
051 */
052 public class HttpTunnelServlet extends HttpServlet {
053 private static final long serialVersionUID = -3826714430767484333L;
054 private static final Logger LOG = LoggerFactory.getLogger(HttpTunnelServlet.class);
055
056 private TransportAcceptListener listener;
057 private HttpTransportFactory transportFactory;
058 private TextWireFormat wireFormat;
059 private ConcurrentMap<String, BlockingQueueTransport> clients = new ConcurrentHashMap<String, BlockingQueueTransport>();
060 private final long requestTimeout = 30000L;
061 private HashMap<String, Object> transportOptions;
062
063 @SuppressWarnings("unchecked")
064 @Override
065 public void init() throws ServletException {
066 super.init();
067 listener = (TransportAcceptListener)getServletContext().getAttribute("acceptListener");
068 if (listener == null) {
069 throw new ServletException("No such attribute 'acceptListener' available in the ServletContext");
070 }
071 transportFactory = (HttpTransportFactory)getServletContext().getAttribute("transportFactory");
072 if (transportFactory == null) {
073 throw new ServletException("No such attribute 'transportFactory' available in the ServletContext");
074 }
075 transportOptions = (HashMap<String, Object>)getServletContext().getAttribute("transportOptions");
076 wireFormat = (TextWireFormat)getServletContext().getAttribute("wireFormat");
077 if (wireFormat == null) {
078 wireFormat = createWireFormat();
079 }
080 }
081
082 @Override
083 protected void doOptions(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
084 response.addHeader("Accepts-Encoding", "gzip");
085 super.doOptions(request, response);
086 }
087
088 @Override
089 protected void doHead(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
090 createTransportChannel(request, response);
091 }
092
093 @Override
094 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
095 // lets return the next response
096 Command packet = null;
097 int count = 0;
098 try {
099 BlockingQueueTransport transportChannel = getTransportChannel(request, response);
100 if (transportChannel == null) {
101 return;
102 }
103
104 packet = (Command)transportChannel.getQueue().poll(requestTimeout, TimeUnit.MILLISECONDS);
105
106 DataOutputStream stream = new DataOutputStream(response.getOutputStream());
107 wireFormat.marshal(packet, stream);
108 count++;
109 } catch (InterruptedException ignore) {
110 }
111
112 if (count == 0) {
113 response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT);
114 }
115 }
116
117 @Override
118 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
119
120 InputStream stream = request.getInputStream();
121 String contentType = request.getContentType();
122 if (contentType != null && contentType.equals("application/x-gzip")) {
123 stream = new GZIPInputStream(stream);
124 }
125
126 // Read the command directly from the reader, assuming UTF8 encoding
127 Command command = (Command) wireFormat.unmarshalText(new InputStreamReader(stream, "UTF-8"));
128
129 if (command instanceof WireFormatInfo) {
130 WireFormatInfo info = (WireFormatInfo) command;
131 if (!canProcessWireFormatVersion(info.getVersion())) {
132 response.sendError(HttpServletResponse.SC_NOT_FOUND, "Cannot process wire format of version: "
133 + info.getVersion());
134 }
135
136 } else {
137
138 BlockingQueueTransport transport = getTransportChannel(request, response);
139 if (transport == null) {
140 return;
141 }
142
143 transport.doConsume(command);
144 }
145 }
146
147 private boolean canProcessWireFormatVersion(int version) {
148 return true;
149 }
150
151 protected String readRequestBody(HttpServletRequest request) throws IOException {
152 StringBuffer buffer = new StringBuffer();
153 BufferedReader reader = request.getReader();
154 while (true) {
155 String line = reader.readLine();
156 if (line == null) {
157 break;
158 } else {
159 buffer.append(line);
160 buffer.append("\n");
161 }
162 }
163 return buffer.toString();
164 }
165
166 protected BlockingQueueTransport getTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException {
167 String clientID = request.getHeader("clientID");
168 if (clientID == null) {
169 response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified");
170 LOG.warn("No clientID header specified");
171 return null;
172 }
173 BlockingQueueTransport answer = clients.get(clientID);
174 if (answer == null) {
175 LOG.warn("The clientID header specified is invalid. Client sesion has not yet been established for it: " + clientID);
176 return null;
177 }
178 return answer;
179 }
180
181 protected BlockingQueueTransport createTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException {
182 final String clientID = request.getHeader("clientID");
183
184 if (clientID == null) {
185 response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified");
186 LOG.warn("No clientID header specified");
187 return null;
188 }
189
190 // Optimistically create the client's transport; this transport may be thrown away if the client has already registered.
191 BlockingQueueTransport answer = createTransportChannel();
192
193 // Record the client's transport and ensure that it has not already registered; this is thread-safe and only allows one
194 // thread to register the client
195 if (clients.putIfAbsent(clientID, answer) != null) {
196 response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID '" + clientID + "' has already been established");
197 LOG.warn("A session for clientID '" + clientID + "' has already been established");
198 return null;
199 }
200
201 // Ensure that the client's transport is cleaned up when no longer
202 // needed.
203 answer.addServiceListener(new ServiceListener() {
204 public void started(Service service) {
205 // Nothing to do.
206 }
207
208 public void stopped(Service service) {
209 clients.remove(clientID);
210 }
211 });
212
213 // Configure the transport with any additional properties or filters. Although the returned transport is not explicitly
214 // persisted, if it is a filter (e.g., InactivityMonitor) it will be linked to the client's transport as a TransportListener
215 // and not GC'd until the client's transport is disposed.
216 Transport transport = answer;
217 try {
218 // Preserve the transportOptions for future use by making a copy before applying (they are removed when applied).
219 HashMap<String, Object> options = new HashMap<String, Object>(transportOptions);
220 transport = transportFactory.serverConfigure(answer, null, options);
221 } catch (Exception e) {
222 throw IOExceptionSupport.create(e);
223 }
224
225 // Wait for the transport to be connected or disposed.
226 listener.onAccept(transport);
227 while (!transport.isConnected() && !transport.isDisposed()) {
228 try {
229 Thread.sleep(100);
230 } catch (InterruptedException ignore) {
231 }
232 }
233
234 // Ensure that the transport was not prematurely disposed.
235 if (transport.isDisposed()) {
236 response.sendError(HttpServletResponse.SC_BAD_REQUEST, "The session for clientID '" + clientID + "' was prematurely disposed");
237 LOG.warn("The session for clientID '" + clientID + "' was prematurely disposed");
238 return null;
239 }
240
241 return answer;
242 }
243
244 protected BlockingQueueTransport createTransportChannel() {
245 return new BlockingQueueTransport(new LinkedBlockingQueue<Object>());
246 }
247
248 protected TextWireFormat createWireFormat() {
249 return new XStreamWireFormat();
250 }
251 }