001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.udp;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.net.UnknownHostException;
023import java.util.HashMap;
024import java.util.Map;
025
026import org.apache.activemq.TransportLoggerSupport;
027import org.apache.activemq.openwire.OpenWireFormat;
028import org.apache.activemq.transport.CommandJoiner;
029import org.apache.activemq.transport.InactivityMonitor;
030import org.apache.activemq.transport.Transport;
031import org.apache.activemq.transport.TransportFactory;
032import org.apache.activemq.transport.TransportServer;
033import org.apache.activemq.transport.reliable.DefaultReplayStrategy;
034import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
035import org.apache.activemq.transport.reliable.ReliableTransport;
036import org.apache.activemq.transport.reliable.ReplayStrategy;
037import org.apache.activemq.transport.reliable.Replayer;
038import org.apache.activemq.transport.tcp.TcpTransportFactory;
039import org.apache.activemq.util.IOExceptionSupport;
040import org.apache.activemq.util.IntrospectionSupport;
041import org.apache.activemq.util.URISupport;
042import org.apache.activemq.wireformat.WireFormat;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
048 *
049 * @deprecated
050 */
051@Deprecated
052public class UdpTransportFactory extends TransportFactory {
053
054    private static final Logger log = LoggerFactory.getLogger(TcpTransportFactory.class);
055
056    @Override
057    public TransportServer doBind(final URI location) throws IOException {
058        try {
059            Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
060            if (options.containsKey("port")) {
061                throw new IllegalArgumentException("The port property cannot be specified on a UDP server transport - please use the port in the URI syntax");
062            }
063            WireFormat wf = createWireFormat(options);
064            int port = location.getPort();
065            OpenWireFormat openWireFormat = asOpenWireFormat(wf);
066            UdpTransport transport = (UdpTransport) createTransport(location.getPort(), wf);
067
068            Transport configuredTransport = configure(transport, wf, options, true);
069            UdpTransportServer server = new UdpTransportServer(location, transport, configuredTransport, createReplayStrategy());
070            return server;
071        } catch (URISyntaxException e) {
072            throw IOExceptionSupport.create(e);
073        } catch (Exception e) {
074            throw IOExceptionSupport.create(e);
075        }
076    }
077
078    @Override
079    public Transport configure(Transport transport, WireFormat format, Map options) throws Exception {
080        return configure(transport, format, options, false);
081    }
082
083    @Override
084    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
085        IntrospectionSupport.setProperties(transport, options);
086        final UdpTransport udpTransport = (UdpTransport)transport;
087
088        // deal with fragmentation
089        transport = new CommandJoiner(transport, asOpenWireFormat(format));
090
091        if (udpTransport.isTrace()) {
092            try {
093                transport = TransportLoggerSupport.createTransportLogger(transport);
094            } catch (Throwable e) {
095                log.error("Could not create TransportLogger, reason: " + e, e);
096            }
097        }
098
099        transport = new InactivityMonitor(transport, format);
100
101        if (format instanceof OpenWireFormat) {
102            transport = configureClientSideNegotiator(transport, format, udpTransport);
103        }
104
105        return transport;
106    }
107
108    @Override
109    protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
110        OpenWireFormat wireFormat = asOpenWireFormat(wf);
111        return new UdpTransport(wireFormat, location);
112    }
113
114    protected Transport createTransport(int port, WireFormat wf) throws UnknownHostException, IOException {
115        OpenWireFormat wireFormat = asOpenWireFormat(wf);
116        return new UdpTransport(wireFormat, port);
117    }
118
119    /**
120     * Configures the transport
121     *
122     * @param acceptServer true if this transport is used purely as an 'accept'
123     *                transport for new connections which work like TCP
124     *                SocketServers where new connections spin up a new separate
125     *                UDP transport
126     */
127    protected Transport configure(Transport transport, WireFormat format, Map options, boolean acceptServer) throws Exception {
128        IntrospectionSupport.setProperties(transport, options);
129        UdpTransport udpTransport = (UdpTransport)transport;
130
131        OpenWireFormat openWireFormat = asOpenWireFormat(format);
132
133        if (udpTransport.isTrace()) {
134            transport = TransportLoggerSupport.createTransportLogger(transport);
135        }
136
137        transport = new InactivityMonitor(transport, format);
138
139        if (!acceptServer && format instanceof OpenWireFormat) {
140            transport = configureClientSideNegotiator(transport, format, udpTransport);
141        }
142
143        // deal with fragmentation
144
145        if (acceptServer) {
146            // lets not support a buffer of messages to enable reliable
147            // messaging on the 'accept server' transport
148            udpTransport.setReplayEnabled(false);
149
150            // we don't want to do reliable checks on this transport as we
151            // delegate to one that does
152            transport = new CommandJoiner(transport, openWireFormat);
153            return transport;
154        } else {
155            ReliableTransport reliableTransport = new ReliableTransport(transport, udpTransport);
156            Replayer replayer = reliableTransport.getReplayer();
157            reliableTransport.setReplayStrategy(createReplayStrategy(replayer));
158
159            // Joiner must be on outside as the inbound messages must be
160            // processed by the reliable transport first
161            return new CommandJoiner(reliableTransport, openWireFormat);
162        }
163    }
164
165
166    protected ReplayStrategy createReplayStrategy(Replayer replayer) {
167        if (replayer != null) {
168            return new DefaultReplayStrategy(5);
169        }
170        return new ExceptionIfDroppedReplayStrategy(1);
171    }
172
173    protected ReplayStrategy createReplayStrategy() {
174        return new DefaultReplayStrategy(5);
175    }
176
177    protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) {
178        return new ResponseRedirectInterceptor(transport, udpTransport);
179    }
180
181    protected OpenWireFormat asOpenWireFormat(WireFormat wf) {
182        OpenWireFormat answer = (OpenWireFormat)wf;
183        return answer;
184    }
185}