Merge pull request #4 from fpapon/GERONIMO-6771

[GERONIMO-6771] Reduce memory usage of circuit breaker by using BitSet
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/circuitbreaker/CircuitBreakerInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/circuitbreaker/CircuitBreakerInterceptor.java
index 1c6bd9a..2365f98 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/circuitbreaker/CircuitBreakerInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/circuitbreaker/CircuitBreakerInterceptor.java
@@ -18,22 +18,6 @@
  */
 package org.apache.safeguard.impl.circuitbreaker;
 
-import static java.util.Arrays.asList;
-
-import java.io.Serializable;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Stream;
-
-import javax.annotation.Priority;
-import javax.enterprise.context.ApplicationScoped;
-import javax.inject.Inject;
-import javax.interceptor.AroundInvoke;
-import javax.interceptor.Interceptor;
-import javax.interceptor.InvocationContext;
-
 import org.apache.safeguard.impl.annotation.AnnotationFinder;
 import org.apache.safeguard.impl.cache.Key;
 import org.apache.safeguard.impl.cache.UnwrappedCache;
@@ -43,6 +27,22 @@
 import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException;
 import org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceDefinitionException;
 
+import javax.annotation.Priority;
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+import javax.interceptor.AroundInvoke;
+import javax.interceptor.Interceptor;
+import javax.interceptor.InvocationContext;
+import java.io.Serializable;
+import java.util.BitSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+
+import static java.util.Arrays.asList;
+
 @CircuitBreaker
 @Interceptor
 @Priority(Interceptor.Priority.PLATFORM_AFTER + 12)
@@ -110,16 +110,18 @@
                 final long now = now();
                 final double currentFailureRatio = getCurrentFailureRatio(nextData);
                 breaker.closedDuration.set(now - currentData.checkIntervalStart);
