001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.reliable;
018
019import java.io.IOException;
020import java.util.SortedSet;
021import java.util.TreeSet;
022
023import org.apache.activemq.command.Command;
024import org.apache.activemq.command.ReplayCommand;
025import org.apache.activemq.command.Response;
026import org.apache.activemq.openwire.CommandIdComparator;
027import org.apache.activemq.transport.FutureResponse;
028import org.apache.activemq.transport.ResponseCorrelator;
029import org.apache.activemq.transport.Transport;
030import org.apache.activemq.transport.udp.UdpTransport;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * This interceptor deals with out of order commands together with being able to
036 * handle dropped commands and the re-requesting dropped commands.
037 *
038 * @deprecated
039 */
040@Deprecated
041public class ReliableTransport extends ResponseCorrelator {
042    private static final Logger LOG = LoggerFactory.getLogger(ReliableTransport.class);
043
044    private ReplayStrategy replayStrategy;
045    private final SortedSet<Command> commands = new TreeSet<Command>(new CommandIdComparator());
046    private int expectedCounter = 1;
047    private int replayBufferCommandCount = 50;
048    private int requestTimeout = 2000;
049    private ReplayBuffer replayBuffer;
050    private Replayer replayer;
051    private UdpTransport udpTransport;
052
053    public ReliableTransport(Transport next, ReplayStrategy replayStrategy) {
054        super(next);
055        this.replayStrategy = replayStrategy;
056    }
057
058    public ReliableTransport(Transport next, UdpTransport udpTransport) throws IOException {
059        super(next, udpTransport.getSequenceGenerator());
060        this.udpTransport = udpTransport;
061        this.replayer = udpTransport.createReplayer();
062    }
063
064    /**
065     * Requests that a range of commands be replayed
066     */
067    public void requestReplay(int fromCommandId, int toCommandId) {
068        ReplayCommand replay = new ReplayCommand();
069        replay.setFirstNakNumber(fromCommandId);
070        replay.setLastNakNumber(toCommandId);
071        try {
072            oneway(replay);
073        } catch (IOException e) {
074            getTransportListener().onException(e);
075        }
076    }
077
078    @Override
079    public Object request(Object o) throws IOException {
080        final Command command = (Command)o;
081        FutureResponse response = asyncRequest(command, null);
082        while (true) {
083            Response result = response.getResult(requestTimeout);
084            if (result != null) {
085                return result;
086            }
087            onMissingResponse(command, response);
088        }
089    }
090
091    @Override
092    public Object request(Object o, int timeout) throws IOException {
093        final Command command = (Command)o;
094        FutureResponse response = asyncRequest(command, null);
095        while (timeout > 0) {
096            int time = timeout;
097            if (timeout > requestTimeout) {
098                time = requestTimeout;
099            }
100            Response result = response.getResult(time);
101            if (result != null) {
102                return result;
103            }
104            onMissingResponse(command, response);
105            timeout -= time;
106        }
107        return response.getResult(0);
108    }
109
110    @Override
111    public void onCommand(Object o) {
112        Command command = (Command)o;
113        // lets pass wireformat through
114        if (command.isWireFormatInfo()) {
115            super.onCommand(command);
116            return;
117        } else if (command.getDataStructureType() == ReplayCommand.DATA_STRUCTURE_TYPE) {
118            replayCommands((ReplayCommand)command);
119            return;
120        }
121
122        int actualCounter = command.getCommandId();
123        boolean valid = expectedCounter == actualCounter;
124
125        if (!valid) {
126            synchronized (commands) {
127                int nextCounter = actualCounter;
128                boolean empty = commands.isEmpty();
129                if (!empty) {
130                    Command nextAvailable = commands.first();
131                    nextCounter = nextAvailable.getCommandId();
132                }
133
134                try {
135                    boolean keep = replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter, nextCounter);
136
137                    if (keep) {
138                        // lets add it to the list for later on
139                        if (LOG.isDebugEnabled()) {
140                            LOG.debug("Received out of order command which is being buffered for later: " + command);
141                        }
142                        commands.add(command);
143                    }
144                } catch (IOException e) {
145                    onException(e);
146                }
147
148                if (!empty) {
149                    // lets see if the first item in the set is the next
150                    // expected
151                    command = commands.first();
152                    valid = expectedCounter == command.getCommandId();
153                    if (valid) {
154                        commands.remove(command);
155                    }
156                }
157            }
158        }
159
160        while (valid) {
161            // we've got a valid header so increment counter
162            replayStrategy.onReceivedPacket(this, expectedCounter);
163            expectedCounter++;
164            super.onCommand(command);
165
166            synchronized (commands) {
167                // we could have more commands left
168                valid = !commands.isEmpty();
169                if (valid) {
170                    // lets see if the first item in the set is the next
171                    // expected
172                    command = commands.first();
173                    valid = expectedCounter == command.getCommandId();
174                    if (valid) {
175                        commands.remove(command);
176                    }
177                }
178            }
179        }
180    }
181
182    public int getBufferedCommandCount() {
183        synchronized (commands) {
184            return commands.size();
185        }
186    }
187
188    public int getExpectedCounter() {
189        return expectedCounter;
190    }
191
192    /**
193     * This property should never really be set - but is mutable primarily for
194     * test cases
195     */
196    public void setExpectedCounter(int expectedCounter) {
197        this.expectedCounter = expectedCounter;
198    }
199
200    public int getRequestTimeout() {
201        return requestTimeout;
202    }
203
204    /**
205     * Sets the default timeout of requests before starting to request commands
206     * are replayed
207     */
208    public void setRequestTimeout(int requestTimeout) {
209        this.requestTimeout = requestTimeout;
210    }
211
212    public ReplayStrategy getReplayStrategy() {
213        return replayStrategy;
214    }
215
216    public ReplayBuffer getReplayBuffer() {
217        if (replayBuffer == null) {
218            replayBuffer = createReplayBuffer();
219        }
220        return replayBuffer;
221    }
222
223    public void setReplayBuffer(ReplayBuffer replayBuffer) {
224        this.replayBuffer = replayBuffer;
225    }
226
227    public int getReplayBufferCommandCount() {
228        return replayBufferCommandCount;
229    }
230
231    /**
232     * Sets the default number of commands which are buffered
233     */
234    public void setReplayBufferCommandCount(int replayBufferSize) {
235        this.replayBufferCommandCount = replayBufferSize;
236    }
237
238    public void setReplayStrategy(ReplayStrategy replayStrategy) {
239        this.replayStrategy = replayStrategy;
240    }
241
242    public Replayer getReplayer() {
243        return replayer;
244    }
245
246    public void setReplayer(Replayer replayer) {
247        this.replayer = replayer;
248    }
249
250    @Override
251    public String toString() {
252        return next.toString();
253    }
254
255    @Override
256    public void start() throws Exception {
257        if (udpTransport != null) {
258            udpTransport.setReplayBuffer(getReplayBuffer());
259        }
260        if (replayStrategy == null) {
261            throw new IllegalArgumentException("Property replayStrategy not specified");
262        }
263        super.start();
264    }
265
266    /**
267     * Lets attempt to replay the request as a command may have disappeared
268     */
269    protected void onMissingResponse(Command command, FutureResponse response) {
270        LOG.debug("Still waiting for response on: " + this + " to command: " + command + " sending replay message");
271
272        int commandId = command.getCommandId();
273        requestReplay(commandId, commandId);
274    }
275
276    protected ReplayBuffer createReplayBuffer() {
277        return new DefaultReplayBuffer(getReplayBufferCommandCount());
278    }
279
280    protected void replayCommands(ReplayCommand command) {
281        try {
282            if (replayer == null) {
283                onException(new IOException("Cannot replay commands. No replayer property configured"));
284            }
285            if (LOG.isDebugEnabled()) {
286                LOG.debug("Processing replay command: " + command);
287            }
288            getReplayBuffer().replayMessages(command.getFirstNakNumber(), command.getLastNakNumber(), replayer);
289
290            // TODO we could proactively remove ack'd stuff from the replay
291            // buffer
292            // if we only have a single client talking to us
293        } catch (IOException e) {
294            onException(e);
295        }
296    }
297
298}