001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.transport;
019
020import java.io.IOException;
021import java.net.Socket;
022import java.util.Iterator;
023import java.util.concurrent.ConcurrentLinkedQueue;
024import java.util.concurrent.atomic.AtomicInteger;
025
026import org.apache.activemq.transport.tcp.TimeStampStream;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * This filter implements write timeouts for socket write operations.
032 * When using blocking IO, the Java implementation doesn't have an explicit flag
033 * to set a timeout, and can cause operations to block forever (or until the TCP stack implementation times out the retransmissions,
034 * which is usually around 13-30 minutes).<br/>
035 * To enable this transport, in the transport URI, simpley add<br/>
036 * <code>transport.soWriteTimeout=<value in millis></code>.<br/>
037 * For example (15 second timeout on write operations to the socket):</br>
038 * <pre><code>
039 * &lt;transportConnector
040 *     name=&quot;tcp1&quot;
041 *     uri=&quot;tcp://127.0.0.1:61616?transport.soTimeout=10000&amp;transport.soWriteTimeout=15000"
042 * /&gt;
043 * </code></pre><br/>
044 * For example (enable default timeout on the socket):</br>
045 * <pre><code>
046 * &lt;transportConnector
047 *     name=&quot;tcp1&quot;
048 *     uri=&quot;tcp://127.0.0.1:61616?transport.soTimeout=10000&amp;transport.soWriteTimeout=15000"
049 * /&gt;
050 * </code></pre>
051 * @author Filip Hanik
052 *
053 */
054public class WriteTimeoutFilter extends TransportFilter {
055
056    private static final Logger LOG = LoggerFactory.getLogger(WriteTimeoutFilter.class);
057    protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue<WriteTimeoutFilter>();
058    protected static AtomicInteger messageCounter = new AtomicInteger(0);
059    protected static TimeoutThread timeoutThread = new TimeoutThread();
060
061    protected static long sleep = 5000l;
062
063    protected long writeTimeout = -1;
064
065    public WriteTimeoutFilter(Transport next) {
066        super(next);
067    }
068
069    @Override
070    public void oneway(Object command) throws IOException {
071        try {
072            registerWrite(this);
073            super.oneway(command);
074        } catch (IOException x) {
075            throw x;
076        } finally {
077            deRegisterWrite(this,false,null);
078        }
079    }
080
081    public long getWriteTimeout() {
082        return writeTimeout;
083    }
084
085    public void setWriteTimeout(long writeTimeout) {
086        this.writeTimeout = writeTimeout;
087    }
088
089    public static long getSleep() {
090        return sleep;
091    }
092
093    public static void setSleep(long sleep) {
094        WriteTimeoutFilter.sleep = sleep;
095    }
096
097
098    protected TimeStampStream getWriter() {
099        return next.narrow(TimeStampStream.class);
100    }
101
102    protected Socket getSocket() {
103        return next.narrow(Socket.class);
104    }
105
106    protected static void registerWrite(WriteTimeoutFilter filter) {
107        writers.add(filter);
108    }
109
110    protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) {
111        boolean result = writers.remove(filter);
112        if (result) {
113            if (fail) {
114                String message = "Forced write timeout for:"+filter.getNext().getRemoteAddress();
115                LOG.warn(message);
116                Socket sock = filter.getSocket();
117                if (sock==null) {
118                    LOG.error("Destination socket is null, unable to close socket.("+message+")");
119                } else {
120                    try {
121                        sock.close();
122                    }catch (IOException ignore) {
123                    }
124                }
125            }
126        }
127        return result;
128    }
129
130    @Override
131    public void start() throws Exception {
132        super.start();
133    }
134
135    @Override
136    public void stop() throws Exception {
137        super.stop();
138    }
139
140    protected static class TimeoutThread extends Thread {
141        static AtomicInteger instance = new AtomicInteger(0);
142        boolean run = true;
143        public TimeoutThread() {
144            setName("WriteTimeoutFilter-Timeout-"+instance.incrementAndGet());
145            setDaemon(true);
146            setPriority(Thread.MIN_PRIORITY);
147            start();
148        }
149
150
151        public void run() {
152            while (run) {
153                boolean error = false;
154                try {
155                    if (!interrupted()) {
156                        Iterator<WriteTimeoutFilter> filters = writers.iterator();
157                        while (run && filters.hasNext()) {
158                            WriteTimeoutFilter filter = filters.next();
159                            if (filter.getWriteTimeout()<=0) continue; //no timeout set
160                            long writeStart = filter.getWriter().getWriteTimestamp();
161                            long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1;
162                            if (delta>filter.getWriteTimeout()) {
163                                WriteTimeoutFilter.deRegisterWrite(filter, true,null);
164                            }//if timeout
165                        }//while
166                    }//if interrupted
167                    try {
168                        Thread.sleep(getSleep());
169                        error = false;
170                    } catch (InterruptedException x) {
171                        //do nothing
172                    }
173                }catch (Throwable t) { //make sure this thread never dies
174                    if (!error) { //use error flag to avoid filling up the logs
175                        LOG.error("WriteTimeout thread unable validate existing sockets.",t);
176                        error = true;
177                    }
178                }
179            }
180        }
181    }
182
183}