001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.camel.component.broker;
018    
019    import java.util.List;
020    import java.util.concurrent.CopyOnWriteArrayList;
021    
022    import org.apache.activemq.broker.ProducerBrokerExchange;
023    import org.apache.activemq.broker.inteceptor.MessageInterceptor;
024    import org.apache.activemq.broker.inteceptor.MessageInterceptorRegistry;
025    import org.apache.activemq.command.ActiveMQDestination;
026    import org.apache.activemq.command.Message;
027    import org.apache.camel.Consumer;
028    import org.apache.camel.MultipleConsumersSupport;
029    import org.apache.camel.Processor;
030    import org.apache.camel.Producer;
031    import org.apache.camel.Service;
032    import org.apache.camel.api.management.ManagedResource;
033    import org.apache.camel.impl.DefaultEndpoint;
034    import org.apache.camel.spi.UriEndpoint;
035    import org.apache.camel.spi.UriParam;
036    import org.apache.camel.spi.UriPath;
037    import org.apache.camel.util.UnsafeUriCharactersEncoder;
038    
039    @ManagedResource(description = "Managed Camel Broker Endpoint")
040    @UriEndpoint(scheme = "broker", consumerClass = BrokerConsumer.class)
041    
042    public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumersSupport, Service {
043        static final String PRODUCER_BROKER_EXCHANGE = "producerBrokerExchange";
044    
045        @UriParam
046        private final BrokerConfiguration configuration;
047        private MessageInterceptorRegistry messageInterceptorRegistry;
048    
049    
050        @UriPath
051        private final ActiveMQDestination destination;
052        private List<MessageInterceptor> messageInterceptorList = new CopyOnWriteArrayList<MessageInterceptor>();
053    
054        public BrokerEndpoint(String uri, BrokerComponent component, ActiveMQDestination destination, BrokerConfiguration configuration) {
055            super(UnsafeUriCharactersEncoder.encode(uri), component);
056            this.destination = destination;
057            this.configuration = configuration;
058        }
059    
060        @Override
061        public Producer createProducer() throws Exception {
062            BrokerProducer producer = new BrokerProducer(this);
063            return producer;
064        }
065    
066        @Override
067        public Consumer createConsumer(Processor processor) throws Exception {
068            BrokerConsumer consumer = new BrokerConsumer(this, processor);
069            configureConsumer(consumer);
070            return consumer;
071        }
072    
073    
074        @Override
075        public boolean isSingleton() {
076            return false;
077        }
078    
079        @Override
080        public boolean isMultipleConsumersSupported() {
081            return true;
082        }
083    
084        public ActiveMQDestination getDestination() {
085            return destination;
086        }
087    
088    
089        @Override
090        protected void doStart() throws Exception {
091            super.doStart();
092            messageInterceptorRegistry =  MessageInterceptorRegistry.getInstance().get(configuration.getBrokerName());
093            for (MessageInterceptor messageInterceptor : messageInterceptorList) {
094                addMessageInterceptor(messageInterceptor);
095            }
096            messageInterceptorList.clear();
097        }
098    
099        @Override
100        protected void doStop() throws Exception {
101            super.doStop();
102        }
103    
104        protected void addMessageInterceptor(MessageInterceptor messageInterceptor) {
105            if (isStarted()) {
106                messageInterceptorRegistry.addMessageInterceptor(destination, messageInterceptor);
107            } else {
108                messageInterceptorList.add(messageInterceptor);
109            }
110        }
111    
112        protected void removeMessageInterceptor(MessageInterceptor messageInterceptor) {
113            messageInterceptorRegistry.removeMessageInterceptor(destination, messageInterceptor);
114    
115        }
116    
117        protected void inject(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception {
118            ProducerBrokerExchange pbe = producerBrokerExchange;
119            if (message != null) {
120                message.setDestination(destination);
121                if (producerBrokerExchange != null && producerBrokerExchange.getRegionDestination() != null){
122                    if (!producerBrokerExchange.getRegionDestination().getActiveMQDestination().equals(destination)){
123                         //The message broker will create a new ProducerBrokerExchange with the
124                         //correct region broker set
125                         pbe = null;
126                    }
127                }
128    
129                messageInterceptorRegistry.injectMessage(pbe, message);
130            }
131        }
132    }