001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.udp;
018    
019    
020    import java.io.DataInputStream;
021    import java.io.DataOutputStream;
022    import java.net.DatagramPacket;
023    import java.net.SocketAddress;
024    import java.nio.ByteBuffer;
025    import java.util.HashMap;
026    import java.util.Map;
027    
028    import org.apache.activemq.command.Command;
029    import org.apache.activemq.command.Endpoint;
030    
031    /**
032     * 
033     * 
034     */
035    public class DatagramHeaderMarshaller {
036    
037        // TODO for large dynamic networks
038        // we may want to evict endpoints that disconnect
039        // from a transport - e.g. for multicast
040        private Map<SocketAddress, Endpoint> endpoints = new HashMap<SocketAddress, Endpoint>();
041        
042        /**
043         * Reads any header if applicable and then creates an endpoint object
044         */
045        public Endpoint createEndpoint(ByteBuffer readBuffer, SocketAddress address) {
046            return getEndpoint(address);
047        }
048    
049        public Endpoint createEndpoint(DatagramPacket datagram, DataInputStream dataIn) {
050            return getEndpoint(datagram.getSocketAddress());
051        }
052    
053        public void writeHeader(Command command, ByteBuffer writeBuffer) {
054            /*
055            writeBuffer.putLong(command.getCounter());
056            writeBuffer.putInt(command.getDataSize());
057            byte flags = command.getFlags();
058            //System.out.println("Writing header with counter: " + header.getCounter() + " size: " + header.getDataSize() + " with flags: " + flags);
059            writeBuffer.put(flags);
060            */
061        }
062    
063        public void writeHeader(Command command, DataOutputStream dataOut) {
064        }
065    
066        /**
067         * Gets the current endpoint object for this address or creates one if not available.
068         * 
069         * Note that this method does not need to be synchronized as its only ever going to be
070         * used by the already-synchronized read() method of a CommandChannel 
071         * 
072         */
073        protected Endpoint getEndpoint(SocketAddress address) {
074            Endpoint endpoint = endpoints.get(address);
075            if (endpoint == null) {
076                endpoint = createEndpoint(address);
077                endpoints.put(address, endpoint);
078            }
079            return endpoint;
080        }
081    
082        protected Endpoint createEndpoint(SocketAddress address) {
083            return new DatagramEndpoint(address.toString(), address);
084        }
085    }