| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nifi.state; |
| |
| import org.apache.nifi.annotation.behavior.Stateful; |
| import org.apache.nifi.components.state.Scope; |
| import org.apache.nifi.components.state.StateManager; |
| import org.apache.nifi.components.state.StateMap; |
| import org.junit.jupiter.api.Assertions; |
| |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.Map; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| public class MockStateManager implements StateManager { |
| private final AtomicInteger versionIndex = new AtomicInteger(0); |
| |
| private StateMap localStateMap = new MockStateMap(null, -1L); |
| private StateMap clusterStateMap = new MockStateMap(null, -1L); |
| |
| private volatile boolean failToGetLocalState = false; |
| private volatile boolean failToSetLocalState = false; |
| private volatile boolean failToGetClusterState = false; |
| private volatile boolean failToSetClusterState = false; |
| private volatile boolean ignoreAnnotations = false; |
| |
| private final AtomicLong localRetrievedCount = new AtomicLong(0L); |
| private final AtomicLong clusterRetrievedCount = new AtomicLong(0L); |
| |
| private final boolean usesLocalState; |
| private final boolean usesClusterState; |
| |
| public MockStateManager(final Object component) { |
| final Stateful stateful = component.getClass().getAnnotation(Stateful.class); |
| if (stateful == null) { |
| usesLocalState = false; |
| usesClusterState = false; |
| } else { |
| final Scope[] scopes = stateful.scopes(); |
| boolean local = false; |
| boolean cluster = false; |
| |
| for (final Scope scope : scopes) { |
| if (scope == Scope.LOCAL) { |
| local = true; |
| } else if (scope == Scope.CLUSTER) { |
| cluster = true; |
| } |
| } |
| |
| usesLocalState = local; |
| usesClusterState = cluster; |
| } |
| } |
| |
| public void reset() { |
| clusterStateMap = new MockStateMap(null, -1L); |
| localStateMap = new MockStateMap(null, -1L); |
| } |
| |
| @Override |
| public synchronized void setState(final Map<String, String> state, final Scope scope) throws IOException { |
| verifyAnnotation(scope); |
| verifyCanSet(scope); |
| final StateMap stateMap = new MockStateMap(state, versionIndex.incrementAndGet()); |
| |
| if (scope == Scope.CLUSTER) { |
| clusterStateMap = stateMap; |
| } else { |
| localStateMap = stateMap; |
| } |
| } |
| |
| @Override |
| public synchronized StateMap getState(final Scope scope) throws IOException { |
| verifyAnnotation(scope); |
| verifyCanGet(scope); |
| return retrieveState(scope); |
| } |
| |
| private synchronized StateMap retrieveState(final Scope scope) { |
| verifyAnnotation(scope); |
| if (scope == Scope.CLUSTER) { |
| clusterRetrievedCount.incrementAndGet(); |
| return clusterStateMap; |
| } else { |
| localRetrievedCount.incrementAndGet(); |
| return localStateMap; |
| } |
| } |
| |
| public long getRetrievalCount(final Scope scope) { |
| switch (scope) { |
| case CLUSTER: |
| return clusterRetrievedCount.get(); |
| case LOCAL: |
| return localRetrievedCount.get(); |
| default: |
| throw new IllegalArgumentException("Invalid scope: " + scope); |
| } |
| } |
| |
| @Override |
| public synchronized boolean replace(final StateMap oldValue, final Map<String, String> newValue, final Scope scope) throws IOException { |
| verifyAnnotation(scope); |
| if (scope == Scope.CLUSTER) { |
| if (oldValue == clusterStateMap) { |
| verifyCanSet(scope); |
| clusterStateMap = new MockStateMap(newValue, versionIndex.incrementAndGet()); |
| return true; |
| } |
| |
| return false; |
| } else { |
| if (oldValue == localStateMap) { |
| verifyCanSet(scope); |
| localStateMap = new MockStateMap(newValue, versionIndex.incrementAndGet()); |
| return true; |
| } |
| |
| return false; |
| } |
| } |
| |
| @Override |
| public synchronized void clear(final Scope scope) throws IOException { |
| verifyAnnotation(scope); |
| setState(Collections.emptyMap(), scope); |
| } |
| |
| private void verifyCanSet(final Scope scope) throws IOException { |
| final boolean failToSet = (scope == Scope.LOCAL) ? failToSetLocalState : failToSetClusterState; |
| if (failToSet) { |
| throw new IOException("Unit Test configured to throw IOException if " + scope + " State is set"); |
| } |
| } |
| |
| private void verifyCanGet(final Scope scope) throws IOException { |
| final boolean failToGet = (scope == Scope.LOCAL) ? failToGetLocalState : failToGetClusterState; |
| if (failToGet) { |
| throw new IOException("Unit Test configured to throw IOException if " + scope + " State is retrieved"); |
| } |
| } |
| |
| public void setIgnoreAnnotations(final boolean ignore) { |
| this.ignoreAnnotations = ignore; |
| } |
| |
| private void verifyAnnotation(final Scope scope) { |
| if (ignoreAnnotations) { |
| return; |
| } |
| |
| // ensure that the @Stateful annotation is present with the appropriate Scope |
| if ((scope == Scope.LOCAL && !usesLocalState) || (scope == Scope.CLUSTER && !usesClusterState)) { |
| Assertions.fail("Component is attempting to set or retrieve state with a scope of " + scope + " but does not declare that it will use " |
| + scope + " state. A @Stateful annotation should be added to the component with a scope of " + scope); |
| } |
| } |
| |
| private String getValue(final String key, final Scope scope) { |
| final StateMap stateMap; |
| if (scope == Scope.CLUSTER) { |
| stateMap = clusterStateMap; |
| } else { |
| stateMap = localStateMap; |
| } |
| |
| return stateMap.get(key); |
| } |
| |
| // |
| // assertion methods to make unit testing easier |
| // |
| /** |
| * Ensures that the state with the given key and scope is set to the given value, or else the test will fail |
| * |
| * @param key the state key |
| * @param value the expected value |
| * @param scope the scope |
| */ |
| public void assertStateEquals(final String key, final String value, final Scope scope) { |
| Assertions.assertEquals(value, getValue(key, scope)); |
| } |
| |
| /** |
| * Ensures that the state is equal to the given values |
| * |
| * @param stateValues the values expected |
| * @param scope the scope to compare the stateValues against |
| */ |
| public void assertStateEquals(final Map<String, String> stateValues, final Scope scope) { |
| final StateMap stateMap = retrieveState(scope); |
| Assertions.assertEquals(stateValues, stateMap.toMap()); |
| } |
| |
| /** |
| * Ensures that the state is not equal to the given values |
| * |
| * @param stateValues the unexpected values |
| * @param scope the scope to compare the stateValues against |
| */ |
| public void assertStateNotEquals(final Map<String, String> stateValues, final Scope scope) { |
| final StateMap stateMap = retrieveState(scope); |
| Assertions.assertNotSame(stateValues, stateMap.toMap()); |
| } |
| |
| /** |
| * Ensures that the state with the given key and scope is not set to the given value, or else the test will fail |
| * |
| * @param key the state key |
| * @param value the unexpected value |
| * @param scope the scope |
| */ |
| public void assertStateNotEquals(final String key, final String value, final Scope scope) { |
| Assertions.assertNotEquals(value, getValue(key, scope)); |
| } |
| |
| /** |
| * Ensures that some value is set for the given key and scope, or else the test will fail |
| * |
| * @param key the state key |
| * @param scope the scope |
| */ |
| public void assertStateSet(final String key, final Scope scope) { |
| Assertions.assertNotNull(getValue(key, scope), "Expected state to be set for key " + key + " and scope " + scope + ", but it was not set"); |
| } |
| |
| /** |
| * Ensures that no value is set for the given key and scope, or else the test will fail |
| * |
| * @param key the state key |
| * @param scope the scope |
| */ |
| public void assertStateNotSet(final String key, final Scope scope) { |
| Assertions.assertNull(getValue(key, scope), "Expected state not to be set for key " + key + " and scope " + scope + ", but it was set"); |
| } |
| |
| /** |
| * Ensures that the state was set for the given scope, regardless of what the value was. |
| * |
| * @param scope the scope |
| */ |
| public void assertStateSet(final Scope scope) { |
| final StateMap stateMap = (scope == Scope.CLUSTER) ? clusterStateMap : localStateMap; |
| Assertions.assertNotSame(-1L, stateMap.getVersion(), "Expected state to be set for Scope " + scope + ", but it was not set"); |
| } |
| |
| /** |
| * Ensures that state was not set for any scope |
| */ |
| public void assertStateNotSet() { |
| assertStateNotSet(Scope.CLUSTER); |
| assertStateNotSet(Scope.LOCAL); |
| } |
| |
| /** |
| * Ensures that the state was not set for the given scope |
| * |
| * @param scope the scope |
| */ |
| public void assertStateNotSet(final Scope scope) { |
| final StateMap stateMap = (scope == Scope.CLUSTER) ? clusterStateMap : localStateMap; |
| Assertions.assertEquals(-1L, stateMap.getVersion(), "Expected state not to be set for Scope " + scope + ", but it was set"); |
| } |
| |
| /** |
| * Specifies whether or not the State Manager should throw an IOException when state is set for the given scope. |
| * Note that calls to {@link #replace(StateMap, Map, Scope)} will fail only if the state would be set (i.e., if |
| * we call replace and the StateMap does not match the old value, it will not fail). |
| * |
| * Also note that if setting state is set to fail, clearing will also fail, as clearing is thought of as setting the |
| * state to empty |
| * |
| * @param scope the scope that should (or should not) fail |
| * @param fail whether or not setting state should fail |
| */ |
| public void setFailOnStateSet(final Scope scope, final boolean fail) { |
| if (scope == Scope.LOCAL) { |
| failToSetLocalState = fail; |
| } else { |
| failToSetClusterState = fail; |
| } |
| } |
| |
| /** |
| * Specifies whether or not the State Manager should throw an IOException when state is retrieved for the given scope. |
| * |
| * @param scope the scope that should (or should not) fail |
| * @param fail whether or not retrieving state should fail |
| */ |
| public void setFailOnStateGet(final Scope scope, final boolean fail) { |
| if (scope == Scope.LOCAL) { |
| failToGetLocalState = fail; |
| } else { |
| failToGetClusterState = fail; |
| } |
| } |
| } |