-                if (nextData.states.length >= breaker.volumeThreshold && currentFailureRatio >= breaker.failureRatio) {
-                    breaker.opened.inc();
+                if (nextData.count >= breaker.volumeThreshold && currentFailureRatio >= breaker.failureRatio) {
+                    if (breaker.opened != null) {
+                        breaker.opened.inc();
+                    }
                     return OPEN;
                 }
                 return this;
             }
 
             private double getCurrentFailureRatio(final CheckIntervalData data) {
-                return data.states.length == 0 ?  0 :
-                        (Stream.of(data.states).filter(it -> !it).count() / (1. * data.states.length));
+                final int length = data.count;
+                return length == 0 ? 0 : ((length - data.states.cardinality()) * 1. / length);
             }
         },
         HALF_OPEN {
@@ -128,12 +130,13 @@
                                            final CheckIntervalData currentData,
                                            final CheckIntervalData nextData) {
                 breaker.halfOpenDuration.set(now() - currentData.checkIntervalStart);
-                if (Stream.of(nextData.states).anyMatch(it -> !it)) { // a exception was thrown
+                final int falseCount = nextData.count - nextData.states.cardinality();
+                if (falseCount > 0) { // an error occurred
                     return OPEN;
                 }
 
-                final long successes = Stream.of(nextData.states).filter(it -> it).count();
-                if (successes == nextData.states.length && successes >= breaker.successThreshold) {
+                final int cardinality = nextData.states.cardinality();
+                if (cardinality == nextData.count && cardinality >= breaker.successThreshold) {
                     return CLOSED;
                 }
                 return this;
@@ -148,7 +151,7 @@
                 if (nextData.checkIntervalStart != currentData.checkIntervalStart) {
                     return breaker.successThreshold == 1 ? CLOSED : HALF_OPEN;
                 }
-                if (Stream.of(nextData.states).filter(it -> it).count() > breaker.successThreshold) {
+                if (nextData.states.cardinality() > breaker.successThreshold) {
                     return breaker.successThreshold == 1 ? CLOSED : HALF_OPEN;
                 }
                 return this;
@@ -250,9 +253,14 @@
     }
 
     public static class CircuitBreakerImpl {
-        private static final Boolean[] EMPTY_ARRAY = new Boolean[0];
-        private static final Boolean[] FIRST_SUCCESS_ARRAY = {Boolean.TRUE};
-        private static final Boolean[] FIRST_FAILURE_ARRAY = {Boolean.FALSE};
+        private static final BitSet EMPTY_ARRAY = new BitSet();
+        private static final BitSet FIRST_SUCCESS_ARRAY = new BitSet();
+        private static final BitSet FIRST_FAILURE_ARRAY = new BitSet();
+
+        static {
+            FIRST_FAILURE_ARRAY.set(0);
+            FIRST_FAILURE_ARRAY.clear(0);
+        }
 
         private final boolean disabled;
         private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);
@@ -279,7 +287,7 @@
                            final FaultToleranceMetrics.Counter callsPrevented,
                            final FaultToleranceMetrics.Counter opened) {
             this.disabled = disabled;
-            this.checkIntervalData = new AtomicReference<>(new CheckIntervalData(volumeThreshold, EMPTY_ARRAY, 0));
+            this.checkIntervalData = new AtomicReference<>(new CheckIntervalData(volumeThreshold, 0, 0, EMPTY_ARRAY, 0));
             this.volumeThreshold = volumeThreshold;
             this.delay = delay;
             this.successThreshold = successThreshold;
@@ -313,7 +321,7 @@
             final State newState = currentState.isStateTransition(this, currentData, nextData);
             if (newState != currentState) {
                 state.compareAndSet(currentState, newState);
-                checkIntervalData.set(new CheckIntervalData(volumeThreshold, EMPTY_ARRAY, now()));
+                checkIntervalData.set(new CheckIntervalData(volumeThreshold, 0, 0, EMPTY_ARRAY, now()));
                 return newState != State.OPEN ? CheckResult.CLOSED_CHANGED : CheckResult.OPEN;
             }
             return newState != State.OPEN ? CheckResult.CLOSED : CheckResult.OPEN;
@@ -348,11 +356,11 @@
         private CheckIntervalData toNewData(final CheckType type, final long time) {
             switch (type) {
                 case FAILURE:
-                    return new CheckIntervalData(volumeThreshold, FIRST_FAILURE_ARRAY, time);
+                    return new CheckIntervalData(volumeThreshold, 1, 1, FIRST_FAILURE_ARRAY, time);
                 case SUCCESS:
-                    return new CheckIntervalData(volumeThreshold, FIRST_SUCCESS_ARRAY, time);
+                    return new CheckIntervalData(volumeThreshold, 1, 1, FIRST_SUCCESS_ARRAY, time);
                 case READ_ONLY:
-                    return new CheckIntervalData(volumeThreshold, EMPTY_ARRAY, time);
+                    return new CheckIntervalData(volumeThreshold, 0, 0, EMPTY_ARRAY, time);
                 default:
                     throw new IllegalArgumentException("unknown type " + type);
             }
@@ -361,34 +369,36 @@
 
     private static class CheckIntervalData {
         private final int length;
-        private final Boolean[] states; // todo: revise that but seems the spec sucks
+        private final int currentIdx;
+        private final int count;
+        private final BitSet states; // todo: revise that but seems the spec sucks
         private final long checkIntervalStart;
 
-        CheckIntervalData(final int length, final Boolean[] states, final long intervalStart) {
+        CheckIntervalData(final int length, final int currentIdx, final int count, final BitSet states, final long intervalStart) {
             this.length = length;
+            this.currentIdx = currentIdx;
+            this.count = count;
             this.states = states;
             this.checkIntervalStart = intervalStart;
         }
 
         private CheckIntervalData success() {
-            return new CheckIntervalData(length, nextArray(true), checkIntervalStart);
+            return next(true);
         }
 
         private CheckIntervalData failure() {
-            return new CheckIntervalData(length, nextArray(false), checkIntervalStart);
+            return next(false);
         }
 
-        private Boolean[] nextArray(final boolean value) {
-            final Boolean[] array = new Boolean[Math.min(length, states.length + 1)];
-            if (this.states.length > 0) {
-                if (this.states.length < array.length) {
-                    System.arraycopy(this.states, 0, array, 0, this.states.length);
-                } else {
-                    System.arraycopy(this.states, 1, array, 0, this.states.length - 1);
-                }
+        private CheckIntervalData next(final boolean value) {
+            final BitSet bitSet = new BitSet(length);
+            bitSet.or(states);
+            if (value) {
+                bitSet.set(currentIdx);
+            } else {
+                bitSet.clear(currentIdx);
             }
-            array[array.length - 1] = value;
-            return array;
+            return new CheckIntervalData(length, (currentIdx + 1) % length, Math.min(count + 1, length), bitSet, checkIntervalStart);
         }
 
         @Override