001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.http;
018    
019    import java.io.BufferedReader;
020    import java.io.DataOutputStream;
021    import java.io.IOException;
022    import java.io.InputStream;
023    import java.io.InputStreamReader;
024    import java.util.HashMap;
025    import java.util.concurrent.ConcurrentHashMap;
026    import java.util.concurrent.ConcurrentMap;
027    import java.util.concurrent.LinkedBlockingQueue;
028    import java.util.concurrent.TimeUnit;
029    import java.util.zip.GZIPInputStream;
030    import javax.servlet.ServletException;
031    import javax.servlet.http.HttpServlet;
032    import javax.servlet.http.HttpServletRequest;
033    import javax.servlet.http.HttpServletResponse;
034    
035    import org.apache.activemq.Service;
036    import org.apache.activemq.command.Command;
037    import org.apache.activemq.command.WireFormatInfo;
038    import org.apache.activemq.transport.Transport;
039    import org.apache.activemq.transport.TransportAcceptListener;
040    import org.apache.activemq.transport.util.TextWireFormat;
041    import org.apache.activemq.transport.xstream.XStreamWireFormat;
042    import org.apache.activemq.util.IOExceptionSupport;
043    import org.apache.activemq.util.ServiceListener;
044    import org.slf4j.Logger;
045    import org.slf4j.LoggerFactory;
046    
047    /**
048     * A servlet which handles server side HTTP transport, delegating to the
049     * ActiveMQ broker. This servlet is designed for being embedded inside an
050     * ActiveMQ Broker using an embedded Jetty or Tomcat instance.
051     */
052    public class HttpTunnelServlet extends HttpServlet {
053        private static final long serialVersionUID = -3826714430767484333L;
054        private static final Logger LOG = LoggerFactory.getLogger(HttpTunnelServlet.class);
055    
056        private TransportAcceptListener listener;
057        private HttpTransportFactory transportFactory;
058        private TextWireFormat wireFormat;
059        private ConcurrentMap<String, BlockingQueueTransport> clients = new ConcurrentHashMap<String, BlockingQueueTransport>();
060        private final long requestTimeout = 30000L;
061        private HashMap<String, Object> transportOptions;
062    
063        @SuppressWarnings("unchecked")
064        @Override
065        public void init() throws ServletException {
066            super.init();
067            listener = (TransportAcceptListener)getServletContext().getAttribute("acceptListener");
068            if (listener == null) {
069                throw new ServletException("No such attribute 'acceptListener' available in the ServletContext");
070            }
071            transportFactory = (HttpTransportFactory)getServletContext().getAttribute("transportFactory");
072            if (transportFactory == null) {
073                throw new ServletException("No such attribute 'transportFactory' available in the ServletContext");
074            }
075            transportOptions = (HashMap<String, Object>)getServletContext().getAttribute("transportOptions");
076            wireFormat = (TextWireFormat)getServletContext().getAttribute("wireFormat");
077            if (wireFormat == null) {
078                wireFormat = createWireFormat();
079            }
080        }
081    
082        @Override
083        protected void doOptions(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
084            response.addHeader("Accepts-Encoding", "gzip");
085            super.doOptions(request, response);
086        }
087    
088        @Override
089        protected void doHead(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
090            createTransportChannel(request, response);
091        }
092    
093        @Override
094        protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
095            // lets return the next response
096            Command packet = null;
097            int count = 0;
098            try {
099                BlockingQueueTransport transportChannel = getTransportChannel(request, response);
100                if (transportChannel == null) {
101                    return;
102                }
103    
104                packet = (Command)transportChannel.getQueue().poll(requestTimeout, TimeUnit.MILLISECONDS);
105    
106                DataOutputStream stream = new DataOutputStream(response.getOutputStream());
107                wireFormat.marshal(packet, stream);
108                count++;
109            } catch (InterruptedException ignore) {
110            }
111    
112            if (count == 0) {
113                response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT);
114            }
115        }
116    
117        @Override
118        protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
119    
120            InputStream stream = request.getInputStream();
121            String contentType = request.getContentType();
122            if (contentType != null && contentType.equals("application/x-gzip")) {
123                stream = new GZIPInputStream(stream);
124            }
125    
126            // Read the command directly from the reader, assuming UTF8 encoding
127            Command command = (Command) wireFormat.unmarshalText(new InputStreamReader(stream, "UTF-8"));
128    
129            if (command instanceof WireFormatInfo) {
130                WireFormatInfo info = (WireFormatInfo) command;
131                if (!canProcessWireFormatVersion(info.getVersion())) {
132                    response.sendError(HttpServletResponse.SC_NOT_FOUND, "Cannot process wire format of version: "
133                            + info.getVersion());
134                }
135    
136            } else {
137    
138                BlockingQueueTransport transport = getTransportChannel(request, response);
139                if (transport == null) {
140                    return;
141                }
142    
143                transport.doConsume(command);
144            }
145        }
146    
147        private boolean canProcessWireFormatVersion(int version) {
148            return true;
149        }
150    
151        protected String readRequestBody(HttpServletRequest request) throws IOException {
152            StringBuffer buffer = new StringBuffer();
153            BufferedReader reader = request.getReader();
154            while (true) {
155                String line = reader.readLine();
156                if (line == null) {
157                    break;
158                } else {
159                    buffer.append(line);
160                    buffer.append("\n");
161                }
162            }
163            return buffer.toString();
164        }
165    
166        protected BlockingQueueTransport getTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException {
167            String clientID = request.getHeader("clientID");
168            if (clientID == null) {
169                response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified");
170                LOG.warn("No clientID header specified");
171                return null;
172            }
173            BlockingQueueTransport answer = clients.get(clientID);
174            if (answer == null) {
175                LOG.warn("The clientID header specified is invalid. Client sesion has not yet been established for it: " + clientID);
176                return null;
177            }
178            return answer;
179        }
180    
181        protected BlockingQueueTransport createTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException {
182            final String clientID = request.getHeader("clientID");
183    
184            if (clientID == null) {
185                response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified");
186                LOG.warn("No clientID header specified");
187                return null;
188            }
189    
190            // Optimistically create the client's transport; this transport may be thrown away if the client has already registered.
191            BlockingQueueTransport answer = createTransportChannel();
192    
193            // Record the client's transport and ensure that it has not already registered; this is thread-safe and only allows one
194            // thread to register the client
195            if (clients.putIfAbsent(clientID, answer) != null) {
196                response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID '" + clientID + "' has already been established");
197                LOG.warn("A session for clientID '" + clientID + "' has already been established");
198                return null;
199            }
200    
201            // Ensure that the client's transport is cleaned up when no longer
202            // needed.
203            answer.addServiceListener(new ServiceListener() {
204                public void started(Service service) {
205                    // Nothing to do.
206                }
207    
208                public void stopped(Service service) {
209                    clients.remove(clientID);
210                }
211            });
212    
213            // Configure the transport with any additional properties or filters.  Although the returned transport is not explicitly
214            // persisted, if it is a filter (e.g., InactivityMonitor) it will be linked to the client's transport as a TransportListener
215            // and not GC'd until the client's transport is disposed.
216            Transport transport = answer;
217            try {
218                // Preserve the transportOptions for future use by making a copy before applying (they are removed when applied).
219                HashMap<String, Object> options = new HashMap<String, Object>(transportOptions);
220                transport = transportFactory.serverConfigure(answer, null, options);
221            } catch (Exception e) {
222                throw IOExceptionSupport.create(e);
223            }
224    
225            // Wait for the transport to be connected or disposed.
226            listener.onAccept(transport);
227            while (!transport.isConnected() && !transport.isDisposed()) {
228                try {
229                    Thread.sleep(100);
230                } catch (InterruptedException ignore) {
231                }
232            }
233    
234            // Ensure that the transport was not prematurely disposed.
235            if (transport.isDisposed()) {
236                response.sendError(HttpServletResponse.SC_BAD_REQUEST, "The session for clientID '" + clientID + "' was prematurely disposed");
237                LOG.warn("The session for clientID '" + clientID + "' was prematurely disposed");
238                return null;
239            }
240    
241            return answer;
242        }
243    
244        protected BlockingQueueTransport createTransportChannel() {
245           return new BlockingQueueTransport(new LinkedBlockingQueue<Object>());
246        }
247    
248        protected TextWireFormat createWireFormat() {
249            return new XStreamWireFormat();
250        }
251    }