001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.ra;
018
019import java.io.Serializable;
020import java.net.URI;
021import java.util.HashMap;
022
023import javax.jms.JMSException;
024import javax.resource.NotSupportedException;
025import javax.resource.ResourceException;
026import javax.resource.spi.ActivationSpec;
027import javax.resource.spi.BootstrapContext;
028import javax.resource.spi.ResourceAdapterInternalException;
029import javax.resource.spi.endpoint.MessageEndpointFactory;
030import javax.transaction.xa.XAException;
031import javax.transaction.xa.XAResource;
032import javax.transaction.xa.Xid;
033
034import org.apache.activemq.ActiveMQConnection;
035import org.apache.activemq.ActiveMQConnectionFactory;
036import org.apache.activemq.RedeliveryPolicy;
037import org.apache.activemq.TransactionContext;
038import org.apache.activemq.broker.BrokerFactory;
039import org.apache.activemq.broker.BrokerService;
040import org.apache.activemq.util.ServiceSupport;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * Knows how to connect to one ActiveMQ server. It can then activate endpoints
046 * and deliver messages to those end points using the connection configure in
047 * the resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4)
048 *
049 * @org.apache.xbean.XBean element="resourceAdapter" rootElement="true"
050 *                         description="The JCA Resource Adaptor for ActiveMQ"
051 *
052 */
053public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implements Serializable, MessageResourceAdapter {
054    private static final long serialVersionUID = 360805587169336959L;
055    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQResourceAdapter.class);
056    private transient final HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> endpointWorkers = new HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>();
057
058    private transient BootstrapContext bootstrapContext;
059    private String brokerXmlConfig;
060    private transient BrokerService broker;
061    private transient Thread brokerStartThread;
062    private ActiveMQConnectionFactory connectionFactory;
063
064    /**
065     *
066     */
067    public ActiveMQResourceAdapter() {
068        super();
069    }
070
071    /**
072     * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext)
073     */
074    @Override
075    public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
076        this.bootstrapContext = bootstrapContext;
077        if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) {
078            brokerStartThread = new Thread("Starting ActiveMQ Broker") {
079                @Override
080                public void run () {
081                    try {
082                        // ensure RAR resources are available to xbean (needed for weblogic)
083                        log.debug("original thread context classLoader: " + Thread.currentThread().getContextClassLoader());
084                        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
085                        log.debug("current (from getClass()) thread context classLoader: " + Thread.currentThread().getContextClassLoader());
086
087                        synchronized( ActiveMQResourceAdapter.this ) {
088                            broker = BrokerFactory.createBroker(new URI(brokerXmlConfig));
089                        }
090                        broker.start();
091                        // Default the ServerUrl to the local broker if not specified in the ra.xml
092                        if (getServerUrl() == null) {
093                            setServerUrl("vm://" + broker.getBrokerName() + "?create=false");
094                        }
095                    } catch (Throwable e) {
096                        log.warn("Could not start up embeded ActiveMQ Broker '"+brokerXmlConfig+"': "+e.getMessage());
097                        log.debug("Reason for: "+e.getMessage(), e);
098                    }
099                }
100            };
101            brokerStartThread.setDaemon(true);
102            brokerStartThread.start();
103
104            // Wait up to 5 seconds for the broker to start up in the async thread.. otherwise keep going without it..
105            try {
106                brokerStartThread.join(1000*5);
107            } catch (InterruptedException e) {
108                Thread.currentThread().interrupt();
109            }
110        }
111    }
112
113    public ActiveMQConnection makeConnection() throws JMSException {
114        if( connectionFactory == null ) {
115            return makeConnection(getInfo());
116        } else {
117            return makeConnection(getInfo(), connectionFactory);
118        }
119    }
120
121    /**
122     * @param activationSpec
123     */
124    @Override
125    public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException {
126        ActiveMQConnectionFactory cf = getConnectionFactory();
127        if (cf == null) {
128            cf = createConnectionFactory(getInfo(), activationSpec);
129        }
130        String userName = defaultValue(activationSpec.getUserName(), getInfo().getUserName());
131        String password = defaultValue(activationSpec.getPassword(), getInfo().getPassword());
132        String clientId = activationSpec.getClientId();
133        if (clientId != null) {
134            cf.setClientID(clientId);
135        } else {
136            if (activationSpec.isDurableSubscription()) {
137                log.warn("No clientID specified for durable subscription: " + activationSpec);
138            }
139        }
140        ActiveMQConnection physicalConnection = (ActiveMQConnection) cf.createConnection(userName, password);
141
142        // have we configured a redelivery policy
143        RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy();
144        if (redeliveryPolicy != null) {
145            physicalConnection.setRedeliveryPolicy(redeliveryPolicy);
146        }
147        return physicalConnection;
148    }
149
150    /**
151     * @see javax.resource.spi.ResourceAdapter#stop()
152     */
153    @Override
154    public void stop() {
155        synchronized (endpointWorkers) {
156            while (endpointWorkers.size() > 0) {
157                ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next();
158                endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec());
159            }
160        }
161
162        synchronized( this ) {
163            if (broker != null) {
164                if( brokerStartThread.isAlive() ) {
165                    brokerStartThread.interrupt();
166                }
167                ServiceSupport.dispose(broker);
168                broker = null;
169            }
170        }
171
172        this.bootstrapContext = null;
173    }
174
175    /**
176     * @see org.apache.activemq.ra.MessageResourceAdapter#getBootstrapContext()
177     */
178    @Override
179    public BootstrapContext getBootstrapContext() {
180        return bootstrapContext;
181    }
182
183    /**
184     * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory,
185     *      javax.resource.spi.ActivationSpec)
186     */
187    @Override
188    public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) throws ResourceException {
189
190        // spec section 5.3.3
191        if (!equals(activationSpec.getResourceAdapter())) {
192            throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + " != " + this + ")");
193        }
194
195        if (!(activationSpec instanceof MessageActivationSpec)) {
196            throw new NotSupportedException("That type of ActivationSpec not supported: " + activationSpec.getClass());
197        }
198
199        ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec);
200        // This is weird.. the same endpoint activated twice.. must be a
201        // container error.
202        if (endpointWorkers.containsKey(key)) {
203            throw new IllegalStateException("Endpoint previously activated");
204        }
205
206        ActiveMQEndpointWorker worker = new ActiveMQEndpointWorker(this, key);
207
208        endpointWorkers.put(key, worker);
209        worker.start();
210    }
211
212    /**
213     * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory,
214     *      javax.resource.spi.ActivationSpec)
215     */
216    @Override
217    public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) {
218        if (activationSpec instanceof MessageActivationSpec) {
219            ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec);
220            ActiveMQEndpointWorker worker = null;
221            synchronized (endpointWorkers) {
222                worker = endpointWorkers.remove(key);
223            }
224            if (worker == null) {
225                // This is weird.. that endpoint was not activated.. oh well..
226                // this method
227                // does not throw exceptions so just return.
228                return;
229            }
230            try {
231                worker.stop();
232            } catch (InterruptedException e) {
233                // We interrupted.. we won't throw an exception but will stop
234                // waiting for the worker
235                // to stop.. we tried our best. Keep trying to interrupt the
236                // thread.
237                Thread.currentThread().interrupt();
238            }
239
240        }
241
242    }
243
244    /**
245     * We only connect to one resource manager per ResourceAdapter instance, so
246     * any ActivationSpec will return the same XAResource.
247     *
248     * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[])
249     */
250    @Override
251    public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
252        try {
253            return new XAResource[]{
254                    new TransactionContext() {
255
256                        @Override
257                        public boolean isSameRM(XAResource xaresource) throws XAException {
258                            ActiveMQConnection original = null;
259                            try {
260                                original = setConnection(newConnection());
261                                boolean result = super.isSameRM(xaresource);
262                                LOG.trace("{}.recover({})={}", getConnection(), xaresource, result);
263                                return result;
264
265                            } catch (JMSException e) {
266                                LOG.trace("isSameRM({}) failed", xaresource, e);
267                                XAException xaException = new XAException(e.getMessage());
268                                throw xaException;
269                            } finally {
270                                closeConnection(original);
271                            }
272                        }
273
274                        @Override
275                        protected String getResourceManagerId() throws JMSException {
276                            ActiveMQConnection original = null;
277                            try {
278                                original = setConnection(newConnection());
279                                return super.getResourceManagerId();
280                            } finally {
281                                closeConnection(original);
282                            }
283                        }
284
285                        @Override
286                        public void commit(Xid xid, boolean onePhase) throws XAException {
287                            ActiveMQConnection original = null;
288                            try {
289                                setConnection(newConnection());
290                                super.commit(xid, onePhase);
291                                LOG.trace("{}.commit({},{})", getConnection(), xid);
292
293                            } catch (JMSException e) {
294                                LOG.trace("{}.commit({},{}) failed", getConnection(), xid, onePhase, e);
295                                throwXAException(e);
296                            } finally {
297                                closeConnection(original);
298                            }
299                        }
300
301                        @Override
302                        public void rollback(Xid xid) throws XAException {
303                            ActiveMQConnection original = null;
304                            try {
305                                original = setConnection(newConnection());
306                                super.rollback(xid);
307                                LOG.trace("{}.rollback({})", getConnection(), xid);
308
309                            } catch (JMSException e) {
310                                LOG.trace("{}.rollback({}) failed", getConnection(), xid, e);
311                                throwXAException(e);
312                            } finally {
313                               closeConnection(original);
314                            }
315                        }
316
317                        @Override
318                        public Xid[] recover(int flags) throws XAException {
319                            Xid[] result = new Xid[]{};
320                            ActiveMQConnection original = null;
321                            try {
322                                original = setConnection(newConnection());
323                                result = super.recover(flags);
324                                LOG.trace("{}.recover({})={}", getConnection(), flags, result);
325
326                            } catch (JMSException e) {
327                                LOG.trace("{}.recover({}) failed", getConnection(), flags, e);
328                                throwXAException(e);
329                            } finally {
330                                closeConnection(original);
331                            }
332                            return result;
333                        }
334
335                        @Override
336                        public void forget(Xid xid) throws XAException {
337                            ActiveMQConnection original = null;
338                            try {
339                                original = setConnection(newConnection());
340                                super.forget(xid);
341                                LOG.trace("{}.forget({})", getConnection(), xid);
342
343                            } catch (JMSException e) {
344                                LOG.trace("{}.forget({}) failed", getConnection(), xid, e);
345                                throwXAException(e);
346                            } finally {
347                                closeConnection(original);
348                            }
349                        }
350
351                        private void throwXAException(JMSException e) throws XAException {
352                            XAException xaException = new XAException(e.getMessage());
353                            xaException.errorCode = XAException.XAER_RMFAIL;
354                            throw xaException;
355                        }
356
357                        private ActiveMQConnection newConnection() throws JMSException {
358                            ActiveMQConnection connection = makeConnection();
359                            connection.start();
360                            return connection;
361                        }
362
363                        private void closeConnection(ActiveMQConnection original) {
364                            ActiveMQConnection connection = getConnection();
365                            if (connection != null) {
366                                try {
367                                    connection.close();
368                                } catch (JMSException ignored) {}
369                            }
370                            setConnection(original);
371                        }
372                    }};
373
374        } catch (Exception e) {
375            throw new ResourceException(e);
376        }
377    }
378
379    // ///////////////////////////////////////////////////////////////////////
380    //
381    // Java Bean getters and setters for this ResourceAdapter class.
382    //
383    // ///////////////////////////////////////////////////////////////////////
384
385    /**
386     * @see org.apache.activemq.ra.MessageResourceAdapter#getBrokerXmlConfig()
387     */
388    @Override
389    public String getBrokerXmlConfig() {
390        return brokerXmlConfig;
391    }
392
393    /**
394     * Sets the <a href="http://activemq.org/Xml+Configuration">XML
395     * configuration file </a> used to configure the ActiveMQ broker via Spring
396     * if using embedded mode.
397     *
398     * @param brokerXmlConfig is the filename which is assumed to be on the
399     *                classpath unless a URL is specified. So a value of
400     *                <code>foo/bar.xml</code> would be assumed to be on the
401     *                classpath whereas <code>file:dir/file.xml</code> would
402     *                use the file system. Any valid URL string is supported.
403     */
404    public void setBrokerXmlConfig(String brokerXmlConfig) {
405        this.brokerXmlConfig = brokerXmlConfig;
406    }
407
408    /**
409     * @see java.lang.Object#equals(java.lang.Object)
410     */
411    @Override
412    public boolean equals(Object o) {
413        if (this == o) {
414            return true;
415        }
416        if (!(o instanceof MessageResourceAdapter)) {
417            return false;
418        }
419
420        final MessageResourceAdapter activeMQResourceAdapter = (MessageResourceAdapter)o;
421
422        if (!getInfo().equals(activeMQResourceAdapter.getInfo())) {
423            return false;
424        }
425        if (notEqual(brokerXmlConfig, activeMQResourceAdapter.getBrokerXmlConfig())) {
426            return false;
427        }
428
429        return true;
430    }
431
432    /**
433     * @see java.lang.Object#hashCode()
434     */
435    @Override
436    public int hashCode() {
437        int result;
438        result = getInfo().hashCode();
439        if (brokerXmlConfig != null) {
440            result ^= brokerXmlConfig.hashCode();
441        }
442        return result;
443    }
444
445    public ActiveMQConnectionFactory getConnectionFactory() {
446        return connectionFactory;
447    }
448
449    public void setConnectionFactory(ActiveMQConnectionFactory aConnectionFactory) {
450        this.connectionFactory = aConnectionFactory;
451    }
452
453
454    }