001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.web;
019
020import java.io.Externalizable;
021import java.io.IOException;
022import java.io.ObjectInput;
023import java.io.ObjectOutput;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.concurrent.Semaphore;
030
031import javax.jms.Connection;
032import javax.jms.ConnectionFactory;
033import javax.jms.DeliveryMode;
034import javax.jms.Destination;
035import javax.jms.JMSException;
036import javax.jms.Message;
037import javax.jms.MessageConsumer;
038import javax.jms.MessageProducer;
039import javax.jms.Session;
040import javax.servlet.ServletContext;
041import javax.servlet.http.HttpServletRequest;
042import javax.servlet.http.HttpSession;
043import javax.servlet.http.HttpSessionActivationListener;
044import javax.servlet.http.HttpSessionBindingEvent;
045import javax.servlet.http.HttpSessionBindingListener;
046import javax.servlet.http.HttpSessionEvent;
047
048import org.apache.activemq.ActiveMQConnectionFactory;
049import org.apache.activemq.MessageAvailableConsumer;
050import org.apache.activemq.broker.BrokerRegistry;
051import org.apache.activemq.broker.BrokerService;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Represents a messaging client used from inside a web container typically
057 * stored inside a HttpSession TODO controls to prevent DOS attacks with users
058 * requesting many consumers TODO configure consumers with small prefetch.
059 * 
060 *
061 *
062 */
063public class WebClient implements HttpSessionActivationListener, HttpSessionBindingListener, Externalizable {
064
065    public static final String WEB_CLIENT_ATTRIBUTE = "org.apache.activemq.webclient";
066    public static final String CONNECTION_FACTORY_ATTRIBUTE = "org.apache.activemq.connectionFactory";
067    public static final String CONNECTION_FACTORY_PREFETCH_PARAM = "org.apache.activemq.connectionFactory.prefetch";
068    public static final String CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM = "org.apache.activemq.connectionFactory.optimizeAck";
069    public static final String BROKER_URL_INIT_PARAM = "org.apache.activemq.brokerURL";
070    public static final String SELECTOR_NAME = "org.apache.activemq.selectorName";
071
072    private static final Logger LOG = LoggerFactory.getLogger(WebClient.class);
073
074    private static transient ConnectionFactory factory;
075
076    private transient Map<Destination, MessageConsumer> consumers = new HashMap<Destination, MessageConsumer>();
077    private transient Connection connection;
078    private transient Session session;
079    private transient MessageProducer producer;
080    private int deliveryMode = DeliveryMode.NON_PERSISTENT;
081    public static String selectorName;
082
083    private final Semaphore semaphore = new Semaphore(1);
084
085    private String username;
086    private String password;
087
088    public WebClient() {
089        if (factory == null) {
090            throw new IllegalStateException("initContext(ServletContext) not called");
091        }
092    }
093
094    /**
095     * Helper method to get the client for the current session, lazily creating
096     * a client if there is none currently
097     * 
098     * @param request is the current HTTP request
099     * @return the current client or a newly creates
100     */
101    public static WebClient getWebClient(HttpServletRequest request) {
102        HttpSession session = request.getSession(true);
103        WebClient client = getWebClient(session);
104        if (client == null || client.isClosed()) {
105            client = WebClient.createWebClient(request);
106            session.setAttribute(WEB_CLIENT_ATTRIBUTE, client);
107        }
108
109        return client;
110    }
111
112    /**
113     * @return the web client for the current HTTP session or null if there is
114     *         not a web client created yet
115     */
116    public static WebClient getWebClient(HttpSession session) {
117        return (WebClient)session.getAttribute(WEB_CLIENT_ATTRIBUTE);
118    }
119
120    public static void initContext(ServletContext context) {
121        initConnectionFactory(context);
122        context.setAttribute("webClients", new HashMap<String, WebClient>());
123        if (selectorName == null) {
124            selectorName = context.getInitParameter(SELECTOR_NAME);
125        }
126        if (selectorName == null) {
127            selectorName = "selector";
128        }        
129    }
130
131    public int getDeliveryMode() {
132        return deliveryMode;
133    }
134
135    public void setDeliveryMode(int deliveryMode) {
136        this.deliveryMode = deliveryMode;
137    }
138
139    public String getUsername() {
140        return username;
141    }
142
143    public void setUsername(String username) {
144        this.username = username;
145    }
146
147    public String getPassword() {
148        return password;
149    }
150
151    public void setPassword(String password) {
152        this.password = password;
153    }
154
155    public synchronized void closeConsumers() {
156        for (Iterator<MessageConsumer> it = consumers.values().iterator(); it.hasNext();) {
157            MessageConsumer consumer = it.next();
158            it.remove();
159            try {
160                consumer.setMessageListener(null);
161                if (consumer instanceof MessageAvailableConsumer) {
162                    ((MessageAvailableConsumer)consumer).setAvailableListener(null);
163                }
164                consumer.close();
165            } catch (JMSException e) {
166                LOG.debug("caught exception closing consumer", e);
167            }
168        }
169    }
170
171    public synchronized void close() {
172        try {
173            if (consumers != null) {
174                closeConsumers();
175            }
176            if (connection != null) {
177                connection.close();
178            }
179        } catch (Exception e) {
180            LOG.debug("caught exception closing consumer", e);
181        } finally {
182            producer = null;
183            session = null;
184            connection = null;
185            if (consumers != null) {
186                consumers.clear();
187            }
188            consumers = null;
189
190        }
191    }
192
193    public boolean isClosed() {
194        return consumers == null;
195    }
196
197    public void writeExternal(ObjectOutput out) throws IOException {
198        if (consumers != null) {
199            out.write(consumers.size());
200            Iterator<Destination> i = consumers.keySet().iterator();
201            while (i.hasNext()) {
202                out.writeObject(i.next().toString());
203            }
204        } else {
205            out.write(-1);
206        }
207
208    }
209
210    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
211        int size = in.readInt();
212        if (size >= 0) {
213            consumers = new HashMap<Destination, MessageConsumer>();
214            for (int i = 0; i < size; i++) {
215                String destinationName = in.readObject().toString();
216
217                try {
218                    Destination destination = destinationName.startsWith("topic://") ? (Destination)getSession().createTopic(destinationName) : (Destination)getSession().createQueue(destinationName);
219                    consumers.put(destination, getConsumer(destination, null, true));
220                } catch (JMSException e) {
221                    LOG.debug("Caought Exception ", e);
222                    IOException ex = new IOException(e.getMessage());
223                    ex.initCause(e.getCause() != null ? e.getCause() : e);
224                    throw ex;
225
226                }
227            }
228        }
229    }
230
231    public void send(Destination destination, Message message) throws JMSException {
232        getProducer().send(destination, message);
233        if (LOG.isDebugEnabled()) {
234            LOG.debug("Sent! to destination: " + destination + " message: " + message);
235        }
236    }
237
238    public void send(Destination destination, Message message, boolean persistent, int priority, long timeToLive) throws JMSException {
239        int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
240        getProducer().send(destination, message, deliveryMode, priority, timeToLive);
241        if (LOG.isDebugEnabled()) {
242            LOG.debug("Sent! to destination: " + destination + " message: " + message);
243        }
244    }
245
246    public Session getSession() throws JMSException {
247        if (session == null) {
248            session = createSession();
249        }
250        return session;
251    }
252
253    public Connection getConnection() throws JMSException {
254        if (connection == null) {
255            if (username != null && password != null) {
256                connection = factory.createConnection(username, password);
257            } else {
258                connection = factory.createConnection();
259            }
260            connection.start();
261        }
262        return connection;
263    }
264
265    protected static synchronized void initConnectionFactory(ServletContext servletContext) {
266        if (factory == null) {
267            factory = (ConnectionFactory)servletContext.getAttribute(CONNECTION_FACTORY_ATTRIBUTE);
268        }
269        if (factory == null) {
270            String brokerURL = servletContext.getInitParameter(BROKER_URL_INIT_PARAM);
271
272
273            if (brokerURL == null) {
274                LOG.debug("Couldn't find " + BROKER_URL_INIT_PARAM + " param, trying to find a broker embedded in a local VM");
275                BrokerService broker = BrokerRegistry.getInstance().findFirst();
276                if (broker == null) {
277                    throw new IllegalStateException("missing brokerURL (specified via " + BROKER_URL_INIT_PARAM + " init-Param) or embedded broker");
278                } else {
279                    brokerURL = "vm://" + broker.getBrokerName();
280                }
281            }
282
283            LOG.debug("Using broker URL: " + brokerURL);
284
285            ActiveMQConnectionFactory amqfactory = new ActiveMQConnectionFactory(brokerURL);
286
287            // Set prefetch policy for factory
288            if (servletContext.getInitParameter(CONNECTION_FACTORY_PREFETCH_PARAM) != null) {
289                int prefetch = Integer.valueOf(servletContext.getInitParameter(CONNECTION_FACTORY_PREFETCH_PARAM)).intValue();
290                amqfactory.getPrefetchPolicy().setAll(prefetch);
291            }
292
293            // Set optimize acknowledge setting
294            if (servletContext.getInitParameter(CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM) != null) {
295                boolean optimizeAck = Boolean.valueOf(servletContext.getInitParameter(CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM)).booleanValue();
296                amqfactory.setOptimizeAcknowledge(optimizeAck);
297            }
298
299            factory = amqfactory;
300
301            servletContext.setAttribute(CONNECTION_FACTORY_ATTRIBUTE, factory);
302        }
303    }
304
305    public synchronized MessageProducer getProducer() throws JMSException {
306        if (producer == null) {
307            producer = getSession().createProducer(null);
308            producer.setDeliveryMode(deliveryMode);
309        }
310        return producer;
311    }
312
313    public void setProducer(MessageProducer producer) {
314        this.producer = producer;
315    }
316
317    public synchronized MessageConsumer getConsumer(Destination destination, String selector) throws JMSException {
318        return getConsumer(destination, selector, true);
319    }
320
321    public synchronized MessageConsumer getConsumer(Destination destination, String selector, boolean create) throws JMSException {
322        MessageConsumer consumer = consumers.get(destination);
323        if (create && consumer == null) {
324            consumer = getSession().createConsumer(destination, selector);
325            consumers.put(destination, consumer);
326        }
327        return consumer;
328    }
329
330    public synchronized void closeConsumer(Destination destination) throws JMSException {
331        MessageConsumer consumer = consumers.get(destination);
332        if (consumer != null) {
333            consumers.remove(destination);
334            consumer.setMessageListener(null);
335            if (consumer instanceof MessageAvailableConsumer) {
336                ((MessageAvailableConsumer)consumer).setAvailableListener(null);
337            }
338            consumer.close();
339        }
340    }
341
342    public synchronized List<MessageConsumer> getConsumers() {
343        return new ArrayList<MessageConsumer>(consumers.values());
344    }
345
346    protected Session createSession() throws JMSException {
347        return getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
348    }
349
350    public Semaphore getSemaphore() {
351        return semaphore;
352    }
353
354    public void sessionWillPassivate(HttpSessionEvent event) {
355        close();
356    }
357
358    public void sessionDidActivate(HttpSessionEvent event) {
359    }
360
361    public void valueBound(HttpSessionBindingEvent event) {
362    }
363
364    public void valueUnbound(HttpSessionBindingEvent event) {
365        close();
366    }
367
368    protected static WebClient createWebClient(HttpServletRequest request) {
369        WebClient client = new WebClient();
370        String auth = request.getHeader("Authorization");
371        if (auth != null) {
372            String[] tokens = auth.split(" ");
373            if (tokens.length == 2) {
374                String encoded = tokens[1].trim();
375                String credentials = new String(javax.xml.bind.DatatypeConverter.parseBase64Binary(encoded));
376                String[] creds = credentials.split(":");
377                if (creds.length == 2) {
378                    client.setUsername(creds[0]);
379                    client.setPassword(creds[1]);
380                }
381            }
382        }
383        return client;
384    }
385
386}