001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019 import java.io.IOException;
020 import java.net.URI;
021 import java.net.URISyntaxException;
022 import java.util.LinkedList;
023 import java.util.StringTokenizer;
024 import java.util.concurrent.CopyOnWriteArrayList;
025 import java.util.regex.Pattern;
026
027 import javax.management.ObjectName;
028
029 import org.apache.activemq.broker.jmx.ManagedTransportConnector;
030 import org.apache.activemq.broker.jmx.ManagementContext;
031 import org.apache.activemq.broker.region.ConnectorStatistics;
032 import org.apache.activemq.command.BrokerInfo;
033 import org.apache.activemq.command.ConnectionControl;
034 import org.apache.activemq.security.MessageAuthorizationPolicy;
035 import org.apache.activemq.thread.TaskRunnerFactory;
036 import org.apache.activemq.transport.Transport;
037 import org.apache.activemq.transport.TransportAcceptListener;
038 import org.apache.activemq.transport.TransportFactorySupport;
039 import org.apache.activemq.transport.TransportServer;
040 import org.apache.activemq.transport.discovery.DiscoveryAgent;
041 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
042 import org.apache.activemq.util.ServiceStopper;
043 import org.apache.activemq.util.ServiceSupport;
044 import org.slf4j.Logger;
045 import org.slf4j.LoggerFactory;
046
047 /**
048 * @org.apache.xbean.XBean
049 */
050 public class TransportConnector implements Connector, BrokerServiceAware {
051
052 final Logger LOG = LoggerFactory.getLogger(TransportConnector.class);
053
054 protected final CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
055 protected TransportStatusDetector statusDector;
056 private BrokerService brokerService;
057 private TransportServer server;
058 private URI uri;
059 private BrokerInfo brokerInfo = new BrokerInfo();
060 private TaskRunnerFactory taskRunnerFactory;
061 private MessageAuthorizationPolicy messageAuthorizationPolicy;
062 private DiscoveryAgent discoveryAgent;
063 private final ConnectorStatistics statistics = new ConnectorStatistics();
064 private URI discoveryUri;
065 private String name;
066 private boolean disableAsyncDispatch;
067 private boolean enableStatusMonitor = false;
068 private Broker broker;
069 private boolean updateClusterClients = false;
070 private boolean rebalanceClusterClients;
071 private boolean updateClusterClientsOnRemove = false;
072 private String updateClusterFilter;
073 private boolean auditNetworkProducers = false;
074 private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE;
075 private int maximumConsumersAllowedPerConnection = Integer.MAX_VALUE;
076 private PublishedAddressPolicy publishedAddressPolicy = new PublishedAddressPolicy();
077
078 LinkedList<String> peerBrokers = new LinkedList<String>();
079
080 public TransportConnector() {
081 }
082
083 public TransportConnector(TransportServer server) {
084 this();
085 setServer(server);
086 if (server != null && server.getConnectURI() != null) {
087 URI uri = server.getConnectURI();
088 if (uri != null && uri.getScheme().equals("vm")) {
089 setEnableStatusMonitor(false);
090 }
091 }
092 }
093
094 /**
095 * @return Returns the connections.
096 */
097 public CopyOnWriteArrayList<TransportConnection> getConnections() {
098 return connections;
099 }
100
101 /**
102 * Factory method to create a JMX managed version of this transport
103 * connector
104 */
105 public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) throws IOException, URISyntaxException {
106 ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer());
107 rc.setBrokerInfo(getBrokerInfo());
108 rc.setDisableAsyncDispatch(isDisableAsyncDispatch());
109 rc.setDiscoveryAgent(getDiscoveryAgent());
110 rc.setDiscoveryUri(getDiscoveryUri());
111 rc.setEnableStatusMonitor(isEnableStatusMonitor());
112 rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
113 rc.setName(getName());
114 rc.setTaskRunnerFactory(getTaskRunnerFactory());
115 rc.setUri(getUri());
116 rc.setBrokerService(brokerService);
117 rc.setUpdateClusterClients(isUpdateClusterClients());
118 rc.setRebalanceClusterClients(isRebalanceClusterClients());
119 rc.setUpdateClusterFilter(getUpdateClusterFilter());
120 rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove());
121 rc.setAuditNetworkProducers(isAuditNetworkProducers());
122 rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection());
123 rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection());
124 rc.setPublishedAddressPolicy(getPublishedAddressPolicy());
125 return rc;
126 }
127
128 @Override
129 public BrokerInfo getBrokerInfo() {
130 return brokerInfo;
131 }
132
133 public void setBrokerInfo(BrokerInfo brokerInfo) {
134 this.brokerInfo = brokerInfo;
135 }
136
137 public TransportServer getServer() throws IOException, URISyntaxException {
138 if (server == null) {
139 setServer(createTransportServer());
140 }
141 return server;
142 }
143
144 public void setServer(TransportServer server) {
145 this.server = server;
146 }
147
148 public URI getUri() {
149 if (uri == null) {
150 try {
151 uri = getConnectUri();
152 } catch (Throwable e) {
153 }
154 }
155 return uri;
156 }
157
158 /**
159 * Sets the server transport URI to use if there is not a
160 * {@link TransportServer} configured via the
161 * {@link #setServer(TransportServer)} method. This value is used to lazy
162 * create a {@link TransportServer} instance
163 *
164 * @param uri
165 */
166 public void setUri(URI uri) {
167 this.uri = uri;
168 }
169
170 public TaskRunnerFactory getTaskRunnerFactory() {
171 return taskRunnerFactory;
172 }
173
174 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
175 this.taskRunnerFactory = taskRunnerFactory;
176 }
177
178 /**
179 * @return the statistics for this connector
180 */
181 @Override
182 public ConnectorStatistics getStatistics() {
183 return statistics;
184 }
185
186 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
187 return messageAuthorizationPolicy;
188 }
189
190 /**
191 * Sets the policy used to decide if the current connection is authorized to
192 * consume a given message
193 */
194 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
195 this.messageAuthorizationPolicy = messageAuthorizationPolicy;
196 }
197
198 @Override
199 public void start() throws Exception {
200 broker = brokerService.getBroker();
201 brokerInfo.setBrokerName(broker.getBrokerName());
202 brokerInfo.setBrokerId(broker.getBrokerId());
203 brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
204 brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
205 brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString());
206 getServer().setAcceptListener(new TransportAcceptListener() {
207 @Override
208 public void onAccept(final Transport transport) {
209 try {
210 brokerService.getTaskRunnerFactory().execute(new Runnable() {
211 @Override
212 public void run() {
213 try {
214 Connection connection = createConnection(transport);
215 connection.start();
216 } catch (Exception e) {
217 String remoteHost = transport.getRemoteAddress();
218 ServiceSupport.dispose(transport);
219 onAcceptError(e, remoteHost);
220 }
221 }
222 });
223 } catch (Exception e) {
224 String remoteHost = transport.getRemoteAddress();
225 ServiceSupport.dispose(transport);
226 onAcceptError(e, remoteHost);
227 }
228 }
229
230 @Override
231 public void onAcceptError(Exception error) {
232 onAcceptError(error, null);
233 }
234
235 private void onAcceptError(Exception error, String remoteHost) {
236 LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": "
237 + error);
238 LOG.debug("Reason: " + error, error);
239 }
240 });
241 getServer().setBrokerInfo(brokerInfo);
242 getServer().start();
243
244 DiscoveryAgent da = getDiscoveryAgent();
245 if (da != null) {
246 da.registerService(getPublishableConnectString());
247 da.start();
248 }
249 if (enableStatusMonitor) {
250 this.statusDector = new TransportStatusDetector(this);
251 this.statusDector.start();
252 }
253
254 LOG.info("Connector " + getName() + " Started");
255 }
256
257 public String getPublishableConnectString() throws Exception {
258 String publishableConnectString = publishedAddressPolicy.getPublishableConnectString(this);
259 if (LOG.isDebugEnabled()) {
260 LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + getConnectUri());
261 }
262 return publishableConnectString;
263 }
264
265 @Override
266 public void stop() throws Exception {
267 ServiceStopper ss = new ServiceStopper();
268 if (discoveryAgent != null) {
269 ss.stop(discoveryAgent);
270 }
271 if (server != null) {
272 ss.stop(server);
273 }
274 if (this.statusDector != null) {
275 this.statusDector.stop();
276 }
277
278 for (TransportConnection connection : connections) {
279 ss.stop(connection);
280 }
281 server = null;
282 ss.throwFirstException();
283 LOG.info("Connector " + getName() + " Stopped");
284 }
285
286 // Implementation methods
287 // -------------------------------------------------------------------------
288 protected Connection createConnection(Transport transport) throws IOException {
289 // prefer to use task runner from broker service as stop task runner, as we can then
290 // tie it to the lifecycle of the broker service
291 TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null
292 : taskRunnerFactory, brokerService.getTaskRunnerFactory());
293 boolean statEnabled = this.getStatistics().isEnabled();
294 answer.getStatistics().setEnabled(statEnabled);
295 answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
296 return answer;
297 }
298
299 protected TransportServer createTransportServer() throws IOException, URISyntaxException {
300 if (uri == null) {
301 throw new IllegalArgumentException("You must specify either a server or uri property");
302 }
303 if (brokerService == null) {
304 throw new IllegalArgumentException(
305 "You must specify the brokerService property. Maybe this connector should be added to a broker?");
306 }
307 return TransportFactorySupport.bind(brokerService, uri);
308 }
309
310 public DiscoveryAgent getDiscoveryAgent() throws IOException {
311 if (discoveryAgent == null) {
312 discoveryAgent = createDiscoveryAgent();
313 }
314 return discoveryAgent;
315 }
316
317 protected DiscoveryAgent createDiscoveryAgent() throws IOException {
318 if (discoveryUri != null) {
319 DiscoveryAgent agent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri);
320
321 if (agent != null && agent instanceof BrokerServiceAware) {
322 ((BrokerServiceAware) agent).setBrokerService(brokerService);
323 }
324
325 return agent;
326 }
327 return null;
328 }
329
330 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
331 this.discoveryAgent = discoveryAgent;
332 }
333
334 public URI getDiscoveryUri() {
335 return discoveryUri;
336 }
337
338 public void setDiscoveryUri(URI discoveryUri) {
339 this.discoveryUri = discoveryUri;
340 }
341
342 public URI getConnectUri() throws IOException, URISyntaxException {
343 if (server != null) {
344 return server.getConnectURI();
345 } else {
346 return uri;
347 }
348 }
349
350 public void onStarted(TransportConnection connection) {
351 connections.add(connection);
352 }
353
354 public void onStopped(TransportConnection connection) {
355 connections.remove(connection);
356 }
357
358 public String getName() {
359 if (name == null) {
360 uri = getUri();
361 if (uri != null) {
362 name = uri.toString();
363 }
364 }
365 return name;
366 }
367
368 public void setName(String name) {
369 this.name = name;
370 }
371
372 @Override
373 public String toString() {
374 String rc = getName();
375 if (rc == null) {
376 rc = super.toString();
377 }
378 return rc;
379 }
380
381 protected ConnectionControl getConnectionControl() {
382 boolean rebalance = isRebalanceClusterClients();
383 String connectedBrokers = "";
384 String separator = "";
385
386 if (isUpdateClusterClients()) {
387 synchronized (peerBrokers) {
388 for (String uri : getPeerBrokers()) {
389 connectedBrokers += separator + uri;
390 separator = ",";
391 }
392
393 if (rebalance) {
394 String shuffle = getPeerBrokers().removeFirst();
395 getPeerBrokers().addLast(shuffle);
396 }
397 }
398 }
399 ConnectionControl control = new ConnectionControl();
400 control.setConnectedBrokers(connectedBrokers);
401 control.setRebalanceConnection(rebalance);
402 return control;
403 }
404
405 public void addPeerBroker(BrokerInfo info) {
406 if (isMatchesClusterFilter(info.getBrokerName())) {
407 synchronized (peerBrokers) {
408 getPeerBrokers().addLast(info.getBrokerURL());
409 }
410 }
411 }
412
413 public void removePeerBroker(BrokerInfo info) {
414 synchronized (peerBrokers) {
415 getPeerBrokers().remove(info.getBrokerURL());
416 }
417 }
418
419 public LinkedList<String> getPeerBrokers() {
420 synchronized (peerBrokers) {
421 if (peerBrokers.isEmpty()) {
422 peerBrokers.add(brokerService.getDefaultSocketURIString());
423 }
424 return peerBrokers;
425 }
426 }
427
428 @Override
429 public void updateClientClusterInfo() {
430 if (isRebalanceClusterClients() || isUpdateClusterClients()) {
431 ConnectionControl control = getConnectionControl();
432 for (Connection c : this.connections) {
433 c.updateClient(control);
434 if (isRebalanceClusterClients()) {
435 control = getConnectionControl();
436 }
437 }
438 }
439 }
440
441 private boolean isMatchesClusterFilter(String brokerName) {
442 boolean result = true;
443 String filter = getUpdateClusterFilter();
444 if (filter != null) {
445 filter = filter.trim();
446 if (filter.length() > 0) {
447 StringTokenizer tokenizer = new StringTokenizer(filter, ",");
448 while (result && tokenizer.hasMoreTokens()) {
449 String token = tokenizer.nextToken();
450 result = isMatchesClusterFilter(brokerName, token);
451 }
452 }
453 }
454
455 return result;
456 }
457
458 private boolean isMatchesClusterFilter(String brokerName, String match) {
459 boolean result = true;
460 if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) {
461 result = Pattern.matches(match, brokerName);
462 }
463 return result;
464 }
465
466 public boolean isDisableAsyncDispatch() {
467 return disableAsyncDispatch;
468 }
469
470 public void setDisableAsyncDispatch(boolean disableAsyncDispatch) {
471 this.disableAsyncDispatch = disableAsyncDispatch;
472 }
473
474 /**
475 * @return the enableStatusMonitor
476 */
477 public boolean isEnableStatusMonitor() {
478 return enableStatusMonitor;
479 }
480
481 /**
482 * @param enableStatusMonitor
483 * the enableStatusMonitor to set
484 */
485 public void setEnableStatusMonitor(boolean enableStatusMonitor) {
486 this.enableStatusMonitor = enableStatusMonitor;
487 }
488
489 /**
490 * This is called by the BrokerService right before it starts the transport.
491 */
492 @Override
493 public void setBrokerService(BrokerService brokerService) {
494 this.brokerService = brokerService;
495 }
496
497 public Broker getBroker() {
498 return broker;
499 }
500
501 public BrokerService getBrokerService() {
502 return brokerService;
503 }
504
505 /**
506 * @return the updateClusterClients
507 */
508 @Override
509 public boolean isUpdateClusterClients() {
510 return this.updateClusterClients;
511 }
512
513 /**
514 * @param updateClusterClients
515 * the updateClusterClients to set
516 */
517 public void setUpdateClusterClients(boolean updateClusterClients) {
518 this.updateClusterClients = updateClusterClients;
519 }
520
521 /**
522 * @return the rebalanceClusterClients
523 */
524 @Override
525 public boolean isRebalanceClusterClients() {
526 return this.rebalanceClusterClients;
527 }
528
529 /**
530 * @param rebalanceClusterClients
531 * the rebalanceClusterClients to set
532 */
533 public void setRebalanceClusterClients(boolean rebalanceClusterClients) {
534 this.rebalanceClusterClients = rebalanceClusterClients;
535 }
536
537 /**
538 * @return the updateClusterClientsOnRemove
539 */
540 @Override
541 public boolean isUpdateClusterClientsOnRemove() {
542 return this.updateClusterClientsOnRemove;
543 }
544
545 /**
546 * @param updateClusterClientsOnRemove the updateClusterClientsOnRemove to set
547 */
548 public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
549 this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
550 }
551
552 /**
553 * @return the updateClusterFilter
554 */
555 public String getUpdateClusterFilter() {
556 return this.updateClusterFilter;
557 }
558
559 /**
560 * @param updateClusterFilter
561 * the updateClusterFilter to set
562 */
563 public void setUpdateClusterFilter(String updateClusterFilter) {
564 this.updateClusterFilter = updateClusterFilter;
565 }
566
567 @Override
568 public int connectionCount() {
569 return connections.size();
570 }
571
572 public boolean isAuditNetworkProducers() {
573 return auditNetworkProducers;
574 }
575
576 /**
577 * Enable a producer audit on network connections, Traps the case of a missing send reply and resend.
578 * Note: does not work with conduit=false, networked composite destinations or networked virtual topics
579 * @param auditNetworkProducers
580 */
581 public void setAuditNetworkProducers(boolean auditNetworkProducers) {
582 this.auditNetworkProducers = auditNetworkProducers;
583 }
584
585 public int getMaximumProducersAllowedPerConnection() {
586 return maximumProducersAllowedPerConnection;
587 }
588
589 public void setMaximumProducersAllowedPerConnection(int maximumProducersAllowedPerConnection) {
590 this.maximumProducersAllowedPerConnection = maximumProducersAllowedPerConnection;
591 }
592
593 public int getMaximumConsumersAllowedPerConnection() {
594 return maximumConsumersAllowedPerConnection;
595 }
596
597 public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) {
598 this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection;
599 }
600
601 /**
602 * Gets the currently configured policy for creating the published connection address of this
603 * TransportConnector.
604 *
605 * @return the publishedAddressPolicy
606 */
607 public PublishedAddressPolicy getPublishedAddressPolicy() {
608 return publishedAddressPolicy;
609 }
610
611 /**
612 * Sets the configured policy for creating the published connection address of this
613 * TransportConnector.
614 *
615 * @return the publishedAddressPolicy
616 */
617 public void setPublishedAddressPolicy(PublishedAddressPolicy publishedAddressPolicy) {
618 this.publishedAddressPolicy = publishedAddressPolicy;
619 }
620 }