001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.net.URISyntaxException;
022    import java.util.LinkedList;
023    import java.util.StringTokenizer;
024    import java.util.concurrent.CopyOnWriteArrayList;
025    import java.util.regex.Pattern;
026    
027    import javax.management.ObjectName;
028    
029    import org.apache.activemq.broker.jmx.ManagedTransportConnector;
030    import org.apache.activemq.broker.jmx.ManagementContext;
031    import org.apache.activemq.broker.region.ConnectorStatistics;
032    import org.apache.activemq.command.BrokerInfo;
033    import org.apache.activemq.command.ConnectionControl;
034    import org.apache.activemq.security.MessageAuthorizationPolicy;
035    import org.apache.activemq.thread.TaskRunnerFactory;
036    import org.apache.activemq.transport.Transport;
037    import org.apache.activemq.transport.TransportAcceptListener;
038    import org.apache.activemq.transport.TransportFactory;
039    import org.apache.activemq.transport.TransportServer;
040    import org.apache.activemq.transport.discovery.DiscoveryAgent;
041    import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
042    import org.apache.activemq.util.ServiceStopper;
043    import org.apache.activemq.util.ServiceSupport;
044    import org.slf4j.Logger;
045    import org.slf4j.LoggerFactory;
046    
047    /**
048     * @org.apache.xbean.XBean
049     */
050    public class TransportConnector implements Connector, BrokerServiceAware {
051    
052        final Logger LOG = LoggerFactory.getLogger(TransportConnector.class);
053    
054        protected final CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
055        protected TransportStatusDetector statusDector;
056        private BrokerService brokerService;
057        private TransportServer server;
058        private URI uri;
059        private BrokerInfo brokerInfo = new BrokerInfo();
060        private TaskRunnerFactory taskRunnerFactory;
061        private MessageAuthorizationPolicy messageAuthorizationPolicy;
062        private DiscoveryAgent discoveryAgent;
063        private final ConnectorStatistics statistics = new ConnectorStatistics();
064        private URI discoveryUri;
065        private String name;
066        private boolean disableAsyncDispatch;
067        private boolean enableStatusMonitor = false;
068        private Broker broker;
069        private boolean updateClusterClients = false;
070        private boolean rebalanceClusterClients;
071        private boolean updateClusterClientsOnRemove = false;
072        private String updateClusterFilter;
073        private boolean auditNetworkProducers = false;
074        private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE;
075        private int maximumConsumersAllowedPerConnection  = Integer.MAX_VALUE;
076    
077        LinkedList<String> peerBrokers = new LinkedList<String>();
078    
079        public TransportConnector() {
080        }
081    
082        public TransportConnector(TransportServer server) {
083            this();
084            setServer(server);
085            if (server != null && server.getConnectURI() != null) {
086                URI uri = server.getConnectURI();
087                if (uri != null && uri.getScheme().equals("vm")) {
088                    setEnableStatusMonitor(false);
089                }
090            }
091        }
092    
093        /**
094         * @return Returns the connections.
095         */
096        public CopyOnWriteArrayList<TransportConnection> getConnections() {
097            return connections;
098        }
099    
100        /**
101         * Factory method to create a JMX managed version of this transport
102         * connector
103         */
104        public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) throws IOException, URISyntaxException {
105            ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer());
106            rc.setBrokerInfo(getBrokerInfo());
107            rc.setDisableAsyncDispatch(isDisableAsyncDispatch());
108            rc.setDiscoveryAgent(getDiscoveryAgent());
109            rc.setDiscoveryUri(getDiscoveryUri());
110            rc.setEnableStatusMonitor(isEnableStatusMonitor());
111            rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
112            rc.setName(getName());
113            rc.setTaskRunnerFactory(getTaskRunnerFactory());
114            rc.setUri(getUri());
115            rc.setBrokerService(brokerService);
116            rc.setUpdateClusterClients(isUpdateClusterClients());
117            rc.setRebalanceClusterClients(isRebalanceClusterClients());
118            rc.setUpdateClusterFilter(getUpdateClusterFilter());
119            rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove());
120            rc.setAuditNetworkProducers(isAuditNetworkProducers());
121            rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection());
122            rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection());
123            return rc;
124        }
125    
126        public BrokerInfo getBrokerInfo() {
127            return brokerInfo;
128        }
129    
130        public void setBrokerInfo(BrokerInfo brokerInfo) {
131            this.brokerInfo = brokerInfo;
132        }
133    
134        public TransportServer getServer() throws IOException, URISyntaxException {
135            if (server == null) {
136                setServer(createTransportServer());
137            }
138            return server;
139        }
140    
141        public void setServer(TransportServer server) {
142            this.server = server;
143        }
144    
145        public URI getUri() {
146            if (uri == null) {
147                try {
148                    uri = getConnectUri();
149                } catch (Throwable e) {
150                }
151            }
152            return uri;
153        }
154    
155        /**
156         * Sets the server transport URI to use if there is not a
157         * {@link TransportServer} configured via the
158         * {@link #setServer(TransportServer)} method. This value is used to lazy
159         * create a {@link TransportServer} instance
160         *
161         * @param uri
162         */
163        public void setUri(URI uri) {
164            this.uri = uri;
165        }
166    
167        public TaskRunnerFactory getTaskRunnerFactory() {
168            return taskRunnerFactory;
169        }
170    
171        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
172            this.taskRunnerFactory = taskRunnerFactory;
173        }
174    
175        /**
176         * @return the statistics for this connector
177         */
178        public ConnectorStatistics getStatistics() {
179            return statistics;
180        }
181    
182        public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
183            return messageAuthorizationPolicy;
184        }
185    
186        /**
187         * Sets the policy used to decide if the current connection is authorized to
188         * consume a given message
189         */
190        public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
191            this.messageAuthorizationPolicy = messageAuthorizationPolicy;
192        }
193    
194        public void start() throws Exception {
195            broker = brokerService.getBroker();
196            brokerInfo.setBrokerName(broker.getBrokerName());
197            brokerInfo.setBrokerId(broker.getBrokerId());
198            brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
199            brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
200            brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString());
201            getServer().setAcceptListener(new TransportAcceptListener() {
202                public void onAccept(final Transport transport) {
203                    try {
204                        brokerService.getTaskRunnerFactory().execute(new Runnable() {
205                            public void run() {
206                                try {
207                                    Connection connection = createConnection(transport);
208                                    connection.start();
209                                } catch (Exception e) {
210                                    String remoteHost = transport.getRemoteAddress();
211                                    ServiceSupport.dispose(transport);
212                                    onAcceptError(e, remoteHost);
213                                }
214                            }
215                        });
216                    } catch (Exception e) {
217                        String remoteHost = transport.getRemoteAddress();
218                        ServiceSupport.dispose(transport);
219                        onAcceptError(e, remoteHost);
220                    }
221                }
222    
223                public void onAcceptError(Exception error) {
224                    onAcceptError(error, null);
225                }
226    
227                private void onAcceptError(Exception error, String remoteHost) {
228                    LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": "
229                            + error);
230                    LOG.debug("Reason: " + error, error);
231                }
232            });
233            getServer().setBrokerInfo(brokerInfo);
234            getServer().start();
235    
236            DiscoveryAgent da = getDiscoveryAgent();
237            if (da != null) {
238                da.registerService(getPublishableConnectString());
239                da.start();
240            }
241            if (enableStatusMonitor) {
242                this.statusDector = new TransportStatusDetector(this);
243                this.statusDector.start();
244            }
245    
246            LOG.info("Connector " + getName() + " Started");
247        }
248    
249        public String getPublishableConnectString() throws Exception {
250            return getPublishableConnectString(getConnectUri());
251        }
252    
253        public String getPublishableConnectString(URI theConnectURI) throws Exception {
254            String publishableConnectString = null;
255            if (theConnectURI != null) {
256                publishableConnectString = theConnectURI.toString();
257                // strip off server side query parameters which may not be compatible to clients
258                if (theConnectURI.getRawQuery() != null) {
259                    publishableConnectString = publishableConnectString.substring(0, publishableConnectString
260                            .indexOf(theConnectURI.getRawQuery()) - 1);
261                }
262            }
263            if (LOG.isDebugEnabled()) {
264                LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + theConnectURI);
265            }
266            return publishableConnectString;
267        }
268    
269        public void stop() throws Exception {
270            ServiceStopper ss = new ServiceStopper();
271            if (discoveryAgent != null) {
272                ss.stop(discoveryAgent);
273            }
274            if (server != null) {
275                ss.stop(server);
276            }
277            if (this.statusDector != null) {
278                this.statusDector.stop();
279            }
280    
281            for (TransportConnection connection : connections) {
282                ss.stop(connection);
283            }
284            server = null;
285            ss.throwFirstException();
286            LOG.info("Connector " + getName() + " Stopped");
287        }
288    
289        // Implementation methods
290        // -------------------------------------------------------------------------
291        protected Connection createConnection(Transport transport) throws IOException {
292            // prefer to use task runner from broker service as stop task runner, as we can then
293            // tie it to the lifecycle of the broker service
294            TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null
295                    : taskRunnerFactory, brokerService.getTaskRunnerFactory());
296            boolean statEnabled = this.getStatistics().isEnabled();
297            answer.getStatistics().setEnabled(statEnabled);
298            answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
299            return answer;
300        }
301    
302        protected TransportServer createTransportServer() throws IOException, URISyntaxException {
303            if (uri == null) {
304                throw new IllegalArgumentException("You must specify either a server or uri property");
305            }
306            if (brokerService == null) {
307                throw new IllegalArgumentException(
308                        "You must specify the brokerService property. Maybe this connector should be added to a broker?");
309            }
310            return TransportFactory.bind(brokerService, uri);
311        }
312    
313        public DiscoveryAgent getDiscoveryAgent() throws IOException {
314            if (discoveryAgent == null) {
315                discoveryAgent = createDiscoveryAgent();
316            }
317            return discoveryAgent;
318        }
319    
320        protected DiscoveryAgent createDiscoveryAgent() throws IOException {
321            if (discoveryUri != null) {
322                DiscoveryAgent agent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri);
323    
324                if (agent != null && agent instanceof BrokerServiceAware) {
325                    ((BrokerServiceAware) agent).setBrokerService(brokerService);
326                }
327    
328                return agent;
329            }
330            return null;
331        }
332    
333        public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
334            this.discoveryAgent = discoveryAgent;
335        }
336    
337        public URI getDiscoveryUri() {
338            return discoveryUri;
339        }
340    
341        public void setDiscoveryUri(URI discoveryUri) {
342            this.discoveryUri = discoveryUri;
343        }
344    
345        public URI getConnectUri() throws IOException, URISyntaxException {
346            if (server != null) {
347                return server.getConnectURI();
348            } else {
349                return uri;
350            }
351        }
352    
353        public void onStarted(TransportConnection connection) {
354            connections.add(connection);
355        }
356    
357        public void onStopped(TransportConnection connection) {
358            connections.remove(connection);
359        }
360    
361        public String getName() {
362            if (name == null) {
363                uri = getUri();
364                if (uri != null) {
365                    name = uri.toString();
366                }
367            }
368            return name;
369        }
370    
371        public void setName(String name) {
372            this.name = name;
373        }
374    
375        @Override
376        public String toString() {
377            String rc = getName();
378            if (rc == null) {
379                rc = super.toString();
380            }
381            return rc;
382        }
383    
384        protected ConnectionControl getConnectionControl() {
385            boolean rebalance = isRebalanceClusterClients();
386            String connectedBrokers = "";
387            String separator = "";
388    
389            if (isUpdateClusterClients()) {
390                synchronized (peerBrokers) {
391                    for (String uri : getPeerBrokers()) {
392                        connectedBrokers += separator + uri;
393                        separator = ",";
394                    }
395    
396                    if (rebalance) {
397                        String shuffle = getPeerBrokers().removeFirst();
398                        getPeerBrokers().addLast(shuffle);
399                    }
400                }
401            }
402            ConnectionControl control = new ConnectionControl();
403            control.setConnectedBrokers(connectedBrokers);
404            control.setRebalanceConnection(rebalance);
405            return control;
406        }
407    
408        public void addPeerBroker(BrokerInfo info) {
409            if (isMatchesClusterFilter(info.getBrokerName())) {
410                synchronized (peerBrokers) {
411                    getPeerBrokers().addLast(info.getBrokerURL());
412                }
413            }
414        }
415    
416        public void removePeerBroker(BrokerInfo info) {
417            synchronized (peerBrokers) {
418                getPeerBrokers().remove(info.getBrokerURL());
419            }
420        }
421    
422        public LinkedList<String> getPeerBrokers() {
423            synchronized (peerBrokers) {
424                if (peerBrokers.isEmpty()) {
425                    peerBrokers.add(brokerService.getDefaultSocketURIString());
426                }
427                return peerBrokers;
428            }
429        }
430    
431        public void updateClientClusterInfo() {
432            if (isRebalanceClusterClients() || isUpdateClusterClients()) {
433                ConnectionControl control = getConnectionControl();
434                for (Connection c : this.connections) {
435                    c.updateClient(control);
436                    if (isRebalanceClusterClients()) {
437                        control = getConnectionControl();
438                    }
439                }
440            }
441        }
442    
443        private boolean isMatchesClusterFilter(String brokerName) {
444            boolean result = true;
445            String filter = getUpdateClusterFilter();
446            if (filter != null) {
447                filter = filter.trim();
448                if (filter.length() > 0) {
449                    StringTokenizer tokenizer = new StringTokenizer(filter, ",");
450                    while (result && tokenizer.hasMoreTokens()) {
451                        String token = tokenizer.nextToken();
452                        result = isMatchesClusterFilter(brokerName, token);
453                    }
454                }
455            }
456    
457            return result;
458        }
459    
460        private boolean isMatchesClusterFilter(String brokerName, String match) {
461            boolean result = true;
462            if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) {
463                result = Pattern.matches(match, brokerName);
464            }
465            return result;
466        }
467    
468        public boolean isDisableAsyncDispatch() {
469            return disableAsyncDispatch;
470        }
471    
472        public void setDisableAsyncDispatch(boolean disableAsyncDispatch) {
473            this.disableAsyncDispatch = disableAsyncDispatch;
474        }
475    
476        /**
477         * @return the enableStatusMonitor
478         */
479        public boolean isEnableStatusMonitor() {
480            return enableStatusMonitor;
481        }
482    
483        /**
484         * @param enableStatusMonitor
485         *            the enableStatusMonitor to set
486         */
487        public void setEnableStatusMonitor(boolean enableStatusMonitor) {
488            this.enableStatusMonitor = enableStatusMonitor;
489        }
490    
491        /**
492         * This is called by the BrokerService right before it starts the transport.
493         */
494        public void setBrokerService(BrokerService brokerService) {
495            this.brokerService = brokerService;
496        }
497    
498        public Broker getBroker() {
499            return broker;
500        }
501    
502        public BrokerService getBrokerService() {
503            return brokerService;
504        }
505    
506        /**
507         * @return the updateClusterClients
508         */
509        public boolean isUpdateClusterClients() {
510            return this.updateClusterClients;
511        }
512    
513        /**
514         * @param updateClusterClients
515         *            the updateClusterClients to set
516         */
517        public void setUpdateClusterClients(boolean updateClusterClients) {
518            this.updateClusterClients = updateClusterClients;
519        }
520    
521        /**
522         * @return the rebalanceClusterClients
523         */
524        public boolean isRebalanceClusterClients() {
525            return this.rebalanceClusterClients;
526        }
527    
528        /**
529         * @param rebalanceClusterClients
530         *            the rebalanceClusterClients to set
531         */
532        public void setRebalanceClusterClients(boolean rebalanceClusterClients) {
533            this.rebalanceClusterClients = rebalanceClusterClients;
534        }
535    
536        /**
537         * @return the updateClusterClientsOnRemove
538         */
539        public boolean isUpdateClusterClientsOnRemove() {
540            return this.updateClusterClientsOnRemove;
541        }
542    
543        /**
544         * @param updateClusterClientsOnRemove the updateClusterClientsOnRemove to set
545         */
546        public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
547            this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
548        }
549    
550        /**
551         * @return the updateClusterFilter
552         */
553        public String getUpdateClusterFilter() {
554            return this.updateClusterFilter;
555        }
556    
557        /**
558         * @param updateClusterFilter
559         *            the updateClusterFilter to set
560         */
561        public void setUpdateClusterFilter(String updateClusterFilter) {
562            this.updateClusterFilter = updateClusterFilter;
563        }
564    
565        public int connectionCount() {
566            return connections.size();
567        }
568    
569        public boolean isAuditNetworkProducers() {
570            return auditNetworkProducers;
571        }
572    
573        /**
574         * Enable a producer audit on network connections, Traps the case of a missing send reply and resend.
575         * Note: does not work with conduit=false, networked composite destinations or networked virtual topics
576         * @param auditNetworkProducers
577         */
578        public void setAuditNetworkProducers(boolean auditNetworkProducers) {
579            this.auditNetworkProducers = auditNetworkProducers;
580        }
581    
582        public int getMaximumProducersAllowedPerConnection() {
583            return maximumProducersAllowedPerConnection;
584        }
585    
586        public void setMaximumProducersAllowedPerConnection(int maximumProducersAllowedPerConnection) {
587            this.maximumProducersAllowedPerConnection = maximumProducersAllowedPerConnection;
588        }
589    
590        public int getMaximumConsumersAllowedPerConnection() {
591            return maximumConsumersAllowedPerConnection;
592        }
593    
594        public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) {
595            this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection;
596        }
597    }