001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.reliable;
018    
019    import java.io.IOException;
020    import java.util.SortedSet;
021    import java.util.TreeSet;
022    
023    import org.apache.activemq.command.Command;
024    import org.apache.activemq.command.ReplayCommand;
025    import org.apache.activemq.command.Response;
026    import org.apache.activemq.openwire.CommandIdComparator;
027    import org.apache.activemq.transport.FutureResponse;
028    import org.apache.activemq.transport.ResponseCorrelator;
029    import org.apache.activemq.transport.Transport;
030    import org.apache.activemq.transport.udp.UdpTransport;
031    import org.slf4j.Logger;
032    import org.slf4j.LoggerFactory;
033    
034    /**
035     * This interceptor deals with out of order commands together with being able to
036     * handle dropped commands and the re-requesting dropped commands.
037     *
038     * @deprecated
039     */
040    @Deprecated
041    public class ReliableTransport extends ResponseCorrelator {
042        private static final Logger LOG = LoggerFactory.getLogger(ReliableTransport.class);
043    
044        private ReplayStrategy replayStrategy;
045        private final SortedSet<Command> commands = new TreeSet<Command>(new CommandIdComparator());
046        private int expectedCounter = 1;
047        private int replayBufferCommandCount = 50;
048        private int requestTimeout = 2000;
049        private ReplayBuffer replayBuffer;
050        private Replayer replayer;
051        private UdpTransport udpTransport;
052    
053        public ReliableTransport(Transport next, ReplayStrategy replayStrategy) {
054            super(next);
055            this.replayStrategy = replayStrategy;
056        }
057    
058        public ReliableTransport(Transport next, UdpTransport udpTransport) throws IOException {
059            super(next, udpTransport.getSequenceGenerator());
060            this.udpTransport = udpTransport;
061            this.replayer = udpTransport.createReplayer();
062        }
063    
064        /**
065         * Requests that a range of commands be replayed
066         */
067        public void requestReplay(int fromCommandId, int toCommandId) {
068            ReplayCommand replay = new ReplayCommand();
069            replay.setFirstNakNumber(fromCommandId);
070            replay.setLastNakNumber(toCommandId);
071            try {
072                oneway(replay);
073            } catch (IOException e) {
074                getTransportListener().onException(e);
075            }
076        }
077    
078        @Override
079        public Object request(Object o) throws IOException {
080            final Command command = (Command)o;
081            FutureResponse response = asyncRequest(command, null);
082            while (true) {
083                Response result = response.getResult(requestTimeout);
084                if (result != null) {
085                    return result;
086                }
087                onMissingResponse(command, response);
088            }
089        }
090    
091        @Override
092        public Object request(Object o, int timeout) throws IOException {
093            final Command command = (Command)o;
094            FutureResponse response = asyncRequest(command, null);
095            while (timeout > 0) {
096                int time = timeout;
097                if (timeout > requestTimeout) {
098                    time = requestTimeout;
099                }
100                Response result = response.getResult(time);
101                if (result != null) {
102                    return result;
103                }
104                onMissingResponse(command, response);
105                timeout -= time;
106            }
107            return response.getResult(0);
108        }
109    
110        @Override
111        public void onCommand(Object o) {
112            Command command = (Command)o;
113            // lets pass wireformat through
114            if (command.isWireFormatInfo()) {
115                super.onCommand(command);
116                return;
117            } else if (command.getDataStructureType() == ReplayCommand.DATA_STRUCTURE_TYPE) {
118                replayCommands((ReplayCommand)command);
119                return;
120            }
121    
122            int actualCounter = command.getCommandId();
123            boolean valid = expectedCounter == actualCounter;
124    
125            if (!valid) {
126                synchronized (commands) {
127                    int nextCounter = actualCounter;
128                    boolean empty = commands.isEmpty();
129                    if (!empty) {
130                        Command nextAvailable = commands.first();
131                        nextCounter = nextAvailable.getCommandId();
132                    }
133    
134                    try {
135                        boolean keep = replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter, nextCounter);
136    
137                        if (keep) {
138                            // lets add it to the list for later on
139                            if (LOG.isDebugEnabled()) {
140                                LOG.debug("Received out of order command which is being buffered for later: " + command);
141                            }
142                            commands.add(command);
143                        }
144                    } catch (IOException e) {
145                        onException(e);
146                    }
147    
148                    if (!empty) {
149                        // lets see if the first item in the set is the next
150                        // expected
151                        command = commands.first();
152                        valid = expectedCounter == command.getCommandId();
153                        if (valid) {
154                            commands.remove(command);
155                        }
156                    }
157                }
158            }
159    
160            while (valid) {
161                // we've got a valid header so increment counter
162                replayStrategy.onReceivedPacket(this, expectedCounter);
163                expectedCounter++;
164                super.onCommand(command);
165    
166                synchronized (commands) {
167                    // we could have more commands left
168                    valid = !commands.isEmpty();
169                    if (valid) {
170                        // lets see if the first item in the set is the next
171                        // expected
172                        command = commands.first();
173                        valid = expectedCounter == command.getCommandId();
174                        if (valid) {
175                            commands.remove(command);
176                        }
177                    }
178                }
179            }
180        }
181    
182        public int getBufferedCommandCount() {
183            synchronized (commands) {
184                return commands.size();
185            }
186        }
187    
188        public int getExpectedCounter() {
189            return expectedCounter;
190        }
191    
192        /**
193         * This property should never really be set - but is mutable primarily for
194         * test cases
195         */
196        public void setExpectedCounter(int expectedCounter) {
197            this.expectedCounter = expectedCounter;
198        }
199    
200        public int getRequestTimeout() {
201            return requestTimeout;
202        }
203    
204        /**
205         * Sets the default timeout of requests before starting to request commands
206         * are replayed
207         */
208        public void setRequestTimeout(int requestTimeout) {
209            this.requestTimeout = requestTimeout;
210        }
211    
212        public ReplayStrategy getReplayStrategy() {
213            return replayStrategy;
214        }
215    
216        public ReplayBuffer getReplayBuffer() {
217            if (replayBuffer == null) {
218                replayBuffer = createReplayBuffer();
219            }
220            return replayBuffer;
221        }
222    
223        public void setReplayBuffer(ReplayBuffer replayBuffer) {
224            this.replayBuffer = replayBuffer;
225        }
226    
227        public int getReplayBufferCommandCount() {
228            return replayBufferCommandCount;
229        }
230    
231        /**
232         * Sets the default number of commands which are buffered
233         */
234        public void setReplayBufferCommandCount(int replayBufferSize) {
235            this.replayBufferCommandCount = replayBufferSize;
236        }
237    
238        public void setReplayStrategy(ReplayStrategy replayStrategy) {
239            this.replayStrategy = replayStrategy;
240        }
241    
242        public Replayer getReplayer() {
243            return replayer;
244        }
245    
246        public void setReplayer(Replayer replayer) {
247            this.replayer = replayer;
248        }
249    
250        @Override
251        public String toString() {
252            return next.toString();
253        }
254    
255        @Override
256        public void start() throws Exception {
257            if (udpTransport != null) {
258                udpTransport.setReplayBuffer(getReplayBuffer());
259            }
260            if (replayStrategy == null) {
261                throw new IllegalArgumentException("Property replayStrategy not specified");
262            }
263            super.start();
264        }
265    
266        /**
267         * Lets attempt to replay the request as a command may have disappeared
268         */
269        protected void onMissingResponse(Command command, FutureResponse response) {
270            LOG.debug("Still waiting for response on: " + this + " to command: " + command + " sending replay message");
271    
272            int commandId = command.getCommandId();
273            requestReplay(commandId, commandId);
274        }
275    
276        protected ReplayBuffer createReplayBuffer() {
277            return new DefaultReplayBuffer(getReplayBufferCommandCount());
278        }
279    
280        protected void replayCommands(ReplayCommand command) {
281            try {
282                if (replayer == null) {
283                    onException(new IOException("Cannot replay commands. No replayer property configured"));
284                }
285                if (LOG.isDebugEnabled()) {
286                    LOG.debug("Processing replay command: " + command);
287                }
288                getReplayBuffer().replayMessages(command.getFirstNakNumber(), command.getLastNakNumber(), replayer);
289    
290                // TODO we could proactively remove ack'd stuff from the replay
291                // buffer
292                // if we only have a single client talking to us
293            } catch (IOException e) {
294                onException(e);
295            }
296        }
297    
298    }