001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.proxy;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.net.URISyntaxException;
022    import java.util.Iterator;
023    import java.util.concurrent.CopyOnWriteArrayList;
024    import org.apache.activemq.Service;
025    import org.apache.activemq.transport.CompositeTransport;
026    import org.apache.activemq.transport.Transport;
027    import org.apache.activemq.transport.TransportAcceptListener;
028    import org.apache.activemq.transport.TransportFactory;
029    import org.apache.activemq.transport.TransportFilter;
030    import org.apache.activemq.transport.TransportServer;
031    import org.apache.activemq.util.ServiceStopper;
032    import org.slf4j.Logger;
033    import org.slf4j.LoggerFactory;
034    
035    /**
036     * @org.apache.xbean.XBean
037     * 
038     * 
039     */
040    public class ProxyConnector implements Service {
041    
042        private static final Logger LOG = LoggerFactory.getLogger(ProxyConnector.class);
043        private TransportServer server;
044        private URI bind;
045        private URI remote;
046        private URI localUri;
047        private String name;
048        /**
049         * Should we proxy commands to the local broker using VM transport as well?
050         */
051        private boolean proxyToLocalBroker = true;
052        
053        private final CopyOnWriteArrayList<ProxyConnection> connections = new CopyOnWriteArrayList<ProxyConnection>();
054    
055        public void start() throws Exception {
056    
057            this.getServer().setAcceptListener(new TransportAcceptListener() {
058                public void onAccept(Transport localTransport) {
059                    try {
060                        Transport remoteTransport = createRemoteTransport();
061                        ProxyConnection connection = new ProxyConnection(localTransport, remoteTransport);
062                        connections.add(connection);
063                        connection.start();
064                    } catch (Exception e) {
065                        onAcceptError(e);
066                    }
067                }
068    
069                public void onAcceptError(Exception error) {
070                    LOG.error("Could not accept connection: " + error, error);
071                }
072            });
073            getServer().start();
074            LOG.info("Proxy Connector " + getName() + " Started");
075    
076        }
077    
078        public void stop() throws Exception {
079            ServiceStopper ss = new ServiceStopper();
080            if (this.server != null) {
081                ss.stop(this.server);
082            }
083            for (Iterator<ProxyConnection> iter = connections.iterator(); iter.hasNext();) {
084                LOG.info("Connector stopped: Stopping proxy.");
085                ss.stop(iter.next());
086            }
087            ss.throwFirstException();
088            LOG.info("Proxy Connector " + getName() + " Stopped");
089        }
090    
091        // Properties
092        // -------------------------------------------------------------------------
093    
094        public URI getLocalUri() {
095            return localUri;
096        }
097    
098        public void setLocalUri(URI localURI) {
099            this.localUri = localURI;
100        }
101    
102        public URI getBind() {
103            return bind;
104        }
105    
106        public void setBind(URI bind) {
107            this.bind = bind;
108        }
109    
110        public URI getRemote() {
111            return remote;
112        }
113    
114        public void setRemote(URI remote) {
115            this.remote = remote;
116        }
117    
118        public TransportServer getServer() throws IOException, URISyntaxException {
119            if (server == null) {
120                server = createServer();
121            }
122            return server;
123        }
124    
125        public void setServer(TransportServer server) {
126            this.server = server;
127        }
128    
129        protected TransportServer createServer() throws IOException, URISyntaxException {
130            if (bind == null) {
131                throw new IllegalArgumentException("You must specify either a server or the bind property");
132            }
133            return TransportFactory.bind(bind);
134        }
135    
136        private Transport createRemoteTransport() throws Exception {
137            Transport transport = TransportFactory.compositeConnect(remote);
138            CompositeTransport ct = transport.narrow(CompositeTransport.class);
139            if (ct != null && localUri != null && proxyToLocalBroker) {
140                ct.add(false,new URI[] {localUri});
141            }
142    
143            // Add a transport filter so that we can track the transport life cycle
144            transport = new TransportFilter(transport) {
145                @Override
146                public void stop() throws Exception {
147                    LOG.info("Stopping proxy.");
148                    super.stop();
149                    connections.remove(this);
150                }
151            };
152            return transport;
153        }
154    
155        public String getName() {
156            if (name == null) {
157                if (server != null) {
158                    name = server.getConnectURI().toString();
159                } else {
160                    name = "proxy";
161                }
162            }
163            return name;
164        }
165    
166        public void setName(String name) {
167            this.name = name;
168        }
169    
170        public boolean isProxyToLocalBroker() {
171            return proxyToLocalBroker;
172        }
173    
174        public void setProxyToLocalBroker(boolean proxyToLocalBroker) {
175            this.proxyToLocalBroker = proxyToLocalBroker;
176        }
177    
178    }