001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.vm;
018    
019    import java.io.IOException;
020    import java.net.InetSocketAddress;
021    import java.net.URI;
022    import java.util.concurrent.atomic.AtomicInteger;
023    
024    import org.apache.activemq.command.BrokerInfo;
025    import org.apache.activemq.transport.MutexTransport;
026    import org.apache.activemq.transport.ResponseCorrelator;
027    import org.apache.activemq.transport.Transport;
028    import org.apache.activemq.transport.TransportAcceptListener;
029    import org.apache.activemq.transport.TransportServer;
030    
031    /**
032     * Broker side of the VMTransport
033     */
034    public class VMTransportServer implements TransportServer {
035    
036        private TransportAcceptListener acceptListener;
037        private final URI location;
038        private boolean disposed;
039    
040        private final AtomicInteger connectionCount = new AtomicInteger(0);
041        private final boolean disposeOnDisconnect;
042    
043        /**
044         * @param location
045         * @param disposeOnDisconnect
046         */
047        public VMTransportServer(URI location, boolean disposeOnDisconnect) {
048            this.location = location;
049            this.disposeOnDisconnect = disposeOnDisconnect;
050        }
051    
052        /**
053         * @return a pretty print of this
054         */
055        public String toString() {
056            return "VMTransportServer(" + location + ")";
057        }
058    
059        /**
060         * @return new VMTransport
061         * @throws IOException
062         */
063        public VMTransport connect() throws IOException {
064            TransportAcceptListener al;
065            synchronized (this) {
066                if (disposed) {
067                    throw new IOException("Server has been disposed.");
068                }
069                al = acceptListener;
070            }
071            if (al == null) {
072                throw new IOException("Server TransportAcceptListener is null.");
073            }
074    
075            connectionCount.incrementAndGet();
076            VMTransport client = new VMTransport(location) {
077                public void stop() throws Exception {
078                    if (!disposed.get()) {
079                        super.stop();
080                        if (connectionCount.decrementAndGet() == 0 && disposeOnDisconnect) {
081                            VMTransportServer.this.stop();
082                        }
083                    }
084                };
085            };
086    
087            VMTransport server = new VMTransport(location);
088            client.setPeer(server);
089            server.setPeer(client);
090            al.onAccept(configure(server));
091            return client;
092        }
093    
094        /**
095         * Configure transport
096         *
097         * @param transport
098         * @return the Transport
099         */
100        public static Transport configure(Transport transport) {
101            transport = new MutexTransport(transport);
102            transport = new ResponseCorrelator(transport);
103            return transport;
104        }
105    
106        /**
107         * Set the Transport accept listener for new Connections
108         *
109         * @param acceptListener
110         */
111        public synchronized void setAcceptListener(TransportAcceptListener acceptListener) {
112            this.acceptListener = acceptListener;
113        }
114    
115        public void start() throws IOException {
116        }
117    
118        public void stop() throws IOException {
119            VMTransportFactory.stopped(this);
120        }
121    
122        public URI getConnectURI() {
123            return location;
124        }
125    
126        public URI getBindURI() {
127            return location;
128        }
129    
130        public void setBrokerInfo(BrokerInfo brokerInfo) {
131        }
132    
133        public InetSocketAddress getSocketAddress() {
134            return null;
135        }
136    
137        public int getConnectionCount() {
138            return connectionCount.intValue();
139        }
140    }