001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.net.URI;
020    
021    import org.apache.activemq.transport.Transport;
022    import org.apache.activemq.transport.TransportFactory;
023    import org.apache.activemq.util.ServiceStopper;
024    
025    /**
026     * A network connector which uses some kind of multicast-like transport that
027     * communicates with potentially many remote brokers over a single logical
028     * {@link Transport} instance such as when using multicast.
029     *
030     * This implementation does not depend on multicast at all; any other group
031     * based transport could be used.
032     *
033     * @org.apache.xbean.XBean
034     *
035     */
036    public class MulticastNetworkConnector extends NetworkConnector {
037    
038        private Transport localTransport;
039        private Transport remoteTransport;
040        private URI remoteURI;
041        private DemandForwardingBridgeSupport bridge;
042    
043        public MulticastNetworkConnector() {
044        }
045    
046        public MulticastNetworkConnector(URI remoteURI) {
047            this.remoteURI = remoteURI;
048        }
049    
050        // Properties
051        // -------------------------------------------------------------------------
052    
053        public DemandForwardingBridgeSupport getBridge() {
054            return bridge;
055        }
056    
057        public void setBridge(DemandForwardingBridgeSupport bridge) {
058            this.bridge = bridge;
059        }
060    
061        public Transport getLocalTransport() {
062            return localTransport;
063        }
064    
065        public void setLocalTransport(Transport localTransport) {
066            this.localTransport = localTransport;
067        }
068    
069        public Transport getRemoteTransport() {
070            return remoteTransport;
071        }
072    
073        /**
074         * Sets the remote transport implementation
075         */
076        public void setRemoteTransport(Transport remoteTransport) {
077            this.remoteTransport = remoteTransport;
078        }
079    
080        public URI getRemoteURI() {
081            return remoteURI;
082        }
083    
084        /**
085         * Sets the remote transport URI to some group transport like
086         * <code>multicast://address:port</code>
087         */
088        public void setRemoteURI(URI remoteURI) {
089            this.remoteURI = remoteURI;
090        }
091    
092        // Implementation methods
093        // -------------------------------------------------------------------------
094    
095        protected void handleStart() throws Exception {
096            if (remoteTransport == null) {
097                if (remoteURI == null) {
098                    throw new IllegalArgumentException("You must specify the remoteURI property");
099                }
100                remoteTransport = TransportFactory.connect(remoteURI);
101            }
102    
103            if (localTransport == null) {
104                localTransport = createLocalTransport();
105            }
106    
107            bridge = createBridge(localTransport, remoteTransport);
108            configureBridge(bridge);
109            bridge.start();
110    
111            // we need to start the transports after we've created the bridge
112            remoteTransport.start();
113            localTransport.start();
114    
115            super.handleStart();
116        }
117    
118        protected void handleStop(ServiceStopper stopper) throws Exception {
119            super.handleStop(stopper);
120            if (bridge != null) {
121                try {
122                    bridge.stop();
123                } catch (Exception e) {
124                    stopper.onException(this, e);
125                }
126            }
127            if (remoteTransport != null) {
128                try {
129                    remoteTransport.stop();
130                } catch (Exception e) {
131                    stopper.onException(this, e);
132                }
133            }
134            if (localTransport != null) {
135                try {
136                    localTransport.stop();
137                } catch (Exception e) {
138                    stopper.onException(this, e);
139                }
140            }
141        }
142    
143        @Override
144        public String toString() {
145            return getClass().getName() + ":" + getName() + "["  + remoteTransport.toString() + "]";
146        }
147    
148        protected DemandForwardingBridgeSupport createBridge(Transport local, Transport remote) {
149            CompositeDemandForwardingBridge bridge = new CompositeDemandForwardingBridge(this, local, remote);
150            bridge.setBrokerService(getBrokerService());
151            return bridge;
152        }
153    }