blob: 0f62613542933180e9667106bec33b5d9374e995 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.state.changelog;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
import org.apache.flink.util.function.ThrowingConsumer;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import static org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType.KEY_VALUE;
import static org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE;
import static org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters.CURRENT_STATE_META_INFO_SNAPSHOT_VERSION;
import static org.apache.flink.state.changelog.StateChangeOperation.ADD;
import static org.apache.flink.state.changelog.StateChangeOperation.ADD_ELEMENT;
import static org.apache.flink.state.changelog.StateChangeOperation.ADD_OR_UPDATE_ELEMENT;
import static org.apache.flink.state.changelog.StateChangeOperation.CLEAR;
import static org.apache.flink.state.changelog.StateChangeOperation.METADATA;
import static org.apache.flink.state.changelog.StateChangeOperation.REMOVE_ELEMENT;
import static org.apache.flink.state.changelog.StateChangeOperation.SET;
import static org.apache.flink.state.changelog.StateChangeOperation.SET_INTERNAL;
import static org.apache.flink.util.Preconditions.checkNotNull;
abstract class AbstractStateChangeLogger<Key, Value, Ns>
implements StateChangeLogger<Value, Ns>, Closeable {
protected final StateChangelogWriter<?> stateChangelogWriter;
protected final InternalKeyContext<Key> keyContext;
protected RegisteredStateMetaInfoBase metaInfo;
private final StateMetaInfoSnapshot.BackendStateType stateType;
private final DataOutputSerializer out = new DataOutputSerializer(128);
private boolean metaDataWritten = false;
private final short stateShortId;
public AbstractStateChangeLogger(
StateChangelogWriter<?> stateChangelogWriter,
InternalKeyContext<Key> keyContext,
RegisteredStateMetaInfoBase metaInfo,
short stateId) {
this.stateChangelogWriter = checkNotNull(stateChangelogWriter);
this.keyContext = checkNotNull(keyContext);
this.metaInfo = checkNotNull(metaInfo);
if (metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) {
this.stateType = KEY_VALUE;
} else if (metaInfo instanceof RegisteredPriorityQueueStateBackendMetaInfo) {
this.stateType = PRIORITY_QUEUE;
} else {
throw new IllegalArgumentException("Unsupported state type: " + metaInfo);
}
this.stateShortId = stateId;
}
@Override
public void valueUpdated(Value newValue, Ns ns) throws IOException {
if (newValue == null) {
valueCleared(ns);
} else {
log(SET, out -> serializeValue(newValue, out), ns);
}
}
@Override
public void valueUpdatedInternal(Value newValue, Ns ns) throws IOException {
if (newValue == null) {
valueCleared(ns);
} else {
log(SET_INTERNAL, out -> serializeValue(newValue, out), ns);
}
}
protected abstract void serializeValue(Value value, DataOutputView out) throws IOException;
@Override
public void valueAdded(Value addedValue, Ns ns) throws IOException {
log(ADD, out -> serializeValue(addedValue, out), ns);
}
@Override
public void valueCleared(Ns ns) throws IOException {
log(CLEAR, ns);
}
@Override
public void valueElementAdded(
ThrowingConsumer<DataOutputView, IOException> dataSerializer, Ns ns)
throws IOException {
log(ADD_ELEMENT, dataSerializer, ns);
}
@Override
public void valueElementAddedOrUpdated(
ThrowingConsumer<DataOutputView, IOException> dataSerializer, Ns ns)
throws IOException {
log(ADD_OR_UPDATE_ELEMENT, dataSerializer, ns);
}
@Override
public void valueElementRemoved(
ThrowingConsumer<DataOutputView, IOException> dataSerializer, Ns ns)
throws IOException {
log(REMOVE_ELEMENT, dataSerializer, ns);
}
@Override
public void resetWritingMetaFlag() {
metaDataWritten = false;
}
protected AbstractStateChangeLogger<Key, Value, Ns> setMetaInfo(
RegisteredStateMetaInfoBase metaInfo) {
this.metaInfo = metaInfo;
return this;
}
protected void log(StateChangeOperation op, Ns ns) throws IOException {
logMetaIfNeeded();
stateChangelogWriter.append(keyContext.getCurrentKeyGroupIndex(), serialize(op, ns, null));
}
protected void log(
StateChangeOperation op,
@Nullable ThrowingConsumer<DataOutputView, IOException> dataWriter,
Ns ns)
throws IOException {
logMetaIfNeeded();
stateChangelogWriter.append(
keyContext.getCurrentKeyGroupIndex(), serialize(op, ns, dataWriter));
}
private void logMetaIfNeeded() throws IOException {
if (!metaDataWritten) {
stateChangelogWriter.appendMeta(
serializeRaw(
out -> {
out.writeByte(METADATA.getCode());
out.writeInt(CURRENT_STATE_META_INFO_SNAPSHOT_VERSION);
StateMetaInfoSnapshotReadersWriters.getWriter()
.writeStateMetaInfoSnapshot(metaInfo.snapshot(), out);
writeDefaultValueAndTtl(out);
out.writeShort(stateShortId);
out.writeByte(stateType.getCode());
}));
metaDataWritten = true;
}
}
protected void writeDefaultValueAndTtl(DataOutputView out) throws IOException {}
private byte[] serialize(
StateChangeOperation op,
Ns ns,
@Nullable ThrowingConsumer<DataOutputView, IOException> dataWriter)
throws IOException {
return serializeRaw(
wrapper -> {
wrapper.writeByte(op.getCode());
wrapper.writeShort(stateShortId);
serializeScope(ns, wrapper);
if (dataWriter != null) {
dataWriter.accept(wrapper);
}
});
}
protected abstract void serializeScope(Ns ns, DataOutputView out) throws IOException;
private byte[] serializeRaw(ThrowingConsumer<DataOutputView, IOException> dataWriter)
throws IOException {
dataWriter.accept(out);
byte[] bytes = out.getCopyOfBuffer();
out.clear();
return bytes;
}
@Override
public void close() throws IOException {
// do nothing
}
}