001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.util;
018    
019    import java.io.IOException;
020    import java.sql.SQLException;
021    import java.util.concurrent.TimeUnit;
022    import java.util.concurrent.atomic.AtomicBoolean;
023    
024    import org.apache.activemq.broker.BrokerService;
025    import org.slf4j.Logger;
026    import org.slf4j.LoggerFactory;
027    
028    /**
029     * @org.apache.xbean.XBean
030     */
031     public class DefaultIOExceptionHandler implements IOExceptionHandler {
032    
033        private static final Logger LOG = LoggerFactory
034                .getLogger(DefaultIOExceptionHandler.class);
035        protected BrokerService broker;
036        private boolean ignoreAllErrors = false;
037        private boolean ignoreNoSpaceErrors = true;
038        private boolean ignoreSQLExceptions = true;
039        private boolean stopStartConnectors = false;
040        private String noSpaceMessage = "space";
041        private String sqlExceptionMessage = ""; // match all
042        private long resumeCheckSleepPeriod = 5*1000;
043        private AtomicBoolean stopStartInProgress = new AtomicBoolean(false);
044    
045        public void handle(IOException exception) {
046            if (ignoreAllErrors) {
047                LOG.info("Ignoring IO exception, " + exception, exception);
048                return;
049            }
050    
051            if (ignoreNoSpaceErrors) {
052                Throwable cause = exception;
053                while (cause != null && cause instanceof IOException) {
054                    String message = cause.getMessage();
055                    if (message != null && message.contains(noSpaceMessage)) {
056                        LOG.info("Ignoring no space left exception, " + exception, exception);
057                        return;
058                    }
059                    cause = cause.getCause();
060                }
061            }
062    
063            if (ignoreSQLExceptions) {
064                Throwable cause = exception;
065                while (cause != null) {
066                    String message = cause.getMessage();
067                    if (cause instanceof SQLException && message.contains(sqlExceptionMessage)) {
068                        LOG.info("Ignoring SQLException, " + exception, cause);
069                        return;
070                    }
071                    cause = cause.getCause();
072                }
073            }
074    
075            if (stopStartConnectors) {
076                if (!stopStartInProgress.compareAndSet(false, true)) {
077                    // we are already working on it
078                    return;
079                }
080                LOG.info("Initiating stop/restart of broker transport due to IO exception, " + exception, exception);
081    
082                new Thread("stop transport connectors on IO exception") {
083                    public void run() {
084                        try {
085                            ServiceStopper stopper = new ServiceStopper();
086                            broker.stopAllConnectors(stopper);
087                        } catch (Exception e) {
088                            LOG.warn("Failure occurred while stopping broker connectors", e);
089                        }
090                    }
091                }.start();
092    
093                // resume again
094                new Thread("restart transport connectors post IO exception") {
095                    public void run() {
096                        try {
097                            while (hasLockOwnership() && isPersistenceAdapterDown()) {
098                                LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
099                                TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
100                            }
101                            broker.startAllConnectors();
102                        } catch (Exception e) {
103                            LOG.warn("Stopping broker due to failure while restarting broker connectors", e);
104                            stopBroker(e);
105                        } finally {
106                            stopStartInProgress.compareAndSet(true, false);
107                        }
108                    }
109    
110                    private boolean isPersistenceAdapterDown() {
111                        boolean checkpointSuccess = false;
112                        try {
113                            broker.getPersistenceAdapter().checkpoint(true);
114                            checkpointSuccess = true;
115                        } catch (Throwable ignored) {}
116                        return !checkpointSuccess;
117                    }
118                }.start();
119    
120                return;
121            }
122    
123            stopBroker(exception);
124        }
125    
126        private void stopBroker(Exception exception) {
127            LOG.info("Stopping the broker due to exception, " + exception, exception);
128            new Thread("Stopping the broker due to IO exception") {
129                public void run() {
130                    try {
131                        broker.stop();
132                    } catch (Exception e) {
133                        LOG.warn("Failure occurred while stopping broker", e);
134                    }
135                }
136            }.start();
137        }
138    
139        protected boolean hasLockOwnership() throws IOException {
140            return true;
141        }
142    
143        public void setBrokerService(BrokerService broker) {
144            this.broker = broker;
145        }
146    
147        public boolean isIgnoreAllErrors() {
148            return ignoreAllErrors;
149        }
150    
151        public void setIgnoreAllErrors(boolean ignoreAllErrors) {
152            this.ignoreAllErrors = ignoreAllErrors;
153        }
154    
155        public boolean isIgnoreNoSpaceErrors() {
156            return ignoreNoSpaceErrors;
157        }
158    
159        public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) {
160            this.ignoreNoSpaceErrors = ignoreNoSpaceErrors;
161        }
162    
163        public String getNoSpaceMessage() {
164            return noSpaceMessage;
165        }
166    
167        public void setNoSpaceMessage(String noSpaceMessage) {
168            this.noSpaceMessage = noSpaceMessage;
169        }
170    
171        public boolean isIgnoreSQLExceptions() {
172            return ignoreSQLExceptions;
173        }
174    
175        public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) {
176            this.ignoreSQLExceptions = ignoreSQLExceptions;
177        }
178    
179        public String getSqlExceptionMessage() {
180            return sqlExceptionMessage;
181        }
182    
183        public void setSqlExceptionMessage(String sqlExceptionMessage) {
184            this.sqlExceptionMessage = sqlExceptionMessage;
185        }
186    
187        public boolean isStopStartConnectors() {
188            return stopStartConnectors;
189        }
190    
191        public void setStopStartConnectors(boolean stopStartConnectors) {
192            this.stopStartConnectors = stopStartConnectors;
193        }
194    
195        public long getResumeCheckSleepPeriod() {
196            return resumeCheckSleepPeriod;
197        }
198    
199        public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) {
200            this.resumeCheckSleepPeriod = resumeCheckSleepPeriod;
201        }
202    }