001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.net.URI;
020    import java.net.URISyntaxException;
021    import java.util.Collection;
022    import java.util.HashMap;
023    import java.util.HashSet;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Set;
027    import java.util.concurrent.ConcurrentHashMap;
028    import java.util.concurrent.CopyOnWriteArrayList;
029    
030    import javax.management.MalformedObjectNameException;
031    import javax.management.ObjectName;
032    
033    import org.apache.activemq.Service;
034    import org.apache.activemq.broker.BrokerService;
035    import org.apache.activemq.broker.jmx.AnnotatedMBean;
036    import org.apache.activemq.broker.jmx.NetworkBridgeView;
037    import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
038    import org.apache.activemq.command.ActiveMQDestination;
039    import org.apache.activemq.command.ConsumerId;
040    import org.apache.activemq.transport.Transport;
041    import org.apache.activemq.transport.TransportFactory;
042    import org.apache.activemq.util.JMXSupport;
043    import org.apache.activemq.util.ServiceStopper;
044    import org.apache.activemq.util.ServiceSupport;
045    import org.slf4j.Logger;
046    import org.slf4j.LoggerFactory;
047    
048    /**
049     * Connector class for bridging broker networks.
050     */
051    public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service {
052    
053        private static final Logger LOG = LoggerFactory.getLogger(NetworkConnector.class);
054        protected URI localURI;
055        protected ConnectionFilter connectionFilter;
056        protected ConcurrentHashMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>();
057    
058        protected ServiceSupport serviceSupport = new ServiceSupport() {
059    
060            protected void doStart() throws Exception {
061                handleStart();
062            }
063    
064            protected void doStop(ServiceStopper stopper) throws Exception {
065                handleStop(stopper);
066            }
067        };
068    
069        private Set<ActiveMQDestination> durableDestinations;
070        private List<ActiveMQDestination> excludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
071        private List<ActiveMQDestination> dynamicallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
072        private List<ActiveMQDestination> staticallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
073        private BrokerService brokerService;
074        private ObjectName objectName;
075    
076        public NetworkConnector() {
077        }
078    
079        public NetworkConnector(URI localURI) {
080            this.localURI = localURI;
081        }
082    
083        public URI getLocalUri() throws URISyntaxException {
084            return localURI;
085        }
086    
087        public void setLocalUri(URI localURI) {
088            this.localURI = localURI;
089        }
090    
091        /**
092         * @return Returns the durableDestinations.
093         */
094        public Set<ActiveMQDestination> getDurableDestinations() {
095            return durableDestinations;
096        }
097    
098        /**
099         * @param durableDestinations The durableDestinations to set.
100         */
101        public void setDurableDestinations(Set<ActiveMQDestination> durableDestinations) {
102            this.durableDestinations = durableDestinations;
103        }
104    
105        /**
106         * @return Returns the excludedDestinations.
107         */
108        public List<ActiveMQDestination> getExcludedDestinations() {
109            return excludedDestinations;
110        }
111    
112        /**
113         * @param excludedDestinations The excludedDestinations to set.
114         */
115        public void setExcludedDestinations(List<ActiveMQDestination> excludedDestinations) {
116            this.excludedDestinations = excludedDestinations;
117        }
118    
119        public void addExcludedDestination(ActiveMQDestination destiantion) {
120            this.excludedDestinations.add(destiantion);
121        }
122    
123        /**
124         * @return Returns the staticallyIncludedDestinations.
125         */
126        public List<ActiveMQDestination> getStaticallyIncludedDestinations() {
127            return staticallyIncludedDestinations;
128        }
129    
130        /**
131         * @param staticallyIncludedDestinations The staticallyIncludedDestinations
132         *                to set.
133         */
134        public void setStaticallyIncludedDestinations(List<ActiveMQDestination> staticallyIncludedDestinations) {
135            this.staticallyIncludedDestinations = staticallyIncludedDestinations;
136        }
137    
138        public void addStaticallyIncludedDestination(ActiveMQDestination destiantion) {
139            this.staticallyIncludedDestinations.add(destiantion);
140        }
141    
142        /**
143         * @return Returns the dynamicallyIncludedDestinations.
144         */
145        public List<ActiveMQDestination> getDynamicallyIncludedDestinations() {
146            return dynamicallyIncludedDestinations;
147        }
148    
149        /**
150         * @param dynamicallyIncludedDestinations The
151         *                dynamicallyIncludedDestinations to set.
152         */
153        public void setDynamicallyIncludedDestinations(List<ActiveMQDestination> dynamicallyIncludedDestinations) {
154            this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
155        }
156    
157        public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion) {
158            this.dynamicallyIncludedDestinations.add(destiantion);
159        }
160    
161        public ConnectionFilter getConnectionFilter() {
162            return connectionFilter;
163        }
164    
165        public void setConnectionFilter(ConnectionFilter connectionFilter) {
166            this.connectionFilter = connectionFilter;
167        }
168    
169        // Implementation methods
170        // -------------------------------------------------------------------------
171        protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result) {
172            List<ActiveMQDestination> destsList = getDynamicallyIncludedDestinations();
173            ActiveMQDestination dests[] = destsList.toArray(new ActiveMQDestination[destsList.size()]);
174            result.setDynamicallyIncludedDestinations(dests);
175            destsList = getExcludedDestinations();
176            dests = destsList.toArray(new ActiveMQDestination[destsList.size()]);
177            result.setExcludedDestinations(dests);
178            destsList = getStaticallyIncludedDestinations();
179            dests = destsList.toArray(new ActiveMQDestination[destsList.size()]);
180            result.setStaticallyIncludedDestinations(dests);
181            if (durableDestinations != null) {
182    
183                HashSet<ActiveMQDestination> topics = new HashSet<ActiveMQDestination>();
184                for (ActiveMQDestination d : durableDestinations) {
185                    if( d.isTopic() ) {
186                        topics.add(d);
187                    }
188                }
189    
190                ActiveMQDestination[] dest = new ActiveMQDestination[topics.size()];
191                dest = (ActiveMQDestination[])topics.toArray(dest);
192                result.setDurableDestinations(dest);
193            }
194            return result;
195        }
196    
197        protected Transport createLocalTransport() throws Exception {
198            return TransportFactory.connect(localURI);
199        }
200    
201        public void start() throws Exception {
202            serviceSupport.start();
203        }
204    
205        public void stop() throws Exception {
206            serviceSupport.stop();
207        }
208    
209        protected void handleStart() throws Exception {
210            if (localURI == null) {
211                throw new IllegalStateException("You must configure the 'localURI' property");
212            }
213            LOG.info("Network Connector " + this + " Started");
214        }
215    
216        protected void handleStop(ServiceStopper stopper) throws Exception {
217            LOG.info("Network Connector " + this + " Stopped");
218        }
219    
220        public boolean isStarted() {
221            return serviceSupport.isStarted();
222        }
223    
224        public boolean isStopped() {
225            return serviceSupport.isStopped();
226        }
227    
228        public boolean isStopping() {
229            return serviceSupport.isStopping();
230        }
231    
232        public ObjectName getObjectName() {
233            return objectName;
234        }
235    
236        public void setObjectName(ObjectName objectName) {
237            this.objectName = objectName;
238        }
239    
240        public BrokerService getBrokerService() {
241            return brokerService;
242        }
243    
244        public void setBrokerService(BrokerService brokerService) {
245            this.brokerService = brokerService;
246        }
247    
248        protected void registerNetworkBridgeMBean(NetworkBridge bridge) {
249            if (!getBrokerService().isUseJmx()) {
250                return;
251            }
252            NetworkBridgeViewMBean view = new NetworkBridgeView(bridge);
253            try {
254                ObjectName objectName = createNetworkBridgeObjectName(bridge);
255                AnnotatedMBean.registerMBean(getBrokerService().getManagementContext(), view, objectName);
256            } catch (Throwable e) {
257                LOG.debug("Network bridge could not be registered in JMX: " + e.getMessage(), e);
258            }
259        }
260    
261        protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) {
262            if (!getBrokerService().isUseJmx()) {
263                return;
264            }
265            try {
266                ObjectName objectName = createNetworkBridgeObjectName(bridge);
267                getBrokerService().getManagementContext().unregisterMBean(objectName);
268            } catch (Throwable e) {
269                LOG.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e);
270            }
271        }
272    
273        protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException {
274            ObjectName connectorName = getObjectName();
275            Map<String, String> map = new HashMap<String, String>(connectorName.getKeyPropertyList());
276            return new ObjectName(connectorName.getDomain() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart((String)map.get("BrokerName")) + "," + "Type=NetworkBridge,"
277                                  + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart((String)map.get("NetworkConnectorName")) + "," + "Name="
278                                  + JMXSupport.encodeObjectNamePart(JMXSupport.encodeObjectNamePart(bridge.getRemoteAddress())));
279        }
280    
281        // ask all the bridges as we can't know to which this consumer is tied
282        public boolean removeDemandSubscription(ConsumerId consumerId) {
283            boolean removeSucceeded = false;
284            for (NetworkBridge bridge : bridges.values()) {
285                if (bridge instanceof DemandForwardingBridgeSupport) {
286                    DemandForwardingBridgeSupport demandBridge = (DemandForwardingBridgeSupport) bridge;
287                    if (demandBridge.removeDemandSubscriptionByLocalId(consumerId)) {
288                        removeSucceeded = true;
289                        break;
290                    }
291                }
292            }
293            return removeSucceeded;
294        }
295    
296        public Collection<NetworkBridge> activeBridges() {
297            return bridges.values();
298        }
299    }