001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.web;
019    
020    import java.io.Externalizable;
021    import java.io.IOException;
022    import java.io.ObjectInput;
023    import java.io.ObjectOutput;
024    import java.util.ArrayList;
025    import java.util.HashMap;
026    import java.util.Iterator;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.concurrent.Semaphore;
030    
031    import javax.jms.Connection;
032    import javax.jms.ConnectionFactory;
033    import javax.jms.DeliveryMode;
034    import javax.jms.Destination;
035    import javax.jms.JMSException;
036    import javax.jms.Message;
037    import javax.jms.MessageConsumer;
038    import javax.jms.MessageProducer;
039    import javax.jms.Session;
040    import javax.servlet.ServletContext;
041    import javax.servlet.http.HttpServletRequest;
042    import javax.servlet.http.HttpSession;
043    import javax.servlet.http.HttpSessionActivationListener;
044    import javax.servlet.http.HttpSessionBindingEvent;
045    import javax.servlet.http.HttpSessionBindingListener;
046    import javax.servlet.http.HttpSessionEvent;
047    
048    import org.apache.activemq.ActiveMQConnectionFactory;
049    import org.apache.activemq.MessageAvailableConsumer;
050    import org.apache.activemq.camel.component.ActiveMQComponent;
051    import org.apache.activemq.camel.component.ActiveMQConfiguration;
052    import org.apache.activemq.pool.PooledConnectionFactory;
053    import org.apache.camel.CamelContext;
054    import org.apache.camel.ProducerTemplate;
055    import org.apache.camel.impl.DefaultCamelContext;
056    import org.slf4j.Logger;
057    import org.slf4j.LoggerFactory;
058    
059    /**
060     * Represents a messaging client used from inside a web container typically
061     * stored inside a HttpSession TODO controls to prevent DOS attacks with users
062     * requesting many consumers TODO configure consumers with small prefetch.
063     * 
064     *
065     *
066     */
067    public class WebClient implements HttpSessionActivationListener, HttpSessionBindingListener, Externalizable {
068    
069        public static final String WEB_CLIENT_ATTRIBUTE = "org.apache.activemq.webclient";
070        public static final String CONNECTION_FACTORY_ATTRIBUTE = "org.apache.activemq.connectionFactory";
071        public static final String CONNECTION_FACTORY_PREFETCH_PARAM = "org.apache.activemq.connectionFactory.prefetch";
072        public static final String CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM = "org.apache.activemq.connectionFactory.optimizeAck";
073        public static final String BROKER_URL_INIT_PARAM = "org.apache.activemq.brokerURL";
074        public static final String SELECTOR_NAME = "org.apache.activemq.selectorName";
075    
076        private static final Logger LOG = LoggerFactory.getLogger(WebClient.class);
077    
078        private static transient ConnectionFactory factory;
079    
080        private transient Map<Destination, MessageConsumer> consumers = new HashMap<Destination, MessageConsumer>();
081        private transient Connection connection;
082        private transient Session session;
083        private transient MessageProducer producer;
084        private int deliveryMode = DeliveryMode.NON_PERSISTENT;
085        public static String selectorName;
086    
087        private final Semaphore semaphore = new Semaphore(1);
088    
089        private CamelContext camelContext;
090        private ProducerTemplate producerTemplate;
091    
092        private String username;
093        private String password;
094    
095        public WebClient() {
096            if (factory == null) {
097                throw new IllegalStateException("initContext(ServletContext) not called");
098            }
099        }
100    
101        /**
102         * Helper method to get the client for the current session, lazily creating
103         * a client if there is none currently
104         * 
105         * @param request is the current HTTP request
106         * @return the current client or a newly creates
107         */
108        public static WebClient getWebClient(HttpServletRequest request) {
109            HttpSession session = request.getSession(true);
110            WebClient client = getWebClient(session);
111            if (client == null || client.isClosed()) {
112                client = WebClient.createWebClient(request);
113                session.setAttribute(WEB_CLIENT_ATTRIBUTE, client);
114            }
115    
116            return client;
117        }
118    
119        /**
120         * @return the web client for the current HTTP session or null if there is
121         *         not a web client created yet
122         */
123        public static WebClient getWebClient(HttpSession session) {
124            return (WebClient)session.getAttribute(WEB_CLIENT_ATTRIBUTE);
125        }
126    
127        public static void initContext(ServletContext context) {
128            initConnectionFactory(context);
129            context.setAttribute("webClients", new HashMap<String, WebClient>());
130            if (selectorName == null) {
131                selectorName = context.getInitParameter(SELECTOR_NAME);
132            }
133            if (selectorName == null) {
134                selectorName = "selector";
135            }        
136        }
137    
138        public int getDeliveryMode() {
139            return deliveryMode;
140        }
141    
142        public void setDeliveryMode(int deliveryMode) {
143            this.deliveryMode = deliveryMode;
144        }
145    
146        public String getUsername() {
147            return username;
148        }
149    
150        public void setUsername(String username) {
151            this.username = username;
152        }
153    
154        public String getPassword() {
155            return password;
156        }
157    
158        public void setPassword(String password) {
159            this.password = password;
160        }
161    
162        public synchronized void closeConsumers() {
163            for (Iterator<MessageConsumer> it = consumers.values().iterator(); it.hasNext();) {
164                MessageConsumer consumer = it.next();
165                it.remove();
166                try {
167                    consumer.setMessageListener(null);
168                    if (consumer instanceof MessageAvailableConsumer) {
169                        ((MessageAvailableConsumer)consumer).setAvailableListener(null);
170                    }
171                    consumer.close();
172                } catch (JMSException e) {
173                    LOG.debug("caught exception closing consumer", e);
174                }
175            }
176        }
177    
178        public synchronized void close() {
179            try {
180                if (consumers != null) {
181                    closeConsumers();
182                }
183                if (connection != null) {
184                    connection.close();
185                }
186                if (producerTemplate != null) {
187                    producerTemplate.stop();
188                }
189            } catch (Exception e) {
190                LOG.debug("caught exception closing consumer", e);
191            } finally {
192                producer = null;
193                session = null;
194                connection = null;
195                producerTemplate = null;
196                if (consumers != null) {
197                    consumers.clear();
198                }
199                consumers = null;
200    
201            }
202        }
203    
204        public boolean isClosed() {
205            return consumers == null;
206        }
207    
208        public void writeExternal(ObjectOutput out) throws IOException {
209            if (consumers != null) {
210                out.write(consumers.size());
211                Iterator<Destination> i = consumers.keySet().iterator();
212                while (i.hasNext()) {
213                    out.writeObject(i.next().toString());
214                }
215            } else {
216                out.write(-1);
217            }
218    
219        }
220    
221        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
222            int size = in.readInt();
223            if (size >= 0) {
224                consumers = new HashMap<Destination, MessageConsumer>();
225                for (int i = 0; i < size; i++) {
226                    String destinationName = in.readObject().toString();
227    
228                    try {
229                        Destination destination = destinationName.startsWith("topic://") ? (Destination)getSession().createTopic(destinationName) : (Destination)getSession().createQueue(destinationName);
230                        consumers.put(destination, getConsumer(destination, null, true));
231                    } catch (JMSException e) {
232                        LOG.debug("Caought Exception ", e);
233                        IOException ex = new IOException(e.getMessage());
234                        ex.initCause(e.getCause() != null ? e.getCause() : e);
235                        throw ex;
236    
237                    }
238                }
239            }
240        }
241    
242        public void send(Destination destination, Message message) throws JMSException {
243            getProducer().send(destination, message);
244            if (LOG.isDebugEnabled()) {
245                LOG.debug("Sent! to destination: " + destination + " message: " + message);
246            }
247        }
248    
249        public void send(Destination destination, Message message, boolean persistent, int priority, long timeToLive) throws JMSException {
250            int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
251            getProducer().send(destination, message, deliveryMode, priority, timeToLive);
252            if (LOG.isDebugEnabled()) {
253                LOG.debug("Sent! to destination: " + destination + " message: " + message);
254            }
255        }
256    
257        public Session getSession() throws JMSException {
258            if (session == null) {
259                session = createSession();
260            }
261            return session;
262        }
263    
264        public Connection getConnection() throws JMSException {
265            if (connection == null) {
266                if (username != null && password != null) {
267                    connection = factory.createConnection(username, password);
268                } else {
269                    connection = factory.createConnection();
270                }
271                connection.start();
272            }
273            return connection;
274        }
275    
276        protected static synchronized void initConnectionFactory(ServletContext servletContext) {
277            if (factory == null) {
278                factory = (ConnectionFactory)servletContext.getAttribute(CONNECTION_FACTORY_ATTRIBUTE);
279            }
280            if (factory == null) {
281                String brokerURL = servletContext.getInitParameter(BROKER_URL_INIT_PARAM);
282    
283                LOG.debug("Value of: " + BROKER_URL_INIT_PARAM + " is: " + brokerURL);
284    
285                if (brokerURL == null) {
286                    throw new IllegalStateException("missing brokerURL (specified via " + BROKER_URL_INIT_PARAM + " init-Param");
287                }
288    
289                ActiveMQConnectionFactory amqfactory = new ActiveMQConnectionFactory(brokerURL);
290    
291                // Set prefetch policy for factory
292                if (servletContext.getInitParameter(CONNECTION_FACTORY_PREFETCH_PARAM) != null) {
293                    int prefetch = Integer.valueOf(servletContext.getInitParameter(CONNECTION_FACTORY_PREFETCH_PARAM)).intValue();
294                    amqfactory.getPrefetchPolicy().setAll(prefetch);
295                }
296    
297                // Set optimize acknowledge setting
298                if (servletContext.getInitParameter(CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM) != null) {
299                    boolean optimizeAck = Boolean.valueOf(servletContext.getInitParameter(CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM)).booleanValue();
300                    amqfactory.setOptimizeAcknowledge(optimizeAck);
301                }
302    
303                factory = amqfactory;
304    
305                servletContext.setAttribute(CONNECTION_FACTORY_ATTRIBUTE, factory);
306            }
307        }
308        
309        public synchronized CamelContext getCamelContext() {
310            if (camelContext == null) {
311                    LOG.debug("Creating camel context");
312                    camelContext = new DefaultCamelContext();
313                    ActiveMQConfiguration conf = new ActiveMQConfiguration();
314                    conf.setConnectionFactory(new PooledConnectionFactory((ActiveMQConnectionFactory)factory));
315                    ActiveMQComponent component = new ActiveMQComponent(conf);
316                    camelContext.addComponent("activemq", component);
317            }
318            return camelContext;
319        }
320        
321        public synchronized ProducerTemplate getProducerTemplate() throws Exception {
322            if (producerTemplate == null) {
323                    LOG.debug("Creating producer template");
324                    producerTemplate = getCamelContext().createProducerTemplate();
325                    producerTemplate.start();
326            }
327            return producerTemplate;
328        }
329    
330        public synchronized MessageProducer getProducer() throws JMSException {
331            if (producer == null) {
332                producer = getSession().createProducer(null);
333                producer.setDeliveryMode(deliveryMode);
334            }
335            return producer;
336        }
337    
338        public void setProducer(MessageProducer producer) {
339            this.producer = producer;
340        }
341    
342        public synchronized MessageConsumer getConsumer(Destination destination, String selector) throws JMSException {
343            return getConsumer(destination, selector, true);
344        }
345    
346        public synchronized MessageConsumer getConsumer(Destination destination, String selector, boolean create) throws JMSException {
347            MessageConsumer consumer = consumers.get(destination);
348            if (create && consumer == null) {
349                consumer = getSession().createConsumer(destination, selector);
350                consumers.put(destination, consumer);
351            }
352            return consumer;
353        }
354    
355        public synchronized void closeConsumer(Destination destination) throws JMSException {
356            MessageConsumer consumer = consumers.get(destination);
357            if (consumer != null) {
358                consumers.remove(destination);
359                consumer.setMessageListener(null);
360                if (consumer instanceof MessageAvailableConsumer) {
361                    ((MessageAvailableConsumer)consumer).setAvailableListener(null);
362                }
363                consumer.close();
364            }
365        }
366    
367        public synchronized List<MessageConsumer> getConsumers() {
368            return new ArrayList<MessageConsumer>(consumers.values());
369        }
370    
371        protected Session createSession() throws JMSException {
372            return getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
373        }
374    
375        public Semaphore getSemaphore() {
376            return semaphore;
377        }
378    
379        public void sessionWillPassivate(HttpSessionEvent event) {
380            close();
381        }
382    
383        public void sessionDidActivate(HttpSessionEvent event) {
384        }
385    
386        public void valueBound(HttpSessionBindingEvent event) {
387        }
388    
389        public void valueUnbound(HttpSessionBindingEvent event) {
390            close();
391        }
392    
393        protected static WebClient createWebClient(HttpServletRequest request) {
394            WebClient client = new WebClient();
395            String auth = request.getHeader("Authorization");
396            if (auth != null) {
397                String[] tokens = auth.split(" ");
398                if (tokens.length == 2) {
399                    String encoded = tokens[1].trim();
400                    String credentials = new String(javax.xml.bind.DatatypeConverter.parseBase64Binary(encoded));
401                    String[] creds = credentials.split(":");
402                    if (creds.length == 2) {
403                        client.setUsername(creds[0]);
404                        client.setPassword(creds[1]);
405                    }
406                }
407            }
408            return client;
409        }
410    
411    }