001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.transport;
019    
020    import java.io.IOException;
021    import java.net.Socket;
022    import java.util.Iterator;
023    import java.util.concurrent.ConcurrentLinkedQueue;
024    import java.util.concurrent.atomic.AtomicInteger;
025    
026    import org.apache.activemq.transport.tcp.TimeStampStream;
027    import org.slf4j.Logger;
028    import org.slf4j.LoggerFactory;
029    
030    /**
031     * This filter implements write timeouts for socket write operations.
032     * When using blocking IO, the Java implementation doesn't have an explicit flag
033     * to set a timeout, and can cause operations to block forever (or until the TCP stack implementation times out the retransmissions,
034     * which is usually around 13-30 minutes).<br/>
035     * To enable this transport, in the transport URI, simpley add<br/>
036     * <code>transport.soWriteTimeout=<value in millis></code>.<br/>
037     * For example (15 second timeout on write operations to the socket):</br>
038     * <pre><code>
039     * &lt;transportConnector
040     *     name=&quot;tcp1&quot;
041     *     uri=&quot;tcp://127.0.0.1:61616?transport.soTimeout=10000&amp;transport.soWriteTimeout=15000"
042     * /&gt;
043     * </code></pre><br/>
044     * For example (enable default timeout on the socket):</br>
045     * <pre><code>
046     * &lt;transportConnector
047     *     name=&quot;tcp1&quot;
048     *     uri=&quot;tcp://127.0.0.1:61616?transport.soTimeout=10000&amp;transport.soWriteTimeout=15000"
049     * /&gt;
050     * </code></pre>
051     * @author Filip Hanik
052     *
053     */
054    public class WriteTimeoutFilter extends TransportFilter {
055    
056        private static final Logger LOG = LoggerFactory.getLogger(WriteTimeoutFilter.class);
057        protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue<WriteTimeoutFilter>();
058        protected static AtomicInteger messageCounter = new AtomicInteger(0);
059        protected static TimeoutThread timeoutThread = new TimeoutThread();
060    
061        protected static long sleep = 5000l;
062    
063        protected long writeTimeout = -1;
064    
065        public WriteTimeoutFilter(Transport next) {
066            super(next);
067        }
068    
069        @Override
070        public void oneway(Object command) throws IOException {
071            try {
072                registerWrite(this);
073                super.oneway(command);
074            } catch (IOException x) {
075                throw x;
076            } finally {
077                deRegisterWrite(this,false,null);
078            }
079        }
080    
081        public long getWriteTimeout() {
082            return writeTimeout;
083        }
084    
085        public void setWriteTimeout(long writeTimeout) {
086            this.writeTimeout = writeTimeout;
087        }
088    
089        public static long getSleep() {
090            return sleep;
091        }
092    
093        public static void setSleep(long sleep) {
094            WriteTimeoutFilter.sleep = sleep;
095        }
096    
097    
098        protected TimeStampStream getWriter() {
099            return next.narrow(TimeStampStream.class);
100        }
101    
102        protected Socket getSocket() {
103            return next.narrow(Socket.class);
104        }
105    
106        protected static void registerWrite(WriteTimeoutFilter filter) {
107            writers.add(filter);
108        }
109    
110        protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) {
111            boolean result = writers.remove(filter);
112            if (result) {
113                if (fail) {
114                    String message = "Forced write timeout for:"+filter.getNext().getRemoteAddress();
115                    LOG.warn(message);
116                    Socket sock = filter.getSocket();
117                    if (sock==null) {
118                        LOG.error("Destination socket is null, unable to close socket.("+message+")");
119                    } else {
120                        try {
121                            sock.close();
122                        }catch (IOException ignore) {
123                        }
124                    }
125                }
126            }
127            return result;
128        }
129    
130        @Override
131        public void start() throws Exception {
132            super.start();
133        }
134    
135        @Override
136        public void stop() throws Exception {
137            super.stop();
138        }
139    
140        protected static class TimeoutThread extends Thread {
141            static AtomicInteger instance = new AtomicInteger(0);
142            boolean run = true;
143            public TimeoutThread() {
144                setName("WriteTimeoutFilter-Timeout-"+instance.incrementAndGet());
145                setDaemon(true);
146                setPriority(Thread.MIN_PRIORITY);
147                start();
148            }
149    
150    
151            public void run() {
152                while (run) {
153                    boolean error = false;
154                    try {
155                        if (!interrupted()) {
156                            Iterator<WriteTimeoutFilter> filters = writers.iterator();
157                            while (run && filters.hasNext()) {
158                                WriteTimeoutFilter filter = filters.next();
159                                if (filter.getWriteTimeout()<=0) continue; //no timeout set
160                                long writeStart = filter.getWriter().getWriteTimestamp();
161                                long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1;
162                                if (delta>filter.getWriteTimeout()) {
163                                    WriteTimeoutFilter.deRegisterWrite(filter, true,null);
164                                }//if timeout
165                            }//while
166                        }//if interrupted
167                        try {
168                            Thread.sleep(getSleep());
169                            error = false;
170                        } catch (InterruptedException x) {
171                            //do nothing
172                        }
173                    }catch (Throwable t) { //make sure this thread never dies
174                        if (!error) { //use error flag to avoid filling up the logs
175                            LOG.error("WriteTimeout thread unable validate existing sockets.",t);
176                            error = true;
177                        }
178                    }
179                }
180            }
181        }
182    
183    }