001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.Map;
024
025import org.apache.activemq.command.Command;
026import org.apache.activemq.command.ExceptionResponse;
027import org.apache.activemq.command.Response;
028import org.apache.activemq.util.IntSequenceGenerator;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * Adds the incrementing sequence number to commands along with performing the
034 * correlation of responses to requests to create a blocking request-response
035 * semantics.
036 * 
037 * 
038 */
039public class ResponseCorrelator extends TransportFilter {
040
041    private static final Logger LOG = LoggerFactory.getLogger(ResponseCorrelator.class);
042    private final Map<Integer, FutureResponse> requestMap = new HashMap<Integer, FutureResponse>();
043    private IntSequenceGenerator sequenceGenerator;
044    private final boolean debug = LOG.isDebugEnabled();
045    private IOException error;
046
047    public ResponseCorrelator(Transport next) {
048        this(next, new IntSequenceGenerator());
049    }
050
051    public ResponseCorrelator(Transport next, IntSequenceGenerator sequenceGenerator) {
052        super(next);
053        this.sequenceGenerator = sequenceGenerator;
054    }
055
056    public void oneway(Object o) throws IOException {
057        Command command = (Command)o;
058        command.setCommandId(sequenceGenerator.getNextSequenceId());
059        command.setResponseRequired(false);
060        next.oneway(command);
061    }
062
063    public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {
064        Command command = (Command) o;
065        command.setCommandId(sequenceGenerator.getNextSequenceId());
066        command.setResponseRequired(true);
067        FutureResponse future = new FutureResponse(responseCallback, this);
068        IOException priorError = null;
069        synchronized (requestMap) {
070            priorError = this.error;
071            if (priorError == null) {
072                requestMap.put(new Integer(command.getCommandId()), future);
073            }
074        }
075
076        if (priorError != null) {
077            future.set(new ExceptionResponse(priorError));
078            throw priorError;
079        }
080
081        next.oneway(command);
082        return future;
083    }
084
085    public Object request(Object command) throws IOException {
086        FutureResponse response = asyncRequest(command, null);
087        return response.getResult();
088    }
089
090    public Object request(Object command, int timeout) throws IOException {
091        FutureResponse response = asyncRequest(command, null);
092        return response.getResult(timeout);
093    }
094
095    public void onCommand(Object o) {
096        Command command = null;
097        if (o instanceof Command) {
098            command = (Command)o;
099        } else {
100            throw new ClassCastException("Object cannot be converted to a Command,  Object: " + o);
101        }
102        if (command.isResponse()) {
103            Response response = (Response)command;
104            FutureResponse future = null;
105            synchronized (requestMap) {
106                future = requestMap.remove(Integer.valueOf(response.getCorrelationId()));
107            }
108            if (future != null) {
109                future.set(response);
110            } else {
111                if (debug) {
112                    LOG.debug("Received unexpected response: {" + command + "}for command id: " + response.getCorrelationId());
113                }
114            }
115        } else {
116            getTransportListener().onCommand(command);
117        }
118    }
119
120    /**
121     * If an async exception occurs, then assume no responses will arrive for
122     * any of current requests. Lets let them know of the problem.
123     */
124    public void onException(IOException error) {
125        dispose(new TransportDisposedIOException("Disposed due to prior exception", error));
126        super.onException(error);
127    }
128    
129    @Override
130    public void stop() throws Exception {
131        dispose(new IOException("Stopped."));
132        super.stop();
133    }
134
135    private void dispose(IOException error) {
136        ArrayList<FutureResponse> requests=null; 
137        synchronized(requestMap) {
138            if( this.error==null) {
139                this.error = error;
140                requests = new ArrayList<FutureResponse>(requestMap.values());
141                requestMap.clear();
142            }
143        }
144        if( requests!=null ) {
145            for (Iterator<FutureResponse> iter = requests.iterator(); iter.hasNext();) {
146                FutureResponse fr = iter.next();
147                fr.set(new ExceptionResponse(error));
148            }
149        }
150    }
151
152    public IntSequenceGenerator getSequenceGenerator() {
153        return sequenceGenerator;
154    }
155
156    public String toString() {
157        return next.toString();
158    }
159}