001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network;
018
019 import java.net.URI;
020
021 import org.apache.activemq.transport.Transport;
022 import org.apache.activemq.transport.TransportFactory;
023 import org.apache.activemq.util.ServiceStopper;
024
025 /**
026 * A network connector which uses some kind of multicast-like transport that
027 * communicates with potentially many remote brokers over a single logical
028 * {@link Transport} instance such as when using multicast.
029 *
030 * This implementation does not depend on multicast at all; any other group
031 * based transport could be used.
032 *
033 * @org.apache.xbean.XBean
034 *
035 */
036 public class MulticastNetworkConnector extends NetworkConnector {
037
038 private Transport localTransport;
039 private Transport remoteTransport;
040 private URI remoteURI;
041 private DemandForwardingBridgeSupport bridge;
042
043 public MulticastNetworkConnector() {
044 }
045
046 public MulticastNetworkConnector(URI remoteURI) {
047 this.remoteURI = remoteURI;
048 }
049
050 // Properties
051 // -------------------------------------------------------------------------
052
053 public DemandForwardingBridgeSupport getBridge() {
054 return bridge;
055 }
056
057 public void setBridge(DemandForwardingBridgeSupport bridge) {
058 this.bridge = bridge;
059 }
060
061 public Transport getLocalTransport() {
062 return localTransport;
063 }
064
065 public void setLocalTransport(Transport localTransport) {
066 this.localTransport = localTransport;
067 }
068
069 public Transport getRemoteTransport() {
070 return remoteTransport;
071 }
072
073 /**
074 * Sets the remote transport implementation
075 */
076 public void setRemoteTransport(Transport remoteTransport) {
077 this.remoteTransport = remoteTransport;
078 }
079
080 public URI getRemoteURI() {
081 return remoteURI;
082 }
083
084 /**
085 * Sets the remote transport URI to some group transport like
086 * <code>multicast://address:port</code>
087 */
088 public void setRemoteURI(URI remoteURI) {
089 this.remoteURI = remoteURI;
090 }
091
092 // Implementation methods
093 // -------------------------------------------------------------------------
094
095 protected void handleStart() throws Exception {
096 if (remoteTransport == null) {
097 if (remoteURI == null) {
098 throw new IllegalArgumentException("You must specify the remoteURI property");
099 }
100 remoteTransport = TransportFactory.connect(remoteURI);
101 }
102
103 if (localTransport == null) {
104 localTransport = createLocalTransport();
105 }
106
107 bridge = createBridge(localTransport, remoteTransport);
108 configureBridge(bridge);
109 bridge.start();
110
111 // we need to start the transports after we've created the bridge
112 remoteTransport.start();
113 localTransport.start();
114
115 super.handleStart();
116 }
117
118 protected void handleStop(ServiceStopper stopper) throws Exception {
119 super.handleStop(stopper);
120 if (bridge != null) {
121 try {
122 bridge.stop();
123 } catch (Exception e) {
124 stopper.onException(this, e);
125 }
126 }
127 if (remoteTransport != null) {
128 try {
129 remoteTransport.stop();
130 } catch (Exception e) {
131 stopper.onException(this, e);
132 }
133 }
134 if (localTransport != null) {
135 try {
136 localTransport.stop();
137 } catch (Exception e) {
138 stopper.onException(this, e);
139 }
140 }
141 }
142
143 @Override
144 public String toString() {
145 return getClass().getName() + ":" + getName() + "[" + remoteTransport.toString() + "]";
146 }
147
148 protected DemandForwardingBridgeSupport createBridge(Transport local, Transport remote) {
149 CompositeDemandForwardingBridge bridge = new CompositeDemandForwardingBridge(this, local, remote);
150 bridge.setBrokerService(getBrokerService());
151 return bridge;
152 }
153 }