001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.http;
018    
019    import java.io.DataInputStream;
020    import java.io.IOException;
021    import java.io.InterruptedIOException;
022    import java.net.URI;
023    import java.util.zip.GZIPInputStream;
024    import java.util.zip.GZIPOutputStream;
025    
026    import org.apache.activemq.command.ShutdownInfo;
027    import org.apache.activemq.transport.FutureResponse;
028    import org.apache.activemq.transport.util.TextWireFormat;
029    import org.apache.activemq.util.ByteArrayOutputStream;
030    import org.apache.activemq.util.IOExceptionSupport;
031    import org.apache.activemq.util.IdGenerator;
032    import org.apache.activemq.util.ServiceStopper;
033    import org.apache.http.Header;
034    import org.apache.http.HttpHost;
035    import org.apache.http.HttpRequest;
036    import org.apache.http.HttpRequestInterceptor;
037    import org.apache.http.HttpResponse;
038    import org.apache.http.HttpStatus;
039    import org.apache.http.auth.AuthScope;
040    import org.apache.http.auth.UsernamePasswordCredentials;
041    import org.apache.http.client.HttpClient;
042    import org.apache.http.client.HttpResponseException;
043    import org.apache.http.client.ResponseHandler;
044    import org.apache.http.client.methods.HttpGet;
045    import org.apache.http.client.methods.HttpHead;
046    import org.apache.http.client.methods.HttpOptions;
047    import org.apache.http.client.methods.HttpPost;
048    import org.apache.http.conn.params.ConnRoutePNames;
049    import org.apache.http.entity.ByteArrayEntity;
050    import org.apache.http.impl.client.BasicResponseHandler;
051    import org.apache.http.impl.client.DefaultHttpClient;
052    import org.apache.http.impl.conn.PoolingClientConnectionManager;
053    import org.apache.http.message.AbstractHttpMessage;
054    import org.apache.http.params.HttpConnectionParams;
055    import org.apache.http.params.HttpParams;
056    import org.apache.http.protocol.HttpContext;
057    import org.apache.http.util.EntityUtils;
058    import org.slf4j.Logger;
059    import org.slf4j.LoggerFactory;
060    
061    /**
062     * A HTTP {@link org.apache.activemq.transport.Transport} which uses the
063     * <a href="http://hc.apache.org/index.html">Apache HTTP Client</a>
064     * library
065     */
066    public class HttpClientTransport extends HttpTransportSupport {
067    
068        public static final int MAX_CLIENT_TIMEOUT = 30000;
069        private static final Logger LOG = LoggerFactory.getLogger(HttpClientTransport.class);
070        private static final IdGenerator CLIENT_ID_GENERATOR = new IdGenerator();
071    
072        private HttpClient sendHttpClient;
073        private HttpClient receiveHttpClient;
074    
075        private final String clientID = CLIENT_ID_GENERATOR.generateId();
076        private boolean trace;
077        private HttpGet httpMethod;
078        private volatile int receiveCounter;
079    
080        private int soTimeout = MAX_CLIENT_TIMEOUT;
081    
082        private boolean useCompression = false;
083        private boolean canSendCompressed = false;
084        private int minSendAsCompressedSize = 0;
085    
086        public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) {
087            super(wireFormat, remoteUrl);
088        }
089    
090        public FutureResponse asyncRequest(Object command) throws IOException {
091            return null;
092        }
093    
094        public void oneway(Object command) throws IOException {
095    
096            if (isStopped()) {
097                throw new IOException("stopped.");
098            }
099            HttpPost httpMethod = new HttpPost(getRemoteUrl().toString());
100            configureMethod(httpMethod);
101            String data = getTextWireFormat().marshalText(command);
102            byte[] bytes = data.getBytes("UTF-8");
103            if (useCompression && canSendCompressed && bytes.length > minSendAsCompressedSize) {
104                ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
105                GZIPOutputStream stream = new GZIPOutputStream(bytesOut);
106                stream.write(bytes);
107                stream.close();
108                httpMethod.addHeader("Content-Type", "application/x-gzip");
109                if (LOG.isTraceEnabled()) {
110                    LOG.trace("Sending compressed, size = " + bytes.length + ", compressed size = " + bytesOut.size());
111                }
112                bytes = bytesOut.toByteArray();
113            }
114            ByteArrayEntity entity = new ByteArrayEntity(bytes);
115            httpMethod.setEntity(entity);
116    
117            HttpClient client = null;
118            HttpResponse answer = null;
119            try {
120                client = getSendHttpClient();
121                HttpParams params = client.getParams();
122                HttpConnectionParams.setSoTimeout(params, soTimeout);
123                answer = client.execute(httpMethod);
124                int status = answer.getStatusLine().getStatusCode();
125                if (status != HttpStatus.SC_OK) {
126                    throw new IOException("Failed to post command: " + command + " as response was: " + answer);
127                }
128                if (command instanceof ShutdownInfo) {
129                    try {
130                        stop();
131                    } catch (Exception e) {
132                        LOG.warn("Error trying to stop HTTP client: "+ e, e);
133                    }
134                }
135            } catch (IOException e) {
136                throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e);
137            } finally {
138                if (answer != null) {
139                    EntityUtils.consume(answer.getEntity());
140                }
141            }
142        }
143    
144        public Object request(Object command) throws IOException {
145            return null;
146        }
147    
148        private DataInputStream createDataInputStream(HttpResponse answer) throws IOException {
149            Header encoding = answer.getEntity().getContentEncoding();
150            if (encoding != null && "gzip".equalsIgnoreCase(encoding.getValue())) {
151                return new DataInputStream(new GZIPInputStream(answer.getEntity().getContent()));
152            } else {
153                return new DataInputStream(answer.getEntity().getContent());
154            }
155        }
156    
157        public void run() {
158    
159            if (LOG.isTraceEnabled()) {
160                LOG.trace("HTTP GET consumer thread starting: " + this);
161            }
162            HttpClient httpClient = getReceiveHttpClient();
163            URI remoteUrl = getRemoteUrl();
164    
165            while (!isStopped() && !isStopping()) {
166    
167                httpMethod = new HttpGet(remoteUrl.toString());
168                configureMethod(httpMethod);
169                HttpResponse answer = null;
170    
171                try {
172                    answer = httpClient.execute(httpMethod);
173                    int status = answer.getStatusLine().getStatusCode();
174                    if (status != HttpStatus.SC_OK) {
175                        if (status == HttpStatus.SC_REQUEST_TIMEOUT) {
176                            LOG.debug("GET timed out");
177                            try {
178                                Thread.sleep(1000);
179                            } catch (InterruptedException e) {
180                                onException(new InterruptedIOException());
181                                break;
182                            }
183                        } else {
184                            onException(new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer));
185                            break;
186                        }
187                    } else {
188                        receiveCounter++;
189                        DataInputStream stream = createDataInputStream(answer);
190                        Object command = (Object)getTextWireFormat().unmarshal(stream);
191                        if (command == null) {
192                            LOG.debug("Received null command from url: " + remoteUrl);
193                        } else {
194                            doConsume(command);
195                        }
196                        stream.close();
197                    }
198                } catch (IOException e) {
199                    onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl + " Reason: " + e.getMessage(), e));
200                    break;
201                } finally {
202                    if (answer != null) {
203                        try {
204                            EntityUtils.consume(answer.getEntity());
205                        } catch (IOException e) {
206                        }
207                    }
208                }
209            }
210        }
211    
212        // Properties
213        // -------------------------------------------------------------------------
214        public HttpClient getSendHttpClient() {
215            if (sendHttpClient == null) {
216                sendHttpClient = createHttpClient();
217            }
218            return sendHttpClient;
219        }
220    
221        public void setSendHttpClient(HttpClient sendHttpClient) {
222            this.sendHttpClient = sendHttpClient;
223        }
224    
225        public HttpClient getReceiveHttpClient() {
226            if (receiveHttpClient == null) {
227                receiveHttpClient = createHttpClient();
228            }
229            return receiveHttpClient;
230        }
231    
232        public void setReceiveHttpClient(HttpClient receiveHttpClient) {
233            this.receiveHttpClient = receiveHttpClient;
234        }
235    
236        // Implementation methods
237        // -------------------------------------------------------------------------
238        protected void doStart() throws Exception {
239    
240            if (LOG.isTraceEnabled()) {
241                LOG.trace("HTTP GET consumer thread starting: " + this);
242            }
243            HttpClient httpClient = getReceiveHttpClient();
244            URI remoteUrl = getRemoteUrl();
245    
246            HttpHead httpMethod = new HttpHead(remoteUrl.toString());
247            configureMethod(httpMethod);
248    
249            // Request the options from the server so we can find out if the broker we are
250            // talking to supports GZip compressed content.  If so and useCompression is on
251            // then we can compress our POST data, otherwise we must send it uncompressed to
252            // ensure backwards compatibility.
253            HttpOptions optionsMethod = new HttpOptions(remoteUrl.toString());
254            ResponseHandler<String> handler = new BasicResponseHandler() {
255                @Override
256                public String handleResponse(HttpResponse response) throws HttpResponseException, IOException {
257    
258                    for(Header header : response.getAllHeaders()) {
259                        if (header.getName().equals("Accepts-Encoding") && header.getValue().contains("gzip")) {
260                            LOG.info("Broker Servlet supports GZip compression.");
261                            canSendCompressed = true;
262                            break;
263                        }
264                    }
265    
266                    return super.handleResponse(response);
267                }
268            };
269    
270    
271            try {
272                httpClient.execute(httpMethod, new BasicResponseHandler());
273                httpClient.execute(optionsMethod, handler);
274            } catch(Exception e) {
275                throw new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + e.getMessage());
276            }
277    
278            super.doStart();
279        }
280    
281        protected void doStop(ServiceStopper stopper) throws Exception {
282            if (httpMethod != null) {
283                httpMethod.abort();
284            }
285        }
286    
287        protected HttpClient createHttpClient() {
288            DefaultHttpClient client = new DefaultHttpClient(new PoolingClientConnectionManager());
289            if (useCompression) {
290                client.addRequestInterceptor( new HttpRequestInterceptor() {
291                    @Override
292                    public void process(HttpRequest request, HttpContext context) {
293                        // We expect to received a compression response that we un-gzip
294                        request.addHeader("Accept-Encoding", "gzip");
295                    }
296                });
297            }
298            if (getProxyHost() != null) {
299                HttpHost proxy = new HttpHost(getProxyHost(), getProxyPort());
300                client.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy);
301    
302                if(getProxyUser() != null && getProxyPassword() != null) {
303                    client.getCredentialsProvider().setCredentials(
304                        new AuthScope(getProxyHost(), getProxyPort()),
305                        new UsernamePasswordCredentials(getProxyUser(), getProxyPassword()));
306                }
307            }
308            return client;
309        }
310    
311        protected void configureMethod(AbstractHttpMessage method) {
312            method.setHeader("clientID", clientID);
313        }
314    
315        public boolean isTrace() {
316            return trace;
317        }
318    
319        public void setTrace(boolean trace) {
320            this.trace = trace;
321        }
322    
323        public int getReceiveCounter() {
324            return receiveCounter;
325        }
326    
327        public int getSoTimeout() {
328            return soTimeout;
329        }
330    
331        public void setSoTimeout(int soTimeout) {
332            this.soTimeout = soTimeout;
333        }
334    
335        public void setUseCompression(boolean useCompression) {
336            this.useCompression = useCompression;
337        }
338    
339        public boolean isUseCompression() {
340            return this.useCompression;
341        }
342    
343        public int getMinSendAsCompressedSize() {
344            return minSendAsCompressedSize;
345        }
346    
347        /**
348         * Sets the minimum size that must be exceeded on a send before compression is used if
349         * the useCompression option is specified.  For very small payloads compression can be
350         * inefficient compared to the transmission size savings.
351         *
352         * Default value is 0.
353         *
354         * @param minSendAsCompressedSize
355         */
356        public void setMinSendAsCompressedSize(int minSendAsCompressedSize) {
357            this.minSendAsCompressedSize = minSendAsCompressedSize;
358        }
359    
360    }