001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.camel;
018    
019    import javax.jms.IllegalStateException;
020    import javax.jms.JMSException;
021    import javax.jms.Message;
022    import javax.jms.MessageConsumer;
023    import javax.jms.MessageListener;
024    
025    import org.apache.activemq.ActiveMQSession;
026    import org.apache.activemq.util.JMSExceptionSupport;
027    import org.apache.camel.Consumer;
028    import org.apache.camel.Endpoint;
029    import org.apache.camel.Exchange;
030    import org.apache.camel.PollingConsumer;
031    import org.apache.camel.Processor;
032    
033    /**
034     * A JMS {@link javax.jms.MessageConsumer} which consumes message exchanges from
035     * a Camel {@link Endpoint}
036     * 
037     * 
038     */
039    public class CamelMessageConsumer implements MessageConsumer {
040        private final CamelDestination destination;
041        private final Endpoint endpoint;
042        private final ActiveMQSession session;
043        private final String messageSelector;
044        private final boolean noLocal;
045        private MessageListener messageListener;
046        private Consumer consumer;
047        private PollingConsumer pollingConsumer;
048        private boolean closed;
049    
050        public CamelMessageConsumer(CamelDestination destination, Endpoint endpoint, ActiveMQSession session, String messageSelector, boolean noLocal) {
051            this.destination = destination;
052            this.endpoint = endpoint;
053            this.session = session;
054            this.messageSelector = messageSelector;
055            this.noLocal = noLocal;
056        }
057    
058        public void close() throws JMSException {
059            if (!closed) {
060                closed = true;
061                try {
062                    if (consumer != null) {
063                        consumer.stop();
064                    }
065                    if (pollingConsumer != null) {
066                        pollingConsumer.stop();
067                    }
068                } catch (JMSException e) {
069                    throw e;
070                } catch (Exception e) {
071                    throw JMSExceptionSupport.create(e);
072                }
073            }
074        }
075    
076        public MessageListener getMessageListener() throws JMSException {
077            return messageListener;
078        }
079    
080        public void setMessageListener(MessageListener messageListener) throws JMSException {
081            this.messageListener = messageListener;
082            if (messageListener != null && consumer == null) {
083                consumer = createConsumer();
084            }
085        }
086    
087        public Message receive() throws JMSException {
088            Exchange exchange = getPollingConsumer().receive();
089            return createMessage(exchange);
090        }
091    
092        public Message receive(long timeoutMillis) throws JMSException {
093            Exchange exchange = getPollingConsumer().receive(timeoutMillis);
094            return createMessage(exchange);
095        }
096    
097        public Message receiveNoWait() throws JMSException {
098            Exchange exchange = getPollingConsumer().receiveNoWait();
099            return createMessage(exchange);
100        }
101    
102        // Properties
103        // -----------------------------------------------------------------------
104    
105        public CamelDestination getDestination() {
106            return destination;
107        }
108    
109        public Endpoint getEndpoint() {
110            return endpoint;
111        }
112    
113        public String getMessageSelector() {
114            return messageSelector;
115        }
116    
117        public boolean isNoLocal() {
118            return noLocal;
119        }
120    
121        public ActiveMQSession getSession() {
122            return session;
123        }
124    
125        // Implementation methods
126        // -----------------------------------------------------------------------
127    
128        protected PollingConsumer getPollingConsumer() throws JMSException {
129            try {
130                if (pollingConsumer == null) {
131                    pollingConsumer = endpoint.createPollingConsumer();
132                    pollingConsumer.start();
133                }
134                return pollingConsumer;
135            } catch (JMSException e) {
136                throw e;
137            } catch (Exception e) {
138                throw JMSExceptionSupport.create(e);
139            }
140        }
141    
142        protected Message createMessage(Exchange exchange) throws JMSException {
143            if (exchange != null) {
144                Message message = destination.getBinding().makeJmsMessage(exchange, session);
145                return message;
146            } else {
147                return null;
148            }
149        }
150    
151        protected Consumer createConsumer() throws JMSException {
152            try {
153                Consumer answer = endpoint.createConsumer(new Processor() {
154                    public void process(Exchange exchange) throws Exception {
155                        Message message = createMessage(exchange);
156                        getMessageListener().onMessage(message);
157                    }
158                });
159                answer.start();
160                return answer;
161            } catch (JMSException e) {
162                throw e;
163            } catch (Exception e) {
164                throw JMSExceptionSupport.create(e);
165            }
166        }
167    
168        protected void checkClosed() throws javax.jms.IllegalStateException {
169            if (closed) {
170                throw new IllegalStateException("The producer is closed");
171            }
172        }
173    }