blob: e004fa17e8b80421f9bf1b85f9ecd345175e5bd9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import java.io.IOException;
/**
* A {@code TypeSerializerSnapshot} is a point-in-time view of a {@link TypeSerializer}'s configuration.
* The configuration snapshot of a serializer is persisted within checkpoints
* as a single source of meta information about the schema of serialized data in the checkpoint.
* This serves three purposes:
*
* <ul>
* <li><strong>Capturing serializer parameters and schema:</strong> a serializer's configuration snapshot
* represents information about the parameters, state, and schema of a serializer.
* This is explained in more detail below.</li>
*
* <li><strong>Compatibility checks for new serializers:</strong> when new serializers are available,
* they need to be checked whether or not they are compatible to read the data written by the previous serializer.
* This is performed by providing the new serializer to the correspondibng serializer configuration
* snapshots in checkpoints.</li>
*
* <li><strong>Factory for a read serializer when schema conversion is required:</strong> in the case that new
* serializers are not compatible to read previous data, a schema conversion process executed across all data
* is required before the new serializer can be continued to be used. This conversion process requires a compatible
* read serializer to restore serialized bytes as objects, and then written back again using the new serializer.
* In this scenario, the serializer configuration snapshots in checkpoints doubles as a factory for the read
* serializer of the conversion process.</li>
* </ul>
*
* <h2>Serializer Configuration and Schema</h2>
*
* <p>Since serializer configuration snapshots needs to be used to ensure serialization compatibility
* for the same managed state as well as serving as a factory for compatible read serializers, the configuration
* snapshot should encode sufficient information about:
*
* <ul>
* <li><strong>Parameter settings of the serializer:</strong> parameters of the serializer include settings
* required to setup the serializer, or the state of the serializer if it is stateful. If the serializer
* has nested serializers, then the configuration snapshot should also contain the parameters of the nested
* serializers.</li>
*
* <li><strong>Serialization schema of the serializer:</strong> the binary format used by the serializer, or
* in other words, the schema of data written by the serializer.</li>
* </ul>
*
* <p>NOTE: Implementations must contain the default empty nullary constructor. This is required to be able to
* deserialize the configuration snapshot from its binary form.
*
* @param <T> The data type that the originating serializer of this configuration serializes.
*/
@PublicEvolving
public interface TypeSerializerSnapshot<T> {
/**
* Returns the version of the current snapshot's written binary format.
*
* @return the version of the current snapshot's written binary format.
*/
int getCurrentVersion();
/**
* Writes the serializer snapshot to the provided {@link DataOutputView}.
* The current version of the written serializer snapshot's binary format
* is specified by the {@link #getCurrentVersion()} method.
*
* @param out the {@link DataOutputView} to write the snapshot to.
*
* @throws IOException Thrown if the snapshot data could not be written.
*/
void writeSnapshot(DataOutputView out) throws IOException;
/**
* Reads the serializer snapshot from the provided {@link DataInputView}.
* The version of the binary format that the serializer snapshot was written
* with is provided. This version can be used to determine how the serializer
* snapshot should be read.
*
* @param readVersion version of the serializer snapshot's written binary format
* @param in the {@link DataInputView} to read the snapshot from.
* @param userCodeClassLoader the user code classloader
*
* * @throws IOException Thrown if the snapshot data could be read or parsed.
*/
void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException;
/**
* Recreates a serializer instance from this snapshot. The returned
* serializer can be safely used to read data written by the prior serializer
* (i.e., the serializer that created this snapshot).
*
* @return a serializer instance restored from this serializer snapshot.
*/
TypeSerializer<T> restoreSerializer();
/**
* Checks a new serializer's compatibility to read data written by the prior serializer.
*
* <p>When a checkpoint/savepoint is restored, this method checks whether the serialization
* format of the data in the checkpoint/savepoint is compatible for the format of the serializer used by the
* program that restores the checkpoint/savepoint. The outcome can be that the serialization format is
* compatible, that the program's serializer needs to reconfigure itself (meaning to incorporate some
* information from the TypeSerializerSnapshot to be compatible), that the format is outright incompatible,
* or that a migration needed. In the latter case, the TypeSerializerSnapshot produces a serializer to
* deserialize the data, and the restoring program's serializer re-serializes the data, thus converting
* the format during the restore operation.
*
* @param newSerializer the new serializer to check.
*
* @return the serializer compatibility result.
*/
TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer);
// ------------------------------------------------------------------------
// read / write utilities
// ------------------------------------------------------------------------
/**
* Writes the given snapshot to the out stream. One should always use this method to write
* snapshots out, rather than directly calling {@link #writeSnapshot(DataOutputView)}.
*
* <p>The snapshot written with this method can be read via {@link #readVersionedSnapshot(DataInputView, ClassLoader)}.
*/
static void writeVersionedSnapshot(DataOutputView out, TypeSerializerSnapshot<?> snapshot) throws IOException {
out.writeUTF(snapshot.getClass().getName());
out.writeInt(snapshot.getCurrentVersion());
snapshot.writeSnapshot(out);
}
/**
* Reads a snapshot from the stream, performing resolving
*
* <p>This method reads snapshots written by {@link #writeVersionedSnapshot(DataOutputView, TypeSerializerSnapshot)}.
*/
static <T> TypeSerializerSnapshot<T> readVersionedSnapshot(DataInputView in, ClassLoader cl) throws IOException {
final TypeSerializerSnapshot<T> snapshot =
TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(in, cl);
final int version = in.readInt();
snapshot.readSnapshot(version, in, cl);
return snapshot;
}
}