blob: 03de72d9e3f4084d92792867c07db23fd3661a8a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.sdk.java.storage;
import static org.apache.flink.statefun.sdk.java.storage.StateValueContexts.StateValueContext;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
import org.apache.flink.statefun.sdk.java.ValueSpec;
import org.apache.flink.statefun.sdk.java.types.Type;
import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.apache.flink.statefun.sdk.shaded.com.google.protobuf.ByteString;
import org.junit.Test;
public class ConcurrentAddressScopedStorageTest {
@Test
public void exampleUsage() {
final ValueSpec<Integer> stateSpec1 = ValueSpec.named("state_1").withIntType();
final ValueSpec<Boolean> stateSpec2 = ValueSpec.named("state_2").withBooleanType();
final List<StateValueContext<?>> testStateValues =
testStateValues(stateValue(stateSpec1, 91), stateValue(stateSpec2, true));
final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
assertThat(storage.get(stateSpec1), is(Optional.of(91)));
assertThat(storage.get(stateSpec2), is(Optional.of(true)));
}
@Test
public void getNullValueCell() {
final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, null));
final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
assertThat(storage.get(stateSpec), is(Optional.empty()));
}
@Test
public void setCell() {
final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
storage.set(stateSpec, 1111);
assertThat(storage.get(stateSpec), is(Optional.of(1111)));
}
@Test
public void setMutableTypeCell() {
final ValueSpec<TestMutableType.Type> stateSpec =
ValueSpec.named("state").withCustomType(new TestMutableType());
final List<StateValueContext<?>> testStateValues =
testStateValues(stateValue(stateSpec, new TestMutableType.Type("hello")));
final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
final TestMutableType.Type newValue = new TestMutableType.Type("hello again!");
storage.set(stateSpec, newValue);
// mutations after a set should not have any effect
newValue.mutate("this value should not be written to storage!");
assertThat(storage.get(stateSpec), is(Optional.of(new TestMutableType.Type("hello again!"))));
}
@Test
public void clearCell() {
final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
storage.remove(stateSpec);
assertThat(storage.get(stateSpec), is(Optional.empty()));
}
@Test
public void clearMutableTypeCell() {
final ValueSpec<TestMutableType.Type> stateSpec =
ValueSpec.named("state").withCustomType(new TestMutableType());
List<StateValueContext<?>> testStateValues =
testStateValues(stateValue(stateSpec, new TestMutableType.Type("hello")));
final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
storage.remove(stateSpec);
assertThat(storage.get(stateSpec), is(Optional.empty()));
}
@Test(expected = IllegalStorageAccessException.class)
public void getNonExistingCell() {
final AddressScopedStorage storage =
new ConcurrentAddressScopedStorage(Collections.emptyList());
storage.get(ValueSpec.named("does_not_exist").withIntType());
}
@Test(expected = IllegalStorageAccessException.class)
public void setNonExistingCell() {
final AddressScopedStorage storage =
new ConcurrentAddressScopedStorage(Collections.emptyList());
storage.set(ValueSpec.named("does_not_exist").withIntType(), 999);
}
@Test(expected = IllegalStorageAccessException.class)
public void clearNonExistingCell() {
final AddressScopedStorage storage =
new ConcurrentAddressScopedStorage(Collections.emptyList());
storage.remove(ValueSpec.named("does_not_exist").withIntType());
}
@Test(expected = IllegalStorageAccessException.class)
public void setToNull() {
final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
storage.set(stateSpec, null);
}
@Test(expected = IllegalStorageAccessException.class)
public void getWithWrongType() {
final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
storage.get(ValueSpec.named("state").withBooleanType());
}
@Test(expected = IllegalStorageAccessException.class)
public void setWithWrongType() {
final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
storage.set(ValueSpec.named("state").withBooleanType(), true);
}
@Test(expected = IllegalStorageAccessException.class)
public void clearWithWrongType() {
final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
storage.remove(ValueSpec.named("state").withBooleanType());
}
private static List<StateValueContext<?>> testStateValues(StateValueContext<?>... testValues) {
return Arrays.asList(testValues);
}
private static <T> StateValueContext<T> stateValue(ValueSpec<T> spec, T value) {
final ToFunction.PersistedValue protocolValue =
ToFunction.PersistedValue.newBuilder()
.setStateName(spec.name())
.setStateValue(
TypedValue.newBuilder()
.setTypename(value == null ? "" : spec.type().typeName().asTypeNameString())
.setHasValue(value != null)
.setValue(toByteString(spec.type(), value)))
.build();
return new StateValueContext<>(spec, protocolValue);
}
private static <T> ByteString toByteString(Type<T> type, T value) {
if (value == null) {
return ByteString.EMPTY;
}
return ByteString.copyFrom(type.typeSerializer().serialize(value).toByteArray());
}
}