blob: 4bb4aa0d8a261eb26295e98e65af33430ecbc562 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
/**
* A {@code TypeSerializerSchemaCompatibility} represents information about whether or not a {@link TypeSerializer}
* can be safely used to read data written by a previous type serializer.
*
* <p>Typically, the compatibility of the new serializer is resolved by checking the serializer against the
* {@link TypeSerializerSnapshot} of the previous serializer. Depending on the type of the
* resolved compatibility result, migration (i.e., reading bytes with the previous serializer and then writing
* it again with the new serializer) may be required before the new serializer can be used.
*
* @param <T> the type of data serialized by the serializer that was being checked.
*
* @see TypeSerializer
* @see TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)
*/
@PublicEvolving
public class TypeSerializerSchemaCompatibility<T> {
/**
* Enum for the type of the compatibility.
*/
enum Type {
/** This indicates that the new serializer continued to be used as is. */
COMPATIBLE_AS_IS,
/**
* This indicates that it is possible to use the new serializer after performing a
* full-scan migration over all state, by reading bytes with the previous serializer
* and then writing it again with the new serializer, effectively converting the
* serialization schema to correspond to the new serializer.
*/
COMPATIBLE_AFTER_MIGRATION,
/**
* This indicates that a reconfigured version of the new serializer
* is compatible, and should be used instead of the original new serializer.
*/
COMPATIBLE_WITH_RECONFIGURED_SERIALIZER,
/**
* This indicates that the new serializer is incompatible, even with migration.
* This normally implies that the deserialized Java class can not be commonly recognized
* by the previous and new serializer.
*/
INCOMPATIBLE
}
/**
* The type of the compatibility.
*/
private final Type resultType;
private final TypeSerializer<T> reconfiguredNewSerializer;
/**
* Returns a result that indicates that the new serializer is compatible and no migration is required.
* The new serializer can continued to be used as is.
*
* @return a result that indicates migration is not required for the new serializer.
*/
public static <T> TypeSerializerSchemaCompatibility<T> compatibleAsIs() {
return new TypeSerializerSchemaCompatibility<>(Type.COMPATIBLE_AS_IS, null);
}
/**
* Returns a result that indicates that the new serializer can be used after migrating the written bytes, i.e.
* reading it with the old serializer and then writing it again with the new serializer.
*
* @return a result that indicates that the new serializer can be used after migrating the written bytes.
*/
public static <T> TypeSerializerSchemaCompatibility<T> compatibleAfterMigration() {
return new TypeSerializerSchemaCompatibility<>(Type.COMPATIBLE_AFTER_MIGRATION, null);
}
/**
* Returns a result that indicates a reconfigured version of the new serializer is compatible, and should be
* used instead of the original new serializer.
*
* @param reconfiguredSerializer the reconfigured version of the new serializer.
* @return a result that indicates a reconfigured version of the new serializer is compatible, and should be
* used instead of the original new serializer.
*/
public static <T> TypeSerializerSchemaCompatibility<T> compatibleWithReconfiguredSerializer(TypeSerializer<T> reconfiguredSerializer) {
return new TypeSerializerSchemaCompatibility<>(
Type.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER,
Preconditions.checkNotNull(reconfiguredSerializer));
}
/**
* Returns a result that indicates there is no possible way for the new serializer to be use-able.
* This normally indicates that there is no common Java class between what the previous bytes can be
* deserialized into and what can be written by the new serializer.
*
* <p>In this case, there is no possible way for the new serializer to continue to be used, even with
* migration. Recovery of the Flink job will fail.
*
* @return a result that indicates incompatibility between the new and previous serializer.
*/
public static <T> TypeSerializerSchemaCompatibility<T> incompatible() {
return new TypeSerializerSchemaCompatibility<>(Type.INCOMPATIBLE, null);
}
private TypeSerializerSchemaCompatibility(Type resultType, @Nullable TypeSerializer<T> reconfiguredNewSerializer) {
this.resultType = Preconditions.checkNotNull(resultType);
this.reconfiguredNewSerializer = reconfiguredNewSerializer;
}
/**
* Returns whether or not the type of the compatibility is {@link Type#COMPATIBLE_AS_IS}.
*
* @return whether or not the type of the compatibility is {@link Type#COMPATIBLE_AS_IS}.
*/
public boolean isCompatibleAsIs() {
return resultType == Type.COMPATIBLE_AS_IS;
}
/**
* Returns whether or not the type of the compatibility is {@link Type#COMPATIBLE_AFTER_MIGRATION}.
*
* @return whether or not the type of the compatibility is {@link Type#COMPATIBLE_AFTER_MIGRATION}.
*/
public boolean isCompatibleAfterMigration() {
return resultType == Type.COMPATIBLE_AFTER_MIGRATION;
}
/**
* Returns whether or not the type of the compatibility is {@link Type#COMPATIBLE_WITH_RECONFIGURED_SERIALIZER}.
*
* @return whether or not the type of the compatibility is {@link Type#COMPATIBLE_WITH_RECONFIGURED_SERIALIZER}.
*/
public boolean isCompatibleWithReconfiguredSerializer() {
return resultType == Type.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER;
}
/**
* Gets the reconfigured serializer. This throws an exception if
* {@link #isCompatibleWithReconfiguredSerializer()} is {@code false}.
*/
public TypeSerializer<T> getReconfiguredSerializer() {
Preconditions.checkState(
isCompatibleWithReconfiguredSerializer(),
"It is only possible to get a reconfigured serializer if the compatibility type is %s, but the type is %s",
Type.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER, resultType);
return reconfiguredNewSerializer;
}
/**
* Returns whether or not the type of the compatibility is {@link Type#INCOMPATIBLE}.
*
* @return whether or not the type of the compatibility is {@link Type#INCOMPATIBLE}.
*/
public boolean isIncompatible() {
return resultType == Type.INCOMPATIBLE;
}
}