001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.ra;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    
022    import javax.jms.Connection;
023    import javax.jms.ConnectionConsumer;
024    import javax.jms.ConnectionMetaData;
025    import javax.jms.Destination;
026    import javax.jms.ExceptionListener;
027    import javax.jms.IllegalStateException;
028    import javax.jms.JMSException;
029    import javax.jms.Queue;
030    import javax.jms.QueueConnection;
031    import javax.jms.QueueSession;
032    import javax.jms.ServerSessionPool;
033    import javax.jms.Session;
034    import javax.jms.Topic;
035    import javax.jms.TopicConnection;
036    import javax.jms.TopicSession;
037    import org.apache.activemq.ActiveMQQueueSession;
038    import org.apache.activemq.ActiveMQSession;
039    import org.apache.activemq.ActiveMQTopicSession;
040    
041    /**
042     * Acts as a pass through proxy for a JMS Connection object. It intercepts
043     * events that are of interest of the ActiveMQManagedConnection.
044     *
045     * 
046     */
047    public class ManagedConnectionProxy implements Connection, QueueConnection, TopicConnection, ExceptionListener {
048    
049        private ActiveMQManagedConnection managedConnection;
050        private final List<ManagedSessionProxy> sessions = new ArrayList<ManagedSessionProxy>();
051        private ExceptionListener exceptionListener;
052    
053        public ManagedConnectionProxy(ActiveMQManagedConnection managedConnection) {
054            this.managedConnection = managedConnection;
055        }
056    
057        /**
058         * Used to let the ActiveMQManagedConnection that this connection handel is
059         * not needed by the app.
060         *
061         * @throws JMSException
062         */
063        public void close() throws JMSException {
064            if (managedConnection != null) {
065                managedConnection.proxyClosedEvent(this);
066            }
067        }
068    
069        /**
070         * Called by the ActiveMQManagedConnection to invalidate this proxy.
071         */
072        public void cleanup() {
073            exceptionListener = null;
074            managedConnection = null;
075            synchronized (sessions) {
076                for (ManagedSessionProxy p : sessions) {
077                    try {
078                        //TODO is this dangerous?  should we copy the list before iterating?
079                        p.cleanup();
080                    } catch (JMSException ignore) {
081                    }
082                }
083                sessions.clear();
084            }
085        }
086    
087        /**
088         * @return "physical" underlying activemq connection, if proxy is associated with a managed connection
089         * @throws javax.jms.JMSException if managed connection is null
090         */
091        private Connection getConnection() throws JMSException {
092            if (managedConnection == null) {
093                throw new IllegalStateException("The Connection is closed");
094            }
095            return managedConnection.getPhysicalConnection();
096        }
097    
098        /**
099         * @param transacted      Whether session is transacted
100         * @param acknowledgeMode session acknowledge mode
101         * @return session proxy
102         * @throws JMSException on error
103         */
104        public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
105            return createSessionProxy(transacted, acknowledgeMode);
106        }
107    
108        /**
109         * @param transacted      Whether session is transacted
110         * @param acknowledgeMode session acknowledge mode
111         * @return session proxy
112         * @throws JMSException on error
113         */
114        private ManagedSessionProxy createSessionProxy(boolean transacted, int acknowledgeMode) throws JMSException {
115            if (!transacted && acknowledgeMode == Session.SESSION_TRANSACTED) {
116                acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
117            }
118            ActiveMQSession session = (ActiveMQSession) getConnection().createSession(transacted, acknowledgeMode);
119            ManagedTransactionContext txContext = new ManagedTransactionContext(managedConnection.getTransactionContext());
120            session.setTransactionContext(txContext);
121            ManagedSessionProxy p = new ManagedSessionProxy(session, this);
122            p.setUseSharedTxContext(managedConnection.isInManagedTx());
123            synchronized (sessions) {
124                sessions.add(p);
125            }
126            return p;
127        }
128    
129        protected void sessionClosed(ManagedSessionProxy session) {
130            synchronized (sessions) {
131                sessions.remove(session);
132            }
133        }
134    
135        public void setUseSharedTxContext(boolean enable) throws JMSException {
136            synchronized (sessions) {
137                for (ManagedSessionProxy p : sessions) {
138                    p.setUseSharedTxContext(enable);
139                }
140            }
141        }
142    
143        /**
144         * @param transacted      Whether session is transacted
145         * @param acknowledgeMode session acknowledge mode
146         * @return session proxy
147         * @throws JMSException on error
148         */
149        public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
150            return new ActiveMQQueueSession(createSessionProxy(transacted, acknowledgeMode));
151        }
152    
153        /**
154         * @param transacted      Whether session is transacted
155         * @param acknowledgeMode session acknowledge mode
156         * @return session proxy
157         * @throws JMSException on error
158         */
159        public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
160            return new ActiveMQTopicSession(createSessionProxy(transacted, acknowledgeMode));
161        }
162    
163        /**
164         * @return client id from delegate
165         * @throws JMSException
166         */
167        public String getClientID() throws JMSException {
168            return getConnection().getClientID();
169        }
170    
171        /**
172         * @return exception listener from delegate
173         * @throws JMSException
174         */
175        public ExceptionListener getExceptionListener() throws JMSException {
176            return getConnection().getExceptionListener();
177        }
178    
179        /**
180         * @return connection metadata from delegate
181         * @throws JMSException
182         */
183        public ConnectionMetaData getMetaData() throws JMSException {
184            return getConnection().getMetaData();
185        }
186    
187        /**
188         * Sets client id on delegate
189         * @param clientID new clientId
190         * @throws JMSException
191         */
192        public void setClientID(String clientID) throws JMSException {
193            getConnection().setClientID(clientID);
194        }
195    
196        /**
197         * sets exception listener on delegate
198         * @param listener new listener
199         * @throws JMSException
200         */
201        public void setExceptionListener(ExceptionListener listener) throws JMSException {
202            getConnection();
203            exceptionListener = listener;
204        }
205    
206        /**
207         * @throws JMSException
208         */
209        public void start() throws JMSException {
210            getConnection().start();
211        }
212    
213        /**
214         * @throws JMSException
215         */
216        public void stop() throws JMSException {
217            getConnection().stop();
218        }
219    
220        /**
221         * @param queue
222         * @param messageSelector
223         * @param sessionPool
224         * @param maxMessages
225         * @return
226         * @throws JMSException
227         */
228        public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
229            throw new JMSException("Not Supported.");
230        }
231    
232        /**
233         * @param topic
234         * @param messageSelector
235         * @param sessionPool
236         * @param maxMessages
237         * @return
238         * @throws JMSException
239         */
240        public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
241            throw new JMSException("Not Supported.");
242        }
243    
244        /**
245         * @param destination
246         * @param messageSelector
247         * @param sessionPool
248         * @param maxMessages
249         * @return
250         * @throws JMSException
251         */
252        public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
253            throw new JMSException("Not Supported.");
254        }
255    
256        /**
257         * @param topic
258         * @param subscriptionName
259         * @param messageSelector
260         * @param sessionPool
261         * @param maxMessages
262         * @return
263         * @throws JMSException
264         */
265        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
266            throw new JMSException("Not Supported.");
267        }
268    
269        /**
270         * @return Returns the managedConnection.
271         */
272        public ActiveMQManagedConnection getManagedConnection() {
273            return managedConnection;
274        }
275    
276        public void onException(JMSException e) {
277            if (exceptionListener != null && managedConnection != null) {
278                try {
279                    exceptionListener.onException(e);
280                } catch (Throwable ignore) {
281                    // We can never trust user code so ignore any exceptions.
282                }
283            }
284        }
285    
286    }