001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.udp;
018    
019    import java.io.IOException;
020    import java.net.InetSocketAddress;
021    import java.net.SocketAddress;
022    import java.net.URI;
023    import java.util.HashMap;
024    import java.util.Map;
025    
026    import org.apache.activemq.command.BrokerInfo;
027    import org.apache.activemq.command.Command;
028    import org.apache.activemq.openwire.OpenWireFormat;
029    import org.apache.activemq.transport.CommandJoiner;
030    import org.apache.activemq.transport.InactivityMonitor;
031    import org.apache.activemq.transport.Transport;
032    import org.apache.activemq.transport.TransportListener;
033    import org.apache.activemq.transport.TransportServer;
034    import org.apache.activemq.transport.TransportServerSupport;
035    import org.apache.activemq.transport.reliable.ReliableTransport;
036    import org.apache.activemq.transport.reliable.ReplayStrategy;
037    import org.apache.activemq.transport.reliable.Replayer;
038    import org.apache.activemq.util.ServiceStopper;
039    import org.slf4j.Logger;
040    import org.slf4j.LoggerFactory;
041    
042    /**
043     * A UDP based implementation of {@link TransportServer}
044     * 
045     * 
046     */
047    
048    public class UdpTransportServer extends TransportServerSupport {
049        private static final Logger LOG = LoggerFactory.getLogger(UdpTransportServer.class);
050    
051        private UdpTransport serverTransport;
052        private ReplayStrategy replayStrategy;
053        private Transport configuredTransport;
054        private boolean usingWireFormatNegotiation;
055        private Map<DatagramEndpoint, Transport> transports = new HashMap<DatagramEndpoint, Transport>();
056    
057        public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport, ReplayStrategy replayStrategy) {
058            super(connectURI);
059            this.serverTransport = serverTransport;
060            this.configuredTransport = configuredTransport;
061            this.replayStrategy = replayStrategy;
062        }
063    
064        public String toString() {
065            return "UdpTransportServer@" + serverTransport;
066        }
067    
068        public void run() {
069        }
070    
071        public UdpTransport getServerTransport() {
072            return serverTransport;
073        }
074    
075        public void setBrokerInfo(BrokerInfo brokerInfo) {
076        }
077    
078        protected void doStart() throws Exception {
079            LOG.info("Starting " + this);
080    
081            configuredTransport.setTransportListener(new TransportListener() {
082                public void onCommand(Object o) {
083                    final Command command = (Command)o;
084                    processInboundConnection(command);
085                }
086    
087                public void onException(IOException error) {
088                    LOG.error("Caught: " + error, error);
089                }
090    
091                public void transportInterupted() {
092                }
093    
094                public void transportResumed() {
095                }
096            });
097            configuredTransport.start();
098        }
099    
100        protected void doStop(ServiceStopper stopper) throws Exception {
101            configuredTransport.stop();
102        }
103    
104        protected void processInboundConnection(Command command) {
105            DatagramEndpoint endpoint = (DatagramEndpoint)command.getFrom();
106            if (LOG.isDebugEnabled()) {
107                LOG.debug("Received command on: " + this + " from address: " + endpoint + " command: " + command);
108            }
109            Transport transport = null;
110            synchronized (transports) {
111                transport = transports.get(endpoint);
112                if (transport == null) {
113                    if (usingWireFormatNegotiation && !command.isWireFormatInfo()) {
114                        LOG.error("Received inbound server communication from: " + command.getFrom() + " expecting WireFormatInfo but was command: " + command);
115                    } else {
116                        if (LOG.isDebugEnabled()) {
117                            LOG.debug("Creating a new UDP server connection");
118                        }
119                        try {
120                            transport = createTransport(command, endpoint);
121                            transport = configureTransport(transport);
122                            transports.put(endpoint, transport);
123                        } catch (IOException e) {
124                            LOG.error("Caught: " + e, e);
125                            getAcceptListener().onAcceptError(e);
126                        }
127                    }
128                } else {
129                    LOG.warn("Discarding duplicate command to server from: " + endpoint + " command: " + command);
130                }
131            }
132        }
133    
134        protected Transport configureTransport(Transport transport) {
135            transport = new InactivityMonitor(transport, serverTransport.getWireFormat());
136            getAcceptListener().onAccept(transport);
137            return transport;
138        }
139    
140        protected Transport createTransport(final Command command, DatagramEndpoint endpoint) throws IOException {
141            if (endpoint == null) {
142                throw new IOException("No endpoint available for command: " + command);
143            }
144            final SocketAddress address = endpoint.getAddress();
145            final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy();
146            final UdpTransport transport = new UdpTransport(connectionWireFormat, address);
147    
148            final ReliableTransport reliableTransport = new ReliableTransport(transport, transport);
149            reliableTransport.getReplayer();
150            reliableTransport.setReplayStrategy(replayStrategy);
151    
152            // Joiner must be on outside as the inbound messages must be processed
153            // by the reliable transport first
154            return new CommandJoiner(reliableTransport, connectionWireFormat) {
155                public void start() throws Exception {
156                    super.start();
157                    reliableTransport.onCommand(command);
158                }
159            };
160    
161            /**
162             * final WireFormatNegotiator wireFormatNegotiator = new
163             * WireFormatNegotiator(configuredTransport, transport.getWireFormat(),
164             * serverTransport .getMinmumWireFormatVersion()) { public void start()
165             * throws Exception { super.start(); log.debug("Starting a new server
166             * transport: " + this + " with command: " + command);
167             * onCommand(command); } // lets use the specific addressing of wire
168             * format protected void sendWireFormat(WireFormatInfo info) throws
169             * IOException { log.debug("#### we have negotiated the wireformat;
170             * sending a wireformat to: " + address); transport.oneway(info,
171             * address); } }; return wireFormatNegotiator;
172             */
173        }
174    
175        public InetSocketAddress getSocketAddress() {
176            return serverTransport.getLocalSocketAddress();
177        }
178    }