001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019 import java.io.IOException;
020 import java.net.URI;
021 import java.net.URISyntaxException;
022 import java.util.LinkedList;
023 import java.util.StringTokenizer;
024 import java.util.concurrent.CopyOnWriteArrayList;
025 import java.util.regex.Pattern;
026
027 import javax.management.ObjectName;
028
029 import org.apache.activemq.broker.jmx.ManagedTransportConnector;
030 import org.apache.activemq.broker.jmx.ManagementContext;
031 import org.apache.activemq.broker.region.ConnectorStatistics;
032 import org.apache.activemq.command.BrokerInfo;
033 import org.apache.activemq.command.ConnectionControl;
034 import org.apache.activemq.security.MessageAuthorizationPolicy;
035 import org.apache.activemq.thread.TaskRunnerFactory;
036 import org.apache.activemq.transport.Transport;
037 import org.apache.activemq.transport.TransportAcceptListener;
038 import org.apache.activemq.transport.TransportFactory;
039 import org.apache.activemq.transport.TransportServer;
040 import org.apache.activemq.transport.discovery.DiscoveryAgent;
041 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
042 import org.apache.activemq.util.ServiceStopper;
043 import org.apache.activemq.util.ServiceSupport;
044 import org.slf4j.Logger;
045 import org.slf4j.LoggerFactory;
046
047 /**
048 * @org.apache.xbean.XBean
049 */
050 public class TransportConnector implements Connector, BrokerServiceAware {
051
052 final Logger LOG = LoggerFactory.getLogger(TransportConnector.class);
053
054 protected final CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
055 protected TransportStatusDetector statusDector;
056 private BrokerService brokerService;
057 private TransportServer server;
058 private URI uri;
059 private BrokerInfo brokerInfo = new BrokerInfo();
060 private TaskRunnerFactory taskRunnerFactory;
061 private MessageAuthorizationPolicy messageAuthorizationPolicy;
062 private DiscoveryAgent discoveryAgent;
063 private final ConnectorStatistics statistics = new ConnectorStatistics();
064 private URI discoveryUri;
065 private String name;
066 private boolean disableAsyncDispatch;
067 private boolean enableStatusMonitor = false;
068 private Broker broker;
069 private boolean updateClusterClients = false;
070 private boolean rebalanceClusterClients;
071 private boolean updateClusterClientsOnRemove = false;
072 private String updateClusterFilter;
073 private boolean auditNetworkProducers = false;
074 private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE;
075 private int maximumConsumersAllowedPerConnection = Integer.MAX_VALUE;
076
077 LinkedList<String> peerBrokers = new LinkedList<String>();
078
079 public TransportConnector() {
080 }
081
082 public TransportConnector(TransportServer server) {
083 this();
084 setServer(server);
085 if (server != null && server.getConnectURI() != null) {
086 URI uri = server.getConnectURI();
087 if (uri != null && uri.getScheme().equals("vm")) {
088 setEnableStatusMonitor(false);
089 }
090 }
091 }
092
093 /**
094 * @return Returns the connections.
095 */
096 public CopyOnWriteArrayList<TransportConnection> getConnections() {
097 return connections;
098 }
099
100 /**
101 * Factory method to create a JMX managed version of this transport
102 * connector
103 */
104 public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) throws IOException, URISyntaxException {
105 ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer());
106 rc.setBrokerInfo(getBrokerInfo());
107 rc.setDisableAsyncDispatch(isDisableAsyncDispatch());
108 rc.setDiscoveryAgent(getDiscoveryAgent());
109 rc.setDiscoveryUri(getDiscoveryUri());
110 rc.setEnableStatusMonitor(isEnableStatusMonitor());
111 rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
112 rc.setName(getName());
113 rc.setTaskRunnerFactory(getTaskRunnerFactory());
114 rc.setUri(getUri());
115 rc.setBrokerService(brokerService);
116 rc.setUpdateClusterClients(isUpdateClusterClients());
117 rc.setRebalanceClusterClients(isRebalanceClusterClients());
118 rc.setUpdateClusterFilter(getUpdateClusterFilter());
119 rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove());
120 rc.setAuditNetworkProducers(isAuditNetworkProducers());
121 rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection());
122 rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection());
123 return rc;
124 }
125
126 public BrokerInfo getBrokerInfo() {
127 return brokerInfo;
128 }
129
130 public void setBrokerInfo(BrokerInfo brokerInfo) {
131 this.brokerInfo = brokerInfo;
132 }
133
134 public TransportServer getServer() throws IOException, URISyntaxException {
135 if (server == null) {
136 setServer(createTransportServer());
137 }
138 return server;
139 }
140
141 public void setServer(TransportServer server) {
142 this.server = server;
143 }
144
145 public URI getUri() {
146 if (uri == null) {
147 try {
148 uri = getConnectUri();
149 } catch (Throwable e) {
150 }
151 }
152 return uri;
153 }
154
155 /**
156 * Sets the server transport URI to use if there is not a
157 * {@link TransportServer} configured via the
158 * {@link #setServer(TransportServer)} method. This value is used to lazy
159 * create a {@link TransportServer} instance
160 *
161 * @param uri
162 */
163 public void setUri(URI uri) {
164 this.uri = uri;
165 }
166
167 public TaskRunnerFactory getTaskRunnerFactory() {
168 return taskRunnerFactory;
169 }
170
171 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
172 this.taskRunnerFactory = taskRunnerFactory;
173 }
174
175 /**
176 * @return the statistics for this connector
177 */
178 public ConnectorStatistics getStatistics() {
179 return statistics;
180 }
181
182 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
183 return messageAuthorizationPolicy;
184 }
185
186 /**
187 * Sets the policy used to decide if the current connection is authorized to
188 * consume a given message
189 */
190 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
191 this.messageAuthorizationPolicy = messageAuthorizationPolicy;
192 }
193
194 public void start() throws Exception {
195 broker = brokerService.getBroker();
196 brokerInfo.setBrokerName(broker.getBrokerName());
197 brokerInfo.setBrokerId(broker.getBrokerId());
198 brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
199 brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
200 brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString());
201 getServer().setAcceptListener(new TransportAcceptListener() {
202 public void onAccept(final Transport transport) {
203 try {
204 brokerService.getTaskRunnerFactory().execute(new Runnable() {
205 public void run() {
206 try {
207 Connection connection = createConnection(transport);
208 connection.start();
209 } catch (Exception e) {
210 String remoteHost = transport.getRemoteAddress();
211 ServiceSupport.dispose(transport);
212 onAcceptError(e, remoteHost);
213 }
214 }
215 });
216 } catch (Exception e) {
217 String remoteHost = transport.getRemoteAddress();
218 ServiceSupport.dispose(transport);
219 onAcceptError(e, remoteHost);
220 }
221 }
222
223 public void onAcceptError(Exception error) {
224 onAcceptError(error, null);
225 }
226
227 private void onAcceptError(Exception error, String remoteHost) {
228 LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": "
229 + error);
230 LOG.debug("Reason: " + error, error);
231 }
232 });
233 getServer().setBrokerInfo(brokerInfo);
234 getServer().start();
235
236 DiscoveryAgent da = getDiscoveryAgent();
237 if (da != null) {
238 da.registerService(getPublishableConnectString());
239 da.start();
240 }
241 if (enableStatusMonitor) {
242 this.statusDector = new TransportStatusDetector(this);
243 this.statusDector.start();
244 }
245
246 LOG.info("Connector " + getName() + " Started");
247 }
248
249 public String getPublishableConnectString() throws Exception {
250 return getPublishableConnectString(getConnectUri());
251 }
252
253 public String getPublishableConnectString(URI theConnectURI) throws Exception {
254 String publishableConnectString = null;
255 if (theConnectURI != null) {
256 publishableConnectString = theConnectURI.toString();
257 // strip off server side query parameters which may not be compatible to clients
258 if (theConnectURI.getRawQuery() != null) {
259 publishableConnectString = publishableConnectString.substring(0, publishableConnectString
260 .indexOf(theConnectURI.getRawQuery()) - 1);
261 }
262 }
263 if (LOG.isDebugEnabled()) {
264 LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + theConnectURI);
265 }
266 return publishableConnectString;
267 }
268
269 public void stop() throws Exception {
270 ServiceStopper ss = new ServiceStopper();
271 if (discoveryAgent != null) {
272 ss.stop(discoveryAgent);
273 }
274 if (server != null) {
275 ss.stop(server);
276 }
277 if (this.statusDector != null) {
278 this.statusDector.stop();
279 }
280
281 for (TransportConnection connection : connections) {
282 ss.stop(connection);
283 }
284 server = null;
285 ss.throwFirstException();
286 LOG.info("Connector " + getName() + " Stopped");
287 }
288
289 // Implementation methods
290 // -------------------------------------------------------------------------
291 protected Connection createConnection(Transport transport) throws IOException {
292 // prefer to use task runner from broker service as stop task runner, as we can then
293 // tie it to the lifecycle of the broker service
294 TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null
295 : taskRunnerFactory, brokerService.getTaskRunnerFactory());
296 boolean statEnabled = this.getStatistics().isEnabled();
297 answer.getStatistics().setEnabled(statEnabled);
298 answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
299 return answer;
300 }
301
302 protected TransportServer createTransportServer() throws IOException, URISyntaxException {
303 if (uri == null) {
304 throw new IllegalArgumentException("You must specify either a server or uri property");
305 }
306 if (brokerService == null) {
307 throw new IllegalArgumentException(
308 "You must specify the brokerService property. Maybe this connector should be added to a broker?");
309 }
310 return TransportFactory.bind(brokerService, uri);
311 }
312
313 public DiscoveryAgent getDiscoveryAgent() throws IOException {
314 if (discoveryAgent == null) {
315 discoveryAgent = createDiscoveryAgent();
316 }
317 return discoveryAgent;
318 }
319
320 protected DiscoveryAgent createDiscoveryAgent() throws IOException {
321 if (discoveryUri != null) {
322 DiscoveryAgent agent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri);
323
324 if (agent != null && agent instanceof BrokerServiceAware) {
325 ((BrokerServiceAware) agent).setBrokerService(brokerService);
326 }
327
328 return agent;
329 }
330 return null;
331 }
332
333 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
334 this.discoveryAgent = discoveryAgent;
335 }
336
337 public URI getDiscoveryUri() {
338 return discoveryUri;
339 }
340
341 public void setDiscoveryUri(URI discoveryUri) {
342 this.discoveryUri = discoveryUri;
343 }
344
345 public URI getConnectUri() throws IOException, URISyntaxException {
346 if (server != null) {
347 return server.getConnectURI();
348 } else {
349 return uri;
350 }
351 }
352
353 public void onStarted(TransportConnection connection) {
354 connections.add(connection);
355 }
356
357 public void onStopped(TransportConnection connection) {
358 connections.remove(connection);
359 }
360
361 public String getName() {
362 if (name == null) {
363 uri = getUri();
364 if (uri != null) {
365 name = uri.toString();
366 }
367 }
368 return name;
369 }
370
371 public void setName(String name) {
372 this.name = name;
373 }
374
375 @Override
376 public String toString() {
377 String rc = getName();
378 if (rc == null) {
379 rc = super.toString();
380 }
381 return rc;
382 }
383
384 protected ConnectionControl getConnectionControl() {
385 boolean rebalance = isRebalanceClusterClients();
386 String connectedBrokers = "";
387 String separator = "";
388
389 if (isUpdateClusterClients()) {
390 synchronized (peerBrokers) {
391 for (String uri : getPeerBrokers()) {
392 connectedBrokers += separator + uri;
393 separator = ",";
394 }
395
396 if (rebalance) {
397 String shuffle = getPeerBrokers().removeFirst();
398 getPeerBrokers().addLast(shuffle);
399 }
400 }
401 }
402 ConnectionControl control = new ConnectionControl();
403 control.setConnectedBrokers(connectedBrokers);
404 control.setRebalanceConnection(rebalance);
405 return control;
406 }
407
408 public void addPeerBroker(BrokerInfo info) {
409 if (isMatchesClusterFilter(info.getBrokerName())) {
410 synchronized (peerBrokers) {
411 getPeerBrokers().addLast(info.getBrokerURL());
412 }
413 }
414 }
415
416 public void removePeerBroker(BrokerInfo info) {
417 synchronized (peerBrokers) {
418 getPeerBrokers().remove(info.getBrokerURL());
419 }
420 }
421
422 public LinkedList<String> getPeerBrokers() {
423 synchronized (peerBrokers) {
424 if (peerBrokers.isEmpty()) {
425 peerBrokers.add(brokerService.getDefaultSocketURIString());
426 }
427 return peerBrokers;
428 }
429 }
430
431 public void updateClientClusterInfo() {
432 if (isRebalanceClusterClients() || isUpdateClusterClients()) {
433 ConnectionControl control = getConnectionControl();
434 for (Connection c : this.connections) {
435 c.updateClient(control);
436 if (isRebalanceClusterClients()) {
437 control = getConnectionControl();
438 }
439 }
440 }
441 }
442
443 private boolean isMatchesClusterFilter(String brokerName) {
444 boolean result = true;
445 String filter = getUpdateClusterFilter();
446 if (filter != null) {
447 filter = filter.trim();
448 if (filter.length() > 0) {
449 StringTokenizer tokenizer = new StringTokenizer(filter, ",");
450 while (result && tokenizer.hasMoreTokens()) {
451 String token = tokenizer.nextToken();
452 result = isMatchesClusterFilter(brokerName, token);
453 }
454 }
455 }
456
457 return result;
458 }
459
460 private boolean isMatchesClusterFilter(String brokerName, String match) {
461 boolean result = true;
462 if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) {
463 result = Pattern.matches(match, brokerName);
464 }
465 return result;
466 }
467
468 public boolean isDisableAsyncDispatch() {
469 return disableAsyncDispatch;
470 }
471
472 public void setDisableAsyncDispatch(boolean disableAsyncDispatch) {
473 this.disableAsyncDispatch = disableAsyncDispatch;
474 }
475
476 /**
477 * @return the enableStatusMonitor
478 */
479 public boolean isEnableStatusMonitor() {
480 return enableStatusMonitor;
481 }
482
483 /**
484 * @param enableStatusMonitor
485 * the enableStatusMonitor to set
486 */
487 public void setEnableStatusMonitor(boolean enableStatusMonitor) {
488 this.enableStatusMonitor = enableStatusMonitor;
489 }
490
491 /**
492 * This is called by the BrokerService right before it starts the transport.
493 */
494 public void setBrokerService(BrokerService brokerService) {
495 this.brokerService = brokerService;
496 }
497
498 public Broker getBroker() {
499 return broker;
500 }
501
502 public BrokerService getBrokerService() {
503 return brokerService;
504 }
505
506 /**
507 * @return the updateClusterClients
508 */
509 public boolean isUpdateClusterClients() {
510 return this.updateClusterClients;
511 }
512
513 /**
514 * @param updateClusterClients
515 * the updateClusterClients to set
516 */
517 public void setUpdateClusterClients(boolean updateClusterClients) {
518 this.updateClusterClients = updateClusterClients;
519 }
520
521 /**
522 * @return the rebalanceClusterClients
523 */
524 public boolean isRebalanceClusterClients() {
525 return this.rebalanceClusterClients;
526 }
527
528 /**
529 * @param rebalanceClusterClients
530 * the rebalanceClusterClients to set
531 */
532 public void setRebalanceClusterClients(boolean rebalanceClusterClients) {
533 this.rebalanceClusterClients = rebalanceClusterClients;
534 }
535
536 /**
537 * @return the updateClusterClientsOnRemove
538 */
539 public boolean isUpdateClusterClientsOnRemove() {
540 return this.updateClusterClientsOnRemove;
541 }
542
543 /**
544 * @param updateClusterClientsOnRemove the updateClusterClientsOnRemove to set
545 */
546 public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
547 this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
548 }
549
550 /**
551 * @return the updateClusterFilter
552 */
553 public String getUpdateClusterFilter() {
554 return this.updateClusterFilter;
555 }
556
557 /**
558 * @param updateClusterFilter
559 * the updateClusterFilter to set
560 */
561 public void setUpdateClusterFilter(String updateClusterFilter) {
562 this.updateClusterFilter = updateClusterFilter;
563 }
564
565 public int connectionCount() {
566 return connections.size();
567 }
568
569 public boolean isAuditNetworkProducers() {
570 return auditNetworkProducers;
571 }
572
573 /**
574 * Enable a producer audit on network connections, Traps the case of a missing send reply and resend.
575 * Note: does not work with conduit=false, networked composite destinations or networked virtual topics
576 * @param auditNetworkProducers
577 */
578 public void setAuditNetworkProducers(boolean auditNetworkProducers) {
579 this.auditNetworkProducers = auditNetworkProducers;
580 }
581
582 public int getMaximumProducersAllowedPerConnection() {
583 return maximumProducersAllowedPerConnection;
584 }
585
586 public void setMaximumProducersAllowedPerConnection(int maximumProducersAllowedPerConnection) {
587 this.maximumProducersAllowedPerConnection = maximumProducersAllowedPerConnection;
588 }
589
590 public int getMaximumConsumersAllowedPerConnection() {
591 return maximumConsumersAllowedPerConnection;
592 }
593
594 public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) {
595 this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection;
596 }
597 }