001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.reliable;
018    
019    import java.io.IOException;
020    import java.util.SortedSet;
021    import java.util.TreeSet;
022    
023    import org.apache.activemq.command.Command;
024    import org.apache.activemq.command.ReplayCommand;
025    import org.apache.activemq.command.Response;
026    import org.apache.activemq.openwire.CommandIdComparator;
027    import org.apache.activemq.transport.FutureResponse;
028    import org.apache.activemq.transport.ResponseCorrelator;
029    import org.apache.activemq.transport.Transport;
030    import org.apache.activemq.transport.udp.UdpTransport;
031    import org.slf4j.Logger;
032    import org.slf4j.LoggerFactory;
033    
034    /**
035     * This interceptor deals with out of order commands together with being able to
036     * handle dropped commands and the re-requesting dropped commands.
037     * 
038     * 
039     */
040    public class ReliableTransport extends ResponseCorrelator {
041        private static final Logger LOG = LoggerFactory.getLogger(ReliableTransport.class);
042    
043        private ReplayStrategy replayStrategy;
044        private SortedSet<Command> commands = new TreeSet<Command>(new CommandIdComparator());
045        private int expectedCounter = 1;
046        private int replayBufferCommandCount = 50;
047        private int requestTimeout = 2000;
048        private ReplayBuffer replayBuffer;
049        private Replayer replayer;
050        private UdpTransport udpTransport;
051    
052        public ReliableTransport(Transport next, ReplayStrategy replayStrategy) {
053            super(next);
054            this.replayStrategy = replayStrategy;
055        }
056    
057        public ReliableTransport(Transport next, UdpTransport udpTransport) throws IOException {
058            super(next, udpTransport.getSequenceGenerator());
059            this.udpTransport = udpTransport;
060            this.replayer = udpTransport.createReplayer();
061        }
062    
063        /**
064         * Requests that a range of commands be replayed
065         */
066        public void requestReplay(int fromCommandId, int toCommandId) {
067            ReplayCommand replay = new ReplayCommand();
068            replay.setFirstNakNumber(fromCommandId);
069            replay.setLastNakNumber(toCommandId);
070            try {
071                oneway(replay);
072            } catch (IOException e) {
073                getTransportListener().onException(e);
074            }
075        }
076    
077        public Object request(Object o) throws IOException {
078            final Command command = (Command)o;
079            FutureResponse response = asyncRequest(command, null);
080            while (true) {
081                Response result = response.getResult(requestTimeout);
082                if (result != null) {
083                    return result;
084                }
085                onMissingResponse(command, response);
086            }
087        }
088    
089        public Object request(Object o, int timeout) throws IOException {
090            final Command command = (Command)o;
091            FutureResponse response = asyncRequest(command, null);
092            while (timeout > 0) {
093                int time = timeout;
094                if (timeout > requestTimeout) {
095                    time = requestTimeout;
096                }
097                Response result = response.getResult(time);
098                if (result != null) {
099                    return result;
100                }
101                onMissingResponse(command, response);
102                timeout -= time;
103            }
104            return response.getResult(0);
105        }
106    
107        public void onCommand(Object o) {
108            Command command = (Command)o;
109            // lets pass wireformat through
110            if (command.isWireFormatInfo()) {
111                super.onCommand(command);
112                return;
113            } else if (command.getDataStructureType() == ReplayCommand.DATA_STRUCTURE_TYPE) {
114                replayCommands((ReplayCommand)command);
115                return;
116            }
117    
118            int actualCounter = command.getCommandId();
119            boolean valid = expectedCounter == actualCounter;
120    
121            if (!valid) {
122                synchronized (commands) {
123                    int nextCounter = actualCounter;
124                    boolean empty = commands.isEmpty();
125                    if (!empty) {
126                        Command nextAvailable = commands.first();
127                        nextCounter = nextAvailable.getCommandId();
128                    }
129    
130                    try {
131                        boolean keep = replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter, nextCounter);
132    
133                        if (keep) {
134                            // lets add it to the list for later on
135                            if (LOG.isDebugEnabled()) {
136                                LOG.debug("Received out of order command which is being buffered for later: " + command);
137                            }
138                            commands.add(command);
139                        }
140                    } catch (IOException e) {
141                        onException(e);
142                    }
143    
144                    if (!empty) {
145                        // lets see if the first item in the set is the next
146                        // expected
147                        command = commands.first();
148                        valid = expectedCounter == command.getCommandId();
149                        if (valid) {
150                            commands.remove(command);
151                        }
152                    }
153                }
154            }
155    
156            while (valid) {
157                // we've got a valid header so increment counter
158                replayStrategy.onReceivedPacket(this, expectedCounter);
159                expectedCounter++;
160                super.onCommand(command);
161    
162                synchronized (commands) {
163                    // we could have more commands left
164                    valid = !commands.isEmpty();
165                    if (valid) {
166                        // lets see if the first item in the set is the next
167                        // expected
168                        command = commands.first();
169                        valid = expectedCounter == command.getCommandId();
170                        if (valid) {
171                            commands.remove(command);
172                        }
173                    }
174                }
175            }
176        }
177    
178        public int getBufferedCommandCount() {
179            synchronized (commands) {
180                return commands.size();
181            }
182        }
183    
184        public int getExpectedCounter() {
185            return expectedCounter;
186        }
187    
188        /**
189         * This property should never really be set - but is mutable primarily for
190         * test cases
191         */
192        public void setExpectedCounter(int expectedCounter) {
193            this.expectedCounter = expectedCounter;
194        }
195    
196        public int getRequestTimeout() {
197            return requestTimeout;
198        }
199    
200        /**
201         * Sets the default timeout of requests before starting to request commands
202         * are replayed
203         */
204        public void setRequestTimeout(int requestTimeout) {
205            this.requestTimeout = requestTimeout;
206        }
207    
208        public ReplayStrategy getReplayStrategy() {
209            return replayStrategy;
210        }
211    
212        public ReplayBuffer getReplayBuffer() {
213            if (replayBuffer == null) {
214                replayBuffer = createReplayBuffer();
215            }
216            return replayBuffer;
217        }
218    
219        public void setReplayBuffer(ReplayBuffer replayBuffer) {
220            this.replayBuffer = replayBuffer;
221        }
222    
223        public int getReplayBufferCommandCount() {
224            return replayBufferCommandCount;
225        }
226    
227        /**
228         * Sets the default number of commands which are buffered
229         */
230        public void setReplayBufferCommandCount(int replayBufferSize) {
231            this.replayBufferCommandCount = replayBufferSize;
232        }
233    
234        public void setReplayStrategy(ReplayStrategy replayStrategy) {
235            this.replayStrategy = replayStrategy;
236        }
237    
238        public Replayer getReplayer() {
239            return replayer;
240        }
241    
242        public void setReplayer(Replayer replayer) {
243            this.replayer = replayer;
244        }
245    
246        public String toString() {
247            return next.toString();
248        }
249    
250        public void start() throws Exception {
251            if (udpTransport != null) {
252                udpTransport.setReplayBuffer(getReplayBuffer());
253            }
254            if (replayStrategy == null) {
255                throw new IllegalArgumentException("Property replayStrategy not specified");
256            }
257            super.start();
258        }
259    
260        /**
261         * Lets attempt to replay the request as a command may have disappeared
262         */
263        protected void onMissingResponse(Command command, FutureResponse response) {
264            LOG.debug("Still waiting for response on: " + this + " to command: " + command + " sending replay message");
265    
266            int commandId = command.getCommandId();
267            requestReplay(commandId, commandId);
268        }
269    
270        protected ReplayBuffer createReplayBuffer() {
271            return new DefaultReplayBuffer(getReplayBufferCommandCount());
272        }
273    
274        protected void replayCommands(ReplayCommand command) {
275            try {
276                if (replayer == null) {
277                    onException(new IOException("Cannot replay commands. No replayer property configured"));
278                }
279                if (LOG.isDebugEnabled()) {
280                    LOG.debug("Processing replay command: " + command);
281                }
282                getReplayBuffer().replayMessages(command.getFirstNakNumber(), command.getLastNakNumber(), replayer);
283    
284                // TODO we could proactively remove ack'd stuff from the replay
285                // buffer
286                // if we only have a single client talking to us
287            } catch (IOException e) {
288                onException(e);
289            }
290        }
291    
292    }