001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.udp;
018    
019    import java.io.DataInputStream;
020    import java.io.DataOutputStream;
021    import java.io.IOException;
022    import java.net.SocketAddress;
023    import java.nio.ByteBuffer;
024    import java.nio.channels.DatagramChannel;
025    
026    import org.apache.activemq.command.Command;
027    import org.apache.activemq.command.Endpoint;
028    import org.apache.activemq.command.LastPartialCommand;
029    import org.apache.activemq.command.PartialCommand;
030    import org.apache.activemq.openwire.BooleanStream;
031    import org.apache.activemq.openwire.OpenWireFormat;
032    import org.apache.activemq.transport.reliable.ReplayBuffer;
033    import org.apache.activemq.util.ByteArrayInputStream;
034    import org.apache.activemq.util.ByteArrayOutputStream;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    /**
039     * A strategy for reading datagrams and de-fragmenting them together.
040     * 
041     * 
042     */
043    public class CommandDatagramChannel extends CommandChannelSupport {
044    
045        private static final Logger LOG = LoggerFactory.getLogger(CommandDatagramChannel.class);
046    
047        private DatagramChannel channel;
048        private ByteBufferPool bufferPool;
049    
050        // reading
051        private Object readLock = new Object();
052        private ByteBuffer readBuffer;
053    
054        // writing
055        private Object writeLock = new Object();
056        private int defaultMarshalBufferSize = 64 * 1024;
057        private volatile int receiveCounter;
058    
059        public CommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller,
060                                      DatagramChannel channel, ByteBufferPool bufferPool) {
061            super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
062            this.channel = channel;
063            this.bufferPool = bufferPool;
064        }
065    
066        public void start() throws Exception {
067            bufferPool.setDefaultSize(datagramSize);
068            bufferPool.start();
069            readBuffer = bufferPool.borrowBuffer();
070        }
071    
072        public void stop() throws Exception {
073            bufferPool.stop();
074        }
075    
076        public Command read() throws IOException {
077            Command answer = null;
078            Endpoint from = null;
079            synchronized (readLock) {
080                while (true) {
081                    readBuffer.clear();
082                    SocketAddress address = channel.receive(readBuffer);
083    
084                    readBuffer.flip();
085    
086                    if (readBuffer.limit() == 0) {
087                        continue;
088                    }
089                    
090                    receiveCounter++;
091                    from = headerMarshaller.createEndpoint(readBuffer, address);
092    
093                    int remaining = readBuffer.remaining();
094                    byte[] data = new byte[remaining];
095                    readBuffer.get(data);
096    
097                    // TODO could use a DataInput implementation that talks direct
098                    // to
099                    // the ByteBuffer to avoid object allocation and unnecessary
100                    // buffering?
101                    DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
102                    answer = (Command)wireFormat.unmarshal(dataIn);
103                    break;
104                }
105            }
106            if (answer != null) {
107                answer.setFrom(from);
108    
109                if (LOG.isDebugEnabled()) {
110                    LOG.debug("Channel: " + name + " received from: " + from + " about to process: " + answer);
111                }
112            }
113            return answer;
114        }
115    
116        public void write(Command command, SocketAddress address) throws IOException {
117            synchronized (writeLock) {
118    
119                ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
120                wireFormat.marshal(command, new DataOutputStream(largeBuffer));
121                byte[] data = largeBuffer.toByteArray();
122                int size = data.length;
123    
124                ByteBuffer writeBuffer = bufferPool.borrowBuffer();
125                writeBuffer.clear();
126                headerMarshaller.writeHeader(command, writeBuffer);
127    
128                if (size > writeBuffer.remaining()) {
129                    // lets split the command up into chunks
130                    int offset = 0;
131                    boolean lastFragment = false;
132                    int length = data.length;
133                    for (int fragment = 0; !lastFragment; fragment++) {
134                        // write the header
135                        if (fragment > 0) {
136                            writeBuffer = bufferPool.borrowBuffer();
137                            writeBuffer.clear();
138                            headerMarshaller.writeHeader(command, writeBuffer);
139                        }
140    
141                        int chunkSize = writeBuffer.remaining();
142    
143                        // we need to remove the amount of overhead to write the
144                        // partial command
145    
146                        // lets write the flags in there
147                        BooleanStream bs = null;
148                        if (wireFormat.isTightEncodingEnabled()) {
149                            bs = new BooleanStream();
150                            bs.writeBoolean(true); // the partial data byte[] is
151                            // never null
152                        }
153    
154                        // lets remove the header of the partial command
155                        // which is the byte for the type and an int for the size of
156                        // the byte[]
157    
158                        // data type + the command ID + size of the partial data
159                        chunkSize -= 1 + 4 + 4;
160    
161                        // the boolean flags
162                        if (bs != null) {
163                            chunkSize -= bs.marshalledSize();
164                        } else {
165                            chunkSize -= 1;
166                        }
167    
168                        if (!wireFormat.isSizePrefixDisabled()) {
169                            // lets write the size of the command buffer
170                            writeBuffer.putInt(chunkSize);
171                            chunkSize -= 4;
172                        }
173    
174                        lastFragment = offset + chunkSize >= length;
175                        if (chunkSize + offset > length) {
176                            chunkSize = length - offset;
177                        }
178    
179                        if (lastFragment) {
180                            writeBuffer.put(LastPartialCommand.DATA_STRUCTURE_TYPE);
181                        } else {
182                            writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE);
183                        }
184    
185                        if (bs != null) {
186                            bs.marshal(writeBuffer);
187                        }
188    
189                        int commandId = command.getCommandId();
190                        if (fragment > 0) {
191                            commandId = sequenceGenerator.getNextSequenceId();
192                        }
193                        writeBuffer.putInt(commandId);
194                        if (bs == null) {
195                            writeBuffer.put((byte)1);
196                        }
197    
198                        // size of byte array
199                        writeBuffer.putInt(chunkSize);
200    
201                        // now the data
202                        writeBuffer.put(data, offset, chunkSize);
203    
204                        offset += chunkSize;
205                        sendWriteBuffer(commandId, address, writeBuffer, false);
206                    }
207                } else {
208                    writeBuffer.put(data);
209                    sendWriteBuffer(command.getCommandId(), address, writeBuffer, false);
210                }
211            }
212        }
213    
214        // Properties
215        // -------------------------------------------------------------------------
216    
217        public ByteBufferPool getBufferPool() {
218            return bufferPool;
219        }
220    
221        /**
222         * Sets the implementation of the byte buffer pool to use
223         */
224        public void setBufferPool(ByteBufferPool bufferPool) {
225            this.bufferPool = bufferPool;
226        }
227    
228        // Implementation methods
229        // -------------------------------------------------------------------------
230        protected void sendWriteBuffer(int commandId, SocketAddress address, ByteBuffer writeBuffer, boolean redelivery) throws IOException {
231            // lets put the datagram into the replay buffer first to prevent timing
232            // issues
233            ReplayBuffer bufferCache = getReplayBuffer();
234            if (bufferCache != null && !redelivery) {
235                bufferCache.addBuffer(commandId, writeBuffer);
236            }
237    
238            writeBuffer.flip();
239    
240            if (LOG.isDebugEnabled()) {
241                String text = redelivery ? "REDELIVERING" : "sending";
242                LOG.debug("Channel: " + name + " " + text + " datagram: " + commandId + " to: " + address);
243            }
244            channel.send(writeBuffer, address);
245        }
246    
247        public void sendBuffer(int commandId, Object buffer) throws IOException {
248            if (buffer != null) {
249                ByteBuffer writeBuffer = (ByteBuffer)buffer;
250                sendWriteBuffer(commandId, getReplayAddress(), writeBuffer, true);
251            } else {
252                if (LOG.isWarnEnabled()) {
253                    LOG.warn("Request for buffer: " + commandId + " is no longer present");
254                }
255            }
256        }
257    
258        public int getReceiveCounter() {
259            return receiveCounter;
260        }
261    
262    }