001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.util;
018
019 import java.io.IOException;
020 import java.sql.SQLException;
021 import java.util.concurrent.TimeUnit;
022 import java.util.concurrent.atomic.AtomicBoolean;
023
024 import org.apache.activemq.broker.BrokerService;
025 import org.slf4j.Logger;
026 import org.slf4j.LoggerFactory;
027
028 /**
029 * @org.apache.xbean.XBean
030 */
031 public class DefaultIOExceptionHandler implements IOExceptionHandler {
032
033 private static final Logger LOG = LoggerFactory
034 .getLogger(DefaultIOExceptionHandler.class);
035 protected BrokerService broker;
036 private boolean ignoreAllErrors = false;
037 private boolean ignoreNoSpaceErrors = true;
038 private boolean ignoreSQLExceptions = true;
039 private boolean stopStartConnectors = false;
040 private String noSpaceMessage = "space";
041 private String sqlExceptionMessage = ""; // match all
042 private long resumeCheckSleepPeriod = 5*1000;
043 private AtomicBoolean stopStartInProgress = new AtomicBoolean(false);
044
045 public void handle(IOException exception) {
046 if (ignoreAllErrors) {
047 LOG.info("Ignoring IO exception, " + exception, exception);
048 return;
049 }
050
051 if (ignoreNoSpaceErrors) {
052 Throwable cause = exception;
053 while (cause != null && cause instanceof IOException) {
054 String message = cause.getMessage();
055 if (message != null && message.contains(noSpaceMessage)) {
056 LOG.info("Ignoring no space left exception, " + exception, exception);
057 return;
058 }
059 cause = cause.getCause();
060 }
061 }
062
063 if (ignoreSQLExceptions) {
064 Throwable cause = exception;
065 while (cause != null) {
066 String message = cause.getMessage();
067 if (cause instanceof SQLException && message.contains(sqlExceptionMessage)) {
068 LOG.info("Ignoring SQLException, " + exception, cause);
069 return;
070 }
071 cause = cause.getCause();
072 }
073 }
074
075 if (stopStartConnectors) {
076 if (!stopStartInProgress.compareAndSet(false, true)) {
077 // we are already working on it
078 return;
079 }
080 LOG.info("Initiating stop/restart of broker transport due to IO exception, " + exception, exception);
081
082 new Thread("stop transport connectors on IO exception") {
083 public void run() {
084 try {
085 ServiceStopper stopper = new ServiceStopper();
086 broker.stopAllConnectors(stopper);
087 } catch (Exception e) {
088 LOG.warn("Failure occurred while stopping broker connectors", e);
089 }
090 }
091 }.start();
092
093 // resume again
094 new Thread("restart transport connectors post IO exception") {
095 public void run() {
096 try {
097 while (hasLockOwnership() && isPersistenceAdapterDown()) {
098 LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
099 TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
100 }
101 broker.startAllConnectors();
102 } catch (Exception e) {
103 LOG.warn("Stopping broker due to failure while restarting broker connectors", e);
104 stopBroker(e);
105 } finally {
106 stopStartInProgress.compareAndSet(true, false);
107 }
108 }
109
110 private boolean isPersistenceAdapterDown() {
111 boolean checkpointSuccess = false;
112 try {
113 broker.getPersistenceAdapter().checkpoint(true);
114 checkpointSuccess = true;
115 } catch (Throwable ignored) {}
116 return !checkpointSuccess;
117 }
118 }.start();
119
120 return;
121 }
122
123 stopBroker(exception);
124 }
125
126 private void stopBroker(Exception exception) {
127 LOG.info("Stopping the broker due to exception, " + exception, exception);
128 new Thread("Stopping the broker due to IO exception") {
129 public void run() {
130 try {
131 broker.stop();
132 } catch (Exception e) {
133 LOG.warn("Failure occurred while stopping broker", e);
134 }
135 }
136 }.start();
137 }
138
139 protected boolean hasLockOwnership() throws IOException {
140 return true;
141 }
142
143 public void setBrokerService(BrokerService broker) {
144 this.broker = broker;
145 }
146
147 public boolean isIgnoreAllErrors() {
148 return ignoreAllErrors;
149 }
150
151 public void setIgnoreAllErrors(boolean ignoreAllErrors) {
152 this.ignoreAllErrors = ignoreAllErrors;
153 }
154
155 public boolean isIgnoreNoSpaceErrors() {
156 return ignoreNoSpaceErrors;
157 }
158
159 public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) {
160 this.ignoreNoSpaceErrors = ignoreNoSpaceErrors;
161 }
162
163 public String getNoSpaceMessage() {
164 return noSpaceMessage;
165 }
166
167 public void setNoSpaceMessage(String noSpaceMessage) {
168 this.noSpaceMessage = noSpaceMessage;
169 }
170
171 public boolean isIgnoreSQLExceptions() {
172 return ignoreSQLExceptions;
173 }
174
175 public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) {
176 this.ignoreSQLExceptions = ignoreSQLExceptions;
177 }
178
179 public String getSqlExceptionMessage() {
180 return sqlExceptionMessage;
181 }
182
183 public void setSqlExceptionMessage(String sqlExceptionMessage) {
184 this.sqlExceptionMessage = sqlExceptionMessage;
185 }
186
187 public boolean isStopStartConnectors() {
188 return stopStartConnectors;
189 }
190
191 public void setStopStartConnectors(boolean stopStartConnectors) {
192 this.stopStartConnectors = stopStartConnectors;
193 }
194
195 public long getResumeCheckSleepPeriod() {
196 return resumeCheckSleepPeriod;
197 }
198
199 public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) {
200 this.resumeCheckSleepPeriod = resumeCheckSleepPeriod;
201 }
202 }