| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.api.common.typeutils; |
| |
| import org.apache.flink.annotation.Internal; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.core.memory.DataInputView; |
| import org.apache.flink.core.memory.DataOutputView; |
| |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.List; |
| |
| import static org.apache.flink.util.Preconditions.checkArgument; |
| |
| /** |
| * A NestedSerializersSnapshotDelegate represents the snapshots of multiple serializers that are used |
| * by an outer serializer. Examples would be tuples, where the outer serializer is the tuple |
| * format serializer, and the NestedSerializersSnapshotDelegate holds the serializers for the |
| * different tuple fields. |
| * |
| * <p>The NestedSerializersSnapshotDelegate does not implement the {@link TypeSerializerSnapshot} interface. |
| * It is not meant to be inherited from, but to be composed with a serializer snapshot implementation. |
| * |
| * <p>The NestedSerializersSnapshotDelegate has its own versioning internally, it does not couple its |
| * versioning to the versioning of the TypeSerializerSnapshot that builds on top of this class. |
| * That way, the NestedSerializersSnapshotDelegate and enclosing TypeSerializerSnapshot the can evolve |
| * their formats independently. |
| */ |
| @Internal |
| public class NestedSerializersSnapshotDelegate { |
| |
| /** Magic number for integrity checks during deserialization. */ |
| private static final int MAGIC_NUMBER = 1333245; |
| |
| /** Current version of the new serialization format. */ |
| private static final int VERSION = 1; |
| |
| /** The snapshots from the serializer that make up this composition. */ |
| private final TypeSerializerSnapshot<?>[] nestedSnapshots; |
| |
| /** |
| * Constructor to create a snapshot for writing. |
| */ |
| public NestedSerializersSnapshotDelegate(TypeSerializer<?>... serializers) { |
| this.nestedSnapshots = TypeSerializerUtils.snapshotBackwardsCompatible(serializers); |
| } |
| |
| /** |
| * Constructor to create a snapshot during deserialization. |
| */ |
| private NestedSerializersSnapshotDelegate(TypeSerializerSnapshot<?>[] snapshots) { |
| this.nestedSnapshots = snapshots; |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Nested Serializers and Compatibility |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Produces a restore serializer from each contained serializer configuration snapshot. |
| * The serializers are returned in the same order as the snapshots are stored. |
| */ |
| public TypeSerializer<?>[] getRestoredNestedSerializers() { |
| return snapshotsToRestoreSerializers(nestedSnapshots); |
| } |
| |
| /** |
| * Creates the restore serializer from the pos-th config snapshot. |
| */ |
| public <T> TypeSerializer<T> getRestoredNestedSerializer(int pos) { |
| checkArgument(pos < nestedSnapshots.length); |
| |
| @SuppressWarnings("unchecked") |
| TypeSerializerSnapshot<T> snapshot = (TypeSerializerSnapshot<T>) nestedSnapshots[pos]; |
| |
| return snapshot.restoreSerializer(); |
| } |
| |
| /** |
| * Returns the snapshots of the nested serializers. |
| * |
| * @return the snapshots of the nested serializers. |
| */ |
| public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() { |
| return nestedSnapshots; |
| } |
| |
| /** |
| * Resolves the compatibility of the nested serializer snapshots with the nested |
| * serializers of the new outer serializer. |
| * |
| * @deprecated this no method will be removed in the future. Resolving compatibility for nested |
| * serializers is now handled by {@link CompositeTypeSerializerSnapshot}. |
| */ |
| @Deprecated |
| public <T> TypeSerializerSchemaCompatibility<T> resolveCompatibilityWithNested( |
| TypeSerializerSchemaCompatibility<?> outerCompatibility, |
| TypeSerializer<?>... newNestedSerializers) { |
| |
| checkArgument(newNestedSerializers.length == nestedSnapshots.length, |
| "Different number of new serializers and existing serializer configuration snapshots"); |
| |
| // compatibility of the outer serializer's format |
| if (outerCompatibility.isIncompatible()) { |
| return TypeSerializerSchemaCompatibility.incompatible(); |
| } |
| |
| // check nested serializers for compatibility |
| boolean nestedSerializerRequiresMigration = false; |
| for (int i = 0; i < nestedSnapshots.length; i++) { |
| TypeSerializerSchemaCompatibility<?> compatibility = |
| resolveCompatibility(newNestedSerializers[i], nestedSnapshots[i]); |
| |
| if (compatibility.isIncompatible()) { |
| return TypeSerializerSchemaCompatibility.incompatible(); |
| } |
| if (compatibility.isCompatibleAfterMigration()) { |
| nestedSerializerRequiresMigration = true; |
| } |
| } |
| |
| return (nestedSerializerRequiresMigration || !outerCompatibility.isCompatibleAsIs()) ? |
| TypeSerializerSchemaCompatibility.compatibleAfterMigration() : |
| TypeSerializerSchemaCompatibility.compatibleAsIs(); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Serialization |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Writes the composite snapshot of all the contained serializers. |
| */ |
| public final void writeNestedSerializerSnapshots(DataOutputView out) throws IOException { |
| out.writeInt(MAGIC_NUMBER); |
| out.writeInt(VERSION); |
| |
| out.writeInt(nestedSnapshots.length); |
| for (TypeSerializerSnapshot<?> snap : nestedSnapshots) { |
| TypeSerializerSnapshot.writeVersionedSnapshot(out, snap); |
| } |
| } |
| |
| /** |
| * Reads the composite snapshot of all the contained serializers. |
| */ |
| public static NestedSerializersSnapshotDelegate readNestedSerializerSnapshots(DataInputView in, ClassLoader cl) throws IOException { |
| final int magicNumber = in.readInt(); |
| if (magicNumber != MAGIC_NUMBER) { |
| throw new IOException(String.format("Corrupt data, magic number mismatch. Expected %8x, found %8x", |
| MAGIC_NUMBER, magicNumber)); |
| } |
| |
| final int version = in.readInt(); |
| if (version != VERSION) { |
| throw new IOException("Unrecognized version: " + version); |
| } |
| |
| final int numSnapshots = in.readInt(); |
| final TypeSerializerSnapshot<?>[] nestedSnapshots = new TypeSerializerSnapshot<?>[numSnapshots]; |
| |
| for (int i = 0; i < numSnapshots; i++) { |
| nestedSnapshots[i] = TypeSerializerSnapshot.readVersionedSnapshot(in, cl); |
| } |
| |
| return new NestedSerializersSnapshotDelegate(nestedSnapshots); |
| } |
| |
| /** |
| * Reads the composite snapshot of all the contained serializers in a way that is compatible |
| * with Version 1 of the deprecated {@link CompositeTypeSerializerConfigSnapshot}. |
| */ |
| public static NestedSerializersSnapshotDelegate legacyReadNestedSerializerSnapshots(DataInputView in, ClassLoader cl) throws IOException { |
| @SuppressWarnings("deprecation") |
| final List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializersAndSnapshots = |
| TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, cl); |
| |
| final TypeSerializerSnapshot<?>[] nestedSnapshots = serializersAndSnapshots.stream() |
| .map(t -> t.f1) |
| .toArray(TypeSerializerSnapshot<?>[]::new); |
| |
| return new NestedSerializersSnapshotDelegate(nestedSnapshots); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Utilities |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Utility method to conjure up a new scope for the generic parameters. |
| */ |
| @SuppressWarnings("unchecked") |
| private static <E> TypeSerializerSchemaCompatibility<E> resolveCompatibility( |
| TypeSerializer<?> serializer, |
| TypeSerializerSnapshot<?> snapshot) { |
| |
| TypeSerializer<E> typedSerializer = (TypeSerializer<E>) serializer; |
| TypeSerializerSnapshot<E> typedSnapshot = (TypeSerializerSnapshot<E>) snapshot; |
| |
| return typedSnapshot.resolveSchemaCompatibility(typedSerializer); |
| } |
| |
| private static TypeSerializer<?>[] snapshotsToRestoreSerializers(TypeSerializerSnapshot<?>... snapshots) { |
| return Arrays.stream(snapshots) |
| .map(TypeSerializerSnapshot::restoreSerializer) |
| .toArray(TypeSerializer[]::new); |
| } |
| } |