001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport;
018    
019    import java.io.IOException;
020    import java.util.concurrent.locks.ReentrantLock;
021    
022    /**
023     * Thread safe Transport Filter that serializes calls to and from the Transport Stack.
024     */
025    public class MutexTransport extends TransportFilter {
026    
027        private final ReentrantLock writeLock = new ReentrantLock();
028        private boolean syncOnCommand;
029    
030        public MutexTransport(Transport next) {
031            super(next);
032            this.syncOnCommand = false;
033        }
034    
035        public MutexTransport(Transport next, boolean syncOnCommand) {
036            super(next);
037            this.syncOnCommand = syncOnCommand;
038        }
039    
040        @Override
041        public void onCommand(Object command) {
042            if (syncOnCommand) {
043                writeLock.lock();
044                try {
045                    transportListener.onCommand(command);
046                } finally {
047                    writeLock.unlock();
048                }
049            } else {
050                transportListener.onCommand(command);
051            }
052        }
053    
054        @Override
055        public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
056            writeLock.lock();
057            try {
058                return next.asyncRequest(command, null);
059            } finally {
060                writeLock.unlock();
061            }
062        }
063    
064        @Override
065        public void oneway(Object command) throws IOException {
066            writeLock.lock();
067            try {
068                next.oneway(command);
069            } finally {
070                writeLock.unlock();
071            }
072        }
073    
074        @Override
075        public Object request(Object command) throws IOException {
076            writeLock.lock();
077            try {
078                return next.request(command);
079            } finally {
080                writeLock.unlock();
081            }
082        }
083    
084        @Override
085        public Object request(Object command, int timeout) throws IOException {
086            writeLock.lock();
087            try {
088                return next.request(command, timeout);
089            } finally {
090                writeLock.unlock();
091            }
092        }
093    
094        @Override
095        public String toString() {
096            return next.toString();
097        }
098    
099        public boolean isSyncOnCommand() {
100            return syncOnCommand;
101        }
102    
103        public void setSyncOnCommand(boolean syncOnCommand) {
104            this.syncOnCommand = syncOnCommand;
105        }
106    }