001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.net.URI;
020    import java.net.URISyntaxException;
021    import java.util.Collection;
022    import java.util.HashSet;
023    import java.util.List;
024    import java.util.Set;
025    import java.util.concurrent.ConcurrentHashMap;
026    
027    import javax.management.MalformedObjectNameException;
028    import javax.management.ObjectName;
029    
030    import org.apache.activemq.Service;
031    import org.apache.activemq.broker.BrokerService;
032    import org.apache.activemq.broker.jmx.AnnotatedMBean;
033    import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
034    import org.apache.activemq.broker.jmx.NetworkBridgeView;
035    import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
036    import org.apache.activemq.command.ActiveMQDestination;
037    import org.apache.activemq.command.ConsumerId;
038    import org.apache.activemq.transport.Transport;
039    import org.apache.activemq.transport.TransportFactory;
040    import org.apache.activemq.util.ServiceStopper;
041    import org.apache.activemq.util.ServiceSupport;
042    import org.slf4j.Logger;
043    import org.slf4j.LoggerFactory;
044    
045    /**
046     * Connector class for bridging broker networks.
047     */
048    public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service {
049    
050        private static final Logger LOG = LoggerFactory.getLogger(NetworkConnector.class);
051        protected URI localURI;
052        protected ConnectionFilter connectionFilter;
053        protected ConcurrentHashMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>();
054    
055        protected ServiceSupport serviceSupport = new ServiceSupport() {
056    
057            @Override
058            protected void doStart() throws Exception {
059                handleStart();
060            }
061    
062            @Override
063            protected void doStop(ServiceStopper stopper) throws Exception {
064                handleStop(stopper);
065            }
066        };
067    
068        private Set<ActiveMQDestination> durableDestinations;
069    
070        private BrokerService brokerService;
071        private ObjectName objectName;
072    
073        public NetworkConnector() {
074        }
075    
076        public NetworkConnector(URI localURI) {
077            this.localURI = localURI;
078        }
079    
080        public URI getLocalUri() throws URISyntaxException {
081            return localURI;
082        }
083    
084        public void setLocalUri(URI localURI) {
085            this.localURI = localURI;
086        }
087    
088        /**
089         * @return Returns the durableDestinations.
090         */
091        public Set<ActiveMQDestination> getDurableDestinations() {
092            return durableDestinations;
093        }
094    
095        /**
096         * @param durableDestinations The durableDestinations to set.
097         */
098        public void setDurableDestinations(Set<ActiveMQDestination> durableDestinations) {
099            this.durableDestinations = durableDestinations;
100        }
101    
102    
103        public void addExcludedDestination(ActiveMQDestination destiantion) {
104            this.excludedDestinations.add(destiantion);
105        }
106    
107    
108        public void addStaticallyIncludedDestination(ActiveMQDestination destiantion) {
109            this.staticallyIncludedDestinations.add(destiantion);
110        }
111    
112    
113        public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion) {
114            this.dynamicallyIncludedDestinations.add(destiantion);
115        }
116    
117        public ConnectionFilter getConnectionFilter() {
118            return connectionFilter;
119        }
120    
121        public void setConnectionFilter(ConnectionFilter connectionFilter) {
122            this.connectionFilter = connectionFilter;
123        }
124    
125        // Implementation methods
126        // -------------------------------------------------------------------------
127        protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result) {
128            List<ActiveMQDestination> destsList = getDynamicallyIncludedDestinations();
129            ActiveMQDestination dests[] = destsList.toArray(new ActiveMQDestination[destsList.size()]);
130            result.setDynamicallyIncludedDestinations(dests);
131            destsList = getExcludedDestinations();
132            dests = destsList.toArray(new ActiveMQDestination[destsList.size()]);
133            result.setExcludedDestinations(dests);
134            destsList = getStaticallyIncludedDestinations();
135            dests = destsList.toArray(new ActiveMQDestination[destsList.size()]);
136            result.setStaticallyIncludedDestinations(dests);
137            if (durableDestinations != null) {
138    
139                HashSet<ActiveMQDestination> topics = new HashSet<ActiveMQDestination>();
140                for (ActiveMQDestination d : durableDestinations) {
141                    if( d.isTopic() ) {
142                        topics.add(d);
143                    }
144                }
145    
146                ActiveMQDestination[] dest = new ActiveMQDestination[topics.size()];
147                dest = topics.toArray(dest);
148                result.setDurableDestinations(dest);
149            }
150            return result;
151        }
152    
153        protected Transport createLocalTransport() throws Exception {
154            return TransportFactory.connect(localURI);
155        }
156    
157        @Override
158        public void start() throws Exception {
159            serviceSupport.start();
160        }
161    
162        @Override
163        public void stop() throws Exception {
164            serviceSupport.stop();
165        }
166    
167        protected void handleStart() throws Exception {
168            if (localURI == null) {
169                throw new IllegalStateException("You must configure the 'localURI' property");
170            }
171            LOG.info("Network Connector " + this + " Started");
172        }
173    
174        protected void handleStop(ServiceStopper stopper) throws Exception {
175            LOG.info("Network Connector " + this + " Stopped");
176        }
177    
178        public boolean isStarted() {
179            return serviceSupport.isStarted();
180        }
181    
182        public boolean isStopped() {
183            return serviceSupport.isStopped();
184        }
185    
186        public boolean isStopping() {
187            return serviceSupport.isStopping();
188        }
189    
190        public ObjectName getObjectName() {
191            return objectName;
192        }
193    
194        public void setObjectName(ObjectName objectName) {
195            this.objectName = objectName;
196        }
197    
198        public BrokerService getBrokerService() {
199            return brokerService;
200        }
201    
202        public void setBrokerService(BrokerService brokerService) {
203            this.brokerService = brokerService;
204        }
205    
206        protected void registerNetworkBridgeMBean(NetworkBridge bridge) {
207            if (!getBrokerService().isUseJmx()) {
208                return;
209            }
210            NetworkBridgeViewMBean view = new NetworkBridgeView(bridge);
211            try {
212                ObjectName objectName = createNetworkBridgeObjectName(bridge);
213                AnnotatedMBean.registerMBean(getBrokerService().getManagementContext(), view, objectName);
214            } catch (Throwable e) {
215                LOG.debug("Network bridge could not be registered in JMX: " + e.getMessage(), e);
216            }
217        }
218    
219        protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) {
220            if (!getBrokerService().isUseJmx()) {
221                return;
222            }
223            try {
224                ObjectName objectName = createNetworkBridgeObjectName(bridge);
225                getBrokerService().getManagementContext().unregisterMBean(objectName);
226            } catch (Throwable e) {
227                LOG.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e);
228            }
229        }
230    
231        protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException {
232            return BrokerMBeanSupport.createNetworkBridgeObjectName(getObjectName(), bridge.getRemoteAddress());
233        }
234    
235        // ask all the bridges as we can't know to which this consumer is tied
236        public boolean removeDemandSubscription(ConsumerId consumerId) {
237            boolean removeSucceeded = false;
238            for (NetworkBridge bridge : bridges.values()) {
239                if (bridge instanceof DemandForwardingBridgeSupport) {
240                    DemandForwardingBridgeSupport demandBridge = (DemandForwardingBridgeSupport) bridge;
241                    if (demandBridge.removeDemandSubscriptionByLocalId(consumerId)) {
242                        removeSucceeded = true;
243                        break;
244                    }
245                }
246            }
247            return removeSucceeded;
248        }
249    
250        public Collection<NetworkBridge> activeBridges() {
251            return bridges.values();
252        }
253    }