001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.util.Iterator;
020    import java.util.Set;
021    import java.util.concurrent.CopyOnWriteArraySet;
022    import java.util.concurrent.atomic.AtomicBoolean;
023    
024    import org.apache.activemq.Service;
025    import org.apache.activemq.ThreadPriorities;
026    import org.slf4j.Logger;
027    import org.slf4j.LoggerFactory;
028    
029    /**
030     * Used to provide information on the status of the Connection
031     * 
032     * 
033     */
034    public class TransportStatusDetector implements Service, Runnable {
035        private static final Logger LOG = LoggerFactory.getLogger(TransportStatusDetector.class);
036        private TransportConnector connector;
037        private Set<TransportConnection> collectionCandidates = new CopyOnWriteArraySet<TransportConnection>();
038        private AtomicBoolean started = new AtomicBoolean(false);
039        private Thread runner;
040        private int sweepInterval = 5000;
041    
042        TransportStatusDetector(TransportConnector connector) {
043            this.connector = connector;
044        }
045    
046        /**
047         * @return Returns the sweepInterval.
048         */
049        public int getSweepInterval() {
050            return sweepInterval;
051        }
052    
053        /**
054         * The sweepInterval to set.
055         * 
056         * @param sweepInterval
057         */
058        public void setSweepInterval(int sweepInterval) {
059            this.sweepInterval = sweepInterval;
060        }
061    
062        protected void doCollection() {
063            for (Iterator<TransportConnection> i = collectionCandidates.iterator(); i.hasNext();) {
064                TransportConnection tc = i.next();
065                if (tc.isMarkedCandidate()) {
066                    if (tc.isBlockedCandidate()) {
067                        collectionCandidates.remove(tc);
068                        doCollection(tc);
069                    } else {
070                        tc.doMark();
071                    }
072                } else {
073                    collectionCandidates.remove(tc);
074                }
075            }
076        }
077    
078        protected void doSweep() {
079            for (Iterator i = connector.getConnections().iterator(); i.hasNext();) {
080                TransportConnection connection = (TransportConnection)i.next();
081                if (connection.isMarkedCandidate()) {
082                    connection.doMark();
083                    collectionCandidates.add(connection);
084                }
085            }
086        }
087    
088        protected void doCollection(TransportConnection tc) {
089            LOG.warn("found a blocked client - stopping: " + tc);
090            try {
091                tc.stop();
092            } catch (Exception e) {
093                LOG.error("Error stopping " + tc, e);
094            }
095        }
096    
097        public void run() {
098            while (started.get()) {
099                try {
100                    doCollection();
101                    doSweep();
102                    Thread.sleep(sweepInterval);
103                } catch (Throwable e) {
104                    LOG.error("failed to complete a sweep for blocked clients", e);
105                }
106            }
107        }
108    
109        public void start() throws Exception {
110            if (started.compareAndSet(false, true)) {
111                runner = new Thread(this, "ActiveMQ Transport Status Monitor: " + connector);
112                runner.setDaemon(true);
113                runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT);
114                runner.start();
115            }
116        }
117    
118        public void stop() throws Exception {
119            started.set(false);
120            if (runner != null) {
121                runner.join(getSweepInterval() * 5);
122            }
123        }
124    }