blob: b806df190e1ec332b1b3cdfeab6c5115a2883ed1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.sdk.state;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.annotations.ForRuntime;
import org.apache.flink.statefun.sdk.annotations.Persisted;
/**
* A {@link PersistedStateRegistry} can be used to register persisted state, such as a {@link
* PersistedValue} or {@link PersistedTable}, etc. All state that is registered via this registry is
* persisted and maintained by the system for fault-tolerance.
*
* <p>Created state registries must be bound to the system by using the {@link Persisted}
* annotation. Please see the class-level Javadoc of {@link StatefulFunction} for an example on how
* to do that.
*
* @see StatefulFunction
*/
public final class PersistedStateRegistry {
private final Map<String, Object> registeredStates = new HashMap<>();
private StateBinder stateBinder;
/**
* The type of the function that this registry is bound to. This is {@code NULL} if this registry
* is not bounded.
*/
@Nullable private FunctionType functionType;
public PersistedStateRegistry() {
this.stateBinder = new NonFaultTolerantStateBinder();
}
/**
* Registers a {@link PersistedValue}. If a registered state already exists for the specified name
* of the value, the registration fails.
*
* @param valueState the value state to register.
* @param <T> the type of the value.
* @throws IllegalStateException if a previous registration exists for the given state name.
*/
public <T> void registerValue(PersistedValue<T> valueState) {
acceptRegistrationOrThrowIfPresent(valueState.name(), valueState);
}
/**
* Registers a {@link PersistedTable}. If a registered state already exists for the specified name
* of the table, the registration fails.
*
* @param tableState the table state to register.
* @param <K> the type of the keys.
* @param <V> the type of the values.
* @throws IllegalStateException if a previous registration exists for the given state name.
*/
public <K, V> void registerTable(PersistedTable<K, V> tableState) {
acceptRegistrationOrThrowIfPresent(tableState.name(), tableState);
}
/**
* Registers a {@link PersistedAppendingBuffer}. If a registered state already exists for the
* specified name of the table, the registration fails.
*
* @param bufferState the appending buffer to register.
* @param <E> the type of the buffer elements.
* @throws IllegalStateException if a previous registration exists for the given state name.
*/
public <E> void registerAppendingBuffer(PersistedAppendingBuffer<E> bufferState) {
acceptRegistrationOrThrowIfPresent(bufferState.name(), bufferState);
}
/**
* Binds this state registry to a given function. All existing registered state in this registry
* will also be bound to the system.
*
* @param stateBinder the new fault-tolerant state binder to use.
* @param functionType the type of the function that this registry is being bound to.
* @throws IllegalStateException if the registry was attempted to be bound more than once.
*/
@ForRuntime
void bind(StateBinder stateBinder, FunctionType functionType) {
if (this.functionType != null) {
throw new IllegalStateException(
"This registry was already bound to function type: "
+ this.functionType
+ ", attempting to rebind to function type: "
+ functionType);
}
this.stateBinder = Objects.requireNonNull(stateBinder);
this.functionType = Objects.requireNonNull(functionType);
registeredStates.values().forEach(state -> stateBinder.bind(state, functionType));
}
private void acceptRegistrationOrThrowIfPresent(String stateName, Object newStateObject) {
final Object previousRegistration = registeredStates.get(stateName);
if (previousRegistration != null) {
throw new IllegalStateException(
String.format(
"State name '%s' was registered twice; previous registered state object with the same name was a %s, attempting to register a new %s under the same name.",
stateName, previousRegistration, newStateObject));
}
registeredStates.put(stateName, newStateObject);
stateBinder.bind(newStateObject, functionType);
}
private static final class NonFaultTolerantStateBinder extends StateBinder {
@Override
public void bindValue(PersistedValue<?> persistedValue, FunctionType functionType) {}
@Override
public void bindTable(PersistedTable<?, ?> persistedTable, FunctionType functionType) {}
@Override
public void bindAppendingBuffer(
PersistedAppendingBuffer<?> persistedAppendingBuffer, FunctionType functionType) {}
}
}