001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.net.URISyntaxException;
022    import java.util.LinkedList;
023    import java.util.StringTokenizer;
024    import java.util.concurrent.CopyOnWriteArrayList;
025    import java.util.regex.Pattern;
026    
027    import javax.management.ObjectName;
028    
029    import org.apache.activemq.broker.jmx.ManagedTransportConnector;
030    import org.apache.activemq.broker.jmx.ManagementContext;
031    import org.apache.activemq.broker.region.ConnectorStatistics;
032    import org.apache.activemq.command.BrokerInfo;
033    import org.apache.activemq.command.ConnectionControl;
034    import org.apache.activemq.security.MessageAuthorizationPolicy;
035    import org.apache.activemq.thread.TaskRunnerFactory;
036    import org.apache.activemq.transport.Transport;
037    import org.apache.activemq.transport.TransportAcceptListener;
038    import org.apache.activemq.transport.TransportFactorySupport;
039    import org.apache.activemq.transport.TransportServer;
040    import org.apache.activemq.transport.discovery.DiscoveryAgent;
041    import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
042    import org.apache.activemq.util.ServiceStopper;
043    import org.apache.activemq.util.ServiceSupport;
044    import org.slf4j.Logger;
045    import org.slf4j.LoggerFactory;
046    
047    /**
048     * @org.apache.xbean.XBean
049     */
050    public class TransportConnector implements Connector, BrokerServiceAware {
051    
052        final Logger LOG = LoggerFactory.getLogger(TransportConnector.class);
053    
054        protected final CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
055        protected TransportStatusDetector statusDector;
056        private BrokerService brokerService;
057        private TransportServer server;
058        private URI uri;
059        private BrokerInfo brokerInfo = new BrokerInfo();
060        private TaskRunnerFactory taskRunnerFactory;
061        private MessageAuthorizationPolicy messageAuthorizationPolicy;
062        private DiscoveryAgent discoveryAgent;
063        private final ConnectorStatistics statistics = new ConnectorStatistics();
064        private URI discoveryUri;
065        private String name;
066        private boolean disableAsyncDispatch;
067        private boolean enableStatusMonitor = false;
068        private Broker broker;
069        private boolean updateClusterClients = false;
070        private boolean rebalanceClusterClients;
071        private boolean updateClusterClientsOnRemove = false;
072        private String updateClusterFilter;
073        private boolean auditNetworkProducers = false;
074        private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE;
075        private int maximumConsumersAllowedPerConnection  = Integer.MAX_VALUE;
076        private PublishedAddressPolicy publishedAddressPolicy = new PublishedAddressPolicy();
077    
078        LinkedList<String> peerBrokers = new LinkedList<String>();
079    
080        public TransportConnector() {
081        }
082    
083        public TransportConnector(TransportServer server) {
084            this();
085            setServer(server);
086            if (server != null && server.getConnectURI() != null) {
087                URI uri = server.getConnectURI();
088                if (uri != null && uri.getScheme().equals("vm")) {
089                    setEnableStatusMonitor(false);
090                }
091            }
092        }
093    
094        /**
095         * @return Returns the connections.
096         */
097        public CopyOnWriteArrayList<TransportConnection> getConnections() {
098            return connections;
099        }
100    
101        /**
102         * Factory method to create a JMX managed version of this transport
103         * connector
104         */
105        public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) throws IOException, URISyntaxException {
106            ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer());
107            rc.setBrokerInfo(getBrokerInfo());
108            rc.setDisableAsyncDispatch(isDisableAsyncDispatch());
109            rc.setDiscoveryAgent(getDiscoveryAgent());
110            rc.setDiscoveryUri(getDiscoveryUri());
111            rc.setEnableStatusMonitor(isEnableStatusMonitor());
112            rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
113            rc.setName(getName());
114            rc.setTaskRunnerFactory(getTaskRunnerFactory());
115            rc.setUri(getUri());
116            rc.setBrokerService(brokerService);
117            rc.setUpdateClusterClients(isUpdateClusterClients());
118            rc.setRebalanceClusterClients(isRebalanceClusterClients());
119            rc.setUpdateClusterFilter(getUpdateClusterFilter());
120            rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove());
121            rc.setAuditNetworkProducers(isAuditNetworkProducers());
122            rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection());
123            rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection());
124            rc.setPublishedAddressPolicy(getPublishedAddressPolicy());
125            return rc;
126        }
127    
128        @Override
129        public BrokerInfo getBrokerInfo() {
130            return brokerInfo;
131        }
132    
133        public void setBrokerInfo(BrokerInfo brokerInfo) {
134            this.brokerInfo = brokerInfo;
135        }
136    
137        public TransportServer getServer() throws IOException, URISyntaxException {
138            if (server == null) {
139                setServer(createTransportServer());
140            }
141            return server;
142        }
143    
144        public void setServer(TransportServer server) {
145            this.server = server;
146        }
147    
148        public URI getUri() {
149            if (uri == null) {
150                try {
151                    uri = getConnectUri();
152                } catch (Throwable e) {
153                }
154            }
155            return uri;
156        }
157    
158        /**
159         * Sets the server transport URI to use if there is not a
160         * {@link TransportServer} configured via the
161         * {@link #setServer(TransportServer)} method. This value is used to lazy
162         * create a {@link TransportServer} instance
163         *
164         * @param uri
165         */
166        public void setUri(URI uri) {
167            this.uri = uri;
168        }
169    
170        public TaskRunnerFactory getTaskRunnerFactory() {
171            return taskRunnerFactory;
172        }
173    
174        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
175            this.taskRunnerFactory = taskRunnerFactory;
176        }
177    
178        /**
179         * @return the statistics for this connector
180         */
181        @Override
182        public ConnectorStatistics getStatistics() {
183            return statistics;
184        }
185    
186        public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
187            return messageAuthorizationPolicy;
188        }
189    
190        /**
191         * Sets the policy used to decide if the current connection is authorized to
192         * consume a given message
193         */
194        public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
195            this.messageAuthorizationPolicy = messageAuthorizationPolicy;
196        }
197    
198        @Override
199        public void start() throws Exception {
200            broker = brokerService.getBroker();
201            brokerInfo.setBrokerName(broker.getBrokerName());
202            brokerInfo.setBrokerId(broker.getBrokerId());
203            brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
204            brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
205            brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString());
206            getServer().setAcceptListener(new TransportAcceptListener() {
207                @Override
208                public void onAccept(final Transport transport) {
209                    try {
210                        brokerService.getTaskRunnerFactory().execute(new Runnable() {
211                            @Override
212                            public void run() {
213                                try {
214                                    Connection connection = createConnection(transport);
215                                    connection.start();
216                                } catch (Exception e) {
217                                    String remoteHost = transport.getRemoteAddress();
218                                    ServiceSupport.dispose(transport);
219                                    onAcceptError(e, remoteHost);
220                                }
221                            }
222                        });
223                    } catch (Exception e) {
224                        String remoteHost = transport.getRemoteAddress();
225                        ServiceSupport.dispose(transport);
226                        onAcceptError(e, remoteHost);
227                    }
228                }
229    
230                @Override
231                public void onAcceptError(Exception error) {
232                    onAcceptError(error, null);
233                }
234    
235                private void onAcceptError(Exception error, String remoteHost) {
236                    LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": "
237                            + error);
238                    LOG.debug("Reason: " + error, error);
239                }
240            });
241            getServer().setBrokerInfo(brokerInfo);
242            getServer().start();
243    
244            DiscoveryAgent da = getDiscoveryAgent();
245            if (da != null) {
246                da.registerService(getPublishableConnectString());
247                da.start();
248            }
249            if (enableStatusMonitor) {
250                this.statusDector = new TransportStatusDetector(this);
251                this.statusDector.start();
252            }
253    
254            LOG.info("Connector " + getName() + " Started");
255        }
256    
257        public String getPublishableConnectString() throws Exception {
258            String publishableConnectString = publishedAddressPolicy.getPublishableConnectString(this);
259            if (LOG.isDebugEnabled()) {
260                LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + getConnectUri());
261            }
262            return publishableConnectString;
263        }
264    
265        @Override
266        public void stop() throws Exception {
267            ServiceStopper ss = new ServiceStopper();
268            if (discoveryAgent != null) {
269                ss.stop(discoveryAgent);
270            }
271            if (server != null) {
272                ss.stop(server);
273            }
274            if (this.statusDector != null) {
275                this.statusDector.stop();
276            }
277    
278            for (TransportConnection connection : connections) {
279                ss.stop(connection);
280            }
281            server = null;
282            ss.throwFirstException();
283            LOG.info("Connector " + getName() + " Stopped");
284        }
285    
286        // Implementation methods
287        // -------------------------------------------------------------------------
288        protected Connection createConnection(Transport transport) throws IOException {
289            // prefer to use task runner from broker service as stop task runner, as we can then
290            // tie it to the lifecycle of the broker service
291            TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null
292                    : taskRunnerFactory, brokerService.getTaskRunnerFactory());
293            boolean statEnabled = this.getStatistics().isEnabled();
294            answer.getStatistics().setEnabled(statEnabled);
295            answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
296            return answer;
297        }
298    
299        protected TransportServer createTransportServer() throws IOException, URISyntaxException {
300            if (uri == null) {
301                throw new IllegalArgumentException("You must specify either a server or uri property");
302            }
303            if (brokerService == null) {
304                throw new IllegalArgumentException(
305                        "You must specify the brokerService property. Maybe this connector should be added to a broker?");
306            }
307            return TransportFactorySupport.bind(brokerService, uri);
308        }
309    
310        public DiscoveryAgent getDiscoveryAgent() throws IOException {
311            if (discoveryAgent == null) {
312                discoveryAgent = createDiscoveryAgent();
313            }
314            return discoveryAgent;
315        }
316    
317        protected DiscoveryAgent createDiscoveryAgent() throws IOException {
318            if (discoveryUri != null) {
319                DiscoveryAgent agent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri);
320    
321                if (agent != null && agent instanceof BrokerServiceAware) {
322                    ((BrokerServiceAware) agent).setBrokerService(brokerService);
323                }
324    
325                return agent;
326            }
327            return null;
328        }
329    
330        public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
331            this.discoveryAgent = discoveryAgent;
332        }
333    
334        public URI getDiscoveryUri() {
335            return discoveryUri;
336        }
337    
338        public void setDiscoveryUri(URI discoveryUri) {
339            this.discoveryUri = discoveryUri;
340        }
341    
342        public URI getConnectUri() throws IOException, URISyntaxException {
343            if (server != null) {
344                return server.getConnectURI();
345            } else {
346                return uri;
347            }
348        }
349    
350        public void onStarted(TransportConnection connection) {
351            connections.add(connection);
352        }
353    
354        public void onStopped(TransportConnection connection) {
355            connections.remove(connection);
356        }
357    
358        public String getName() {
359            if (name == null) {
360                uri = getUri();
361                if (uri != null) {
362                    name = uri.toString();
363                }
364            }
365            return name;
366        }
367    
368        public void setName(String name) {
369            this.name = name;
370        }
371    
372        @Override
373        public String toString() {
374            String rc = getName();
375            if (rc == null) {
376                rc = super.toString();
377            }
378            return rc;
379        }
380    
381        protected ConnectionControl getConnectionControl() {
382            boolean rebalance = isRebalanceClusterClients();
383            String connectedBrokers = "";
384            String separator = "";
385    
386            if (isUpdateClusterClients()) {
387                synchronized (peerBrokers) {
388                    for (String uri : getPeerBrokers()) {
389                        connectedBrokers += separator + uri;
390                        separator = ",";
391                    }
392    
393                    if (rebalance) {
394                        String shuffle = getPeerBrokers().removeFirst();
395                        getPeerBrokers().addLast(shuffle);
396                    }
397                }
398            }
399            ConnectionControl control = new ConnectionControl();
400            control.setConnectedBrokers(connectedBrokers);
401            control.setRebalanceConnection(rebalance);
402            return control;
403        }
404    
405        public void addPeerBroker(BrokerInfo info) {
406            if (isMatchesClusterFilter(info.getBrokerName())) {
407                synchronized (peerBrokers) {
408                    getPeerBrokers().addLast(info.getBrokerURL());
409                }
410            }
411        }
412    
413        public void removePeerBroker(BrokerInfo info) {
414            synchronized (peerBrokers) {
415                getPeerBrokers().remove(info.getBrokerURL());
416            }
417        }
418    
419        public LinkedList<String> getPeerBrokers() {
420            synchronized (peerBrokers) {
421                if (peerBrokers.isEmpty()) {
422                    peerBrokers.add(brokerService.getDefaultSocketURIString());
423                }
424                return peerBrokers;
425            }
426        }
427    
428        @Override
429        public void updateClientClusterInfo() {
430            if (isRebalanceClusterClients() || isUpdateClusterClients()) {
431                ConnectionControl control = getConnectionControl();
432                for (Connection c : this.connections) {
433                    c.updateClient(control);
434                    if (isRebalanceClusterClients()) {
435                        control = getConnectionControl();
436                    }
437                }
438            }
439        }
440    
441        private boolean isMatchesClusterFilter(String brokerName) {
442            boolean result = true;
443            String filter = getUpdateClusterFilter();
444            if (filter != null) {
445                filter = filter.trim();
446                if (filter.length() > 0) {
447                    StringTokenizer tokenizer = new StringTokenizer(filter, ",");
448                    while (result && tokenizer.hasMoreTokens()) {
449                        String token = tokenizer.nextToken();
450                        result = isMatchesClusterFilter(brokerName, token);
451                    }
452                }
453            }
454    
455            return result;
456        }
457    
458        private boolean isMatchesClusterFilter(String brokerName, String match) {
459            boolean result = true;
460            if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) {
461                result = Pattern.matches(match, brokerName);
462            }
463            return result;
464        }
465    
466        public boolean isDisableAsyncDispatch() {
467            return disableAsyncDispatch;
468        }
469    
470        public void setDisableAsyncDispatch(boolean disableAsyncDispatch) {
471            this.disableAsyncDispatch = disableAsyncDispatch;
472        }
473    
474        /**
475         * @return the enableStatusMonitor
476         */
477        public boolean isEnableStatusMonitor() {
478            return enableStatusMonitor;
479        }
480    
481        /**
482         * @param enableStatusMonitor
483         *            the enableStatusMonitor to set
484         */
485        public void setEnableStatusMonitor(boolean enableStatusMonitor) {
486            this.enableStatusMonitor = enableStatusMonitor;
487        }
488    
489        /**
490         * This is called by the BrokerService right before it starts the transport.
491         */
492        @Override
493        public void setBrokerService(BrokerService brokerService) {
494            this.brokerService = brokerService;
495        }
496    
497        public Broker getBroker() {
498            return broker;
499        }
500    
501        public BrokerService getBrokerService() {
502            return brokerService;
503        }
504    
505        /**
506         * @return the updateClusterClients
507         */
508        @Override
509        public boolean isUpdateClusterClients() {
510            return this.updateClusterClients;
511        }
512    
513        /**
514         * @param updateClusterClients
515         *            the updateClusterClients to set
516         */
517        public void setUpdateClusterClients(boolean updateClusterClients) {
518            this.updateClusterClients = updateClusterClients;
519        }
520    
521        /**
522         * @return the rebalanceClusterClients
523         */
524        @Override
525        public boolean isRebalanceClusterClients() {
526            return this.rebalanceClusterClients;
527        }
528    
529        /**
530         * @param rebalanceClusterClients
531         *            the rebalanceClusterClients to set
532         */
533        public void setRebalanceClusterClients(boolean rebalanceClusterClients) {
534            this.rebalanceClusterClients = rebalanceClusterClients;
535        }
536    
537        /**
538         * @return the updateClusterClientsOnRemove
539         */
540        @Override
541        public boolean isUpdateClusterClientsOnRemove() {
542            return this.updateClusterClientsOnRemove;
543        }
544    
545        /**
546         * @param updateClusterClientsOnRemove the updateClusterClientsOnRemove to set
547         */
548        public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
549            this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
550        }
551    
552        /**
553         * @return the updateClusterFilter
554         */
555        public String getUpdateClusterFilter() {
556            return this.updateClusterFilter;
557        }
558    
559        /**
560         * @param updateClusterFilter
561         *            the updateClusterFilter to set
562         */
563        public void setUpdateClusterFilter(String updateClusterFilter) {
564            this.updateClusterFilter = updateClusterFilter;
565        }
566    
567        @Override
568        public int connectionCount() {
569            return connections.size();
570        }
571    
572        public boolean isAuditNetworkProducers() {
573            return auditNetworkProducers;
574        }
575    
576        /**
577         * Enable a producer audit on network connections, Traps the case of a missing send reply and resend.
578         * Note: does not work with conduit=false, networked composite destinations or networked virtual topics
579         * @param auditNetworkProducers
580         */
581        public void setAuditNetworkProducers(boolean auditNetworkProducers) {
582            this.auditNetworkProducers = auditNetworkProducers;
583        }
584    
585        public int getMaximumProducersAllowedPerConnection() {
586            return maximumProducersAllowedPerConnection;
587        }
588    
589        public void setMaximumProducersAllowedPerConnection(int maximumProducersAllowedPerConnection) {
590            this.maximumProducersAllowedPerConnection = maximumProducersAllowedPerConnection;
591        }
592    
593        public int getMaximumConsumersAllowedPerConnection() {
594            return maximumConsumersAllowedPerConnection;
595        }
596    
597        public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) {
598            this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection;
599        }
600    
601        /**
602         * Gets the currently configured policy for creating the published connection address of this
603         * TransportConnector.
604         *
605         * @return the publishedAddressPolicy
606         */
607        public PublishedAddressPolicy getPublishedAddressPolicy() {
608            return publishedAddressPolicy;
609        }
610    
611        /**
612         * Sets the configured policy for creating the published connection address of this
613         * TransportConnector.
614         *
615         * @return the publishedAddressPolicy
616         */
617        public void setPublishedAddressPolicy(PublishedAddressPolicy publishedAddressPolicy) {
618            this.publishedAddressPolicy = publishedAddressPolicy;
619        }
620    }