| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.safeguard.impl.circuitbreaker; |
| |
| import org.apache.safeguard.impl.annotation.AnnotationFinder; |
| import org.apache.safeguard.impl.cache.Key; |
| import org.apache.safeguard.impl.cache.UnwrappedCache; |
| import org.apache.safeguard.impl.config.ConfigurationMapper; |
| import org.apache.safeguard.impl.metrics.FaultToleranceMetrics; |
| import org.eclipse.microprofile.faulttolerance.CircuitBreaker; |
| import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException; |
| import org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceDefinitionException; |
| |
| import javax.annotation.Priority; |
| import javax.enterprise.context.ApplicationScoped; |
| import javax.inject.Inject; |
| import javax.interceptor.AroundInvoke; |
| import javax.interceptor.Interceptor; |
| import javax.interceptor.InvocationContext; |
| import java.io.Serializable; |
| import java.util.BitSet; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.stream.Stream; |
| |
| import static java.util.Arrays.asList; |
| |
| @CircuitBreaker |
| @Interceptor |
| @Priority(Interceptor.Priority.PLATFORM_AFTER + 12) |
| public class CircuitBreakerInterceptor implements Serializable { |
| @Inject |
| private Cache cache; |
| |
| @AroundInvoke |
| public Object ifNotOpen(final InvocationContext context) throws Exception { |
| final Map<Key, CircuitBreakerImpl> circuitBreakers = cache.getCircuitBreakers(); |
| final Key key = new Key(context, cache.getUnwrappedCache().getUnwrappedCache()); |
| CircuitBreakerImpl circuitBreaker = circuitBreakers.get(key); |
| if (circuitBreaker == null) { |
| circuitBreaker = cache.create(context); |
| final CircuitBreakerImpl existing = circuitBreakers.putIfAbsent(key, circuitBreaker); |
| if (existing != null) { |
| circuitBreaker = existing; |
| } else { |
| cache.postCreate(circuitBreaker, context); |
| } |
| } |
| if (circuitBreaker.disabled) { |
| return context.proceed(); |
| } |
| |
| final CheckResult state = circuitBreaker.performStateCheck(CheckType.READ_ONLY); |
| if (state == CheckResult.OPEN) { |
| circuitBreaker.callsPrevented.inc(); |
| throw new CircuitBreakerOpenException(context.getMethod() + " circuit breaker is open"); |
| } |
| try { |
| final Object result = context.proceed(); |
| if (state != CheckResult.CLOSED_CHANGED) { // a change triggers a reset we want to preserve |
| circuitBreaker.onSuccess(); |
| } |
| circuitBreaker.callsSucceeded.inc(); |
| return result; |
| } catch (final Exception e) { |
| if (circuitBreaker.failOn.length > 0 && |
| Stream.of(circuitBreaker.failOn).anyMatch(it -> it.isInstance(e) || it.isInstance(e.getCause()))) { |
| circuitBreaker.onFailure(); |
| } else { |
| circuitBreaker.callsSucceeded.inc(); |
| } |
| throw e; |
| } |
| } |
| |
| private static long now() { |
| return System.nanoTime(); |
| } |
| |
| private enum CheckType { |
| READ_ONLY, |
| FAILURE, |
| SUCCESS |
| } |
| |
| private enum State { |
| CLOSED { |
| @Override |
| public State isStateTransition(final CircuitBreakerImpl breaker, |
| final CheckIntervalData currentData, |
| final CheckIntervalData nextData) { |
| final long now = now(); |
| final double currentFailureRatio = getCurrentFailureRatio(nextData); |
| breaker.closedDuration.set(now - currentData.checkIntervalStart); |
| if (nextData.count >= breaker.volumeThreshold && currentFailureRatio >= breaker.failureRatio) { |
| if (breaker.opened != null) { |
| breaker.opened.inc(); |
| } |
| return OPEN; |
| } |
| return this; |
| } |
| |
| private double getCurrentFailureRatio(final CheckIntervalData data) { |
| final int length = data.count; |
| return length == 0 ? 0 : ((length - data.states.cardinality()) * 1. / length); |
| } |
| }, |
| HALF_OPEN { |
| @Override |
| public State isStateTransition(final CircuitBreakerImpl breaker, |
| final CheckIntervalData currentData, |
| final CheckIntervalData nextData) { |
| breaker.halfOpenDuration.set(now() - currentData.checkIntervalStart); |
| final int falseCount = nextData.count - nextData.states.cardinality(); |
| if (falseCount > 0) { // an error occurred |
| return OPEN; |
| } |
| |
| final int cardinality = nextData.states.cardinality(); |
| if (cardinality == nextData.count && cardinality >= breaker.successThreshold) { |
| return CLOSED; |
| } |
| return this; |
| } |
| }, |
| OPEN { |
| @Override |
| public State isStateTransition(final CircuitBreakerImpl breaker, |
| final CheckIntervalData currentData, |
| final CheckIntervalData nextData) { |
| breaker.openDuration.set(now() - currentData.checkIntervalStart); |
| if (nextData.checkIntervalStart != currentData.checkIntervalStart) { |
| return breaker.successThreshold == 1 ? CLOSED : HALF_OPEN; |
| } |
| if (nextData.states.cardinality() > breaker.successThreshold) { |
| return breaker.successThreshold == 1 ? CLOSED : HALF_OPEN; |
| } |
| return this; |
| } |
| }; |
| |
| private boolean isCheckIntervalFinished(final CircuitBreakerImpl breaker, |
| final CheckIntervalData currentData, |
| final long now) { |
| return (now - currentData.checkIntervalStart) > breaker.delay; |
| } |
| |
| public abstract State isStateTransition(CircuitBreakerImpl breaker, |
| CheckIntervalData currentData, |
| CheckIntervalData nextData); |
| } |
| |
| @ApplicationScoped |
| public static class Cache { |
| private final Map<Key, CircuitBreakerImpl> circuitBreakers = new ConcurrentHashMap<>(); |
| |
| @Inject |
| private AnnotationFinder finder; |
| |
| @Inject |
| private ConfigurationMapper mapper; |
| |
| @Inject |
| private FaultToleranceMetrics metrics; |
| |
| @Inject |
| private UnwrappedCache unwrappedCache; |
| |
| public UnwrappedCache getUnwrappedCache() { |
| return unwrappedCache; |
| } |
| |
| public Map<Key, CircuitBreakerImpl> getCircuitBreakers() { |
| return circuitBreakers; |
| } |
| |
| public CircuitBreakerImpl create(final InvocationContext context) { |
| final CircuitBreaker definition = mapper.map( |
| finder.findAnnotation(CircuitBreaker.class, context), context.getMethod(), CircuitBreaker.class); |
| final long delay = definition.delayUnit().getDuration().toNanos() * definition.delay(); |
| if (delay < 0) { |
| throw new FaultToleranceDefinitionException("CircuitBreaker delay can't be < 0"); |
| } |
| |
| final Class<? extends Throwable>[] failOn = definition.failOn(); |
| |
| final double failureRatio = definition.failureRatio(); |
| if (failureRatio < 0 || failureRatio > 1) { |
| throw new FaultToleranceDefinitionException("CircuitBreaker failure ratio can't be < 0 and > 1"); |
| } |
| |
| final int volumeThreshold = definition.requestVolumeThreshold(); |
| if (volumeThreshold < 1) { |
| throw new FaultToleranceDefinitionException("CircuitBreaker volume threshold can't be < 0"); |
| } |
| |
| final int successThreshold = definition.successThreshold(); |
| if (successThreshold <= 0) { |
| throw new FaultToleranceDefinitionException("CircuitBreaker success threshold can't be <= 0"); |
| } |
| |
| final String metricsNameBase = getBaseMetricsName(context); |
| return new CircuitBreakerImpl( |
| !mapper.isEnabled(context.getMethod(), CircuitBreaker.class), |
| volumeThreshold, delay, successThreshold, |
| failOn, failureRatio, metrics.counter(metricsNameBase + "callsSucceeded.total", |
| "Number of calls allowed to run by the circuit breaker that returned successfully"), |
| metrics.counter(metricsNameBase + "callsFailed.total", |
| "Number of calls allowed to run by the circuit breaker that then failed"), |
| metrics.counter(metricsNameBase + "callsPrevented.total", |
| "Number of calls prevented from running by an open circuit breaker"), |
| metrics.counter(metricsNameBase + "opened.total", |
| "Number of times the circuit breaker has moved from closed state to open state")); |
| } |
| |
| private String getBaseMetricsName(final InvocationContext context) { |
| return "ft." + context.getMethod().getDeclaringClass().getCanonicalName() + "." |
| + context.getMethod().getName() + ".circuitbreaker."; |
| } |
| |
| public void postCreate(final CircuitBreakerImpl circuitBreaker, final InvocationContext context) { |
| final String metricsNameBase = getBaseMetricsName(context); |
| metrics.gauge(metricsNameBase + "open.total", "Amount of time the circuit breaker has spent in open state", "nanoseconds", |
| circuitBreaker.openDuration::get); |
| metrics.gauge(metricsNameBase + "halfOpen.total", "Amount of time the circuit breaker has spent in half-open state", "nanoseconds", |
| circuitBreaker.halfOpenDuration::get); |
| metrics.gauge(metricsNameBase + "closed.total", "Amount of time the circuit breaker has spent in closed state", "nanoseconds", |
| circuitBreaker.closedDuration::get); |
| } |
| } |
| |
| private enum CheckResult { |
| OPEN, CLOSED_CHANGED, CLOSED |
| } |
| |
| public static class CircuitBreakerImpl { |
| private static final BitSet EMPTY_ARRAY = new BitSet(); |
| private static final BitSet FIRST_SUCCESS_ARRAY = new BitSet(); |
| private static final BitSet FIRST_FAILURE_ARRAY = new BitSet(); |
| |
| static { |
| FIRST_FAILURE_ARRAY.set(0); |
| FIRST_FAILURE_ARRAY.clear(0); |
| } |
| |
| private final boolean disabled; |
| private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED); |
| private final AtomicReference<CheckIntervalData> checkIntervalData; |
| private final int volumeThreshold; |
| private final long delay; |
| private final int successThreshold; |
| private final double failureRatio; |
| private final Class<? extends Throwable>[] failOn; |
| |
| private final AtomicLong openDuration = new AtomicLong(); |
| private final FaultToleranceMetrics.Counter callsSucceeded; |
| private final FaultToleranceMetrics.Counter callsFailed; |
| private final FaultToleranceMetrics.Counter callsPrevented; |
| private final AtomicLong halfOpenDuration = new AtomicLong(); |
| private final AtomicLong closedDuration = new AtomicLong(); |
| private final FaultToleranceMetrics.Counter opened; |
| |
| CircuitBreakerImpl(final boolean disabled, |
| final int volumeThreshold, final long delay, final int successThreshold, |
| final Class<? extends Throwable>[] failOn, final double failureRatio, |
| final FaultToleranceMetrics.Counter callsSucceeded, |
| final FaultToleranceMetrics.Counter callsFailed, |
| final FaultToleranceMetrics.Counter callsPrevented, |
| final FaultToleranceMetrics.Counter opened) { |
| this.disabled = disabled; |
| this.checkIntervalData = new AtomicReference<>(new CheckIntervalData(volumeThreshold, 0, 0, EMPTY_ARRAY, 0)); |
| this.volumeThreshold = volumeThreshold; |
| this.delay = delay; |
| this.successThreshold = successThreshold; |
| this.failOn = failOn; |
| this.failureRatio = failureRatio; |
| this.callsSucceeded = callsSucceeded; |
| this.callsFailed = callsFailed; |
| this.callsPrevented = callsPrevented; |
| this.opened = opened; |
| } |
| |
| private void onSuccess() { |
| performStateCheck(CheckType.SUCCESS); |
| } |
| |
| private void onFailure() { |
| performStateCheck(CheckType.FAILURE); |
| callsFailed.inc(); |
| } |
| |
| private CheckResult performStateCheck(final CheckType type) { |
| CheckIntervalData currentData; |
| CheckIntervalData nextData; |
| State currentState; |
| do { |
| final long time = now(); |
| currentState = state.get(); |
| currentData = checkIntervalData.get(); |
| nextData = nextCheckIntervalData(type, currentData, currentState, time); |
| } while (!updateCheckIntervalData(currentData, nextData)); |
| final State newState = currentState.isStateTransition(this, currentData, nextData); |
| if (newState != currentState) { |
| state.compareAndSet(currentState, newState); |
| checkIntervalData.set(new CheckIntervalData(volumeThreshold, 0, 0, EMPTY_ARRAY, now())); |
| return newState != State.OPEN ? CheckResult.CLOSED_CHANGED : CheckResult.OPEN; |
| } |
| return newState != State.OPEN ? CheckResult.CLOSED : CheckResult.OPEN; |
| } |
| |
| private boolean updateCheckIntervalData(final CheckIntervalData currentData, |
| final CheckIntervalData nextData) { |
| return currentData == nextData |
| || checkIntervalData.compareAndSet(currentData, nextData); |
| } |
| |
| private CheckIntervalData nextCheckIntervalData(final CheckType type, |
| final CheckIntervalData currentData, |
| final State currentState, |
| final long time) { |
| if (currentState.isCheckIntervalFinished(this, currentData, time)) { |
| return toNewData(type, time); |
| } else { |
| switch (type) { |
| case FAILURE: |
| return currentData.failure(); |
| case SUCCESS: |
| return currentData.success(); |
| case READ_ONLY: |
| return currentData; |
| default: |
| throw new IllegalArgumentException("unknown type " + type); |
| } |
| } |
| } |
| |
| private CheckIntervalData toNewData(final CheckType type, final long time) { |
| switch (type) { |
| case FAILURE: |
| return new CheckIntervalData(volumeThreshold, 1, 1, FIRST_FAILURE_ARRAY, time); |
| case SUCCESS: |
| return new CheckIntervalData(volumeThreshold, 1, 1, FIRST_SUCCESS_ARRAY, time); |
| case READ_ONLY: |
| return new CheckIntervalData(volumeThreshold, 0, 0, EMPTY_ARRAY, time); |
| default: |
| throw new IllegalArgumentException("unknown type " + type); |
| } |
| } |
| } |
| |
| private static class CheckIntervalData { |
| private final int length; |
| private final int currentIdx; |
| private final int count; |
| private final BitSet states; // todo: revise that but seems the spec sucks |
| private final long checkIntervalStart; |
| |
| CheckIntervalData(final int length, final int currentIdx, final int count, final BitSet states, final long intervalStart) { |
| this.length = length; |
| this.currentIdx = currentIdx; |
| this.count = count; |
| this.states = states; |
| this.checkIntervalStart = intervalStart; |
| } |
| |
| private CheckIntervalData success() { |
| return next(true); |
| } |
| |
| private CheckIntervalData failure() { |
| return next(false); |
| } |
| |
| private CheckIntervalData next(final boolean value) { |
| final BitSet bitSet = new BitSet(length); |
| bitSet.or(states); |
| if (value) { |
| bitSet.set(currentIdx); |
| } else { |
| bitSet.clear(currentIdx); |
| } |
| return new CheckIntervalData(length, (currentIdx + 1) % length, Math.min(count + 1, length), bitSet, checkIntervalStart); |
| } |
| |
| @Override |
| public String toString() { |
| return "CheckIntervalData{states=" + asList(states) + ", checkIntervalStart=" + checkIntervalStart + '}'; |
| } |
| } |
| } |