001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.camel;
018    
019    import javax.jms.JMSException;
020    import javax.jms.MessageConsumer;
021    import javax.jms.MessageProducer;
022    import javax.jms.QueueReceiver;
023    import javax.jms.QueueSender;
024    import javax.jms.TopicPublisher;
025    import javax.jms.TopicSubscriber;
026    
027    import org.apache.activemq.ActiveMQConnection;
028    import org.apache.activemq.ActiveMQSession;
029    import org.apache.activemq.CustomDestination;
030    import org.apache.camel.CamelContext;
031    import org.apache.camel.CamelContextAware;
032    import org.apache.camel.Endpoint;
033    import org.apache.camel.component.jms.JmsBinding;
034    import org.apache.camel.component.jms.JmsEndpoint;
035    
036    /**
037     * 
038     */
039    public class CamelDestination implements CustomDestination, CamelContextAware {
040        private String uri;
041        private Endpoint endpoint;
042        private CamelContext camelContext;
043        // add in dummy endpoint pending camel release with 
044        // https://issues.apache.org/activemq/browse/CAMEL-1982
045        private JmsBinding binding = new JmsBinding(new JmsEndpoint());
046    
047        public CamelDestination() {
048        }
049    
050        public CamelDestination(String uri) {
051            this.uri = uri;
052        }
053    
054        public String toString() {
055            return uri.toString();
056        }
057    
058        // CustomDestination interface
059        //-----------------------------------------------------------------------
060        public MessageConsumer createConsumer(ActiveMQSession session, String messageSelector) {
061            return createConsumer(session, messageSelector, false);
062        }
063    
064        public MessageConsumer createConsumer(ActiveMQSession session, String messageSelector, boolean noLocal) {
065            return new CamelMessageConsumer(this, resolveEndpoint(session), session, messageSelector, noLocal);
066        }
067    
068        public TopicSubscriber createSubscriber(ActiveMQSession session, String messageSelector, boolean noLocal) {
069            return createDurableSubscriber(session, null, messageSelector, noLocal);
070        }
071    
072        public TopicSubscriber createDurableSubscriber(ActiveMQSession session, String name, String messageSelector, boolean noLocal) {
073            throw new UnsupportedOperationException("This destination is not a Topic: " + this);
074        }
075    
076        public QueueReceiver createReceiver(ActiveMQSession session, String messageSelector) {
077            throw new UnsupportedOperationException("This destination is not a Queue: " + this);
078        }
079    
080        // Producers
081        //-----------------------------------------------------------------------
082        public MessageProducer createProducer(ActiveMQSession session) throws JMSException {
083            return new CamelMessageProducer(this, resolveEndpoint(session), session);
084        }
085    
086        public TopicPublisher createPublisher(ActiveMQSession session) throws JMSException {
087            throw new UnsupportedOperationException("This destination is not a Topic: " + this);
088        }
089    
090        public QueueSender createSender(ActiveMQSession session) throws JMSException {
091            throw new UnsupportedOperationException("This destination is not a Queue: " + this);
092        }
093    
094        // Properties
095        //-----------------------------------------------------------------------
096    
097        public String getUri() {
098            return uri;
099        }
100    
101        public void setUri(String uri) {
102            this.uri = uri;
103        }
104    
105        public Endpoint getEndpoint() {
106            return endpoint;
107        }
108    
109        public void setEndpoint(Endpoint endpoint) {
110            this.endpoint = endpoint;
111        }
112    
113        public CamelContext getCamelContext() {
114            return camelContext;
115        }
116    
117        public void setCamelContext(CamelContext camelContext) {
118            this.camelContext = camelContext;
119        }
120    
121        public JmsBinding getBinding() {
122            return binding;
123        }
124    
125        public void setBinding(JmsBinding binding) {
126            this.binding = binding;
127        }
128    
129        // Implementation methods
130        //-----------------------------------------------------------------------
131    
132        /**
133         * Resolves the Camel Endpoint for this destination
134         *
135         * @return
136         */
137        protected Endpoint resolveEndpoint(ActiveMQSession session) {
138            Endpoint answer = getEndpoint();
139            if (answer == null) {
140                answer = resolveCamelContext(session).getEndpoint(getUri());
141                if (answer == null) {
142                    throw new IllegalArgumentException("No endpoint could be found for URI: " + getUri());
143                }
144            }
145            return answer;
146        }
147    
148        protected CamelContext resolveCamelContext(ActiveMQSession session) {
149            CamelContext answer = getCamelContext();
150            if (answer == null) {
151                ActiveMQConnection connection = session.getConnection();
152                if (connection instanceof CamelConnection) {
153                    CamelConnection camelConnection = (CamelConnection) connection;
154                    answer = camelConnection.getCamelContext();
155                }
156            }
157            if (answer == null) {
158                throw new IllegalArgumentException("No CamelContext has been configured");
159            }
160            return answer;
161        }
162    }