blob: 7e6d08990d5c84062710124f431821a6ea9548ad [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.SuppressReplyException;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @org.apache.xbean.XBean
*/
public class DefaultIOExceptionHandler implements IOExceptionHandler {
private static final Logger LOG = LoggerFactory
.getLogger(DefaultIOExceptionHandler.class);
protected BrokerService broker;
private boolean ignoreAllErrors = false;
private boolean ignoreNoSpaceErrors = true;
private boolean ignoreSQLExceptions = true;
private boolean stopStartConnectors = false;
private String noSpaceMessage = "space";
private String sqlExceptionMessage = ""; // match all
private long resumeCheckSleepPeriod = 5*1000;
private final AtomicBoolean handlingException = new AtomicBoolean(false);
private boolean systemExitOnShutdown = false;
@Override
public void handle(IOException exception) {
if (!broker.isStarted() || ignoreAllErrors) {
allowIOResumption();
LOG.info("Ignoring IO exception, " + exception, exception);
return;
}
if (ignoreNoSpaceErrors) {
Throwable cause = exception;
while (cause != null && cause instanceof IOException) {
String message = cause.getMessage();
if (message != null && message.contains(noSpaceMessage)) {
LOG.info("Ignoring no space left exception, " + exception, exception);
allowIOResumption();
return;
}
cause = cause.getCause();
}
}
if (ignoreSQLExceptions) {
Throwable cause = exception;
while (cause != null) {
if (cause instanceof SQLException) {
String message = cause.getMessage();
if (message == null) {
message = "";
}
if (message.contains(sqlExceptionMessage)) {
LOG.info("Ignoring SQLException, " + exception, cause);
return;
}
}
cause = cause.getCause();
}
}
if (stopStartConnectors) {
if (handlingException.compareAndSet(false, true)) {
LOG.info("Initiating stop/restart of transports on " + broker + " due to IO exception, " + exception, exception);
new Thread("IOExceptionHandler: stop transports") {
@Override
public void run() {
try {
ServiceStopper stopper = new ServiceStopper();
broker.stopAllConnectors(stopper);
LOG.info("Successfully stopped transports on " + broker);
} catch (Exception e) {
LOG.warn("Failure occurred while stopping broker connectors", e);
} finally {
// resume again
new Thread("IOExceptionHandler: restart transports") {
@Override
public void run() {
try {
allowIOResumption();
while (hasLockOwnership() && isPersistenceAdapterDown()) {
LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
}
if (hasLockOwnership()) {
Map<ActiveMQDestination, Destination> destinations = ((RegionBroker)broker.getRegionBroker()).getDestinationMap();
for (Destination destination : destinations.values()) {
if (destination instanceof Queue) {
Queue queue = (Queue)destination;
if (queue.isResetNeeded()) {
queue.clearPendingMessages(0);
}
}
}
broker.startAllConnectors();
LOG.info("Successfully restarted transports on " + broker);
}
} catch (Exception e) {
LOG.warn("Stopping " + broker + " due to failure restarting transports", e);
stopBroker(e);
} finally {
handlingException.compareAndSet(true, false);
}
}
private boolean isPersistenceAdapterDown() {
boolean checkpointSuccess = false;
try {
broker.getPersistenceAdapter().checkpoint(true);
checkpointSuccess = true;
} catch (Throwable ignored) {
}
return !checkpointSuccess;
}
}.start();
}
}
}.start();
}
throw new SuppressReplyException("Stop/RestartTransportsInitiated", exception);
}
if (handlingException.compareAndSet(false, true)) {
stopBroker(exception);
}
// we don't want to propagate the exception back to the client
// They will see a delay till they see a disconnect via socket.close
// at which point failover: can kick in.
throw new SuppressReplyException("ShutdownBrokerInitiated", exception);
}
protected void allowIOResumption() {
try {
if (broker.getPersistenceAdapter() != null) {
broker.getPersistenceAdapter().allowIOResumption();
}
} catch (IOException e) {
LOG.warn("Failed to allow IO resumption", e);
}
}
private void stopBroker(Exception exception) {
LOG.info("Stopping " + broker + " due to exception, " + exception, exception);
new Thread("IOExceptionHandler: stopping " + broker) {
@Override
public void run() {
try {
if( broker.isRestartAllowed() ) {
broker.requestRestart();
}
broker.setSystemExitOnShutdown(isSystemExitOnShutdown());
broker.stop();
} catch (Exception e) {
LOG.warn("Failure occurred while stopping broker", e);
}
}
}.start();
}
protected boolean hasLockOwnership() throws IOException {
return true;
}
@Override
public void setBrokerService(BrokerService broker) {
this.broker = broker;
}
public boolean isIgnoreAllErrors() {
return ignoreAllErrors;
}
public void setIgnoreAllErrors(boolean ignoreAllErrors) {
this.ignoreAllErrors = ignoreAllErrors;
}
public boolean isIgnoreNoSpaceErrors() {
return ignoreNoSpaceErrors;
}
public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) {
this.ignoreNoSpaceErrors = ignoreNoSpaceErrors;
}
public String getNoSpaceMessage() {
return noSpaceMessage;
}
public void setNoSpaceMessage(String noSpaceMessage) {
this.noSpaceMessage = noSpaceMessage;
}
public boolean isIgnoreSQLExceptions() {
return ignoreSQLExceptions;
}
public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) {
this.ignoreSQLExceptions = ignoreSQLExceptions;
}
public String getSqlExceptionMessage() {
return sqlExceptionMessage;
}
public void setSqlExceptionMessage(String sqlExceptionMessage) {
this.sqlExceptionMessage = sqlExceptionMessage;
}
public boolean isStopStartConnectors() {
return stopStartConnectors;
}
public void setStopStartConnectors(boolean stopStartConnectors) {
this.stopStartConnectors = stopStartConnectors;
}
public long getResumeCheckSleepPeriod() {
return resumeCheckSleepPeriod;
}
public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) {
this.resumeCheckSleepPeriod = resumeCheckSleepPeriod;
}
public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
this.systemExitOnShutdown = systemExitOnShutdown;
}
public boolean isSystemExitOnShutdown() {
return systemExitOnShutdown;
}
}