001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.Map;
024    
025    import org.apache.activemq.command.Command;
026    import org.apache.activemq.command.ExceptionResponse;
027    import org.apache.activemq.command.Response;
028    import org.apache.activemq.util.IntSequenceGenerator;
029    import org.slf4j.Logger;
030    import org.slf4j.LoggerFactory;
031    
032    /**
033     * Adds the incrementing sequence number to commands along with performing the
034     * corelation of responses to requests to create a blocking request-response
035     * semantics.
036     * 
037     * 
038     */
039    public class ResponseCorrelator extends TransportFilter {
040    
041        private static final Logger LOG = LoggerFactory.getLogger(ResponseCorrelator.class);
042        private final Map<Integer, FutureResponse> requestMap = new HashMap<Integer, FutureResponse>();
043        private IntSequenceGenerator sequenceGenerator;
044        private final boolean debug = LOG.isDebugEnabled();
045        private IOException error;
046    
047        public ResponseCorrelator(Transport next) {
048            this(next, new IntSequenceGenerator());
049        }
050    
051        public ResponseCorrelator(Transport next, IntSequenceGenerator sequenceGenerator) {
052            super(next);
053            this.sequenceGenerator = sequenceGenerator;
054        }
055    
056        public void oneway(Object o) throws IOException {
057            Command command = (Command)o;
058            command.setCommandId(sequenceGenerator.getNextSequenceId());
059            command.setResponseRequired(false);
060            next.oneway(command);
061        }
062    
063        public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {
064            Command command = (Command) o;
065            command.setCommandId(sequenceGenerator.getNextSequenceId());
066            command.setResponseRequired(true);
067            FutureResponse future = new FutureResponse(responseCallback);
068            IOException priorError = null;
069            synchronized (requestMap) {
070                priorError = this.error;
071                if (priorError == null) {
072                    requestMap.put(new Integer(command.getCommandId()), future);
073                }
074            }
075    
076            if (priorError != null) {
077                future.set(new ExceptionResponse(priorError));
078                throw priorError;
079            }
080    
081            next.oneway(command);
082            return future;
083        }
084    
085        public Object request(Object command) throws IOException {
086            FutureResponse response = asyncRequest(command, null);
087            return response.getResult();
088        }
089    
090        public Object request(Object command, int timeout) throws IOException {
091            FutureResponse response = asyncRequest(command, null);
092            return response.getResult(timeout);
093        }
094    
095        public void onCommand(Object o) {
096            Command command = null;
097            if (o instanceof Command) {
098                command = (Command)o;
099            } else {
100                throw new ClassCastException("Object cannot be converted to a Command,  Object: " + o);
101            }
102            if (command.isResponse()) {
103                Response response = (Response)command;
104                FutureResponse future = null;
105                synchronized (requestMap) {
106                    future = requestMap.remove(Integer.valueOf(response.getCorrelationId()));
107                }
108                if (future != null) {
109                    future.set(response);
110                } else {
111                    if (debug) {
112                        LOG.debug("Received unexpected response: {" + command + "}for command id: " + response.getCorrelationId());
113                    }
114                }
115            } else {
116                getTransportListener().onCommand(command);
117            }
118        }
119    
120        /**
121         * If an async exception occurs, then assume no responses will arrive for
122         * any of current requests. Lets let them know of the problem.
123         */
124        public void onException(IOException error) {
125            dispose(error);
126            super.onException(error);
127        }
128        
129        @Override
130        public void stop() throws Exception {
131            dispose(new IOException("Stopped."));
132            super.stop();
133        }
134    
135        private void dispose(IOException error) {
136            ArrayList<FutureResponse> requests=null; 
137            synchronized(requestMap) {
138                if( this.error==null) {
139                    this.error = error;
140                    requests = new ArrayList<FutureResponse>(requestMap.values());
141                    requestMap.clear();
142                }
143            }
144            if( requests!=null ) {
145                for (Iterator<FutureResponse> iter = requests.iterator(); iter.hasNext();) {
146                    FutureResponse fr = iter.next();
147                    fr.set(new ExceptionResponse(error));
148                }
149            }
150        }
151    
152        public IntSequenceGenerator getSequenceGenerator() {
153            return sequenceGenerator;
154        }
155    
156        public String toString() {
157            return next.toString();
158        }
159    }