001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.ra;
018    
019    import java.net.URI;
020    import java.util.HashMap;
021    
022    import javax.jms.Connection;
023    import javax.jms.JMSException;
024    import javax.jms.XAConnection;
025    import javax.jms.XASession;
026    import javax.resource.NotSupportedException;
027    import javax.resource.ResourceException;
028    import javax.resource.spi.ActivationSpec;
029    import javax.resource.spi.BootstrapContext;
030    import javax.resource.spi.ResourceAdapterInternalException;
031    import javax.resource.spi.endpoint.MessageEndpointFactory;
032    import javax.transaction.xa.XAResource;
033    
034    import org.apache.activemq.ActiveMQConnection;
035    import org.apache.activemq.ActiveMQConnectionFactory;
036    import org.apache.activemq.RedeliveryPolicy;
037    import org.apache.activemq.broker.BrokerFactory;
038    import org.apache.activemq.broker.BrokerService;
039    import org.apache.activemq.util.ServiceSupport;
040    
041    /**
042     * Knows how to connect to one ActiveMQ server. It can then activate endpoints
043     * and deliver messages to those end points using the connection configure in
044     * the resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4)
045     * 
046     * @org.apache.xbean.XBean element="resourceAdapter" rootElement="true"
047     *                         description="The JCA Resource Adaptor for ActiveMQ"
048     * 
049     */
050    public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implements MessageResourceAdapter {
051    
052        private final HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> endpointWorkers = new HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>();
053    
054        private BootstrapContext bootstrapContext;
055        private String brokerXmlConfig;
056        private BrokerService broker;
057        private Thread brokerStartThread;
058        private ActiveMQConnectionFactory connectionFactory;
059        
060        /**
061         * 
062         */
063        public ActiveMQResourceAdapter() {
064            super();
065        }
066    
067        /**
068         * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext)
069         */
070        public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
071            this.bootstrapContext = bootstrapContext;
072            if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) {
073                brokerStartThread = new Thread("Starting ActiveMQ Broker") {
074                    @Override
075                    public void run () {
076                        try {
077                            // ensure RAR resources are available to xbean (needed for weblogic)
078                            log.debug("original thread context classLoader: " + Thread.currentThread().getContextClassLoader());
079                            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
080                            log.debug("current (from getClass()) thread context classLoader: " + Thread.currentThread().getContextClassLoader());
081                            
082                            synchronized( ActiveMQResourceAdapter.this ) {
083                                broker = BrokerFactory.createBroker(new URI(brokerXmlConfig));
084                            }
085                            broker.start();
086                        } catch (Throwable e) {
087                            log.warn("Could not start up embeded ActiveMQ Broker '"+brokerXmlConfig+"': "+e.getMessage());
088                            log.debug("Reason for: "+e.getMessage(), e);
089                        }
090                    }
091                };
092                brokerStartThread.setDaemon(true);
093                brokerStartThread.start();
094                
095                // Wait up to 5 seconds for the broker to start up in the async thread.. otherwise keep going without it..
096                try {
097                    brokerStartThread.join(1000*5);
098                } catch (InterruptedException e) {
099                    Thread.currentThread().interrupt();
100                }                
101            }
102        }
103    
104        /**
105         * @see org.apache.activemq.ra.MessageResourceAdapter#makeConnection()
106         */
107        public ActiveMQConnection makeConnection() throws JMSException {
108            if( connectionFactory == null ) {
109                return makeConnection(getInfo());
110            } else {
111                return makeConnection(getInfo(), connectionFactory);
112            }
113        }
114    
115        /**
116         * @param activationSpec
117         */
118        public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException {
119            ActiveMQConnectionFactory cf = getConnectionFactory();
120            if (cf == null) {
121                cf = createConnectionFactory(getInfo());
122            }
123            String userName = defaultValue(activationSpec.getUserName(), getInfo().getUserName());
124            String password = defaultValue(activationSpec.getPassword(), getInfo().getPassword());
125            String clientId = activationSpec.getClientId();
126            if (clientId != null) {
127                cf.setClientID(clientId);
128            } else {
129                if (activationSpec.isDurableSubscription()) {
130                    log.warn("No clientID specified for durable subscription: " + activationSpec);
131                }
132            }
133            ActiveMQConnection physicalConnection = (ActiveMQConnection) cf.createConnection(userName, password);
134    
135            // have we configured a redelivery policy
136            RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy();
137            if (redeliveryPolicy != null) {
138                physicalConnection.setRedeliveryPolicy(redeliveryPolicy);
139            }
140            return physicalConnection;
141        }
142    
143        /**
144         * @see javax.resource.spi.ResourceAdapter#stop()
145         */
146        public void stop() {
147            while (endpointWorkers.size() > 0) {
148                ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next();
149                endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec());
150            }
151            
152            synchronized( this ) {
153                if (broker != null) {
154                    if( brokerStartThread.isAlive() ) {
155                        brokerStartThread.interrupt();
156                    }
157                    ServiceSupport.dispose(broker);
158                    broker = null;
159                }
160            }
161            
162            this.bootstrapContext = null;
163        }
164    
165        /**
166         * @see org.apache.activemq.ra.MessageResourceAdapter#getBootstrapContext()
167         */
168        public BootstrapContext getBootstrapContext() {
169            return bootstrapContext;
170        }
171    
172        /**
173         * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory,
174         *      javax.resource.spi.ActivationSpec)
175         */
176        public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) throws ResourceException {
177    
178            // spec section 5.3.3
179            if (!equals(activationSpec.getResourceAdapter())) {
180                throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + " != " + this + ")");
181            }
182    
183            if (!(activationSpec instanceof MessageActivationSpec)) {
184                throw new NotSupportedException("That type of ActivationSpec not supported: " + activationSpec.getClass());
185            }
186    
187            ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec);
188            // This is weird.. the same endpoint activated twice.. must be a
189            // container error.
190            if (endpointWorkers.containsKey(key)) {
191                throw new IllegalStateException("Endpoint previously activated");
192            }
193    
194            ActiveMQEndpointWorker worker = new ActiveMQEndpointWorker(this, key);
195    
196            endpointWorkers.put(key, worker);
197            worker.start();
198        }
199    
200        /**
201         * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory,
202         *      javax.resource.spi.ActivationSpec)
203         */
204        public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) {
205    
206            if (activationSpec instanceof MessageActivationSpec) {
207                ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec);
208                ActiveMQEndpointWorker worker = endpointWorkers.remove(key);
209                if (worker == null) {
210                    // This is weird.. that endpoint was not activated.. oh well..
211                    // this method
212                    // does not throw exceptions so just return.
213                    return;
214                }
215                try {
216                    worker.stop();
217                } catch (InterruptedException e) {
218                    // We interrupted.. we won't throw an exception but will stop
219                    // waiting for the worker
220                    // to stop.. we tried our best. Keep trying to interrupt the
221                    // thread.
222                    Thread.currentThread().interrupt();
223                }
224    
225            }
226    
227        }
228    
229        /**
230         * We only connect to one resource manager per ResourceAdapter instance, so
231         * any ActivationSpec will return the same XAResource.
232         * 
233         * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[])
234         */
235        public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
236            Connection connection = null;
237            try {
238                connection = makeConnection();
239                if (connection instanceof XAConnection) {
240                    XASession session = ((XAConnection)connection).createXASession();
241                    XAResource xaResource = session.getXAResource();
242                    return new XAResource[] {
243                        xaResource
244                    };
245                }
246                return new XAResource[] {};
247            } catch (JMSException e) {
248                throw new ResourceException(e);
249            } finally {
250                try {
251                    connection.close();
252                } catch (Throwable ignore) {
253                    //
254                }
255            }
256        }
257    
258        // ///////////////////////////////////////////////////////////////////////
259        //
260        // Java Bean getters and setters for this ResourceAdapter class.
261        //
262        // ///////////////////////////////////////////////////////////////////////
263    
264        /**
265         * @see org.apache.activemq.ra.MessageResourceAdapter#getBrokerXmlConfig()
266         */
267        public String getBrokerXmlConfig() {
268            return brokerXmlConfig;
269        }
270    
271        /**
272         * Sets the <a href="http://activemq.org/Xml+Configuration">XML
273         * configuration file </a> used to configure the ActiveMQ broker via Spring
274         * if using embedded mode.
275         * 
276         * @param brokerXmlConfig is the filename which is assumed to be on the
277         *                classpath unless a URL is specified. So a value of
278         *                <code>foo/bar.xml</code> would be assumed to be on the
279         *                classpath whereas <code>file:dir/file.xml</code> would
280         *                use the file system. Any valid URL string is supported.
281         */
282        public void setBrokerXmlConfig(String brokerXmlConfig) {
283            this.brokerXmlConfig = brokerXmlConfig;
284        }
285    
286        /**
287         * @see java.lang.Object#equals(java.lang.Object)
288         */
289        @Override
290        public boolean equals(Object o) {
291            if (this == o) {
292                return true;
293            }
294            if (!(o instanceof MessageResourceAdapter)) {
295                return false;
296            }
297    
298            final MessageResourceAdapter activeMQResourceAdapter = (MessageResourceAdapter)o;
299    
300            if (!getInfo().equals(activeMQResourceAdapter.getInfo())) {
301                return false;
302            }
303            if (notEqual(brokerXmlConfig, activeMQResourceAdapter.getBrokerXmlConfig())) {
304                return false;
305            }
306    
307            return true;
308        }
309    
310        /**
311         * @see java.lang.Object#hashCode()
312         */
313        @Override
314        public int hashCode() {
315            int result;
316            result = getInfo().hashCode();
317            if (brokerXmlConfig != null) {
318                result ^= brokerXmlConfig.hashCode();
319            }
320            return result;
321        }
322    
323        public ActiveMQConnectionFactory getConnectionFactory() {
324            return connectionFactory;
325        }
326    
327        public void setConnectionFactory(ActiveMQConnectionFactory aConnectionFactory) {
328            this.connectionFactory = aConnectionFactory;
329        }
330    
331    
332        }