workaround for the tck since they are broken + making CircuitBreakerTest passing
diff --git a/pom.xml b/pom.xml
index e5889f2..912b7c6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,7 +75,7 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<microprofile-fault-tolerance.version>1.1.3</microprofile-fault-tolerance.version>
- <owb.version>2.0.1</owb.version>
+ <owb.version>2.0.8</owb.version>
<arquillian.version>1.1.14.Final</arquillian.version>
<arquillian-weld-embedded.version>2.0.0.Final</arquillian-weld-embedded.version>
<cdi2-api.version>2.0</cdi2-api.version>
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/circuitbreaker/CircuitBreakerInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/circuitbreaker/CircuitBreakerInterceptor.java
index f7c54a8..65bb493 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/circuitbreaker/CircuitBreakerInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/circuitbreaker/CircuitBreakerInterceptor.java
@@ -18,9 +18,10 @@
*/
package org.apache.safeguard.impl.circuitbreaker;
+import static java.util.Arrays.asList;
+
import java.io.Serializable;
import java.lang.reflect.Method;
-import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -59,19 +60,22 @@
circuitBreaker = existing;
}
}
- if (!circuitBreaker.checkState()) {
+
+ final CheckResult state = circuitBreaker.performStateCheck(CheckType.READ_ONLY);
+ if (state == CheckResult.OPEN) {
circuitBreaker.callsPrevented.inc();
throw new CircuitBreakerOpenException(context.getMethod() + " circuit breaker is open");
}
try {
final Object result = context.proceed();
- circuitBreaker.callsSucceeded.inc();
+ if (state != CheckResult.CLOSED_CHANGED) { // a change triggers a reset we want to preserve
+ circuitBreaker.onSuccess();
+ }
return result;
} catch (final Exception e) {
if (circuitBreaker.failOn.length > 0 &&
Stream.of(circuitBreaker.failOn).anyMatch(it -> it.isInstance(e) || it.isInstance(e.getCause()))) {
- circuitBreaker.callsFailed.inc();
- circuitBreaker.incrementAndCheckState(1);
+ circuitBreaker.onFailure();
} else {
circuitBreaker.callsSucceeded.inc();
}
@@ -79,28 +83,80 @@
}
}
+ private static long now() {
+ return System.nanoTime();
+ }
+
+ private enum CheckType {
+ READ_ONLY,
+ FAILURE,
+ SUCCESS
+ }
+
private enum State {
CLOSED {
@Override
- public State oppositeState() {
- return OPEN;
+ public State isStateTransition(final CircuitBreakerImpl breaker,
+ final CheckIntervalData currentData,
+ final CheckIntervalData nextData) {
+ final long now = now();
+ final double currentFailureRatio = getCurrentFailureRatio(nextData);
+ if (nextData.states.length >= breaker.volumeThreshold && currentFailureRatio >= breaker.failureRatio) {
+ breaker.closedDuration.set(now - currentData.checkIntervalStart);
+ breaker.opened.inc();
+ return OPEN;
+ }
+ return this;
+ }
+
+ private double getCurrentFailureRatio(final CheckIntervalData data) {
+ return data.states.length == 0 ? 0 :
+ (Stream.of(data.states).filter(it -> !it).count() / (1. * data.states.length));
}
},
+ HALF_OPEN {
+ @Override
+ public State isStateTransition(final CircuitBreakerImpl breaker,
+ final CheckIntervalData currentData,
+ final CheckIntervalData nextData) {
+ if (Stream.of(nextData.states).anyMatch(it -> !it)) { // a exception was thrown
+ return OPEN;
+ }
+ final long successes = Stream.of(nextData.states).filter(it -> it).count();
+ if (successes == nextData.states.length && successes >= breaker.successThreshold) {
+ breaker.halfOpenDuration.set(now() - currentData.checkIntervalStart);
+ return CLOSED;
+ }
+ return this;
+ }
+ },
OPEN {
@Override
- public State oppositeState() {
- return CLOSED;
+ public State isStateTransition(final CircuitBreakerImpl breaker,
+ final CheckIntervalData currentData,
+ final CheckIntervalData nextData) {
+ if (nextData.checkIntervalStart != currentData.checkIntervalStart) {
+ breaker.openDuration.set(now() - currentData.checkIntervalStart);
+ return breaker.successThreshold == 1 ? CLOSED : HALF_OPEN;
+ }
+ if (Stream.of(nextData.states).filter(it -> it).count() > breaker.successThreshold) {
+ breaker.openDuration.set(now() - currentData.checkIntervalStart);
+ return breaker.successThreshold == 1 ? CLOSED : HALF_OPEN;
+ }
+ return this;
}
};
- /**
- * Returns the opposite state to the represented state. This is useful
- * for flipping the current state.
- *
- * @return the opposite state
- */
- public abstract State oppositeState();
+ private boolean isCheckIntervalFinished(final CircuitBreakerImpl breaker,
+ final CheckIntervalData currentData,
+ final long now) {
+ return (now - currentData.checkIntervalStart) > breaker.delay;
+ }
+
+ public abstract State isStateTransition(CircuitBreakerImpl breaker,
+ CheckIntervalData currentData,
+ CheckIntervalData nextData);
}
@ApplicationScoped
@@ -149,7 +205,7 @@
+ context.getMethod().getName() + ".circuitbreaker.";
final CircuitBreakerImpl circuitBreaker = new CircuitBreakerImpl(volumeThreshold, delay, successThreshold,
- delay, failOn, failureRatio, metrics.counter(metricsNameBase + "callsSucceeded.total",
+ failOn, failureRatio, metrics.counter(metricsNameBase + "callsSucceeded.total",
"Number of calls allowed to run by the circuit breaker that returned successfully"),
metrics.counter(metricsNameBase + "callsFailed.total",
"Number of calls allowed to run by the circuit breaker that then failed"),
@@ -167,16 +223,20 @@
}
}
- // from commons-lang - todo: refine
+ private enum CheckResult {
+ OPEN, CLOSED_CHANGED, CLOSED
+ }
+
public static class CircuitBreakerImpl {
- private static final Map<State, StateStrategy> STRATEGY_MAP = createStrategyMap();
+ private static final Boolean[] EMPTY_ARRAY = new Boolean[0];
+ private static final Boolean[] FIRST_SUCCESS_ARRAY = {Boolean.TRUE};
+ private static final Boolean[] FIRST_FAILURE_ARRAY = {Boolean.FALSE};
private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);
private final AtomicReference<CheckIntervalData> checkIntervalData;
- private final int openingThreshold;
- private final long openingInterval;
- private final int closingThreshold;
- private final long closingInterval;
+ private final int volumeThreshold;
+ private final long delay;
+ private final int successThreshold;
private final double failureRatio;
private final Class<? extends Throwable>[] failOn;
@@ -188,18 +248,16 @@
private final AtomicLong closedDuration = new AtomicLong();
private final FaultToleranceMetrics.Counter opened;
- CircuitBreakerImpl(final int openingThreshold, final long openingInterval, final int closingThreshold,
- final long closingInterval, final Class<? extends Throwable>[] failOn,
- final double failureRatio,
+ CircuitBreakerImpl(final int volumeThreshold, final long delay, final int successThreshold,
+ final Class<? extends Throwable>[] failOn, final double failureRatio,
final FaultToleranceMetrics.Counter callsSucceeded,
final FaultToleranceMetrics.Counter callsFailed,
final FaultToleranceMetrics.Counter callsPrevented,
final FaultToleranceMetrics.Counter opened) {
- this.checkIntervalData = new AtomicReference<>(new CheckIntervalData(0, 0));
- this.openingThreshold = openingThreshold;
- this.openingInterval = openingInterval;
- this.closingThreshold = closingThreshold;
- this.closingInterval = closingInterval;
+ this.checkIntervalData = new AtomicReference<>(new CheckIntervalData(volumeThreshold, EMPTY_ARRAY, 0));
+ this.volumeThreshold = volumeThreshold;
+ this.delay = delay;
+ this.successThreshold = successThreshold;
this.failOn = failOn;
this.failureRatio = failureRatio;
this.callsSucceeded = callsSucceeded;
@@ -208,23 +266,17 @@
this.opened = opened;
}
- protected static boolean isOpen(final State state) {
- return state == State.OPEN;
+ private void onSuccess() {
+ performStateCheck(CheckType.SUCCESS);
+ callsSucceeded.inc();
}
- protected void changeState(final State newState) {
- state.compareAndSet(newState.oppositeState(), newState);
+ private void onFailure() {
+ performStateCheck(CheckType.FAILURE);
+ callsFailed.inc();
}
- public boolean checkState() {
- return performStateCheck(0);
- }
-
- public boolean incrementAndCheckState(final Integer increment) {
- return performStateCheck(increment);
- }
-
- private boolean performStateCheck(final int increment) {
+ private CheckResult performStateCheck(final CheckType type) {
CheckIntervalData currentData;
CheckIntervalData nextData;
State currentState;
@@ -232,16 +284,15 @@
final long time = now();
currentState = state.get();
currentData = checkIntervalData.get();
- nextData = nextCheckIntervalData(increment, currentData, currentState, time);
+ nextData = nextCheckIntervalData(type, currentData, currentState, time);
} while (!updateCheckIntervalData(currentData, nextData));
- if (stateStrategy(currentState).isStateTransition(this, currentData, nextData)) {
- currentState = currentState.oppositeState();
- if (currentState == State.OPEN) {
- opened.inc();
- }
- changeStateAndStartNewCheckInterval(currentState);
+ final State newState = currentState.isStateTransition(this, currentData, nextData);
+ if (newState != currentState) {
+ state.compareAndSet(currentState, newState);
+ checkIntervalData.set(new CheckIntervalData(volumeThreshold, EMPTY_ARRAY, now()));
+ return newState != State.OPEN ? CheckResult.CLOSED_CHANGED : CheckResult.OPEN;
}
- return !isOpen(currentState);
+ return newState != State.OPEN ? CheckResult.CLOSED : CheckResult.OPEN;
}
private boolean updateCheckIntervalData(final CheckIntervalData currentData,
@@ -250,98 +301,75 @@
|| checkIntervalData.compareAndSet(currentData, nextData);
}
- private void changeStateAndStartNewCheckInterval(final State newState) {
- changeState(newState);
- checkIntervalData.set(new CheckIntervalData(0, now()));
- }
-
- private CheckIntervalData nextCheckIntervalData(final int increment,
- final CheckIntervalData currentData, final State currentState, final long time) {
- CheckIntervalData nextData;
- if (stateStrategy(currentState).isCheckIntervalFinished(this, currentData, time)) {
- nextData = new CheckIntervalData(increment, time);
+ private CheckIntervalData nextCheckIntervalData(final CheckType type,
+ final CheckIntervalData currentData,
+ final State currentState,
+ final long time) {
+ if (currentState.isCheckIntervalFinished(this, currentData, time)) {
+ return toNewData(type, time);
} else {
- nextData = currentData.increment(increment);
- }
- return nextData;
- }
-
- static long now() {
- return System.nanoTime();
- }
-
- private static StateStrategy stateStrategy(final State state) {
- return STRATEGY_MAP.get(state);
- }
-
- private static Map<State, StateStrategy> createStrategyMap() {
- final Map<State, StateStrategy> map = new EnumMap<>(State.class);
- map.put(State.CLOSED, new StateStrategyClosed());
- map.put(State.OPEN, new StateStrategyOpen());
- return map;
- }
-
- private static class CheckIntervalData {
- private final int eventCount;
- private final long checkIntervalStart;
-
- CheckIntervalData(final int count, final long intervalStart) {
- eventCount = count;
- checkIntervalStart = intervalStart;
- }
-
- private CheckIntervalData increment(final int delta) {
- return (delta == 0) ? this : new CheckIntervalData(eventCount + delta, checkIntervalStart);
- }
- }
-
- private abstract static class StateStrategy {
- private boolean isCheckIntervalFinished(final CircuitBreakerImpl breaker,
- final CheckIntervalData currentData, final long now) {
- return now - currentData.checkIntervalStart > fetchCheckInterval(breaker);
- }
-
- public abstract boolean isStateTransition(CircuitBreakerImpl breaker,
- CheckIntervalData currentData, CheckIntervalData nextData);
-
- protected abstract long fetchCheckInterval(CircuitBreakerImpl breaker);
- }
-
- private static class StateStrategyClosed extends StateStrategy {
- @Override
- public boolean isStateTransition(final CircuitBreakerImpl breaker,
- final CheckIntervalData currentData, final CheckIntervalData nextData) {
- final long now = now();
- final boolean result =
- nextData.eventCount >= breaker.openingThreshold || (now != currentData.checkIntervalStart && (currentData.eventCount / (now - currentData.checkIntervalStart)) > breaker.failureRatio);
- if (!result) {
- breaker.closedDuration.set(now - currentData.checkIntervalStart);
+ switch (type) {
+ case FAILURE:
+ return currentData.failure();
+ case SUCCESS:
+ return currentData.success();
+ case READ_ONLY:
+ return currentData;
+ default:
+ throw new IllegalArgumentException("unknown type " + type);
}
- return result;
- }
-
- @Override
- protected long fetchCheckInterval(final CircuitBreakerImpl breaker) {
- return breaker.openingInterval;
}
}
- private static class StateStrategyOpen extends StateStrategy {
- @Override
- public boolean isStateTransition(final CircuitBreakerImpl breaker,
- final CheckIntervalData currentData, final CheckIntervalData nextData) {
- final boolean result =
- nextData.checkIntervalStart != currentData.checkIntervalStart && currentData.eventCount <= breaker.closingThreshold;
- if (!result) {
- breaker.openDuration.set(now() - currentData.checkIntervalStart);
- }
- return result;
+ private CheckIntervalData toNewData(final CheckType type, final long time) {
+ switch (type) {
+ case FAILURE:
+ return new CheckIntervalData(volumeThreshold, FIRST_FAILURE_ARRAY, time);
+ case SUCCESS:
+ return new CheckIntervalData(volumeThreshold, FIRST_SUCCESS_ARRAY, time);
+ case READ_ONLY:
+ return new CheckIntervalData(volumeThreshold, EMPTY_ARRAY, time);
+ default:
+ throw new IllegalArgumentException("unknown type " + type);
}
+ }
+ }
- @Override
- protected long fetchCheckInterval(final CircuitBreakerImpl breaker) {
- return breaker.closingInterval;
+ private static class CheckIntervalData {
+ private final int length;
+ private final Boolean[] states; // todo: revise that
+ private final long checkIntervalStart;
+
+ CheckIntervalData(final int length, final Boolean[] states, final long intervalStart) {
+ this.length = length;
+ this.states = states;
+ this.checkIntervalStart = intervalStart;
+ }
+
+ private CheckIntervalData success() {
+ return new CheckIntervalData(length, nextArray(true), checkIntervalStart);
+ }
+
+ private CheckIntervalData failure() {
+ return new CheckIntervalData(length, nextArray(false), checkIntervalStart);
+ }
+
+ private Boolean[] nextArray(final boolean value) {
+ final Boolean[] array = new Boolean[Math.min(length, states.length + 1)];
+ if (this.states.length > 0) {
+ if (this.states.length < array.length) {
+ System.arraycopy(this.states, 0, array, 0, this.states.length);
+ } else {
+ System.arraycopy(this.states, 1, array, 0, this.states.length - 1);
+ }
}
+ array[array.length - 1] = value;
+ return array;
+ }
+
+ @Override
+ public String toString() {
+ return "CheckIntervalData{states=" + asList(states) + ", checkIntervalStart=" + checkIntervalStart + '}';
}
}
}
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/metrics/MicroprofileMetricsImpl.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/metrics/MicroprofileMetricsImpl.java
index 0a2fa9f..dde4a07 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/metrics/MicroprofileMetricsImpl.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/metrics/MicroprofileMetricsImpl.java
@@ -40,7 +40,7 @@
@Override
public Counter counter(final String name, final String description) {
final org.eclipse.microprofile.metrics.Counter delegate = registry.counter(
- new Metadata(name, name, description, COUNTER, "none"));
+ reusable(new Metadata(name, name, description, COUNTER, "none")));
return new Counter() {
@Override
public void inc() {
@@ -57,14 +57,19 @@
@Override
public void gauge(final String name, final String description, final String unit,
final Supplier<Long> supplier) {
- registry.register(new Metadata(name, name, description, GAUGE, unit),
+ registry.register(reusable(new Metadata(name, name, description, GAUGE, unit)),
(org.eclipse.microprofile.metrics.Gauge<Long>) supplier::get);
}
@Override
public Histogram histogram(final String name, final String description) {
final org.eclipse.microprofile.metrics.Histogram histogram = registry.histogram(
- new Metadata(name, name, description, HISTOGRAM, "none"));
+ reusable(new Metadata(name, name, description, HISTOGRAM, "none")));
return histogram::update;
}
+
+ private Metadata reusable(final Metadata metadata) {
+ metadata.setReusable(true);
+ return metadata;
+ }
}
diff --git a/safeguard-impl/src/test/java/org/apache/safeguard/ft/tck/SafeguardTCKExtension.java b/safeguard-impl/src/test/java/org/apache/safeguard/ft/tck/SafeguardTCKExtension.java
index 858dc0e..c665c52 100644
--- a/safeguard-impl/src/test/java/org/apache/safeguard/ft/tck/SafeguardTCKExtension.java
+++ b/safeguard-impl/src/test/java/org/apache/safeguard/ft/tck/SafeguardTCKExtension.java
@@ -1,30 +1,58 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.safeguard.ft.tck;
+import java.util.stream.Stream;
+
+import javax.enterprise.inject.spi.CDI;
+
+import org.apache.safeguard.impl.circuitbreaker.CircuitBreakerInterceptor;
+import org.apache.webbeans.spi.ContainerLifecycle;
+import org.eclipse.microprofile.metrics.MetricRegistry;
import org.jboss.arquillian.container.test.spi.client.deployment.ApplicationArchiveProcessor;
+import org.jboss.arquillian.core.api.Instance;
+import org.jboss.arquillian.core.api.annotation.Inject;
+import org.jboss.arquillian.core.api.annotation.Observes;
import org.jboss.arquillian.core.spi.LoadableExtension;
+import org.jboss.arquillian.test.spi.event.suite.After;
public class SafeguardTCKExtension implements LoadableExtension {
+
@Override
public void register(final ExtensionBuilder extensionBuilder) {
- extensionBuilder.service(ApplicationArchiveProcessor.class, ArchiveAppender.class);
+ extensionBuilder.service(ApplicationArchiveProcessor.class, ArchiveAppender.class)
+ .observer(LeakingTCKWorkaround.class);
+ }
+
+ public static class LeakingTCKWorkaround {
+
+ @Inject
+ private Instance<ContainerLifecycle> lifecycle;
+
+ public void clearCache(@Observes(precedence = -1) final After event) {
+ final CDI<Object> cdi = CDI.current();
+ final MetricRegistry registry = cdi.select(MetricRegistry.class).get();
+ cdi.select(CircuitBreakerInterceptor.Cache.class).get().getCircuitBreakers().clear();
+ Stream.concat(registry.getGauges().keySet().stream(), registry.getCounters().keySet().stream())
+ .filter(it -> it.startsWith("ft.") && it.startsWith(".circuitbreaker."))
+ .forEach(registry::remove);
+ }
}
}
diff --git a/safeguard-impl/src/test/resources/dev.xml b/safeguard-impl/src/test/resources/dev.xml
index f392d51..687fdb7 100644
--- a/safeguard-impl/src/test/resources/dev.xml
+++ b/safeguard-impl/src/test/resources/dev.xml
@@ -19,12 +19,14 @@
<suite name="Dev Manual Test Run" verbose="2" configfailurepolicy="continue" >
<test name="Manual Run">
<packages> <!-- all TCK -->
- <!--<package name="org.eclipse.microprofile.fault.tolerance.tck" />-->
+ <!--
+ <package name="org.eclipse.microprofile.fault.tolerance.tck" />
+ -->
</packages>
- <classes> <!-- for dev you can filter them out -->
- <class name="org.eclipse.microprofile.fault.tolerance.tck.FallbackTest">
+ <classes>
+ <class name="org.eclipse.microprofile.fault.tolerance.tck.CircuitBreakerTest">
<methods>
- <include name="testClassLevelFallbackSuccess" />
+
</methods>
</class>
</classes>