001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.udp;
018    
019    import java.io.IOException;
020    import java.net.InetSocketAddress;
021    import java.net.SocketAddress;
022    import java.net.URI;
023    import java.util.HashMap;
024    import java.util.Map;
025    
026    import org.apache.activemq.command.BrokerInfo;
027    import org.apache.activemq.command.Command;
028    import org.apache.activemq.openwire.OpenWireFormat;
029    import org.apache.activemq.transport.CommandJoiner;
030    import org.apache.activemq.transport.InactivityMonitor;
031    import org.apache.activemq.transport.Transport;
032    import org.apache.activemq.transport.TransportListener;
033    import org.apache.activemq.transport.TransportServer;
034    import org.apache.activemq.transport.TransportServerSupport;
035    import org.apache.activemq.transport.reliable.ReliableTransport;
036    import org.apache.activemq.transport.reliable.ReplayStrategy;
037    import org.apache.activemq.util.ServiceStopper;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    
041    /**
042     * A UDP based implementation of {@link TransportServer}
043     *
044     * @deprecated
045     */
046    @Deprecated
047    public class UdpTransportServer extends TransportServerSupport {
048        private static final Logger LOG = LoggerFactory.getLogger(UdpTransportServer.class);
049    
050        private final UdpTransport serverTransport;
051        private final ReplayStrategy replayStrategy;
052        private final Transport configuredTransport;
053        private boolean usingWireFormatNegotiation;
054        private final Map<DatagramEndpoint, Transport> transports = new HashMap<DatagramEndpoint, Transport>();
055    
056        public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport, ReplayStrategy replayStrategy) {
057            super(connectURI);
058            this.serverTransport = serverTransport;
059            this.configuredTransport = configuredTransport;
060            this.replayStrategy = replayStrategy;
061        }
062    
063        @Override
064        public String toString() {
065            return "UdpTransportServer@" + serverTransport;
066        }
067    
068        public void run() {
069        }
070    
071        public UdpTransport getServerTransport() {
072            return serverTransport;
073        }
074    
075        @Override
076        public void setBrokerInfo(BrokerInfo brokerInfo) {
077        }
078    
079        @Override
080        protected void doStart() throws Exception {
081            LOG.info("Starting " + this);
082    
083            configuredTransport.setTransportListener(new TransportListener() {
084                @Override
085                public void onCommand(Object o) {
086                    final Command command = (Command)o;
087                    processInboundConnection(command);
088                }
089    
090                @Override
091                public void onException(IOException error) {
092                    LOG.error("Caught: " + error, error);
093                }
094    
095                @Override
096                public void transportInterupted() {
097                }
098    
099                @Override
100                public void transportResumed() {
101                }
102            });
103            configuredTransport.start();
104        }
105    
106        @Override
107        protected void doStop(ServiceStopper stopper) throws Exception {
108            configuredTransport.stop();
109        }
110    
111        protected void processInboundConnection(Command command) {
112            DatagramEndpoint endpoint = (DatagramEndpoint)command.getFrom();
113            if (LOG.isDebugEnabled()) {
114                LOG.debug("Received command on: " + this + " from address: " + endpoint + " command: " + command);
115            }
116            Transport transport = null;
117            synchronized (transports) {
118                transport = transports.get(endpoint);
119                if (transport == null) {
120                    if (usingWireFormatNegotiation && !command.isWireFormatInfo()) {
121                        LOG.error("Received inbound server communication from: " + command.getFrom() + " expecting WireFormatInfo but was command: " + command);
122                    } else {
123                        if (LOG.isDebugEnabled()) {
124                            LOG.debug("Creating a new UDP server connection");
125                        }
126                        try {
127                            transport = createTransport(command, endpoint);
128                            transport = configureTransport(transport);
129                            transports.put(endpoint, transport);
130                        } catch (IOException e) {
131                            LOG.error("Caught: " + e, e);
132                            getAcceptListener().onAcceptError(e);
133                        }
134                    }
135                } else {
136                    LOG.warn("Discarding duplicate command to server from: " + endpoint + " command: " + command);
137                }
138            }
139        }
140    
141        protected Transport configureTransport(Transport transport) {
142            transport = new InactivityMonitor(transport, serverTransport.getWireFormat());
143            getAcceptListener().onAccept(transport);
144            return transport;
145        }
146    
147        protected Transport createTransport(final Command command, DatagramEndpoint endpoint) throws IOException {
148            if (endpoint == null) {
149                throw new IOException("No endpoint available for command: " + command);
150            }
151            final SocketAddress address = endpoint.getAddress();
152            final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy();
153            final UdpTransport transport = new UdpTransport(connectionWireFormat, address);
154    
155            final ReliableTransport reliableTransport = new ReliableTransport(transport, transport);
156            reliableTransport.getReplayer();
157            reliableTransport.setReplayStrategy(replayStrategy);
158    
159            // Joiner must be on outside as the inbound messages must be processed
160            // by the reliable transport first
161            return new CommandJoiner(reliableTransport, connectionWireFormat) {
162                @Override
163                public void start() throws Exception {
164                    super.start();
165                    reliableTransport.onCommand(command);
166                }
167            };
168    
169            /**
170             * final WireFormatNegotiator wireFormatNegotiator = new
171             * WireFormatNegotiator(configuredTransport, transport.getWireFormat(),
172             * serverTransport .getMinmumWireFormatVersion()) { public void start()
173             * throws Exception { super.start(); log.debug("Starting a new server
174             * transport: " + this + " with command: " + command);
175             * onCommand(command); } // lets use the specific addressing of wire
176             * format protected void sendWireFormat(WireFormatInfo info) throws
177             * IOException { log.debug("#### we have negotiated the wireformat;
178             * sending a wireformat to: " + address); transport.oneway(info,
179             * address); } }; return wireFormatNegotiator;
180             */
181        }
182    
183        @Override
184        public InetSocketAddress getSocketAddress() {
185            return serverTransport.getLocalSocketAddress();
186        }
187    
188        @Override
189        public boolean isSslServer() {
190            return false;
191        }
192    }