blob: 4ec1f77ebe65460db9025ab9495021050e5607bf [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
* {@link MapState} implementation that stores state in RocksDB.
* <p>{@link RocksDBStateBackend} must ensure that we set the
* {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since
* we use the {@code merge()} call.
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <UK> The type of the keys in the map state.
* @param <UV> The type of the values in the map state.
class RocksDBMapState<K, N, UK, UV>
extends AbstractRocksDBState<K, N, Map<UK, UV>, MapState<UK, UV>>
implements InternalMapState<K, N, UK, UV> {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class);
/** Serializer for the keys and values. */
private final TypeSerializer<UK> userKeySerializer;
private final TypeSerializer<UV> userValueSerializer;
* Creates a new {@code RocksDBMapState}.
* @param columnFamily The RocksDB column family that this state is associated to.
* @param namespaceSerializer The serializer for the namespace.
* @param valueSerializer The serializer for the state.
* @param defaultValue The default value for the state.
* @param backend The backend for which this state is bind to.
private RocksDBMapState(
ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
TypeSerializer<Map<UK, UV>> valueSerializer,
Map<UK, UV> defaultValue,
RocksDBKeyedStateBackend<K> backend) {
super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
Preconditions.checkState(valueSerializer instanceof MapSerializer, "Unexpected serializer type.");
MapSerializer<UK, UV> castedMapSerializer = (MapSerializer<UK, UV>) valueSerializer;
this.userKeySerializer = castedMapSerializer.getKeySerializer();
this.userValueSerializer = castedMapSerializer.getValueSerializer();
public TypeSerializer<K> getKeySerializer() {
return backend.getKeySerializer();
public TypeSerializer<N> getNamespaceSerializer() {
return namespaceSerializer;
public TypeSerializer<Map<UK, UV>> getValueSerializer() {
return valueSerializer;
// ------------------------------------------------------------------------
// MapState Implementation
// ------------------------------------------------------------------------
public UV get(UK userKey) throws IOException, RocksDBException {
byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
return (rawValueBytes == null ? null : deserializeUserValue(rawValueBytes));
public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
byte[] rawValueBytes = serializeUserValue(userValue);
backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
public void putAll(Map<UK, UV> map) throws IOException, RocksDBException {
if (map == null) {
try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, writeOptions)) {
for (Map.Entry<UK, UV> entry : map.entrySet()) {
byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(entry.getKey());
byte[] rawValueBytes = serializeUserValue(entry.getValue());
writeBatchWrapper.put(columnFamily, rawKeyBytes, rawValueBytes);
public void remove(UK userKey) throws IOException, RocksDBException {
byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
backend.db.delete(columnFamily, writeOptions, rawKeyBytes);
public boolean contains(UK userKey) throws IOException, RocksDBException {
byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
return (rawValueBytes != null);
public Iterable<Map.Entry<UK, UV>> entries() throws IOException {
final Iterator<Map.Entry<UK, UV>> iterator = iterator();
// Return null to make the behavior consistent with other states.
if (!iterator.hasNext()) {
return null;
} else {
return () -> iterator;
public Iterable<UK> keys() throws IOException {
final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
return () -> new RocksDBMapIterator<UK>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
public UK next() {
RocksDBMapEntry entry = nextEntry();
return (entry == null ? null : entry.getKey());
public Iterable<UV> values() throws IOException {
final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
return () -> new RocksDBMapIterator<UV>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
public UV next() {
RocksDBMapEntry entry = nextEntry();
return (entry == null ? null : entry.getValue());
public Iterator<Map.Entry<UK, UV>> iterator() throws IOException {
final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, prefixBytes, userKeySerializer, userValueSerializer) {
public Map.Entry<UK, UV> next() {
return nextEntry();
public void clear() {
try {
try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(backend.db, columnFamily);
WriteBatch writeBatch = new WriteBatch(128)) {
final byte[] keyPrefixBytes = serializeCurrentKeyAndNamespace();;
while (iterator.isValid()) {
byte[] keyBytes = iterator.key();
if (startWithKeyPrefix(keyPrefixBytes, keyBytes)) {
writeBatch.remove(columnFamily, keyBytes);
} else {
backend.db.write(writeOptions, writeBatch);
} catch (Exception e) {
LOG.warn("Error while cleaning the state.", e);
public byte[] getSerializedValue(
final byte[] serializedKeyAndNamespace,
final TypeSerializer<K> safeKeySerializer,
final TypeSerializer<N> safeNamespaceSerializer,
final TypeSerializer<Map<UK, UV>> safeValueSerializer) throws Exception {
//TODO make KvStateSerializer key-group aware to save this round trip and key-group computation
Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, backend.getNumberOfKeyGroups());
ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(128);
DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
final byte[] keyPrefixBytes = outputStream.toByteArray();
final MapSerializer<UK, UV> serializer = (MapSerializer<UK, UV>) safeValueSerializer;
final TypeSerializer<UK> dupUserKeySerializer = serializer.getKeySerializer();
final TypeSerializer<UV> dupUserValueSerializer = serializer.getValueSerializer();
final Iterator<Map.Entry<UK, UV>> iterator = new RocksDBMapIterator<Map.Entry<UK, UV>>(
dupUserValueSerializer) {
public Map.Entry<UK, UV> next() {
return nextEntry();
// Return null to make the behavior consistent with other backends
if (!iterator.hasNext()) {
return null;
return KvStateSerializer.serializeMap(() -> iterator, dupUserKeySerializer, dupUserValueSerializer);
// ------------------------------------------------------------------------
// Serialization Methods
// ------------------------------------------------------------------------
private byte[] serializeCurrentKeyAndNamespace() throws IOException {
return keySerializationStream.toByteArray();
private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOException {
userKeySerializer.serialize(userKey, keySerializationDataOutputView);
return keySerializationStream.toByteArray();
private byte[] serializeUserValue(UV userValue) throws IOException {
return serializeUserValue(userValue, userValueSerializer);
private UV deserializeUserValue(byte[] rawValueBytes) throws IOException {
return deserializeUserValue(rawValueBytes, userValueSerializer);
private byte[] serializeUserValue(UV userValue, TypeSerializer<UV> valueSerializer) throws IOException {
if (userValue == null) {
} else {
valueSerializer.serialize(userValue, keySerializationDataOutputView);
return keySerializationStream.toByteArray();
private UK deserializeUserKey(int userKeyOffset, byte[] rawKeyBytes, TypeSerializer<UK> keySerializer) throws IOException {
ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawKeyBytes);
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
return keySerializer.deserialize(in);
private UV deserializeUserValue(byte[] rawValueBytes, TypeSerializer<UV> valueSerializer) throws IOException {
ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawValueBytes);
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
boolean isNull = in.readBoolean();
return isNull ? null : valueSerializer.deserialize(in);
private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) {
if (rawKeyBytes.length < keyPrefixBytes.length) {
return false;
for (int i = keyPrefixBytes.length; --i >= backend.getKeyGroupPrefixBytes(); ) {
if (rawKeyBytes[i] != keyPrefixBytes[i]) {
return false;
return true;
// ------------------------------------------------------------------------
// Internal Classes
// ------------------------------------------------------------------------
/** A map entry in RocksDBMapState. */
private class RocksDBMapEntry implements Map.Entry<UK, UV> {
private final RocksDB db;
/** The raw bytes of the key stored in RocksDB. Each user key is stored in RocksDB
* with the format #KeyGroup#Key#Namespace#UserKey. */
private final byte[] rawKeyBytes;
/** The raw bytes of the value stored in RocksDB. */
private byte[] rawValueBytes;
/** True if the entry has been deleted. */
private boolean deleted;
/** The user key and value. The deserialization is performed lazily, i.e. the key
* and the value is deserialized only when they are accessed. */
private UK userKey;
private UV userValue;
/** The offset of User Key offset in raw key bytes. */
private final int userKeyOffset;
private TypeSerializer<UK> keySerializer;
private TypeSerializer<UV> valueSerializer;
@Nonnull final RocksDB db,
@Nonnegative final int userKeyOffset,
@Nonnull final byte[] rawKeyBytes,
@Nonnull final byte[] rawValueBytes,
@Nonnull final TypeSerializer<UK> keySerializer,
@Nonnull final TypeSerializer<UV> valueSerializer) {
this.db = db;
this.userKeyOffset = userKeyOffset;
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.rawKeyBytes = rawKeyBytes;
this.rawValueBytes = rawValueBytes;
this.deleted = false;
public void remove() {
deleted = true;
rawValueBytes = null;
try {
db.delete(columnFamily, writeOptions, rawKeyBytes);
} catch (RocksDBException e) {
throw new FlinkRuntimeException("Error while removing data from RocksDB.", e);
public UK getKey() {
if (userKey == null) {
try {
userKey = deserializeUserKey(userKeyOffset, rawKeyBytes, keySerializer);
} catch (IOException e) {
throw new FlinkRuntimeException("Error while deserializing the user key.", e);
return userKey;
public UV getValue() {
if (deleted) {
return null;
} else {
if (userValue == null) {
try {
userValue = deserializeUserValue(rawValueBytes, valueSerializer);
} catch (IOException e) {
throw new FlinkRuntimeException("Error while deserializing the user value.", e);
return userValue;
public UV setValue(UV value) {
if (deleted) {
throw new IllegalStateException("The value has already been deleted.");
UV oldValue = getValue();
try {
userValue = value;
rawValueBytes = serializeUserValue(value, valueSerializer);
db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while putting data into RocksDB.", e);
return oldValue;
/** An auxiliary utility to scan all entries under the given key. */
private abstract class RocksDBMapIterator<T> implements Iterator<T> {
private static final int CACHE_SIZE_LIMIT = 128;
/** The db where data resides. */
private final RocksDB db;
* The prefix bytes of the key being accessed. All entries under the same key
* have the same prefix, hence we can stop iterating once coming across an
* entry with a different prefix.
private final byte[] keyPrefixBytes;
* True if all entries have been accessed or the iterator has come across an
* entry with a different prefix.
private boolean expired = false;
/** A in-memory cache for the entries in the rocksdb. */
private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList<>();
private int cacheIndex = 0;
private final TypeSerializer<UK> keySerializer;
private final TypeSerializer<UV> valueSerializer;
final RocksDB db,
final byte[] keyPrefixBytes,
final TypeSerializer<UK> keySerializer,
final TypeSerializer<UV> valueSerializer) {
this.db = db;
this.keyPrefixBytes = keyPrefixBytes;
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
public boolean hasNext() {
return (cacheIndex < cacheEntries.size());
public void remove() {
if (cacheIndex == 0 || cacheIndex > cacheEntries.size()) {
throw new IllegalStateException("The remove operation must be called after a valid next operation.");
RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex - 1);
final RocksDBMapEntry nextEntry() {
if (cacheIndex == cacheEntries.size()) {
if (!expired) {
throw new IllegalStateException();
return null;
RocksDBMapEntry entry = cacheEntries.get(cacheIndex);
return entry;
private void loadCache() {
if (cacheIndex > cacheEntries.size()) {
throw new IllegalStateException();
// Load cache entries only when the cache is empty and there still exist unread entries
if (cacheIndex < cacheEntries.size() || expired) {
// use try-with-resources to ensure RocksIterator can be release even some runtime exception
// occurred in the below code block.
try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(db, columnFamily)) {
* The iteration starts from the prefix bytes at the first loading. The cache then is
* reloaded when the next entry to return is the last one in the cache. At that time,
* we will start the iterating from the last returned entry.
RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes);
cacheIndex = 0;;
* If the last returned entry is not deleted, it will be the first entry in the
* iterating. Skip it to avoid redundant access in such cases.
if (lastEntry != null && !lastEntry.deleted) {;
while (true) {
if (!iterator.isValid() || !startWithKeyPrefix(keyPrefixBytes, iterator.key())) {
expired = true;
if (cacheEntries.size() >= CACHE_SIZE_LIMIT) {
RocksDBMapEntry entry = new RocksDBMapEntry(
static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(
StateDescriptor<S, SV> stateDesc,
Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
RocksDBKeyedStateBackend<K> backend) {
return (IS) new RocksDBMapState<>(
(TypeSerializer<Map<UK, UV>>) registerResult.f1.getStateSerializer(),
(Map<UK, UV>) stateDesc.getDefaultValue(),