001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.camel.component;
018    
019    import java.net.URISyntaxException;
020    import java.util.Map;
021    import java.util.concurrent.CopyOnWriteArrayList;
022    
023    import org.apache.activemq.ActiveMQConnectionFactory;
024    import org.apache.activemq.Service;
025    import org.apache.camel.CamelContext;
026    import org.apache.camel.component.jms.JmsComponent;
027    import org.apache.camel.component.jms.JmsConfiguration;
028    import org.apache.camel.util.IntrospectionSupport;
029    import org.apache.camel.util.ObjectHelper;
030    import org.apache.camel.util.URISupport;
031    import org.springframework.jms.connection.SingleConnectionFactory;
032    
033    /**
034     * The <a href="http://activemq.apache.org/camel/activemq.html">ActiveMQ Component</a>
035     *
036     * 
037     */
038    public class ActiveMQComponent extends JmsComponent {
039        private final CopyOnWriteArrayList<SingleConnectionFactory> singleConnectionFactoryList =
040            new CopyOnWriteArrayList<SingleConnectionFactory>();
041        private final CopyOnWriteArrayList<Service> pooledConnectionFactoryServiceList =
042            new CopyOnWriteArrayList<Service>();
043        private boolean exposeAllQueues;
044        private CamelEndpointLoader endpointLoader;
045    
046        /**
047         * Creates an <a href="http://camel.apache.org/activemq.html">ActiveMQ Component</a>
048         *
049         * @return the created component
050         */
051        public static ActiveMQComponent activeMQComponent() {
052            return new ActiveMQComponent();
053        }
054    
055        /**
056         * Creates an <a href="http://camel.apache.org/activemq.html">ActiveMQ Component</a>
057         * connecting to the given <a href="http://activemq.apache.org/configuring-transports.html">broker URL</a>
058         *
059         * @param brokerURL the URL to connect to
060         * @return the created component
061         */
062        public static ActiveMQComponent activeMQComponent(String brokerURL) {
063            ActiveMQComponent answer = new ActiveMQComponent();
064            if (answer.getConfiguration() instanceof ActiveMQConfiguration) {
065                ((ActiveMQConfiguration) answer.getConfiguration())
066                        .setBrokerURL(brokerURL);
067            }
068    
069            // set the connection factory with the provided broker url
070            answer.setConnectionFactory(new ActiveMQConnectionFactory(brokerURL));
071            return answer;
072        }
073    
074        public ActiveMQComponent() {
075        }
076    
077        public ActiveMQComponent(CamelContext context) {
078            super(context);
079        }
080    
081        public ActiveMQComponent(ActiveMQConfiguration configuration) {
082            super(configuration);
083        }
084    
085    
086        public void setBrokerURL(String brokerURL) {
087            if (getConfiguration() instanceof ActiveMQConfiguration) {
088                ((ActiveMQConfiguration)getConfiguration()).setBrokerURL(brokerURL);
089            }
090        }
091    
092        public void setUserName(String userName) {
093            if (getConfiguration() instanceof ActiveMQConfiguration) {
094                ((ActiveMQConfiguration)getConfiguration()).setUserName(userName);
095            }
096        }
097    
098        public void setPassword(String password) {
099            if (getConfiguration() instanceof ActiveMQConfiguration) {
100                ((ActiveMQConfiguration)getConfiguration()).setPassword(password);
101            }
102        }
103    
104        public boolean isExposeAllQueues() {
105            return exposeAllQueues;
106        }
107    
108        /**
109         * If enabled this will cause all Queues in the ActiveMQ broker to be eagerly populated into the CamelContext
110         * so that they can be easily browsed by any Camel tooling. This option is disabled by default.
111         *
112         * @param exposeAllQueues
113         */
114        public void setExposeAllQueues(boolean exposeAllQueues) {
115            this.exposeAllQueues = exposeAllQueues;
116        }
117    
118        public void setUsePooledConnection(boolean usePooledConnection) {
119            if (getConfiguration() instanceof ActiveMQConfiguration) {
120                ((ActiveMQConfiguration)getConfiguration()).setUsePooledConnection(usePooledConnection);
121            }
122        }
123    
124        public void setUseSingleConnection(boolean useSingleConnection) {
125            if (getConfiguration() instanceof ActiveMQConfiguration) {
126                ((ActiveMQConfiguration)getConfiguration()).setUseSingleConnection(useSingleConnection);
127            }
128        }
129    
130        protected void addPooledConnectionFactoryService(Service pooledConnectionFactoryService) {
131            pooledConnectionFactoryServiceList.add(pooledConnectionFactoryService);
132        }
133    
134        protected void addSingleConnectionFactory(SingleConnectionFactory singleConnectionFactory) {
135            singleConnectionFactoryList.add(singleConnectionFactory);
136        }
137    
138        @Override
139        @SuppressWarnings("unchecked")
140        protected String convertPathToActualDestination(String path, Map<String, Object> parameters) {
141            // support ActiveMQ destination options using the destination. prefix
142            // http://activemq.apache.org/destination-options.html
143            Map options = IntrospectionSupport.extractProperties(parameters, "destination.");
144    
145            String query;
146            try {
147                query = URISupport.createQueryString(options);
148            } catch (URISyntaxException e) {
149                throw ObjectHelper.wrapRuntimeCamelException(e);
150            }
151    
152            // if we have destination options then append them to the destination name
153            if (ObjectHelper.isNotEmpty(query)) {
154                return path + "?" + query;
155            } else {
156                return path;
157            }
158        }
159    
160        @Override
161        protected void doStart() throws Exception {
162            super.doStart();
163            if (isExposeAllQueues()) {
164                endpointLoader = new CamelEndpointLoader(getCamelContext());
165                endpointLoader.afterPropertiesSet();
166            }
167        }
168    
169        @Override
170        protected void doStop() throws Exception {
171            if (endpointLoader != null) {
172                endpointLoader.destroy();
173                endpointLoader = null;
174            }
175            for (Service s : pooledConnectionFactoryServiceList) {
176                s.stop();
177            }
178            pooledConnectionFactoryServiceList.clear();
179            for (SingleConnectionFactory s : singleConnectionFactoryList) {
180                s.destroy();
181            }
182            singleConnectionFactoryList.clear();
183            super.doStop();
184        }
185    
186        @Override
187        public void setConfiguration(JmsConfiguration configuration) {
188            if (configuration instanceof ActiveMQConfiguration) {
189                ((ActiveMQConfiguration) configuration).setActiveMQComponent(this);
190            }
191            super.setConfiguration(configuration);
192        }
193    
194        @Override
195        protected JmsConfiguration createConfiguration() {
196            ActiveMQConfiguration answer = new ActiveMQConfiguration();
197            answer.setActiveMQComponent(this);
198            return answer;
199        }
200    }