001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.camel.component;
018    
019    import java.net.URISyntaxException;
020    import java.util.*;
021    import java.util.concurrent.CopyOnWriteArrayList;
022    
023    import org.apache.activemq.ActiveMQConnectionFactory;
024    import org.apache.activemq.EnhancedConnection;
025    import org.apache.activemq.Service;
026    import org.apache.activemq.advisory.DestinationSource;
027    import org.apache.activemq.command.ActiveMQDestination;
028    import org.apache.camel.CamelContext;
029    import org.apache.camel.ComponentConfiguration;
030    import org.apache.camel.component.jms.JmsComponent;
031    import org.apache.camel.component.jms.JmsConfiguration;
032    import org.apache.camel.spi.EndpointCompleter;
033    import org.apache.camel.util.IntrospectionSupport;
034    import org.apache.camel.util.ObjectHelper;
035    import org.apache.camel.util.URISupport;
036    import org.slf4j.Logger;
037    import org.slf4j.LoggerFactory;
038    import org.springframework.jms.connection.SingleConnectionFactory;
039    
040    import javax.jms.Connection;
041    
042    /**
043     * The <a href="http://activemq.apache.org/camel/activemq.html">ActiveMQ Component</a>
044     *
045     * 
046     */
047    public class ActiveMQComponent extends JmsComponent implements EndpointCompleter {
048        private final CopyOnWriteArrayList<SingleConnectionFactory> singleConnectionFactoryList =
049            new CopyOnWriteArrayList<SingleConnectionFactory>();
050        private final CopyOnWriteArrayList<Service> pooledConnectionFactoryServiceList =
051            new CopyOnWriteArrayList<Service>();
052        private static final transient Logger LOG = LoggerFactory.getLogger(ActiveMQComponent.class);
053        private boolean exposeAllQueues;
054        private CamelEndpointLoader endpointLoader;
055    
056        private EnhancedConnection connection;
057        DestinationSource source;
058        boolean sourceInitialized = false;
059    
060        /**
061         * Creates an <a href="http://camel.apache.org/activemq.html">ActiveMQ Component</a>
062         *
063         * @return the created component
064         */
065        public static ActiveMQComponent activeMQComponent() {
066            return new ActiveMQComponent();
067        }
068    
069        /**
070         * Creates an <a href="http://camel.apache.org/activemq.html">ActiveMQ Component</a>
071         * connecting to the given <a href="http://activemq.apache.org/configuring-transports.html">broker URL</a>
072         *
073         * @param brokerURL the URL to connect to
074         * @return the created component
075         */
076        public static ActiveMQComponent activeMQComponent(String brokerURL) {
077            ActiveMQComponent answer = new ActiveMQComponent();
078            if (answer.getConfiguration() instanceof ActiveMQConfiguration) {
079                ((ActiveMQConfiguration) answer.getConfiguration())
080                        .setBrokerURL(brokerURL);
081            }
082    
083            // set the connection factory with the provided broker url
084            answer.setConnectionFactory(new ActiveMQConnectionFactory(brokerURL));
085            return answer;
086        }
087    
088        public ActiveMQComponent() {
089        }
090    
091        public ActiveMQComponent(CamelContext context) {
092            super(context);
093        }
094    
095        public ActiveMQComponent(ActiveMQConfiguration configuration) {
096            super(configuration);
097        }
098    
099    
100        public void setBrokerURL(String brokerURL) {
101            if (getConfiguration() instanceof ActiveMQConfiguration) {
102                ((ActiveMQConfiguration)getConfiguration()).setBrokerURL(brokerURL);
103            }
104        }
105    
106        public void setUserName(String userName) {
107            if (getConfiguration() instanceof ActiveMQConfiguration) {
108                ((ActiveMQConfiguration)getConfiguration()).setUserName(userName);
109            }
110        }
111    
112        public void setPassword(String password) {
113            if (getConfiguration() instanceof ActiveMQConfiguration) {
114                ((ActiveMQConfiguration)getConfiguration()).setPassword(password);
115            }
116        }
117    
118        public boolean isExposeAllQueues() {
119            return exposeAllQueues;
120        }
121    
122        /**
123         * If enabled this will cause all Queues in the ActiveMQ broker to be eagerly populated into the CamelContext
124         * so that they can be easily browsed by any Camel tooling. This option is disabled by default.
125         *
126         * @param exposeAllQueues
127         */
128        public void setExposeAllQueues(boolean exposeAllQueues) {
129            this.exposeAllQueues = exposeAllQueues;
130        }
131    
132        public void setUsePooledConnection(boolean usePooledConnection) {
133            if (getConfiguration() instanceof ActiveMQConfiguration) {
134                ((ActiveMQConfiguration)getConfiguration()).setUsePooledConnection(usePooledConnection);
135            }
136        }
137    
138        public void setUseSingleConnection(boolean useSingleConnection) {
139            if (getConfiguration() instanceof ActiveMQConfiguration) {
140                ((ActiveMQConfiguration)getConfiguration()).setUseSingleConnection(useSingleConnection);
141            }
142        }
143    
144        protected void addPooledConnectionFactoryService(Service pooledConnectionFactoryService) {
145            pooledConnectionFactoryServiceList.add(pooledConnectionFactoryService);
146        }
147    
148        protected void addSingleConnectionFactory(SingleConnectionFactory singleConnectionFactory) {
149            singleConnectionFactoryList.add(singleConnectionFactory);
150        }
151    
152        @Override
153        @SuppressWarnings("unchecked")
154        protected String convertPathToActualDestination(String path, Map<String, Object> parameters) {
155            // support ActiveMQ destination options using the destination. prefix
156            // http://activemq.apache.org/destination-options.html
157            Map options = IntrospectionSupport.extractProperties(parameters, "destination.");
158    
159            String query;
160            try {
161                query = URISupport.createQueryString(options);
162            } catch (URISyntaxException e) {
163                throw ObjectHelper.wrapRuntimeCamelException(e);
164            }
165    
166            // if we have destination options then append them to the destination name
167            if (ObjectHelper.isNotEmpty(query)) {
168                return path + "?" + query;
169            } else {
170                return path;
171            }
172        }
173    
174        @Override
175        protected void doStart() throws Exception {
176            super.doStart();
177    
178            if (isExposeAllQueues()) {
179                createDestinationSource();
180                endpointLoader = new CamelEndpointLoader(getCamelContext(), source);
181                endpointLoader.afterPropertiesSet();
182            }
183        }
184    
185        protected void createDestinationSource() {
186            try {
187                if (source == null) {
188                    if (connection == null) {
189                        Connection value = getConfiguration().getConnectionFactory().createConnection();
190                        if (value instanceof EnhancedConnection) {
191                            connection = (EnhancedConnection) value;
192                        } else {
193                            throw new IllegalArgumentException("Created JMS Connection is not an EnhancedConnection: " + value);
194                        }
195                        connection.start();
196                    }
197                    source = connection.getDestinationSource();
198                }
199            } catch (Throwable t) {
200                LOG.info("Can't get destination source, endpoint completer will not work", t);
201            }
202        }
203    
204        @Override
205        protected void doStop() throws Exception {
206            if (source != null) {
207                source.stop();
208                source = null;
209            }
210            if (connection != null) {
211                connection.close();
212                connection = null;
213            }
214            for (Service s : pooledConnectionFactoryServiceList) {
215                s.stop();
216            }
217            pooledConnectionFactoryServiceList.clear();
218            for (SingleConnectionFactory s : singleConnectionFactoryList) {
219                s.destroy();
220            }
221            singleConnectionFactoryList.clear();
222            super.doStop();
223        }
224    
225        @Override
226        public void setConfiguration(JmsConfiguration configuration) {
227            if (configuration instanceof ActiveMQConfiguration) {
228                ((ActiveMQConfiguration) configuration).setActiveMQComponent(this);
229            }
230            super.setConfiguration(configuration);
231        }
232    
233        @Override
234        protected JmsConfiguration createConfiguration() {
235            ActiveMQConfiguration answer = new ActiveMQConfiguration();
236            answer.setActiveMQComponent(this);
237            return answer;
238        }
239    
240        @Override
241        public List<String> completeEndpointPath(ComponentConfiguration componentConfiguration, String completionText) {
242            // try to initialize destination source only the first time
243            if (!sourceInitialized) {
244                createDestinationSource();
245                sourceInitialized = true;
246            }
247            ArrayList<String> answer = new ArrayList<String>();
248            if (source != null) {
249                Set candidates = source.getQueues();
250                String destinationName = completionText;
251                if (completionText.startsWith("topic:")) {
252                    candidates = source.getTopics();
253                    destinationName = completionText.substring(6);
254                } else if (completionText.startsWith("queue:")) {
255                    destinationName = completionText.substring(6);
256                }
257    
258                Iterator it = candidates.iterator();
259    
260                while (it.hasNext()) {
261                    ActiveMQDestination destination = (ActiveMQDestination) it.next();
262                    if (destination.getPhysicalName().startsWith(destinationName)) {
263                        answer.add(destination.getPhysicalName());
264                    }
265                }
266            }
267            return answer;
268        }
269    }