001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.udp;
018    
019    import java.io.EOFException;
020    import java.io.IOException;
021    import java.net.BindException;
022    import java.net.DatagramSocket;
023    import java.net.InetAddress;
024    import java.net.InetSocketAddress;
025    import java.net.SocketAddress;
026    import java.net.SocketException;
027    import java.net.URI;
028    import java.net.UnknownHostException;
029    import java.nio.channels.AsynchronousCloseException;
030    import java.nio.channels.DatagramChannel;
031    
032    import org.apache.activemq.Service;
033    import org.apache.activemq.command.Command;
034    import org.apache.activemq.command.Endpoint;
035    import org.apache.activemq.openwire.OpenWireFormat;
036    import org.apache.activemq.transport.Transport;
037    import org.apache.activemq.transport.TransportThreadSupport;
038    import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
039    import org.apache.activemq.transport.reliable.ReplayBuffer;
040    import org.apache.activemq.transport.reliable.ReplayStrategy;
041    import org.apache.activemq.transport.reliable.Replayer;
042    import org.apache.activemq.util.InetAddressUtil;
043    import org.apache.activemq.util.IntSequenceGenerator;
044    import org.apache.activemq.util.ServiceStopper;
045    import org.slf4j.Logger;
046    import org.slf4j.LoggerFactory;
047    
048    /**
049     * An implementation of the {@link Transport} interface using raw UDP
050     * 
051     * 
052     */
053    public class UdpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
054        private static final Logger LOG = LoggerFactory.getLogger(UdpTransport.class);
055    
056        private static final int MAX_BIND_ATTEMPTS = 50;
057        private static final long BIND_ATTEMPT_DELAY = 100;
058    
059        private CommandChannel commandChannel;
060        private OpenWireFormat wireFormat;
061        private ByteBufferPool bufferPool;
062        private ReplayStrategy replayStrategy = new ExceptionIfDroppedReplayStrategy();
063        private ReplayBuffer replayBuffer;
064        private int datagramSize = 4 * 1024;
065        private SocketAddress targetAddress;
066        private SocketAddress originalTargetAddress;
067        private DatagramChannel channel;
068        private boolean trace;
069        private boolean useLocalHost = false;
070        private int port;
071        private int minmumWireFormatVersion;
072        private String description;
073        private IntSequenceGenerator sequenceGenerator;
074        private boolean replayEnabled = true;
075    
076        protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
077            this.wireFormat = wireFormat;
078        }
079    
080        public UdpTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException {
081            this(wireFormat);
082            this.targetAddress = createAddress(remoteLocation);
083            description = remoteLocation.toString() + "@";
084        }
085    
086        public UdpTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException {
087            this(wireFormat);
088            this.targetAddress = socketAddress;
089            this.description = getProtocolName() + "ServerConnection@";
090        }
091    
092        /**
093         * Used by the server transport
094         */
095        public UdpTransport(OpenWireFormat wireFormat, int port) throws UnknownHostException, IOException {
096            this(wireFormat);
097            this.port = port;
098            this.targetAddress = null;
099            this.description = getProtocolName() + "Server@";
100        }
101    
102        /**
103         * Creates a replayer for working with the reliable transport
104         */
105        public Replayer createReplayer() throws IOException {
106            if (replayEnabled) {
107                return getCommandChannel();
108            }
109            return null;
110        }
111    
112        /**
113         * A one way asynchronous send
114         */
115        public void oneway(Object command) throws IOException {
116            oneway(command, targetAddress);
117        }
118    
119        /**
120         * A one way asynchronous send to a given address
121         */
122        public void oneway(Object command, SocketAddress address) throws IOException {
123            if (LOG.isDebugEnabled()) {
124                LOG.debug("Sending oneway from: " + this + " to target: " + targetAddress + " command: " + command);
125            }
126            checkStarted();
127            commandChannel.write((Command)command, address);
128        }
129    
130        /**
131         * @return pretty print of 'this'
132         */
133        public String toString() {
134            if (description != null) {
135                return description + port;
136            } else {
137                return getProtocolUriScheme() + targetAddress + "@" + port;
138            }
139        }
140    
141        /**
142         * reads packets from a Socket
143         */
144        public void run() {
145            LOG.trace("Consumer thread starting for: " + toString());
146            while (!isStopped()) {
147                try {
148                    Command command = commandChannel.read();
149                    doConsume(command);
150                } catch (AsynchronousCloseException e) {
151                    // DatagramChannel closed
152                    try {
153                        stop();
154                    } catch (Exception e2) {
155                        LOG.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
156                    }
157                } catch (SocketException e) {
158                    // DatagramSocket closed
159                    LOG.debug("Socket closed: " + e, e);
160                    try {
161                        stop();
162                    } catch (Exception e2) {
163                        LOG.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
164                    }
165                } catch (EOFException e) {
166                    // DataInputStream closed
167                    LOG.debug("Socket closed: " + e, e);
168                    try {
169                        stop();
170                    } catch (Exception e2) {
171                        LOG.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
172                    }
173                } catch (Exception e) {
174                    try {
175                        stop();
176                    } catch (Exception e2) {
177                        LOG.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
178                    }
179                    if (e instanceof IOException) {
180                        onException((IOException)e);
181                    } else {
182                        LOG.error("Caught: " + e, e);
183                        e.printStackTrace();
184                    }
185                }
186            }
187        }
188    
189        /**
190         * We have received the WireFormatInfo from the server on the actual channel
191         * we should use for all future communication with the server, so lets set
192         * the target to be the actual channel that the server has chosen for us to
193         * talk on.
194         */
195        public void setTargetEndpoint(Endpoint newTarget) {
196            if (newTarget instanceof DatagramEndpoint) {
197                DatagramEndpoint endpoint = (DatagramEndpoint)newTarget;
198                SocketAddress address = endpoint.getAddress();
199                if (address != null) {
200                    if (originalTargetAddress == null) {
201                        originalTargetAddress = targetAddress;
202                    }
203                    targetAddress = address;
204                    commandChannel.setTargetAddress(address);
205                }
206            }
207        }
208    
209        // Properties
210        // -------------------------------------------------------------------------
211        public boolean isTrace() {
212            return trace;
213        }
214    
215        public void setTrace(boolean trace) {
216            this.trace = trace;
217        }
218    
219        public int getDatagramSize() {
220            return datagramSize;
221        }
222    
223        public void setDatagramSize(int datagramSize) {
224            this.datagramSize = datagramSize;
225        }
226    
227        public boolean isUseLocalHost() {
228            return useLocalHost;
229        }
230    
231        /**
232         * Sets whether 'localhost' or the actual local host name should be used to
233         * make local connections. On some operating systems such as Macs its not
234         * possible to connect as the local host name so localhost is better.
235         */
236        public void setUseLocalHost(boolean useLocalHost) {
237            this.useLocalHost = useLocalHost;
238        }
239    
240        public CommandChannel getCommandChannel() throws IOException {
241            if (commandChannel == null) {
242                commandChannel = createCommandChannel();
243            }
244            return commandChannel;
245        }
246    
247        /**
248         * Sets the implementation of the command channel to use.
249         */
250        public void setCommandChannel(CommandDatagramChannel commandChannel) {
251            this.commandChannel = commandChannel;
252        }
253    
254        public ReplayStrategy getReplayStrategy() {
255            return replayStrategy;
256        }
257    
258        /**
259         * Sets the strategy used to replay missed datagrams
260         */
261        public void setReplayStrategy(ReplayStrategy replayStrategy) {
262            this.replayStrategy = replayStrategy;
263        }
264    
265        public int getPort() {
266            return port;
267        }
268    
269        /**
270         * Sets the port to connect on
271         */
272        public void setPort(int port) {
273            this.port = port;
274        }
275    
276        public int getMinmumWireFormatVersion() {
277            return minmumWireFormatVersion;
278        }
279    
280        public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
281            this.minmumWireFormatVersion = minmumWireFormatVersion;
282        }
283    
284        public OpenWireFormat getWireFormat() {
285            return wireFormat;
286        }
287    
288        public IntSequenceGenerator getSequenceGenerator() {
289            if (sequenceGenerator == null) {
290                sequenceGenerator = new IntSequenceGenerator();
291            }
292            return sequenceGenerator;
293        }
294    
295        public void setSequenceGenerator(IntSequenceGenerator sequenceGenerator) {
296            this.sequenceGenerator = sequenceGenerator;
297        }
298    
299        public boolean isReplayEnabled() {
300            return replayEnabled;
301        }
302    
303        /**
304         * Sets whether or not replay should be enabled when using the reliable
305         * transport. i.e. should we maintain a buffer of messages that can be
306         * replayed?
307         */
308        public void setReplayEnabled(boolean replayEnabled) {
309            this.replayEnabled = replayEnabled;
310        }
311    
312        public ByteBufferPool getBufferPool() {
313            if (bufferPool == null) {
314                bufferPool = new DefaultBufferPool();
315            }
316            return bufferPool;
317        }
318    
319        public void setBufferPool(ByteBufferPool bufferPool) {
320            this.bufferPool = bufferPool;
321        }
322    
323        public ReplayBuffer getReplayBuffer() {
324            return replayBuffer;
325        }
326    
327        public void setReplayBuffer(ReplayBuffer replayBuffer) throws IOException {
328            this.replayBuffer = replayBuffer;
329            getCommandChannel().setReplayBuffer(replayBuffer);
330        }
331    
332        // Implementation methods
333        // -------------------------------------------------------------------------
334    
335        /**
336         * Creates an address from the given URI
337         */
338        protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException, IOException {
339            String host = resolveHostName(remoteLocation.getHost());
340            return new InetSocketAddress(host, remoteLocation.getPort());
341        }
342    
343        protected String resolveHostName(String host) throws UnknownHostException {
344            String localName = InetAddressUtil.getLocalHostName();
345            if (localName != null && isUseLocalHost()) {
346                if (localName.equals(host)) {
347                    return "localhost";
348                }
349            }
350            return host;
351        }
352    
353        protected void doStart() throws Exception {
354            getCommandChannel().start();
355    
356            super.doStart();
357        }
358    
359        protected CommandChannel createCommandChannel() throws IOException {
360            SocketAddress localAddress = createLocalAddress();
361            channel = DatagramChannel.open();
362    
363            channel = connect(channel, targetAddress);
364    
365            DatagramSocket socket = channel.socket();
366            bind(socket, localAddress);
367            if (port == 0) {
368                port = socket.getLocalPort();
369            }
370    
371            return createCommandDatagramChannel();
372        }
373    
374        protected CommandChannel createCommandDatagramChannel() {
375            return new CommandDatagramChannel(this, getWireFormat(), getDatagramSize(), getTargetAddress(), createDatagramHeaderMarshaller(), getChannel(), getBufferPool());
376        }
377    
378        protected void bind(DatagramSocket socket, SocketAddress localAddress) throws IOException {
379            channel.configureBlocking(true);
380    
381            if (LOG.isDebugEnabled()) {
382                LOG.debug("Binding to address: " + localAddress);
383            }
384    
385            //
386            // We have noticed that on some platfoms like linux, after you close
387            // down
388            // a previously bound socket, it can take a little while before we can
389            // bind it again.
390            // 
391            for (int i = 0; i < MAX_BIND_ATTEMPTS; i++) {
392                try {
393                    socket.bind(localAddress);
394                    return;
395                } catch (BindException e) {
396                    if (i + 1 == MAX_BIND_ATTEMPTS) {
397                        throw e;
398                    }
399                    try {
400                        Thread.sleep(BIND_ATTEMPT_DELAY);
401                    } catch (InterruptedException e1) {
402                        Thread.currentThread().interrupt();
403                        throw e;
404                    }
405                }
406            }
407    
408        }
409    
410        protected DatagramChannel connect(DatagramChannel channel, SocketAddress targetAddress2) throws IOException {
411            // TODO
412            // connect to default target address to avoid security checks each time
413            // channel = channel.connect(targetAddress);
414    
415            return channel;
416        }
417    
418        protected SocketAddress createLocalAddress() {
419            return new InetSocketAddress(port);
420        }
421    
422        protected void doStop(ServiceStopper stopper) throws Exception {
423            if (channel != null) {
424                channel.close();
425            }
426        }
427    
428        protected DatagramHeaderMarshaller createDatagramHeaderMarshaller() {
429            return new DatagramHeaderMarshaller();
430        }
431    
432        protected String getProtocolName() {
433            return "Udp";
434        }
435    
436        protected String getProtocolUriScheme() {
437            return "udp://";
438        }
439    
440        protected SocketAddress getTargetAddress() {
441            return targetAddress;
442        }
443    
444        protected DatagramChannel getChannel() {
445            return channel;
446        }
447    
448        protected void setChannel(DatagramChannel channel) {
449            this.channel = channel;
450        }
451    
452        public InetSocketAddress getLocalSocketAddress() {
453            if (channel == null) {
454                return null;
455            } else {
456                return (InetSocketAddress)channel.socket().getLocalSocketAddress();
457            }
458        }
459    
460        public String getRemoteAddress() {
461            if (targetAddress != null) {
462                return "" + targetAddress;
463            }
464            return null;
465        }
466    
467        public int getReceiveCounter() {
468            if (commandChannel == null) {
469                return 0;
470            }
471            return commandChannel.getReceiveCounter();
472        }
473    }