blob: d70e764fbd1769d34d679f196571bb261b8363b3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import java.io.IOException;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @org.apache.xbean.XBean
*/
public class DefaultIOExceptionHandler implements IOExceptionHandler {
private static final Logger LOG = LoggerFactory
.getLogger(DefaultIOExceptionHandler.class);
private BrokerService broker;
private boolean ignoreAllErrors = false;
private boolean ignoreNoSpaceErrors = true;
private boolean ignoreSQLExceptions = true;
private boolean stopStartConnectors = false;
private String noSpaceMessage = "space";
private String sqlExceptionMessage = ""; // match all
private long resumeCheckSleepPeriod = 5*1000;
private AtomicBoolean stopStartInProgress = new AtomicBoolean(false);
public void handle(IOException exception) {
if (ignoreAllErrors) {
LOG.info("Ignoring IO exception, " + exception, exception);
return;
}
if (ignoreNoSpaceErrors) {
Throwable cause = exception;
while (cause != null && cause instanceof IOException) {
if (cause.getMessage().contains(noSpaceMessage)) {
LOG.info("Ignoring no space left exception, " + exception, exception);
return;
}
cause = cause.getCause();
}
}
if (ignoreSQLExceptions) {
Throwable cause = exception;
while (cause != null) {
if (cause instanceof SQLException && cause.getMessage().contains(sqlExceptionMessage)) {
LOG.info("Ignoring SQLException, " + exception, cause);
return;
}
cause = cause.getCause();
}
}
if (stopStartConnectors) {
if (!stopStartInProgress.compareAndSet(false, true)) {
// we are already working on it
return;
}
LOG.info("Initiating stop/restart of broker transport due to IO exception, " + exception, exception);
new Thread("stop transport connectors on IO exception") {
public void run() {
try {
ServiceStopper stopper = new ServiceStopper();
broker.stopAllConnectors(stopper);
} catch (Exception e) {
LOG.warn("Failure occurred while stopping broker connectors", e);
}
}
}.start();
// resume again
new Thread("restart transport connectors post IO exception") {
public void run() {
try {
while (isPersistenceAdapterDown()) {
LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
}
broker.startAllConnectors();
} catch (Exception e) {
LOG.warn("Failure occurred while restarting broker connectors", e);
} finally {
stopStartInProgress.compareAndSet(true, false);
}
}
private boolean isPersistenceAdapterDown() {
boolean checkpointSuccess = false;
try {
broker.getPersistenceAdapter().checkpoint(true);
checkpointSuccess = true;
} catch (Throwable ignored) {}
return !checkpointSuccess;
}
}.start();
return;
}
LOG.info("Stopping the broker due to IO exception, " + exception, exception);
new Thread("Stopping the broker due to IO exception") {
public void run() {
try {
broker.stop();
} catch (Exception e) {
LOG.warn("Failure occurred while stopping broker", e);
}
}
}.start();
}
public void setBrokerService(BrokerService broker) {
this.broker = broker;
}
public boolean isIgnoreAllErrors() {
return ignoreAllErrors;
}
public void setIgnoreAllErrors(boolean ignoreAllErrors) {
this.ignoreAllErrors = ignoreAllErrors;
}
public boolean isIgnoreNoSpaceErrors() {
return ignoreNoSpaceErrors;
}
public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) {
this.ignoreNoSpaceErrors = ignoreNoSpaceErrors;
}
public String getNoSpaceMessage() {
return noSpaceMessage;
}
public void setNoSpaceMessage(String noSpaceMessage) {
this.noSpaceMessage = noSpaceMessage;
}
public boolean isIgnoreSQLExceptions() {
return ignoreSQLExceptions;
}
public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) {
this.ignoreSQLExceptions = ignoreSQLExceptions;
}
public String getSqlExceptionMessage() {
return sqlExceptionMessage;
}
public void setSqlExceptionMessage(String sqlExceptionMessage) {
this.sqlExceptionMessage = sqlExceptionMessage;
}
public boolean isStopStartConnectors() {
return stopStartConnectors;
}
public void setStopStartConnectors(boolean stopStartConnectors) {
this.stopStartConnectors = stopStartConnectors;
}
public long getResumeCheckSleepPeriod() {
return resumeCheckSleepPeriod;
}
public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) {
this.resumeCheckSleepPeriod = resumeCheckSleepPeriod;
}
}