blob: 2365f980682d9df37a055f97a43bbfcb92c0635b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.safeguard.impl.circuitbreaker;
import org.apache.safeguard.impl.annotation.AnnotationFinder;
import org.apache.safeguard.impl.cache.Key;
import org.apache.safeguard.impl.cache.UnwrappedCache;
import org.apache.safeguard.impl.config.ConfigurationMapper;
import org.apache.safeguard.impl.metrics.FaultToleranceMetrics;
import org.eclipse.microprofile.faulttolerance.CircuitBreaker;
import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException;
import org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceDefinitionException;
import javax.annotation.Priority;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.interceptor.AroundInvoke;
import javax.interceptor.Interceptor;
import javax.interceptor.InvocationContext;
import java.io.Serializable;
import java.util.BitSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import static java.util.Arrays.asList;
@CircuitBreaker
@Interceptor
@Priority(Interceptor.Priority.PLATFORM_AFTER + 12)
public class CircuitBreakerInterceptor implements Serializable {
@Inject
private Cache cache;
@AroundInvoke
public Object ifNotOpen(final InvocationContext context) throws Exception {
final Map<Key, CircuitBreakerImpl> circuitBreakers = cache.getCircuitBreakers();
final Key key = new Key(context, cache.getUnwrappedCache().getUnwrappedCache());
CircuitBreakerImpl circuitBreaker = circuitBreakers.get(key);
if (circuitBreaker == null) {
circuitBreaker = cache.create(context);
final CircuitBreakerImpl existing = circuitBreakers.putIfAbsent(key, circuitBreaker);
if (existing != null) {
circuitBreaker = existing;
} else {
cache.postCreate(circuitBreaker, context);
}
}
if (circuitBreaker.disabled) {
return context.proceed();
}
final CheckResult state = circuitBreaker.performStateCheck(CheckType.READ_ONLY);
if (state == CheckResult.OPEN) {
circuitBreaker.callsPrevented.inc();
throw new CircuitBreakerOpenException(context.getMethod() + " circuit breaker is open");
}
try {
final Object result = context.proceed();
if (state != CheckResult.CLOSED_CHANGED) { // a change triggers a reset we want to preserve
circuitBreaker.onSuccess();
}
circuitBreaker.callsSucceeded.inc();
return result;
} catch (final Exception e) {
if (circuitBreaker.failOn.length > 0 &&
Stream.of(circuitBreaker.failOn).anyMatch(it -> it.isInstance(e) || it.isInstance(e.getCause()))) {
circuitBreaker.onFailure();
} else {
circuitBreaker.callsSucceeded.inc();
}
throw e;
}
}
private static long now() {
return System.nanoTime();
}
private enum CheckType {
READ_ONLY,
FAILURE,
SUCCESS
}
private enum State {
CLOSED {
@Override
public State isStateTransition(final CircuitBreakerImpl breaker,
final CheckIntervalData currentData,
final CheckIntervalData nextData) {
final long now = now();
final double currentFailureRatio = getCurrentFailureRatio(nextData);
breaker.closedDuration.set(now - currentData.checkIntervalStart);
if (nextData.count >= breaker.volumeThreshold && currentFailureRatio >= breaker.failureRatio) {
if (breaker.opened != null) {
breaker.opened.inc();
}
return OPEN;
}
return this;
}
private double getCurrentFailureRatio(final CheckIntervalData data) {
final int length = data.count;
return length == 0 ? 0 : ((length - data.states.cardinality()) * 1. / length);
}
},
HALF_OPEN {
@Override
public State isStateTransition(final CircuitBreakerImpl breaker,
final CheckIntervalData currentData,
final CheckIntervalData nextData) {
breaker.halfOpenDuration.set(now() - currentData.checkIntervalStart);
final int falseCount = nextData.count - nextData.states.cardinality();
if (falseCount > 0) { // an error occurred
return OPEN;
}
final int cardinality = nextData.states.cardinality();
if (cardinality == nextData.count && cardinality >= breaker.successThreshold) {
return CLOSED;
}
return this;
}
},
OPEN {
@Override
public State isStateTransition(final CircuitBreakerImpl breaker,
final CheckIntervalData currentData,
final CheckIntervalData nextData) {
breaker.openDuration.set(now() - currentData.checkIntervalStart);
if (nextData.checkIntervalStart != currentData.checkIntervalStart) {
return breaker.successThreshold == 1 ? CLOSED : HALF_OPEN;
}
if (nextData.states.cardinality() > breaker.successThreshold) {
return breaker.successThreshold == 1 ? CLOSED : HALF_OPEN;
}
return this;
}
};
private boolean isCheckIntervalFinished(final CircuitBreakerImpl breaker,
final CheckIntervalData currentData,
final long now) {
return (now - currentData.checkIntervalStart) > breaker.delay;
}
public abstract State isStateTransition(CircuitBreakerImpl breaker,
CheckIntervalData currentData,
CheckIntervalData nextData);
}
@ApplicationScoped
public static class Cache {
private final Map<Key, CircuitBreakerImpl> circuitBreakers = new ConcurrentHashMap<>();
@Inject
private AnnotationFinder finder;
@Inject
private ConfigurationMapper mapper;
@Inject
private FaultToleranceMetrics metrics;
@Inject
private UnwrappedCache unwrappedCache;
public UnwrappedCache getUnwrappedCache() {
return unwrappedCache;
}
public Map<Key, CircuitBreakerImpl> getCircuitBreakers() {
return circuitBreakers;
}
public CircuitBreakerImpl create(final InvocationContext context) {
final CircuitBreaker definition = mapper.map(
finder.findAnnotation(CircuitBreaker.class, context), context.getMethod(), CircuitBreaker.class);
final long delay = definition.delayUnit().getDuration().toNanos() * definition.delay();
if (delay < 0) {
throw new FaultToleranceDefinitionException("CircuitBreaker delay can't be < 0");
}
final Class<? extends Throwable>[] failOn = definition.failOn();
final double failureRatio = definition.failureRatio();
if (failureRatio < 0 || failureRatio > 1) {
throw new FaultToleranceDefinitionException("CircuitBreaker failure ratio can't be < 0 and > 1");
}
final int volumeThreshold = definition.requestVolumeThreshold();
if (volumeThreshold < 1) {
throw new FaultToleranceDefinitionException("CircuitBreaker volume threshold can't be < 0");
}
final int successThreshold = definition.successThreshold();
if (successThreshold <= 0) {
throw new FaultToleranceDefinitionException("CircuitBreaker success threshold can't be <= 0");
}
final String metricsNameBase = getBaseMetricsName(context);
return new CircuitBreakerImpl(
!mapper.isEnabled(context.getMethod(), CircuitBreaker.class),
volumeThreshold, delay, successThreshold,
failOn, failureRatio, metrics.counter(metricsNameBase + "callsSucceeded.total",
"Number of calls allowed to run by the circuit breaker that returned successfully"),
metrics.counter(metricsNameBase + "callsFailed.total",
"Number of calls allowed to run by the circuit breaker that then failed"),
metrics.counter(metricsNameBase + "callsPrevented.total",
"Number of calls prevented from running by an open circuit breaker"),
metrics.counter(metricsNameBase + "opened.total",
"Number of times the circuit breaker has moved from closed state to open state"));
}
private String getBaseMetricsName(final InvocationContext context) {
return "ft." + context.getMethod().getDeclaringClass().getCanonicalName() + "."
+ context.getMethod().getName() + ".circuitbreaker.";
}
public void postCreate(final CircuitBreakerImpl circuitBreaker, final InvocationContext context) {
final String metricsNameBase = getBaseMetricsName(context);
metrics.gauge(metricsNameBase + "open.total", "Amount of time the circuit breaker has spent in open state", "nanoseconds",
circuitBreaker.openDuration::get);
metrics.gauge(metricsNameBase + "halfOpen.total", "Amount of time the circuit breaker has spent in half-open state", "nanoseconds",
circuitBreaker.halfOpenDuration::get);
metrics.gauge(metricsNameBase + "closed.total", "Amount of time the circuit breaker has spent in closed state", "nanoseconds",
circuitBreaker.closedDuration::get);
}
}
private enum CheckResult {
OPEN, CLOSED_CHANGED, CLOSED
}
public static class CircuitBreakerImpl {
private static final BitSet EMPTY_ARRAY = new BitSet();
private static final BitSet FIRST_SUCCESS_ARRAY = new BitSet();
private static final BitSet FIRST_FAILURE_ARRAY = new BitSet();
static {
FIRST_FAILURE_ARRAY.set(0);
FIRST_FAILURE_ARRAY.clear(0);
}
private final boolean disabled;
private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);
private final AtomicReference<CheckIntervalData> checkIntervalData;
private final int volumeThreshold;
private final long delay;
private final int successThreshold;
private final double failureRatio;
private final Class<? extends Throwable>[] failOn;
private final AtomicLong openDuration = new AtomicLong();
private final FaultToleranceMetrics.Counter callsSucceeded;
private final FaultToleranceMetrics.Counter callsFailed;
private final FaultToleranceMetrics.Counter callsPrevented;
private final AtomicLong halfOpenDuration = new AtomicLong();
private final AtomicLong closedDuration = new AtomicLong();
private final FaultToleranceMetrics.Counter opened;
CircuitBreakerImpl(final boolean disabled,
final int volumeThreshold, final long delay, final int successThreshold,
final Class<? extends Throwable>[] failOn, final double failureRatio,
final FaultToleranceMetrics.Counter callsSucceeded,
final FaultToleranceMetrics.Counter callsFailed,
final FaultToleranceMetrics.Counter callsPrevented,
final FaultToleranceMetrics.Counter opened) {
this.disabled = disabled;
this.checkIntervalData = new AtomicReference<>(new CheckIntervalData(volumeThreshold, 0, 0, EMPTY_ARRAY, 0));
this.volumeThreshold = volumeThreshold;
this.delay = delay;
this.successThreshold = successThreshold;
this.failOn = failOn;
this.failureRatio = failureRatio;
this.callsSucceeded = callsSucceeded;
this.callsFailed = callsFailed;
this.callsPrevented = callsPrevented;
this.opened = opened;
}
private void onSuccess() {
performStateCheck(CheckType.SUCCESS);
}
private void onFailure() {
performStateCheck(CheckType.FAILURE);
callsFailed.inc();
}
private CheckResult performStateCheck(final CheckType type) {
CheckIntervalData currentData;
CheckIntervalData nextData;
State currentState;
do {
final long time = now();
currentState = state.get();
currentData = checkIntervalData.get();
nextData = nextCheckIntervalData(type, currentData, currentState, time);
} while (!updateCheckIntervalData(currentData, nextData));
final State newState = currentState.isStateTransition(this, currentData, nextData);
if (newState != currentState) {
state.compareAndSet(currentState, newState);
checkIntervalData.set(new CheckIntervalData(volumeThreshold, 0, 0, EMPTY_ARRAY, now()));
return newState != State.OPEN ? CheckResult.CLOSED_CHANGED : CheckResult.OPEN;
}
return newState != State.OPEN ? CheckResult.CLOSED : CheckResult.OPEN;
}
private boolean updateCheckIntervalData(final CheckIntervalData currentData,
final CheckIntervalData nextData) {
return currentData == nextData
|| checkIntervalData.compareAndSet(currentData, nextData);
}
private CheckIntervalData nextCheckIntervalData(final CheckType type,
final CheckIntervalData currentData,
final State currentState,
final long time) {
if (currentState.isCheckIntervalFinished(this, currentData, time)) {
return toNewData(type, time);
} else {
switch (type) {
case FAILURE:
return currentData.failure();
case SUCCESS:
return currentData.success();
case READ_ONLY:
return currentData;
default:
throw new IllegalArgumentException("unknown type " + type);
}
}
}
private CheckIntervalData toNewData(final CheckType type, final long time) {
switch (type) {
case FAILURE:
return new CheckIntervalData(volumeThreshold, 1, 1, FIRST_FAILURE_ARRAY, time);
case SUCCESS:
return new CheckIntervalData(volumeThreshold, 1, 1, FIRST_SUCCESS_ARRAY, time);
case READ_ONLY:
return new CheckIntervalData(volumeThreshold, 0, 0, EMPTY_ARRAY, time);
default:
throw new IllegalArgumentException("unknown type " + type);
}
}
}
private static class CheckIntervalData {
private final int length;
private final int currentIdx;
private final int count;
private final BitSet states; // todo: revise that but seems the spec sucks
private final long checkIntervalStart;
CheckIntervalData(final int length, final int currentIdx, final int count, final BitSet states, final long intervalStart) {
this.length = length;
this.currentIdx = currentIdx;
this.count = count;
this.states = states;
this.checkIntervalStart = intervalStart;
}
private CheckIntervalData success() {
return next(true);
}
private CheckIntervalData failure() {
return next(false);
}
private CheckIntervalData next(final boolean value) {
final BitSet bitSet = new BitSet(length);
bitSet.or(states);
if (value) {
bitSet.set(currentIdx);
} else {
bitSet.clear(currentIdx);
}
return new CheckIntervalData(length, (currentIdx + 1) % length, Math.min(count + 1, length), bitSet, checkIntervalStart);
}
@Override
public String toString() {
return "CheckIntervalData{states=" + asList(states) + ", checkIntervalStart=" + checkIntervalStart + '}';
}
}
}