001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.ra;
018    
019    import java.io.PrintWriter;
020    import java.util.List;
021    import java.util.concurrent.CopyOnWriteArrayList;
022    import javax.jms.Connection;
023    import javax.jms.ExceptionListener;
024    import javax.jms.JMSException;
025    import javax.resource.ResourceException;
026    import javax.resource.spi.ConnectionEvent;
027    import javax.resource.spi.ConnectionEventListener;
028    import javax.resource.spi.ConnectionRequestInfo;
029    import javax.resource.spi.LocalTransaction;
030    import javax.resource.spi.ManagedConnection;
031    import javax.resource.spi.ManagedConnectionMetaData;
032    import javax.security.auth.Subject;
033    import javax.transaction.xa.XAResource;
034    import org.apache.activemq.ActiveMQConnection;
035    import org.apache.activemq.LocalTransactionEventListener;
036    import org.apache.activemq.TransactionContext;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    /**
041     * ActiveMQManagedConnection maps to real physical connection to the server.
042     * Since a ManagedConnection has to provide a transaction managment interface to
043     * the physical connection, and sessions are the objects implement transaction
044     * managment interfaces in the JMS API, this object also maps to a singe
045     * physical JMS session. <p/> The side-effect is that JMS connection the
046     * application gets will allways create the same session object. This is good if
047     * running in an app server since the sessions are elisted in the context
048     * transaction. This is bad if used outside of an app server since the user may
049     * be trying to create 2 different sessions to coordinate 2 different uow.
050     * 
051     * 
052     */
053    public class ActiveMQManagedConnection implements ManagedConnection, ExceptionListener { // TODO:
054                                                                                                // ,
055                                                                                                // DissociatableManagedConnection
056                                                                                                // {
057    
058        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQManagedConnection.class);
059    
060        private PrintWriter logWriter;
061    
062        private final ActiveMQConnection physicalConnection;
063        private final TransactionContext transactionContext;
064        private final List<ManagedConnectionProxy> proxyConnections = new CopyOnWriteArrayList<ManagedConnectionProxy>();
065        private final List<ConnectionEventListener> listeners = new CopyOnWriteArrayList<ConnectionEventListener>();
066        private final LocalAndXATransaction localAndXATransaction;
067    
068        private Subject subject;
069        private ActiveMQConnectionRequestInfo info;
070        private boolean destroyed;
071    
072        public ActiveMQManagedConnection(Subject subject, ActiveMQConnection physicalConnection, ActiveMQConnectionRequestInfo info) throws ResourceException {
073            try {
074                this.subject = subject;
075                this.info = info;
076                this.physicalConnection = physicalConnection;
077                this.transactionContext = new TransactionContext(physicalConnection);
078    
079                this.localAndXATransaction = new LocalAndXATransaction(transactionContext) {
080                    public void setInManagedTx(boolean inManagedTx) throws JMSException {
081                        super.setInManagedTx(inManagedTx);
082                        for (ManagedConnectionProxy proxy:proxyConnections) {
083                            proxy.setUseSharedTxContext(inManagedTx);
084                        }
085                    }
086                };
087    
088                this.transactionContext.setLocalTransactionEventListener(new LocalTransactionEventListener() {
089                    public void beginEvent() {
090                        fireBeginEvent();
091                    }
092    
093                    public void commitEvent() {
094                        fireCommitEvent();
095                    }
096    
097                    public void rollbackEvent() {
098                        fireRollbackEvent();
099                    }
100                });
101    
102                physicalConnection.setExceptionListener(this);
103            } catch (JMSException e) {
104                throw new ResourceException("Could not create a new connection: " + e.getMessage(), e);
105            }
106        }
107    
108        public boolean isInManagedTx() {
109            return localAndXATransaction.isInManagedTx();
110        }
111    
112        public static boolean matches(Object x, Object y) {
113            if (x == null ^ y == null) {
114                return false;
115            }
116            if (x != null && !x.equals(y)) {
117                return false;
118            }
119            return true;
120        }
121    
122        public void associate(Subject subject, ActiveMQConnectionRequestInfo info) throws JMSException {
123    
124            // Do we need to change the associated userid/password
125            if (!matches(info.getUserName(), this.info.getUserName()) || !matches(info.getPassword(), this.info.getPassword())) {
126                physicalConnection.changeUserInfo(info.getUserName(), info.getPassword());
127            }
128    
129            // Do we need to set the clientId?
130            if (info.getClientid() != null && info.getClientid().length() > 0) {
131                physicalConnection.setClientID(info.getClientid());
132            }
133    
134            this.subject = subject;
135            this.info = info;
136        }
137    
138        public Connection getPhysicalConnection() {
139            return physicalConnection;
140        }
141    
142        private void fireBeginEvent() {
143            ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_STARTED);
144            for(ConnectionEventListener l:listeners) {
145                l.localTransactionStarted(event);
146            }
147        }
148    
149        private void fireCommitEvent() {
150            ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_COMMITTED);
151            for(ConnectionEventListener l:listeners) {
152                l.localTransactionCommitted(event);
153            }
154        }
155    
156        private void fireRollbackEvent() {
157            ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK);
158            for(ConnectionEventListener l:listeners) {
159                l.localTransactionRolledback(event);
160            }
161        }
162    
163        private void fireCloseEvent(ManagedConnectionProxy proxy) {
164            ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.CONNECTION_CLOSED);
165            event.setConnectionHandle(proxy);
166    
167            for(ConnectionEventListener l:listeners) {
168                l.connectionClosed(event);
169            }
170        }
171    
172        private void fireErrorOccurredEvent(Exception error) {
173            ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.CONNECTION_ERROR_OCCURRED, error);
174            for(ConnectionEventListener l:listeners) {
175                l.connectionErrorOccurred(event);
176            }
177        }
178    
179        /**
180         * @see javax.resource.spi.ManagedConnection#getConnection(javax.security.auth.Subject,
181         *      javax.resource.spi.ConnectionRequestInfo)
182         */
183        public Object getConnection(Subject subject, ConnectionRequestInfo info) throws ResourceException {
184            ManagedConnectionProxy proxy = new ManagedConnectionProxy(this);
185            proxyConnections.add(proxy);
186            return proxy;
187        }
188    
189        private boolean isDestroyed() {
190            return destroyed;
191        }
192    
193        /**
194         * Close down the physical connection to the server.
195         * 
196         * @see javax.resource.spi.ManagedConnection#destroy()
197         */
198        public void destroy() throws ResourceException {
199            // Have we allready been destroyed??
200            if (isDestroyed()) {
201                return;
202            }
203    
204            cleanup();
205    
206            try {
207                physicalConnection.close();
208                destroyed = true;
209            } catch (JMSException e) {
210                LOG.info("Error occured during close of a JMS connection.", e);
211            }
212        }
213    
214        /**
215         * Cleans up all proxy handles attached to this physical connection so that
216         * they cannot be used anymore.
217         * 
218         * @see javax.resource.spi.ManagedConnection#cleanup()
219         */
220        public void cleanup() throws ResourceException {
221    
222            // Have we allready been destroyed??
223            if (isDestroyed()) {
224                return;
225            }
226    
227            for (ManagedConnectionProxy proxy:proxyConnections) {
228                proxy.cleanup();
229            }
230            proxyConnections.clear();
231    
232            try {
233                physicalConnection.cleanup();
234            } catch (JMSException e) {
235                throw new ResourceException("Could cleanup the ActiveMQ connection: " + e, e);
236            }
237            // defer transaction cleanup till after close so that close is aware of the current tx
238            localAndXATransaction.cleanup();
239    
240        }
241    
242        /**
243         * @see javax.resource.spi.ManagedConnection#associateConnection(java.lang.Object)
244         */
245        public void associateConnection(Object connection) throws ResourceException {
246            if (connection instanceof ManagedConnectionProxy) {
247                ManagedConnectionProxy proxy = (ManagedConnectionProxy)connection;
248                proxyConnections.add(proxy);
249            } else {
250                throw new ResourceException("Not supported : associating connection instance of " + connection.getClass().getName());
251            }
252        }
253    
254        /**
255         * @see javax.resource.spi.ManagedConnection#addConnectionEventListener(javax.resource.spi.ConnectionEventListener)
256         */
257        public void addConnectionEventListener(ConnectionEventListener listener) {
258            listeners.add(listener);
259        }
260    
261        /**
262         * @see javax.resource.spi.ManagedConnection#removeConnectionEventListener(javax.resource.spi.ConnectionEventListener)
263         */
264        public void removeConnectionEventListener(ConnectionEventListener listener) {
265            listeners.remove(listener);
266        }
267    
268        /**
269         * @see javax.resource.spi.ManagedConnection#getXAResource()
270         */
271        public XAResource getXAResource() throws ResourceException {
272            return localAndXATransaction;
273        }
274    
275        /**
276         * @see javax.resource.spi.ManagedConnection#getLocalTransaction()
277         */
278        public LocalTransaction getLocalTransaction() throws ResourceException {
279            return localAndXATransaction;
280        }
281    
282        /**
283         * @see javax.resource.spi.ManagedConnection#getMetaData()
284         */
285        public ManagedConnectionMetaData getMetaData() throws ResourceException {
286            return new ManagedConnectionMetaData() {
287    
288                public String getEISProductName() throws ResourceException {
289                    if (physicalConnection == null) {
290                        throw new ResourceException("Not connected.");
291                    }
292                    try {
293                        return physicalConnection.getMetaData().getJMSProviderName();
294                    } catch (JMSException e) {
295                        throw new ResourceException("Error accessing provider.", e);
296                    }
297                }
298    
299                public String getEISProductVersion() throws ResourceException {
300                    if (physicalConnection == null) {
301                        throw new ResourceException("Not connected.");
302                    }
303                    try {
304                        return physicalConnection.getMetaData().getProviderVersion();
305                    } catch (JMSException e) {
306                        throw new ResourceException("Error accessing provider.", e);
307                    }
308                }
309    
310                public int getMaxConnections() throws ResourceException {
311                    if (physicalConnection == null) {
312                        throw new ResourceException("Not connected.");
313                    }
314                    return Integer.MAX_VALUE;
315                }
316    
317                public String getUserName() throws ResourceException {
318                    if (physicalConnection == null) {
319                        throw new ResourceException("Not connected.");
320                    }
321                    try {
322                        return physicalConnection.getClientID();
323                    } catch (JMSException e) {
324                        throw new ResourceException("Error accessing provider.", e);
325                    }
326                }
327            };
328        }
329    
330        /**
331         * @see javax.resource.spi.ManagedConnection#setLogWriter(java.io.PrintWriter)
332         */
333        public void setLogWriter(PrintWriter logWriter) throws ResourceException {
334            this.logWriter = logWriter;
335        }
336    
337        /**
338         * @see javax.resource.spi.ManagedConnection#getLogWriter()
339         */
340        public PrintWriter getLogWriter() throws ResourceException {
341            return logWriter;
342        }
343    
344        /**
345         * @param subject subject to match
346         * @param info cri to match
347         * @return whether the subject and cri match sufficiently to allow using this connection under the new circumstances
348         */
349        public boolean matches(Subject subject, ConnectionRequestInfo info) {
350            // Check to see if it is our info class
351            if (info == null) {
352                return false;
353            }
354            if (info.getClass() != ActiveMQConnectionRequestInfo.class) {
355                return false;
356            }
357    
358            // Do the subjects match?
359            if (subject == null ^ this.subject == null) {
360                return false;
361            }
362            if (subject != null && !subject.equals(this.subject)) {
363                return false;
364            }
365    
366            // Does the info match?
367            return info.equals(this.info);
368        }
369    
370        /**
371         * When a proxy is closed this cleans up the proxy and notifys the
372         * ConnectionEventListeners that a connection closed.
373         * 
374         * @param proxy
375         */
376        public void proxyClosedEvent(ManagedConnectionProxy proxy) {
377            proxyConnections.remove(proxy);
378            proxy.cleanup();
379            fireCloseEvent(proxy);
380        }
381    
382        public void onException(JMSException e) {
383            LOG.warn("Connection failed: " + e);
384            LOG.debug("Cause: ", e);
385    
386            for (ManagedConnectionProxy proxy:proxyConnections) {
387                proxy.onException(e);
388            }
389            // Let the container know that the error occured.
390            fireErrorOccurredEvent(e);
391        }
392    
393        /**
394         * @return Returns the transactionContext.
395         */
396        public TransactionContext getTransactionContext() {
397            return transactionContext;
398        }
399    
400    }