001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport;
018    
019    import java.io.IOException;
020    
021    import org.apache.activemq.command.WireFormatInfo;
022    import org.apache.activemq.wireformat.WireFormat;
023    import org.slf4j.Logger;
024    import org.slf4j.LoggerFactory;
025    
026    /**
027     * Used to make sure that commands are arriving periodically from the peer of
028     * the transport.
029     */
030    public class InactivityMonitor extends AbstractInactivityMonitor {
031    
032        private static final Logger LOG = LoggerFactory.getLogger(InactivityMonitor.class);
033    
034        private WireFormatInfo localWireFormatInfo;
035        private WireFormatInfo remoteWireFormatInfo;
036    
037        private boolean ignoreRemoteWireFormat = false;
038        private boolean ignoreAllWireFormatInfo = false;
039    
040        public InactivityMonitor(Transport next, WireFormat wireFormat) {
041            super(next, wireFormat);
042            if (this.wireFormat == null) {
043                this.ignoreAllWireFormatInfo = true;
044            }
045        }
046    
047        protected void processInboundWireFormatInfo(WireFormatInfo info) throws IOException {
048            IOException error = null;
049            remoteWireFormatInfo = info;
050            try {
051                startMonitorThreads();
052            } catch (IOException e) {
053                error = e;
054            }
055            if (error != null) {
056                onException(error);
057            }
058        }
059    
060        protected void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException{
061            localWireFormatInfo = info;
062            startMonitorThreads();
063        }
064    
065        @Override
066        protected synchronized void startMonitorThreads() throws IOException {
067            if (isMonitorStarted()) {
068                return;
069            }
070    
071            long readCheckTime = getReadCheckTime();
072    
073            if (readCheckTime > 0) {
074                setWriteCheckTime(writeCheckValueFromReadCheck(readCheckTime));
075            }
076    
077            super.startMonitorThreads();
078        }
079    
080        private long writeCheckValueFromReadCheck(long readCheckTime) {
081            return readCheckTime>3 ? readCheckTime/3 : readCheckTime;
082        }
083    
084        @Override
085        protected boolean configuredOk() throws IOException {
086            boolean configured = false;
087            if (ignoreAllWireFormatInfo) {
088                configured = true;
089            } else if (localWireFormatInfo != null && remoteWireFormatInfo != null) {
090                if (!ignoreRemoteWireFormat) {
091                    if (LOG.isDebugEnabled()) {
092                        LOG.debug("Using min of local: " + localWireFormatInfo + " and remote: " + remoteWireFormatInfo);
093                    }
094    
095                    long readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
096                    long writeCheckTime = writeCheckValueFromReadCheck(readCheckTime);
097    
098                    setReadCheckTime(readCheckTime);
099                    setInitialDelayTime(Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay()));
100                    setWriteCheckTime(writeCheckTime);
101    
102                } else {
103                    if (LOG.isDebugEnabled()) {
104                        LOG.debug("Using local: " + localWireFormatInfo);
105                    }
106    
107                    long readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
108                    long writeCheckTime = writeCheckValueFromReadCheck(readCheckTime);
109    
110                    setReadCheckTime(readCheckTime);
111                    setInitialDelayTime(localWireFormatInfo.getMaxInactivityDurationInitalDelay());
112                    setWriteCheckTime(writeCheckTime);
113                }
114                configured = true;
115            }
116    
117            return configured;
118        }
119    
120        public boolean isIgnoreAllWireFormatInfo() {
121            return ignoreAllWireFormatInfo;
122        }
123    
124        public void setIgnoreAllWireFormatInfo(boolean ignoreAllWireFormatInfo) {
125            this.ignoreAllWireFormatInfo = ignoreAllWireFormatInfo;
126        }
127    
128        public boolean isIgnoreRemoteWireFormat() {
129            return ignoreRemoteWireFormat;
130        }
131    
132        public void setIgnoreRemoteWireFormat(boolean ignoreRemoteWireFormat) {
133            this.ignoreRemoteWireFormat = ignoreRemoteWireFormat;
134        }
135    }