001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.udp;
018    
019    import java.io.DataInputStream;
020    import java.io.DataOutputStream;
021    import java.io.IOException;
022    import java.net.DatagramPacket;
023    import java.net.DatagramSocket;
024    import java.net.SocketAddress;
025    
026    import org.apache.activemq.command.Command;
027    import org.apache.activemq.command.Endpoint;
028    import org.apache.activemq.command.LastPartialCommand;
029    import org.apache.activemq.command.PartialCommand;
030    import org.apache.activemq.openwire.BooleanStream;
031    import org.apache.activemq.openwire.OpenWireFormat;
032    import org.apache.activemq.transport.reliable.ReplayBuffer;
033    import org.apache.activemq.util.ByteArrayInputStream;
034    import org.apache.activemq.util.ByteArrayOutputStream;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    /**
039     * A strategy for reading datagrams and de-fragmenting them together.
040     * 
041     * 
042     */
043    public class CommandDatagramSocket extends CommandChannelSupport {
044    
045        private static final Logger LOG = LoggerFactory.getLogger(CommandDatagramSocket.class);
046    
047        private DatagramSocket channel;
048        private Object readLock = new Object();
049        private Object writeLock = new Object();
050    
051        private volatile int receiveCounter;
052    
053        public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller,
054                                     DatagramSocket channel) {
055            super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
056            this.channel = channel;
057        }
058    
059        public void start() throws Exception {
060        }
061    
062        public void stop() throws Exception {
063        }
064    
065        public Command read() throws IOException {
066            Command answer = null;
067            Endpoint from = null;
068            synchronized (readLock) {
069                while (true) {
070                    DatagramPacket datagram = createDatagramPacket();
071                    channel.receive(datagram);
072    
073                    // TODO could use a DataInput implementation that talks direct
074                    // to the byte[] to avoid object allocation
075                    receiveCounter++;
076                    DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(datagram.getData(), 0, datagram.getLength()));
077                    
078                    from = headerMarshaller.createEndpoint(datagram, dataIn);
079                    answer = (Command)wireFormat.unmarshal(dataIn);
080                    break;
081                }
082            }
083            if (answer != null) {
084                answer.setFrom(from);
085    
086                if (LOG.isDebugEnabled()) {
087                    LOG.debug("Channel: " + name + " about to process: " + answer);
088                }
089            }
090            return answer;
091        }
092    
093        public void write(Command command, SocketAddress address) throws IOException {
094            synchronized (writeLock) {
095    
096                ByteArrayOutputStream writeBuffer = createByteArrayOutputStream();
097                DataOutputStream dataOut = new DataOutputStream(writeBuffer);
098                headerMarshaller.writeHeader(command, dataOut);
099    
100                int offset = writeBuffer.size();
101    
102                wireFormat.marshal(command, dataOut);
103    
104                if (remaining(writeBuffer) >= 0) {
105                    sendWriteBuffer(address, writeBuffer, command.getCommandId());
106                } else {
107                    // lets split the command up into chunks
108                    byte[] data = writeBuffer.toByteArray();
109                    boolean lastFragment = false;
110                    int length = data.length;
111                    for (int fragment = 0; !lastFragment; fragment++) {
112                        writeBuffer = createByteArrayOutputStream();
113                        headerMarshaller.writeHeader(command, dataOut);
114    
115                        int chunkSize = remaining(writeBuffer);
116    
117                        // we need to remove the amount of overhead to write the
118                        // partial command
119    
120                        // lets write the flags in there
121                        BooleanStream bs = null;
122                        if (wireFormat.isTightEncodingEnabled()) {
123                            bs = new BooleanStream();
124                            bs.writeBoolean(true); // the partial data byte[] is
125                            // never null
126                        }
127    
128                        // lets remove the header of the partial command
129                        // which is the byte for the type and an int for the size of
130                        // the byte[]
131    
132                        // data type + the command ID + size of the partial data
133                        chunkSize -= 1 + 4 + 4;
134    
135                        // the boolean flags
136                        if (bs != null) {
137                            chunkSize -= bs.marshalledSize();
138                        } else {
139                            chunkSize -= 1;
140                        }
141    
142                        if (!wireFormat.isSizePrefixDisabled()) {
143                            // lets write the size of the command buffer
144                            dataOut.writeInt(chunkSize);
145                            chunkSize -= 4;
146                        }
147    
148                        lastFragment = offset + chunkSize >= length;
149                        if (chunkSize + offset > length) {
150                            chunkSize = length - offset;
151                        }
152    
153                        if (lastFragment) {
154                            dataOut.write(LastPartialCommand.DATA_STRUCTURE_TYPE);
155                        } else {
156                            dataOut.write(PartialCommand.DATA_STRUCTURE_TYPE);
157                        }
158    
159                        if (bs != null) {
160                            bs.marshal(dataOut);
161                        }
162    
163                        int commandId = command.getCommandId();
164                        if (fragment > 0) {
165                            commandId = sequenceGenerator.getNextSequenceId();
166                        }
167                        dataOut.writeInt(commandId);
168                        if (bs == null) {
169                            dataOut.write((byte)1);
170                        }
171    
172                        // size of byte array
173                        dataOut.writeInt(chunkSize);
174    
175                        // now the data
176                        dataOut.write(data, offset, chunkSize);
177    
178                        offset += chunkSize;
179                        sendWriteBuffer(address, writeBuffer, commandId);
180                    }
181                }
182            }
183        }
184    
185        public int getDatagramSize() {
186            return datagramSize;
187        }
188    
189        public void setDatagramSize(int datagramSize) {
190            this.datagramSize = datagramSize;
191        }
192    
193        // Implementation methods
194        // -------------------------------------------------------------------------
195        protected void sendWriteBuffer(SocketAddress address, ByteArrayOutputStream writeBuffer, int commandId) throws IOException {
196            byte[] data = writeBuffer.toByteArray();
197            sendWriteBuffer(commandId, address, data, false);
198        }
199    
200        protected void sendWriteBuffer(int commandId, SocketAddress address, byte[] data, boolean redelivery) throws IOException {
201            // lets put the datagram into the replay buffer first to prevent timing
202            // issues
203            ReplayBuffer bufferCache = getReplayBuffer();
204            if (bufferCache != null && !redelivery) {
205                bufferCache.addBuffer(commandId, data);
206            }
207    
208            if (LOG.isDebugEnabled()) {
209                String text = redelivery ? "REDELIVERING" : "sending";
210                LOG.debug("Channel: " + name + " " + text + " datagram: " + commandId + " to: " + address);
211            }
212            DatagramPacket packet = new DatagramPacket(data, 0, data.length, address);
213            channel.send(packet);
214        }
215    
216        public void sendBuffer(int commandId, Object buffer) throws IOException {
217            if (buffer != null) {
218                byte[] data = (byte[])buffer;
219                sendWriteBuffer(commandId, replayAddress, data, true);
220            } else {
221                if (LOG.isWarnEnabled()) {
222                    LOG.warn("Request for buffer: " + commandId + " is no longer present");
223                }
224            }
225        }
226    
227        protected DatagramPacket createDatagramPacket() {
228            return new DatagramPacket(new byte[datagramSize], datagramSize);
229        }
230    
231        protected int remaining(ByteArrayOutputStream buffer) {
232            return datagramSize - buffer.size();
233        }
234    
235        protected ByteArrayOutputStream createByteArrayOutputStream() {
236            return new ByteArrayOutputStream(datagramSize);
237        }
238    
239        public int getReceiveCounter() {
240            return receiveCounter;
241        }
242    }