001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.camel;
018    
019    import javax.jms.Destination;
020    import javax.jms.IllegalStateException;
021    import javax.jms.JMSException;
022    import javax.jms.Message;
023    
024    import org.apache.activemq.ActiveMQMessageProducerSupport;
025    import org.apache.activemq.ActiveMQSession;
026    import org.apache.activemq.util.JMSExceptionSupport;
027    import org.apache.camel.Endpoint;
028    import org.apache.camel.Exchange;
029    import org.apache.camel.ExchangePattern;
030    import org.apache.camel.Producer;
031    import org.apache.camel.component.jms.JmsMessage;
032    import org.apache.camel.util.ObjectHelper;
033    
034    /**
035     * A JMS {@link javax.jms.MessageProducer} which sends message exchanges to a
036     * Camel {@link Endpoint}
037     * 
038     * 
039     */
040    public class CamelMessageProducer extends ActiveMQMessageProducerSupport {
041        
042        protected Producer producer;
043    
044        private final CamelDestination destination;
045        private final Endpoint endpoint;
046        private boolean closed;
047    
048        public CamelMessageProducer(CamelDestination destination, Endpoint endpoint, ActiveMQSession session) throws JMSException {
049            super(session);
050            this.destination = destination;
051            this.endpoint = endpoint;
052            try {
053                this.producer = endpoint.createProducer();
054            } catch (JMSException e) {
055                throw e;
056            } catch (Exception e) {
057                throw JMSExceptionSupport.create(e);
058            }
059        }
060    
061        public CamelDestination getDestination() throws JMSException {
062            return destination;
063        }
064    
065        public Endpoint getEndpoint() {
066            return endpoint;
067        }
068    
069        public void close() throws JMSException {
070            if (!closed) {
071                closed = true;
072                try {
073                    producer.stop();
074                } catch (JMSException e) {
075                    throw e;
076                } catch (Exception e) {
077                    throw JMSExceptionSupport.create(e);
078                }
079            }
080        }
081    
082        public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
083            CamelDestination camelDestination = null;
084            if (ObjectHelper.equal(destination, this.destination)) {
085                camelDestination = this.destination;
086            } else {
087                // TODO support any CamelDestination?
088                throw new IllegalArgumentException("Invalid destination setting: " + destination + " when expected: " + this.destination);
089            }
090            try {
091                            Exchange exchange = endpoint.createExchange(ExchangePattern.InOnly);
092                            exchange.setIn(new JmsMessage(message, camelDestination.getBinding()));
093                producer.process(exchange);
094            } catch (JMSException e) {
095                throw e;
096            } catch (Exception e) {
097                throw JMSExceptionSupport.create(e);
098            }
099        }
100    
101        protected void checkClosed() throws IllegalStateException {
102            if (closed) {
103                throw new IllegalStateException("The producer is closed");
104            }
105        }
106    }