blob: adddcf5e329ffb9c87561dd9f8138a9eb5c3cc08 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.flink.core.state;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
import org.apache.flink.statefun.flink.core.TestUtils;
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.annotations.Persisted;
import org.apache.flink.statefun.sdk.state.Accessor;
import org.apache.flink.statefun.sdk.state.AppendingBufferAccessor;
import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
import org.apache.flink.statefun.sdk.state.PersistedStateRegistry;
import org.apache.flink.statefun.sdk.state.PersistedTable;
import org.apache.flink.statefun.sdk.state.PersistedValue;
import org.apache.flink.statefun.sdk.state.TableAccessor;
import org.junit.Test;
public class PersistedStatesTest {
// test collaborators
private final FakeState state = new FakeState();
// object under test
private final FlinkStateBinder binderUnderTest =
new FlinkStateBinder(state, TestUtils.FUNCTION_TYPE);
@Test
public void exampleUsage() {
PersistedStates.findAndBind(new SanityClass(), binderUnderTest);
assertThat(state.boundNames, hasItems("name", "last"));
}
@Test(expected = IllegalStateException.class)
public void nullValueField() {
PersistedStates.findAndBind(new NullValueClass(), binderUnderTest);
}
@Test
public void nonAnnotatedClass() {
PersistedStates.findAndBind(new IgnoreNonAnnotated(), binderUnderTest);
assertTrue(state.boundNames.isEmpty());
}
@Test
public void extendedClass() {
PersistedStates.findAndBind(new ChildClass(), binderUnderTest);
assertThat(state.boundNames, hasItems("parent", "child"));
}
@Test(expected = IllegalArgumentException.class)
public void staticPersistedFieldsAreNotAllowed() {
PersistedStates.findAndBind(new StaticPersistedValue(), binderUnderTest);
}
@Test
public void bindPersistedTable() {
PersistedStates.findAndBind(new PersistedTableValue(), binderUnderTest);
assertThat(state.boundNames, hasItems("table"));
}
@Test
public void bindPersistedAppendingBuffer() {
PersistedStates.findAndBind(new PersistedAppendingBufferState(), binderUnderTest);
assertThat(state.boundNames, hasItems("buffer"));
}
@Test
public void bindDynamicState() {
DynamicState dynamicState = new DynamicState();
PersistedStates.findAndBind(dynamicState, binderUnderTest);
dynamicState.process();
assertThat(
state.boundNames,
hasItems(
"in-constructor-value",
"in-constructor-table",
"in-constructor-buffer",
"post-constructor-value",
"post-constructor-table",
"post-constructor-buffer"));
}
@Test
public void bindComposedState() {
PersistedStates.findAndBind(new OuterClass(), binderUnderTest);
assertThat(state.boundNames, hasItems("inner"));
}
static final class SanityClass {
@SuppressWarnings("unused")
@Persisted
PersistedValue<String> name = PersistedValue.of("name", String.class);
@Persisted
@SuppressWarnings("unused")
PersistedValue<String> last = PersistedValue.of("last", String.class);
}
static final class NullValueClass {
@SuppressWarnings("unused")
@Persisted
PersistedValue<String> last;
}
abstract static class ParentClass {
@SuppressWarnings("unused")
@Persisted
PersistedValue<String> parent = PersistedValue.of("parent", String.class);
}
static final class ChildClass extends ParentClass {
@SuppressWarnings("unused")
@Persisted
PersistedValue<String> child = PersistedValue.of("child", String.class);
}
static final class IgnoreNonAnnotated {
@SuppressWarnings("unused")
PersistedValue<String> last = PersistedValue.of("last", String.class);
}
static final class StaticPersistedValue {
@Persisted
@SuppressWarnings("unused")
static PersistedValue<String> value = PersistedValue.of("static", String.class);
}
static final class PersistedTableValue {
@Persisted
@SuppressWarnings("unused")
PersistedTable<String, byte[]> value = PersistedTable.of("table", String.class, byte[].class);
}
static final class PersistedAppendingBufferState {
@Persisted
@SuppressWarnings("unused")
PersistedAppendingBuffer<Boolean> value = PersistedAppendingBuffer.of("buffer", Boolean.class);
}
static final class DynamicState {
@Persisted PersistedStateRegistry provider = new PersistedStateRegistry();
DynamicState() {
provider.registerValue(PersistedValue.of("in-constructor-value", String.class));
provider.registerTable(
PersistedTable.of("in-constructor-table", String.class, Integer.class));
provider.registerAppendingBuffer(
PersistedAppendingBuffer.of("in-constructor-buffer", String.class));
}
void process() {
provider.registerValue(PersistedValue.of("post-constructor-value", String.class));
provider.registerTable(
PersistedTable.of("post-constructor-table", String.class, Integer.class));
provider.registerAppendingBuffer(
PersistedAppendingBuffer.of("post-constructor-buffer", String.class));
}
}
static final class InnerClass {
@Persisted
@SuppressWarnings("unused")
PersistedValue<String> value = PersistedValue.of("inner", String.class);
}
static final class OuterClass {
@Persisted
@SuppressWarnings("unused")
InnerClass innerClass = new InnerClass();
}
private static final class FakeState implements State {
Set<String> boundNames = new HashSet<>();
@Override
public <T> Accessor<T> createFlinkStateAccessor(
FunctionType functionType, PersistedValue<T> persistedValue) {
boundNames.add(persistedValue.name());
return new Accessor<T>() {
T value;
@Override
public void set(T value) {
this.value = value;
}
@Override
public T get() {
return value;
}
@Override
public void clear() {
value = null;
}
};
}
@Override
public <K, V> TableAccessor<K, V> createFlinkStateTableAccessor(
FunctionType functionType, PersistedTable<K, V> persistedTable) {
boundNames.add(persistedTable.name());
return new TableAccessor<K, V>() {
Map<K, V> map = new HashMap<>();
@Override
public void set(K key, V value) {
map.put(key, value);
}
@Override
public V get(K key) {
return map.get(key);
}
@Override
public void remove(K key) {
map.remove(key);
}
@Override
public Iterable<Map.Entry<K, V>> entries() {
return map.entrySet();
}
@Override
public Iterable<K> keys() {
return map.keySet();
}
@Override
public Iterable<V> values() {
return map.values();
}
@Override
public void clear() {
map.clear();
}
};
}
@Override
public <E> AppendingBufferAccessor<E> createFlinkStateAppendingBufferAccessor(
FunctionType functionType, PersistedAppendingBuffer<E> persistedAppendingBuffer) {
boundNames.add(persistedAppendingBuffer.name());
return new AppendingBufferAccessor<E>() {
private List<E> list;
@Override
public void append(@Nonnull E element) {
if (list == null) {
list = new ArrayList<>();
}
list.add(element);
}
@Override
public void appendAll(@Nonnull List<E> elements) {
if (list == null) {
list = new ArrayList<>();
}
list.addAll(elements);
}
@Override
public void replaceWith(@Nonnull List<E> elements) {
list = elements;
}
@Nonnull
@Override
public Iterable<E> view() {
return list;
}
@Override
public void clear() {
list = null;
}
};
}
@Override
public void setCurrentKey(Address key) {
throw new UnsupportedOperationException();
}
}
}