001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.udp;
018
019import java.io.IOException;
020import java.net.InetSocketAddress;
021import java.net.SocketAddress;
022import java.net.URI;
023import java.util.HashMap;
024import java.util.Map;
025
026import org.apache.activemq.command.BrokerInfo;
027import org.apache.activemq.command.Command;
028import org.apache.activemq.openwire.OpenWireFormat;
029import org.apache.activemq.transport.CommandJoiner;
030import org.apache.activemq.transport.InactivityMonitor;
031import org.apache.activemq.transport.Transport;
032import org.apache.activemq.transport.TransportListener;
033import org.apache.activemq.transport.TransportServer;
034import org.apache.activemq.transport.TransportServerSupport;
035import org.apache.activemq.transport.reliable.ReliableTransport;
036import org.apache.activemq.transport.reliable.ReplayStrategy;
037import org.apache.activemq.util.ServiceStopper;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * A UDP based implementation of {@link TransportServer}
043 *
044 * @deprecated
045 */
046@Deprecated
047public class UdpTransportServer extends TransportServerSupport {
048    private static final Logger LOG = LoggerFactory.getLogger(UdpTransportServer.class);
049
050    private final UdpTransport serverTransport;
051    private final ReplayStrategy replayStrategy;
052    private final Transport configuredTransport;
053    private boolean usingWireFormatNegotiation;
054    private final Map<DatagramEndpoint, Transport> transports = new HashMap<DatagramEndpoint, Transport>();
055    private boolean allowLinkStealing;
056
057    public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport, ReplayStrategy replayStrategy) {
058        super(connectURI);
059        this.serverTransport = serverTransport;
060        this.configuredTransport = configuredTransport;
061        this.replayStrategy = replayStrategy;
062    }
063
064    @Override
065    public String toString() {
066        return "UdpTransportServer@" + serverTransport;
067    }
068
069    public void run() {
070    }
071
072    public UdpTransport getServerTransport() {
073        return serverTransport;
074    }
075
076    @Override
077    public void setBrokerInfo(BrokerInfo brokerInfo) {
078    }
079
080    @Override
081    protected void doStart() throws Exception {
082        LOG.info("Starting " + this);
083
084        configuredTransport.setTransportListener(new TransportListener() {
085            @Override
086            public void onCommand(Object o) {
087                final Command command = (Command)o;
088                processInboundConnection(command);
089            }
090
091            @Override
092            public void onException(IOException error) {
093                LOG.error("Caught: " + error, error);
094            }
095
096            @Override
097            public void transportInterupted() {
098            }
099
100            @Override
101            public void transportResumed() {
102            }
103        });
104        configuredTransport.start();
105    }
106
107    @Override
108    protected void doStop(ServiceStopper stopper) throws Exception {
109        configuredTransport.stop();
110    }
111
112    protected void processInboundConnection(Command command) {
113        DatagramEndpoint endpoint = (DatagramEndpoint)command.getFrom();
114        if (LOG.isDebugEnabled()) {
115            LOG.debug("Received command on: " + this + " from address: " + endpoint + " command: " + command);
116        }
117        Transport transport = null;
118        synchronized (transports) {
119            transport = transports.get(endpoint);
120            if (transport == null) {
121                if (usingWireFormatNegotiation && !command.isWireFormatInfo()) {
122                    LOG.error("Received inbound server communication from: " + command.getFrom() + " expecting WireFormatInfo but was command: " + command);
123                } else {
124                    if (LOG.isDebugEnabled()) {
125                        LOG.debug("Creating a new UDP server connection");
126                    }
127                    try {
128                        transport = createTransport(command, endpoint);
129                        transport = configureTransport(transport);
130                        transports.put(endpoint, transport);
131                    } catch (IOException e) {
132                        LOG.error("Caught: " + e, e);
133                        getAcceptListener().onAcceptError(e);
134                    }
135                }
136            } else {
137                LOG.warn("Discarding duplicate command to server from: " + endpoint + " command: " + command);
138            }
139        }
140    }
141
142    protected Transport configureTransport(Transport transport) {
143        transport = new InactivityMonitor(transport, serverTransport.getWireFormat());
144        getAcceptListener().onAccept(transport);
145        return transport;
146    }
147
148    protected Transport createTransport(final Command command, DatagramEndpoint endpoint) throws IOException {
149        if (endpoint == null) {
150            throw new IOException("No endpoint available for command: " + command);
151        }
152        final SocketAddress address = endpoint.getAddress();
153        final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy();
154        final UdpTransport transport = new UdpTransport(connectionWireFormat, address);
155
156        final ReliableTransport reliableTransport = new ReliableTransport(transport, transport);
157        reliableTransport.getReplayer();
158        reliableTransport.setReplayStrategy(replayStrategy);
159
160        // Joiner must be on outside as the inbound messages must be processed
161        // by the reliable transport first
162        return new CommandJoiner(reliableTransport, connectionWireFormat) {
163            @Override
164            public void start() throws Exception {
165                super.start();
166                reliableTransport.onCommand(command);
167            }
168        };
169
170        /**
171         * final WireFormatNegotiator wireFormatNegotiator = new
172         * WireFormatNegotiator(configuredTransport, transport.getWireFormat(),
173         * serverTransport .getMinmumWireFormatVersion()) { public void start()
174         * throws Exception { super.start(); log.debug("Starting a new server
175         * transport: " + this + " with command: " + command);
176         * onCommand(command); } // lets use the specific addressing of wire
177         * format protected void sendWireFormat(WireFormatInfo info) throws
178         * IOException { log.debug("#### we have negotiated the wireformat;
179         * sending a wireformat to: " + address); transport.oneway(info,
180         * address); } }; return wireFormatNegotiator;
181         */
182    }
183
184    @Override
185    public InetSocketAddress getSocketAddress() {
186        return serverTransport.getLocalSocketAddress();
187    }
188
189    @Override
190    public boolean isSslServer() {
191        return false;
192    }
193
194    @Override
195    public boolean isAllowLinkStealing() {
196        return allowLinkStealing;
197    }
198
199    public void setAllowLinkStealing(boolean allowLinkStealing) {
200        this.allowLinkStealing = allowLinkStealing;
201    }
202}