blob: 4826606b4b24343d630d8d7aac01941ebac029fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.sdk.state;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.annotations.ForRuntime;
import org.apache.flink.statefun.sdk.annotations.Persisted;
/**
* A {@link PersistedTable} is a table (collection of keys and values) registered within {@link
* StatefulFunction}s and is persisted and maintained by the system for fault-tolerance.
*
* <p>Created persisted tables must be registered by using the {@link Persisted} annotation. Please
* see the class-level Javadoc of {@link StatefulFunction} for an example on how to do that.
*
* @see StatefulFunction
* @param <K> type of the key - Please note that the key have a meaningful {@link #hashCode()} and
* {@link #equals(Object)} implemented.
* @param <V> type of the value.
*/
public final class PersistedTable<K, V> {
private final String name;
private final Class<K> keyType;
private final Class<V> valueType;
private final Expiration expiration;
private TableAccessor<K, V> accessor;
private PersistedTable(
String name,
Class<K> keyType,
Class<V> valueType,
Expiration expiration,
TableAccessor<K, V> accessor) {
this.name = Objects.requireNonNull(name);
this.keyType = Objects.requireNonNull(keyType);
this.valueType = Objects.requireNonNull(valueType);
this.expiration = Objects.requireNonNull(expiration);
this.accessor = Objects.requireNonNull(accessor);
}
/**
* Creates a {@link PersistedTable} instance that may be used to access persisted state managed by
* the system. Access to the persisted table is identified by an unique name, type of the key, and
* type of the value. These may not change across multiple executions of the application.
*
* @param name the unique name of the persisted state.
* @param keyType the type of the state keys of this {@code PersistedTable}.
* @param valueType the type of the state values of this {@code PersistedTale}.
* @param <K> the type of the state keys.
* @param <V> the type of the state values.
* @return a {@code PersistedTable} instance.
*/
public static <K, V> PersistedTable<K, V> of(String name, Class<K> keyType, Class<V> valueType) {
return of(name, keyType, valueType, Expiration.none());
}
/**
* Creates a {@link PersistedTable} instance that may be used to access persisted state managed by
* the system. Access to the persisted table is identified by an unique name, type of the key, and
* type of the value. These may not change across multiple executions of the application.
*
* @param name the unique name of the persisted state.
* @param keyType the type of the state keys of this {@code PersistedTable}.
* @param valueType the type of the state values of this {@code PersistedTale}.
* @param expiration state expiration configuration.
* @param <K> the type of the state keys.
* @param <V> the type of the state values.
* @return a {@code PersistedTable} instance.
*/
public static <K, V> PersistedTable<K, V> of(
String name, Class<K> keyType, Class<V> valueType, Expiration expiration) {
return new PersistedTable<>(
name, keyType, valueType, expiration, new NonFaultTolerantAccessor<>());
}
/**
* Returns the unique name of the persisted table.
*
* @return unique name of the persisted table.
*/
public String name() {
return name;
}
/**
* Returns the type of the persisted tables keys.
*
* @return the type of the persisted tables keys.
*/
public Class<K> keyType() {
return keyType;
}
/**
* Returns the type of the persisted tables values.
*
* @return the type of the persisted tables values.
*/
public Class<V> valueType() {
return valueType;
}
public Expiration expiration() {
return expiration;
}
/**
* Returns a persisted table's value.
*
* @return the persisted table value associated with {@code key}.
*/
public V get(K key) {
return accessor.get(key);
}
/**
* Updates the persisted table.
*
* @param key the to associate the value with.
* @param value the new value.
*/
public void set(K key, V value) {
accessor.set(key, value);
}
/**
* Removes the value associated with {@code key}.
*
* @param key the key to remove.
*/
public void remove(K key) {
accessor.remove(key);
}
/**
* Gets an {@link Iterable} over all the entries of the persisted table.
*
* @return An {@link Iterable} of the elements of the persisted table.
*/
public Iterable<Map.Entry<K, V>> entries() {
return accessor.entries();
}
/**
* Gets an {@link Iterable} over all keys of the persisted table.
*
* @return An {@link Iterable} of keys in the persisted table.
*/
public Iterable<K> keys() {
return accessor.keys();
}
/**
* Gets an {@link Iterable} over all values of the persisted table.
*
* @return An {@link Iterable} of values in the persisted table.
*/
public Iterable<V> values() {
return accessor.values();
}
/** Clears all elements in the persisted buffer. */
public void clear() {
accessor.clear();
}
@Override
public String toString() {
return String.format(
"PersistedTable{name=%s, keyType=%s, valueType=%s, expiration=%s}",
name, keyType.getName(), valueType.getName(), expiration);
}
@ForRuntime
void setAccessor(TableAccessor<K, V> newAccessor) {
Objects.requireNonNull(newAccessor);
this.accessor = newAccessor;
}
private static final class NonFaultTolerantAccessor<K, V> implements TableAccessor<K, V> {
private final Map<K, V> map = new HashMap<>();
@Override
public void set(K key, V value) {
map.put(key, value);
}
@Override
public V get(K key) {
return map.get(key);
}
@Override
public void remove(K key) {
map.remove(key);
}
@Override
public Iterable<Map.Entry<K, V>> entries() {
return map.entrySet();
}
@Override
public Iterable<K> keys() {
return map.keySet();
}
@Override
public Iterable<V> values() {
return map.values();
}
@Override
public void clear() {
map.clear();
}
}
}