001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.multicast;
018
019import java.io.IOException;
020import java.net.DatagramSocket;
021import java.net.InetAddress;
022import java.net.InetSocketAddress;
023import java.net.MulticastSocket;
024import java.net.SocketAddress;
025import java.net.SocketException;
026import java.net.URI;
027import java.net.UnknownHostException;
028
029import org.apache.activemq.openwire.OpenWireFormat;
030import org.apache.activemq.transport.udp.CommandChannel;
031import org.apache.activemq.transport.udp.CommandDatagramSocket;
032import org.apache.activemq.transport.udp.DatagramHeaderMarshaller;
033import org.apache.activemq.transport.udp.UdpTransport;
034import org.apache.activemq.util.ServiceStopper;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * A multicast based transport.
040 * 
041 * 
042 */
043public class MulticastTransport extends UdpTransport {
044
045    private static final Logger LOG = LoggerFactory.getLogger(MulticastTransport.class);
046
047    private static final int DEFAULT_IDLE_TIME = 5000;
048
049    private MulticastSocket socket;
050    private InetAddress mcastAddress;
051    private int mcastPort;
052    private int timeToLive = 1;
053    private boolean loopBackMode;
054    private long keepAliveInterval = DEFAULT_IDLE_TIME;
055
056    public MulticastTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException {
057        super(wireFormat, remoteLocation);
058    }
059
060    public long getKeepAliveInterval() {
061        return keepAliveInterval;
062    }
063
064    public void setKeepAliveInterval(long keepAliveInterval) {
065        this.keepAliveInterval = keepAliveInterval;
066    }
067
068    public boolean isLoopBackMode() {
069        return loopBackMode;
070    }
071
072    public void setLoopBackMode(boolean loopBackMode) {
073        this.loopBackMode = loopBackMode;
074    }
075
076    public int getTimeToLive() {
077        return timeToLive;
078    }
079
080    public void setTimeToLive(int timeToLive) {
081        this.timeToLive = timeToLive;
082    }
083
084    protected String getProtocolName() {
085        return "Multicast";
086    }
087
088    protected String getProtocolUriScheme() {
089        return "multicast://";
090    }
091
092    protected void bind(DatagramSocket socket, SocketAddress localAddress) throws SocketException {
093    }
094
095    protected void doStop(ServiceStopper stopper) throws Exception {
096        super.doStop(stopper);
097        if (socket != null) {
098            try {
099                socket.leaveGroup(getMulticastAddress());
100            } catch (IOException e) {
101                stopper.onException(this, e);
102            }
103            socket.close();
104        }
105    }
106
107    protected CommandChannel createCommandChannel() throws IOException {
108        socket = new MulticastSocket(mcastPort);
109        socket.setLoopbackMode(loopBackMode);
110        socket.setTimeToLive(timeToLive);
111
112        LOG.debug("Joining multicast address: " + getMulticastAddress());
113        socket.joinGroup(getMulticastAddress());
114        socket.setSoTimeout((int)keepAliveInterval);
115
116        return new CommandDatagramSocket(this, getWireFormat(), getDatagramSize(), getTargetAddress(), createDatagramHeaderMarshaller(), getSocket());
117    }
118
119    protected InetAddress getMulticastAddress() {
120        return mcastAddress;
121    }
122
123    protected MulticastSocket getSocket() {
124        return socket;
125    }
126
127    protected void setSocket(MulticastSocket socket) {
128        this.socket = socket;
129    }
130
131    protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException, IOException {
132        this.mcastAddress = InetAddress.getByName(remoteLocation.getHost());
133        this.mcastPort = remoteLocation.getPort();
134        return new InetSocketAddress(mcastAddress, mcastPort);
135    }
136
137    protected DatagramHeaderMarshaller createDatagramHeaderMarshaller() {
138        return new MulticastDatagramHeaderMarshaller("udp://dummyHostName:" + getPort());
139    }
140
141}