001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.http;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.net.URI;
023import java.security.cert.X509Certificate;
024import java.util.zip.GZIPInputStream;
025import java.util.zip.GZIPOutputStream;
026
027import org.apache.activemq.command.ShutdownInfo;
028import org.apache.activemq.transport.FutureResponse;
029import org.apache.activemq.transport.util.TextWireFormat;
030import org.apache.activemq.util.ByteArrayOutputStream;
031import org.apache.activemq.util.IOExceptionSupport;
032import org.apache.activemq.util.IdGenerator;
033import org.apache.activemq.util.ServiceStopper;
034import org.apache.activemq.wireformat.WireFormat;
035import org.apache.http.Header;
036import org.apache.http.HttpHost;
037import org.apache.http.HttpRequest;
038import org.apache.http.HttpRequestInterceptor;
039import org.apache.http.HttpResponse;
040import org.apache.http.HttpStatus;
041import org.apache.http.auth.AuthScope;
042import org.apache.http.auth.UsernamePasswordCredentials;
043import org.apache.http.client.HttpClient;
044import org.apache.http.client.HttpResponseException;
045import org.apache.http.client.ResponseHandler;
046import org.apache.http.client.methods.HttpGet;
047import org.apache.http.client.methods.HttpHead;
048import org.apache.http.client.methods.HttpOptions;
049import org.apache.http.client.methods.HttpPost;
050import org.apache.http.conn.ClientConnectionManager;
051import org.apache.http.conn.params.ConnRoutePNames;
052import org.apache.http.conn.scheme.PlainSocketFactory;
053import org.apache.http.conn.scheme.Scheme;
054import org.apache.http.entity.ByteArrayEntity;
055import org.apache.http.impl.client.BasicResponseHandler;
056import org.apache.http.impl.client.DefaultHttpClient;
057import org.apache.http.impl.conn.PoolingClientConnectionManager;
058import org.apache.http.message.AbstractHttpMessage;
059import org.apache.http.params.HttpConnectionParams;
060import org.apache.http.params.HttpParams;
061import org.apache.http.protocol.HttpContext;
062import org.apache.http.util.EntityUtils;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066/**
067 * A HTTP {@link org.apache.activemq.transport.Transport} which uses the
068 * <a href="http://hc.apache.org/index.html">Apache HTTP Client</a>
069 * library
070 */
071public class HttpClientTransport extends HttpTransportSupport {
072
073    public static final int MAX_CLIENT_TIMEOUT = 30000;
074    private static final Logger LOG = LoggerFactory.getLogger(HttpClientTransport.class);
075    private static final IdGenerator CLIENT_ID_GENERATOR = new IdGenerator();
076
077    private HttpClient sendHttpClient;
078    private HttpClient receiveHttpClient;
079
080    private final String clientID = CLIENT_ID_GENERATOR.generateId();
081    private boolean trace;
082    private HttpGet httpMethod;
083    private volatile int receiveCounter;
084
085    private int soTimeout = MAX_CLIENT_TIMEOUT;
086
087    private boolean useCompression = false;
088    protected boolean canSendCompressed = false;
089    private int minSendAsCompressedSize = 0;
090
091    public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) {
092        super(wireFormat, remoteUrl);
093    }
094
095    public FutureResponse asyncRequest(Object command) throws IOException {
096        return null;
097    }
098
099    @Override
100    public void oneway(Object command) throws IOException {
101
102        if (isStopped()) {
103            throw new IOException("stopped.");
104        }
105        HttpPost httpMethod = new HttpPost(getRemoteUrl().toString());
106        configureMethod(httpMethod);
107        String data = getTextWireFormat().marshalText(command);
108        byte[] bytes = data.getBytes("UTF-8");
109        if (useCompression && canSendCompressed && bytes.length > minSendAsCompressedSize) {
110            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
111            GZIPOutputStream stream = new GZIPOutputStream(bytesOut);
112            stream.write(bytes);
113            stream.close();
114            httpMethod.addHeader("Content-Type", "application/x-gzip");
115            if (LOG.isTraceEnabled()) {
116                LOG.trace("Sending compressed, size = " + bytes.length + ", compressed size = " + bytesOut.size());
117            }
118            bytes = bytesOut.toByteArray();
119        }
120        ByteArrayEntity entity = new ByteArrayEntity(bytes);
121        httpMethod.setEntity(entity);
122
123        HttpClient client = null;
124        HttpResponse answer = null;
125        try {
126            client = getSendHttpClient();
127            answer = client.execute(httpMethod);
128            int status = answer.getStatusLine().getStatusCode();
129            if (status != HttpStatus.SC_OK) {
130                throw new IOException("Failed to post command: " + command + " as response was: " + answer);
131            }
132            if (command instanceof ShutdownInfo) {
133                try {
134                    stop();
135                } catch (Exception e) {
136                    LOG.warn("Error trying to stop HTTP client: "+ e, e);
137                }
138            }
139        } catch (IOException e) {
140            throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e);
141        } finally {
142            if (answer != null) {
143                EntityUtils.consume(answer.getEntity());
144            }
145        }
146    }
147
148    @Override
149    public Object request(Object command) throws IOException {
150        return null;
151    }
152
153    private DataInputStream createDataInputStream(HttpResponse answer) throws IOException {
154        Header encoding = answer.getEntity().getContentEncoding();
155        if (encoding != null && "gzip".equalsIgnoreCase(encoding.getValue())) {
156            return new DataInputStream(new GZIPInputStream(answer.getEntity().getContent()));
157        } else {
158            return new DataInputStream(answer.getEntity().getContent());
159        }
160    }
161
162    @Override
163    public void run() {
164
165        if (LOG.isTraceEnabled()) {
166            LOG.trace("HTTP GET consumer thread starting: " + this);
167        }
168        HttpClient httpClient = getReceiveHttpClient();
169        URI remoteUrl = getRemoteUrl();
170
171        while (!isStopped() && !isStopping()) {
172
173            httpMethod = new HttpGet(remoteUrl.toString());
174            configureMethod(httpMethod);
175            HttpResponse answer = null;
176
177            try {
178                answer = httpClient.execute(httpMethod);
179                int status = answer.getStatusLine().getStatusCode();
180                if (status != HttpStatus.SC_OK) {
181                    if (status == HttpStatus.SC_REQUEST_TIMEOUT) {
182                        LOG.debug("GET timed out");
183                        try {
184                            Thread.sleep(1000);
185                        } catch (InterruptedException e) {
186                            onException(new InterruptedIOException());
187                            Thread.currentThread().interrupt();
188                            break;
189                        }
190                    } else {
191                        onException(new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer));
192                        break;
193                    }
194                } else {
195                    receiveCounter++;
196                    DataInputStream stream = createDataInputStream(answer);
197                    Object command = getTextWireFormat().unmarshal(stream);
198                    if (command == null) {
199                        LOG.debug("Received null command from url: " + remoteUrl);
200                    } else {
201                        doConsume(command);
202                    }
203                    stream.close();
204                }
205            } catch (IOException e) {
206                onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl + " Reason: " + e.getMessage(), e));
207                break;
208            } finally {
209                if (answer != null) {
210                    try {
211                        EntityUtils.consume(answer.getEntity());
212                    } catch (IOException e) {
213                    }
214                }
215            }
216        }
217    }
218
219    // Properties
220    // -------------------------------------------------------------------------
221    public HttpClient getSendHttpClient() {
222        if (sendHttpClient == null) {
223            sendHttpClient = createHttpClient();
224        }
225        return sendHttpClient;
226    }
227
228    public void setSendHttpClient(HttpClient sendHttpClient) {
229        this.sendHttpClient = sendHttpClient;
230    }
231
232    public HttpClient getReceiveHttpClient() {
233        if (receiveHttpClient == null) {
234            receiveHttpClient = createHttpClient();
235        }
236        return receiveHttpClient;
237    }
238
239    public void setReceiveHttpClient(HttpClient receiveHttpClient) {
240        this.receiveHttpClient = receiveHttpClient;
241    }
242
243    // Implementation methods
244    // -------------------------------------------------------------------------
245    @Override
246    protected void doStart() throws Exception {
247
248        if (LOG.isTraceEnabled()) {
249            LOG.trace("HTTP GET consumer thread starting: " + this);
250        }
251        HttpClient httpClient = getReceiveHttpClient();
252        URI remoteUrl = getRemoteUrl();
253
254        HttpHead httpMethod = new HttpHead(remoteUrl.toString());
255        configureMethod(httpMethod);
256
257        // Request the options from the server so we can find out if the broker we are
258        // talking to supports GZip compressed content.  If so and useCompression is on
259        // then we can compress our POST data, otherwise we must send it uncompressed to
260        // ensure backwards compatibility.
261        HttpOptions optionsMethod = new HttpOptions(remoteUrl.toString());
262        ResponseHandler<String> handler = new BasicResponseHandler() {
263            @Override
264            public String handleResponse(HttpResponse response) throws HttpResponseException, IOException {
265
266                for(Header header : response.getAllHeaders()) {
267                    if (header.getName().equals("Accepts-Encoding") && header.getValue().contains("gzip")) {
268                        LOG.info("Broker Servlet supports GZip compression.");
269                        canSendCompressed = true;
270                        break;
271                    }
272                }
273
274                return super.handleResponse(response);
275            }
276        };
277
278        try {
279            httpClient.execute(httpMethod, new BasicResponseHandler());
280            httpClient.execute(optionsMethod, handler);
281        } catch(Exception e) {
282            throw new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + e.getMessage());
283        }
284
285        super.doStart();
286    }
287
288    @Override
289    protected void doStop(ServiceStopper stopper) throws Exception {
290        if (httpMethod != null) {
291            // In some versions of the JVM a race between the httpMethod and the completion
292            // of the method when using HTTPS can lead to a deadlock.  This hack attempts to
293            // detect that and interrupt the thread that's locked so that they can complete
294            // on another attempt.
295            for (int i = 0; i < 3; ++i) {
296                Thread abortThread = new Thread(new Runnable() {
297
298                    @Override
299                    public void run() {
300                        try {
301                            httpMethod.abort();
302                        } catch (Exception e) {
303                        }
304                    }
305                });
306
307                abortThread.start();
308                abortThread.join(2000);
309                if (abortThread.isAlive() && !httpMethod.isAborted()) {
310                    abortThread.interrupt();
311                }
312            }
313        }
314    }
315
316    protected HttpClient createHttpClient() {
317        DefaultHttpClient client = new DefaultHttpClient(createClientConnectionManager());
318        if (useCompression) {
319            client.addRequestInterceptor( new HttpRequestInterceptor() {
320                @Override
321                public void process(HttpRequest request, HttpContext context) {
322                    // We expect to received a compression response that we un-gzip
323                    request.addHeader("Accept-Encoding", "gzip");
324                }
325            });
326        }
327        if (getProxyHost() != null) {
328            HttpHost proxy = new HttpHost(getProxyHost(), getProxyPort());
329            client.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy);
330
331            if (client.getConnectionManager().getSchemeRegistry().get("http") == null) {
332                client.getConnectionManager().getSchemeRegistry().register(
333                    new Scheme("http", getProxyPort(), PlainSocketFactory.getSocketFactory()));
334            }
335
336            if(getProxyUser() != null && getProxyPassword() != null) {
337                client.getCredentialsProvider().setCredentials(
338                    new AuthScope(getProxyHost(), getProxyPort()),
339                    new UsernamePasswordCredentials(getProxyUser(), getProxyPassword()));
340            }
341        }
342
343        HttpParams params = client.getParams();
344        HttpConnectionParams.setSoTimeout(params, soTimeout);
345
346        return client;
347    }
348
349    protected ClientConnectionManager createClientConnectionManager() {
350        return new PoolingClientConnectionManager();
351    }
352
353    protected void configureMethod(AbstractHttpMessage method) {
354        method.setHeader("clientID", clientID);
355    }
356
357    public boolean isTrace() {
358        return trace;
359    }
360
361    public void setTrace(boolean trace) {
362        this.trace = trace;
363    }
364
365    @Override
366    public int getReceiveCounter() {
367        return receiveCounter;
368    }
369
370    public int getSoTimeout() {
371        return soTimeout;
372    }
373
374    public void setSoTimeout(int soTimeout) {
375        this.soTimeout = soTimeout;
376    }
377
378    public void setUseCompression(boolean useCompression) {
379        this.useCompression = useCompression;
380    }
381
382    public boolean isUseCompression() {
383        return this.useCompression;
384    }
385
386    public int getMinSendAsCompressedSize() {
387        return minSendAsCompressedSize;
388    }
389
390    /**
391     * Sets the minimum size that must be exceeded on a send before compression is used if
392     * the useCompression option is specified.  For very small payloads compression can be
393     * inefficient compared to the transmission size savings.
394     *
395     * Default value is 0.
396     *
397     * @param minSendAsCompressedSize
398     */
399    public void setMinSendAsCompressedSize(int minSendAsCompressedSize) {
400        this.minSendAsCompressedSize = minSendAsCompressedSize;
401    }
402
403    @Override
404    public X509Certificate[] getPeerCertificates() {
405        return null;
406    }
407
408    @Override
409    public void setPeerCertificates(X509Certificate[] certificates) {
410    }
411
412    @Override
413    public WireFormat getWireFormat() {
414        return getTextWireFormat();
415    }
416}