001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.ra;
018    
019    import java.io.Serializable;
020    
021    import javax.jms.BytesMessage;
022    import javax.jms.Destination;
023    import javax.jms.JMSException;
024    import javax.jms.MapMessage;
025    import javax.jms.Message;
026    import javax.jms.MessageConsumer;
027    import javax.jms.MessageListener;
028    import javax.jms.MessageProducer;
029    import javax.jms.ObjectMessage;
030    import javax.jms.Queue;
031    import javax.jms.QueueBrowser;
032    import javax.jms.QueueReceiver;
033    import javax.jms.QueueSender;
034    import javax.jms.QueueSession;
035    import javax.jms.Session;
036    import javax.jms.StreamMessage;
037    import javax.jms.TemporaryQueue;
038    import javax.jms.TemporaryTopic;
039    import javax.jms.TextMessage;
040    import javax.jms.Topic;
041    import javax.jms.TopicPublisher;
042    import javax.jms.TopicSession;
043    import javax.jms.TopicSubscriber;
044    
045    /**
046     * A {@link Session} implementation which can be used with the ActiveMQ JCA
047     * Resource Adapter to publish messages using the same JMS session that is used
048     * to dispatch messages.
049     * 
050     * 
051     */
052    public class InboundSessionProxy implements Session, QueueSession, TopicSession {
053    
054        private InboundContext sessionAndProducer;
055    
056        public Session getSession() throws JMSException {
057            return getSessionAndProducer().getSession();
058        }
059    
060        public QueueSession getQueueSession() throws JMSException {
061            Session session = getSession();
062            if (session instanceof QueueSession) {
063                return (QueueSession)session;
064            } else {
065                throw new JMSException("The underlying JMS Session does not support QueueSession semantics: " + session);
066            }
067        }
068    
069        public TopicSession getTopicSession() throws JMSException {
070            Session session = getSession();
071            if (session instanceof TopicSession) {
072                return (TopicSession)session;
073            } else {
074                throw new JMSException("The underlying JMS Session does not support TopicSession semantics: " + session);
075            }
076        }
077    
078        public InboundContext getSessionAndProducer() throws JMSException {
079            if (sessionAndProducer == null) {
080                sessionAndProducer = InboundContextSupport.getActiveSessionAndProducer();
081                if (sessionAndProducer == null) {
082                    throw new JMSException("No currently active Session. This JMS provider cannot be used outside a MessageListener.onMessage() invocation");
083                }
084            }
085            return sessionAndProducer;
086        }
087    
088        public MessageProducer createProducer(Destination destination) throws JMSException {
089            return new InboundMessageProducerProxy(getSessionAndProducer().getMessageProducer(), destination);
090        }
091    
092        public void close() throws JMSException {
093            // we don't allow users to close this session
094            // as its used by the JCA container
095        }
096    
097        public void commit() throws JMSException {
098            // the JCA container will handle transactions
099        }
100    
101        public void rollback() throws JMSException {
102            // the JCA container will handle transactions
103        }
104    
105        public void recover() throws JMSException {
106            // the JCA container will handle recovery
107        }
108    
109        public void run() {
110            try {
111                getSession().run();
112            } catch (JMSException e) {
113                throw new RuntimeException("Failed to run() on session due to: " + e, e);
114            }
115        }
116    
117        // Straightforward delegation methods
118        // -------------------------------------------------------------------------
119    
120        public QueueBrowser createBrowser(Queue queue) throws JMSException {
121            return getSession().createBrowser(queue);
122        }
123    
124        public QueueBrowser createBrowser(Queue queue, String s) throws JMSException {
125            return getSession().createBrowser(queue, s);
126        }
127    
128        public BytesMessage createBytesMessage() throws JMSException {
129            return getSession().createBytesMessage();
130        }
131    
132        public MessageConsumer createConsumer(Destination destination) throws JMSException {
133            return getSession().createConsumer(destination);
134        }
135    
136        public MessageConsumer createConsumer(Destination destination, String s) throws JMSException {
137            return getSession().createConsumer(destination, s);
138        }
139    
140        public MessageConsumer createConsumer(Destination destination, String s, boolean b) throws JMSException {
141            return getSession().createConsumer(destination, s, b);
142        }
143    
144        public TopicSubscriber createDurableSubscriber(Topic topic, String s) throws JMSException {
145            return getSession().createDurableSubscriber(topic, s);
146        }
147    
148        public TopicSubscriber createDurableSubscriber(Topic topic, String s, String s1, boolean b) throws JMSException {
149            return getSession().createDurableSubscriber(topic, s, s1, b);
150        }
151    
152        public MapMessage createMapMessage() throws JMSException {
153            return getSession().createMapMessage();
154        }
155    
156        public Message createMessage() throws JMSException {
157            return getSession().createMessage();
158        }
159    
160        public ObjectMessage createObjectMessage() throws JMSException {
161            return getSession().createObjectMessage();
162        }
163    
164        public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
165            return getSession().createObjectMessage(serializable);
166        }
167    
168        public Queue createQueue(String s) throws JMSException {
169            return getSession().createQueue(s);
170        }
171    
172        public StreamMessage createStreamMessage() throws JMSException {
173            return getSession().createStreamMessage();
174        }
175    
176        public TemporaryQueue createTemporaryQueue() throws JMSException {
177            return getSession().createTemporaryQueue();
178        }
179    
180        public TemporaryTopic createTemporaryTopic() throws JMSException {
181            return getSession().createTemporaryTopic();
182        }
183    
184        public TextMessage createTextMessage() throws JMSException {
185            return getSession().createTextMessage();
186        }
187    
188        public TextMessage createTextMessage(String s) throws JMSException {
189            return getSession().createTextMessage(s);
190        }
191    
192        public Topic createTopic(String s) throws JMSException {
193            return getSession().createTopic(s);
194        }
195    
196        public int getAcknowledgeMode() throws JMSException {
197            return getSession().getAcknowledgeMode();
198        }
199    
200        public MessageListener getMessageListener() throws JMSException {
201            return getSession().getMessageListener();
202        }
203    
204        public boolean getTransacted() throws JMSException {
205            return getSession().getTransacted();
206        }
207    
208        public void setMessageListener(MessageListener messageListener) throws JMSException {
209            getSession().setMessageListener(messageListener);
210        }
211    
212        public void unsubscribe(String s) throws JMSException {
213            getSession().unsubscribe(s);
214        }
215    
216        public QueueReceiver createReceiver(Queue queue) throws JMSException {
217            return getQueueSession().createReceiver(queue);
218        }
219    
220        public QueueReceiver createReceiver(Queue queue, String s) throws JMSException {
221            return getQueueSession().createReceiver(queue, s);
222        }
223    
224        public QueueSender createSender(Queue queue) throws JMSException {
225            return new InboundMessageProducerProxy(getSessionAndProducer().getMessageProducer(), queue);
226        }
227    
228        public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
229            return getTopicSession().createSubscriber(topic);
230        }
231    
232        public TopicSubscriber createSubscriber(Topic topic, String s, boolean b) throws JMSException {
233            return getTopicSession().createSubscriber(topic, s, b);
234        }
235    
236        public TopicPublisher createPublisher(Topic topic) throws JMSException {
237            return getTopicSession().createPublisher(topic);
238        }
239    
240        public String toString() {
241            try {
242                return "InboundSessionProxy { " + getSession() + " }";
243            } catch (JMSException e) {
244                return "InboundSessionProxy { null }";
245            }
246        }
247    
248    }