001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.multicast;
018    
019    import java.io.IOException;
020    import java.net.DatagramSocket;
021    import java.net.InetAddress;
022    import java.net.InetSocketAddress;
023    import java.net.MulticastSocket;
024    import java.net.SocketAddress;
025    import java.net.SocketException;
026    import java.net.URI;
027    import java.net.UnknownHostException;
028    
029    import org.apache.activemq.openwire.OpenWireFormat;
030    import org.apache.activemq.transport.udp.CommandChannel;
031    import org.apache.activemq.transport.udp.CommandDatagramSocket;
032    import org.apache.activemq.transport.udp.DatagramHeaderMarshaller;
033    import org.apache.activemq.transport.udp.UdpTransport;
034    import org.apache.activemq.util.ServiceStopper;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    /**
039     * A multicast based transport.
040     * 
041     * 
042     */
043    public class MulticastTransport extends UdpTransport {
044    
045        private static final Logger LOG = LoggerFactory.getLogger(MulticastTransport.class);
046    
047        private static final int DEFAULT_IDLE_TIME = 5000;
048    
049        private MulticastSocket socket;
050        private InetAddress mcastAddress;
051        private int mcastPort;
052        private int timeToLive = 1;
053        private boolean loopBackMode;
054        private long keepAliveInterval = DEFAULT_IDLE_TIME;
055    
056        public MulticastTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException {
057            super(wireFormat, remoteLocation);
058        }
059    
060        public long getKeepAliveInterval() {
061            return keepAliveInterval;
062        }
063    
064        public void setKeepAliveInterval(long keepAliveInterval) {
065            this.keepAliveInterval = keepAliveInterval;
066        }
067    
068        public boolean isLoopBackMode() {
069            return loopBackMode;
070        }
071    
072        public void setLoopBackMode(boolean loopBackMode) {
073            this.loopBackMode = loopBackMode;
074        }
075    
076        public int getTimeToLive() {
077            return timeToLive;
078        }
079    
080        public void setTimeToLive(int timeToLive) {
081            this.timeToLive = timeToLive;
082        }
083    
084        protected String getProtocolName() {
085            return "Multicast";
086        }
087    
088        protected String getProtocolUriScheme() {
089            return "multicast://";
090        }
091    
092        protected void bind(DatagramSocket socket, SocketAddress localAddress) throws SocketException {
093        }
094    
095        protected void doStop(ServiceStopper stopper) throws Exception {
096            super.doStop(stopper);
097            if (socket != null) {
098                try {
099                    socket.leaveGroup(getMulticastAddress());
100                } catch (IOException e) {
101                    stopper.onException(this, e);
102                }
103                socket.close();
104            }
105        }
106    
107        protected CommandChannel createCommandChannel() throws IOException {
108            socket = new MulticastSocket(mcastPort);
109            socket.setLoopbackMode(loopBackMode);
110            socket.setTimeToLive(timeToLive);
111    
112            LOG.debug("Joining multicast address: " + getMulticastAddress());
113            socket.joinGroup(getMulticastAddress());
114            socket.setSoTimeout((int)keepAliveInterval);
115    
116            return new CommandDatagramSocket(this, getWireFormat(), getDatagramSize(), getTargetAddress(), createDatagramHeaderMarshaller(), getSocket());
117        }
118    
119        protected InetAddress getMulticastAddress() {
120            return mcastAddress;
121        }
122    
123        protected MulticastSocket getSocket() {
124            return socket;
125        }
126    
127        protected void setSocket(MulticastSocket socket) {
128            this.socket = socket;
129        }
130    
131        protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException, IOException {
132            this.mcastAddress = InetAddress.getByName(remoteLocation.getHost());
133            this.mcastPort = remoteLocation.getPort();
134            return new InetSocketAddress(mcastAddress, mcastPort);
135        }
136    
137        protected DatagramHeaderMarshaller createDatagramHeaderMarshaller() {
138            return new MulticastDatagramHeaderMarshaller("udp://dummyHostName:" + getPort());
139        }
140    
141    }