blob: 9751e2df8294dae69c3010bd2dc3eb17a124694e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import static org.junit.Assume.assumeThat;
/**
* A test base for testing {@link TypeSerializer} upgrades.
*
* <p>You can run {@link #generateTestSetupFiles()} on a Flink branch to (re-)generate the test data
* files.
*/
public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedElementT>
extends TestLogger {
public static final MigrationVersion[] MIGRATION_VERSIONS =
MigrationVersion.v1_11.orHigher().toArray(new MigrationVersion[0]);
public static final MigrationVersion CURRENT_VERSION = MigrationVersion.v1_13;
private final TestSpecification<PreviousElementT, UpgradedElementT> testSpecification;
protected TypeSerializerUpgradeTestBase(
TestSpecification<PreviousElementT, UpgradedElementT> testSpecification) {
this.testSpecification = checkNotNull(testSpecification);
}
// ------------------------------------------------------------------------------
// APIs
// ------------------------------------------------------------------------------
/**
* Setup code for a {@link TestSpecification}. This creates the serializer before upgrade and
* test data, that will be written by the created pre-upgrade {@link TypeSerializer}.
*/
public interface PreUpgradeSetup<PreviousElementT> {
/** Creates a pre-upgrade {@link TypeSerializer}. */
TypeSerializer<PreviousElementT> createPriorSerializer();
/** Creates test data that will be written using the pre-upgrade {@link TypeSerializer}. */
PreviousElementT createTestData();
}
/**
* Verification code for a {@link TestSpecification}. This creates the "upgraded" {@link
* TypeSerializer} and provides matchers for comparing the deserialized test data and for the
* {@link TypeSerializerSchemaCompatibility}.
*/
public interface UpgradeVerifier<UpgradedElementT> {
/** Creates a post-upgrade {@link TypeSerializer}. */
TypeSerializer<UpgradedElementT> createUpgradedSerializer();
/** Returns a {@link Matcher} for asserting the deserialized test data. */
Matcher<UpgradedElementT> testDataMatcher();
/**
* Returns a {@link Matcher} for comparing the {@link TypeSerializerSchemaCompatibility}
* that the serializer upgrade produced with an expected {@link
* TypeSerializerSchemaCompatibility}.
*/
Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>> schemaCompatibilityMatcher(
MigrationVersion version);
}
private static class ClassLoaderSafePreUpgradeSetup<PreviousElementT>
implements PreUpgradeSetup<PreviousElementT> {
private final PreUpgradeSetup<PreviousElementT> delegateSetup;
private final ClassLoader setupClassloader;
ClassLoaderSafePreUpgradeSetup(
Class<? extends PreUpgradeSetup<PreviousElementT>> delegateSetupClass)
throws Exception {
checkNotNull(delegateSetupClass);
Class<? extends PreUpgradeSetup<PreviousElementT>> relocatedDelegateSetupClass =
ClassRelocator.relocate(delegateSetupClass);
this.setupClassloader = relocatedDelegateSetupClass.getClassLoader();
try (ThreadContextClassLoader ignored =
new ThreadContextClassLoader(setupClassloader)) {
this.delegateSetup = relocatedDelegateSetupClass.newInstance();
}
}
@Override
public TypeSerializer<PreviousElementT> createPriorSerializer() {
try (ThreadContextClassLoader ignored =
new ThreadContextClassLoader(setupClassloader)) {
return delegateSetup.createPriorSerializer();
} catch (IOException e) {
throw new RuntimeException(
"Error creating prior serializer via ClassLoaderSafePreUpgradeSetup.", e);
}
}
@Override
public PreviousElementT createTestData() {
try (ThreadContextClassLoader ignored =
new ThreadContextClassLoader(setupClassloader)) {
return delegateSetup.createTestData();
} catch (IOException e) {
throw new RuntimeException(
"Error creating test data via ThreadContextClassLoader.", e);
}
}
}
private static class ClassLoaderSafeUpgradeVerifier<UpgradedElementT>
implements UpgradeVerifier<UpgradedElementT> {
private final UpgradeVerifier<UpgradedElementT> delegateVerifier;
private final ClassLoader verifierClassloader;
ClassLoaderSafeUpgradeVerifier(
Class<? extends UpgradeVerifier<UpgradedElementT>> delegateVerifierClass)
throws Exception {
checkNotNull(delegateVerifierClass);
Class<? extends UpgradeVerifier<UpgradedElementT>> relocatedDelegateVerifierClass =
ClassRelocator.relocate(delegateVerifierClass);
this.verifierClassloader = relocatedDelegateVerifierClass.getClassLoader();
try (ThreadContextClassLoader ignored =
new ThreadContextClassLoader(verifierClassloader)) {
this.delegateVerifier = relocatedDelegateVerifierClass.newInstance();
}
}
@Override
public TypeSerializer<UpgradedElementT> createUpgradedSerializer() {
try (ThreadContextClassLoader ignored =
new ThreadContextClassLoader(verifierClassloader)) {
return delegateVerifier.createUpgradedSerializer();
} catch (IOException e) {
throw new RuntimeException(
"Error creating upgraded serializer via ClassLoaderSafeUpgradeVerifier.",
e);
}
}
@Override
public Matcher<UpgradedElementT> testDataMatcher() {
try (ThreadContextClassLoader ignored =
new ThreadContextClassLoader(verifierClassloader)) {
return delegateVerifier.testDataMatcher();
} catch (IOException e) {
throw new RuntimeException(
"Error creating expected test data via ClassLoaderSafeUpgradeVerifier.", e);
}
}
@Override
public Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>>
schemaCompatibilityMatcher(MigrationVersion version) {
try (ThreadContextClassLoader ignored =
new ThreadContextClassLoader(verifierClassloader)) {
return delegateVerifier.schemaCompatibilityMatcher(version);
} catch (IOException e) {
throw new RuntimeException(
"Error creating schema compatibility matcher via ClassLoaderSafeUpgradeVerifier.",
e);
}
}
}
/**
* Specification of one test scenario. This mainly needs a {@link PreUpgradeSetup} and {@link
* UpgradeVerifier}.
*/
public static class TestSpecification<PreviousElementT, UpgradedElementT> {
private final String name;
private final MigrationVersion migrationVersion;
private final ClassLoaderSafePreUpgradeSetup<PreviousElementT> setup;
private final ClassLoaderSafeUpgradeVerifier<UpgradedElementT> verifier;
public TestSpecification(
String name,
MigrationVersion migrationVersion,
Class<? extends PreUpgradeSetup<PreviousElementT>> setupClass,
Class<? extends UpgradeVerifier<UpgradedElementT>> verifierClass)
throws Exception {
this.name = checkNotNull(name);
this.migrationVersion = checkNotNull(migrationVersion);
this.setup = new ClassLoaderSafePreUpgradeSetup<>(setupClass);
this.verifier = new ClassLoaderSafeUpgradeVerifier<>(verifierClass);
}
@Override
public String toString() {
return name + " / " + migrationVersion;
}
}
// ------------------------------------------------------------------------------
// Test file generation
// ------------------------------------------------------------------------------
private static final int INITIAL_OUTPUT_BUFFER_SIZE = 64;
/**
* Execute this test to generate test files. Remember to be using the correct branch when
* generating the test files, e.g. to generate test files for {@link MigrationVersion#v1_8}, you
* should be under the release-1.8 branch.
*/
@Test
@Ignore
public void generateTestSetupFiles() throws Exception {
Files.createDirectories(getSerializerSnapshotFilePath().getParent());
try (ThreadContextClassLoader ignored =
new ThreadContextClassLoader(testSpecification.setup.setupClassloader)) {
TypeSerializer<PreviousElementT> priorSerializer =
testSpecification.setup.createPriorSerializer();
// first, use the serializer to write test data
// NOTE: it is important that we write test data first, because some serializers'
// configuration
// mutates only after being used for serialization (e.g. dynamic type
// registrations for Pojo / Kryo)
DataOutputSerializer testDataOut = new DataOutputSerializer(INITIAL_OUTPUT_BUFFER_SIZE);
priorSerializer.serialize(testSpecification.setup.createTestData(), testDataOut);
writeContentsTo(getGenerateDataFilePath(), testDataOut.getCopyOfBuffer());
// ... then write the serializer snapshot
DataOutputSerializer serializerSnapshotOut =
new DataOutputSerializer(INITIAL_OUTPUT_BUFFER_SIZE);
writeSerializerSnapshot(serializerSnapshotOut, priorSerializer, CURRENT_VERSION);
writeContentsTo(
getGenerateSerializerSnapshotFilePath(),
serializerSnapshotOut.getCopyOfBuffer());
}
}
// ------------------------------------------------------------------------------
// Tests
// ------------------------------------------------------------------------------
@Test
public void restoreSerializerIsValid() throws Exception {
try (ThreadContextClassLoader ignored =
new ThreadContextClassLoader(testSpecification.verifier.verifierClassloader)) {
assumeThat(
"This test only applies for test specifications that verify an upgraded serializer that is not incompatible.",
TypeSerializerSchemaCompatibility.incompatible(),
not(
testSpecification.verifier.schemaCompatibilityMatcher(
testSpecification.migrationVersion)));
TypeSerializerSnapshot<UpgradedElementT> restoredSerializerSnapshot =
snapshotUnderTest();
TypeSerializer<UpgradedElementT> restoredSerializer =
restoredSerializerSnapshot.restoreSerializer();
assertSerializerIsValid(
restoredSerializer,
true,
dataUnderTest(),
testSpecification.verifier.testDataMatcher());
}
}
@Test
public void upgradedSerializerHasExpectedSchemaCompatibility() throws Exception {
try (ThreadContextClassLoader ignored =
new ThreadContextClassLoader(testSpecification.verifier.verifierClassloader)) {
TypeSerializerSnapshot<UpgradedElementT> restoredSerializerSnapshot =
snapshotUnderTest();
TypeSerializer<UpgradedElementT> upgradedSerializer =
testSpecification.verifier.createUpgradedSerializer();
TypeSerializerSchemaCompatibility<UpgradedElementT> upgradeCompatibility =
restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
assertThat(
upgradeCompatibility,
testSpecification.verifier.schemaCompatibilityMatcher(
testSpecification.migrationVersion));
}
}
@Test
public void upgradedSerializerIsValidAfterMigration() throws Exception {
try (ThreadContextClassLoader ignored =
new ThreadContextClassLoader(testSpecification.verifier.verifierClassloader)) {
TypeSerializerSnapshot<UpgradedElementT> restoredSerializerSnapshot =
snapshotUnderTest();
TypeSerializer<UpgradedElementT> upgradedSerializer =
testSpecification.verifier.createUpgradedSerializer();
TypeSerializerSchemaCompatibility<UpgradedElementT> upgradeCompatibility =
restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
assumeThat(
"This test only applies for test specifications that verify an upgraded serializer that requires migration to be compatible.",
upgradeCompatibility,
TypeSerializerMatchers.isCompatibleAfterMigration());
// migrate the previous data schema,
TypeSerializer<UpgradedElementT> restoreSerializer =
restoredSerializerSnapshot.restoreSerializer();
DataInputView migratedData =
readAndThenWriteData(
dataUnderTest(),
restoreSerializer,
upgradedSerializer,
testSpecification.verifier.testDataMatcher());
// .. and then assert that the upgraded serializer is valid with the migrated data
assertSerializerIsValid(
upgradedSerializer,
false,
migratedData,
testSpecification.verifier.testDataMatcher());
}
}
@Test
public void upgradedSerializerIsValidAfterReconfiguration() throws Exception {
try (ThreadContextClassLoader ignored =
new ThreadContextClassLoader(testSpecification.verifier.verifierClassloader)) {
TypeSerializerSnapshot<UpgradedElementT> restoredSerializerSnapshot =
snapshotUnderTest();
TypeSerializer<UpgradedElementT> upgradedSerializer =
testSpecification.verifier.createUpgradedSerializer();
TypeSerializerSchemaCompatibility<UpgradedElementT> upgradeCompatibility =
restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
assumeThat(
"This test only applies for test specifications that verify an upgraded serializer that requires reconfiguration to be compatible.",
upgradeCompatibility,
TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer());
TypeSerializer<UpgradedElementT> reconfiguredUpgradedSerializer =
upgradeCompatibility.getReconfiguredSerializer();
assertSerializerIsValid(
reconfiguredUpgradedSerializer,
false,
dataUnderTest(),
testSpecification.verifier.testDataMatcher());
}
}
@Test
public void upgradedSerializerIsValidWhenCompatibleAsIs() throws Exception {
try (ThreadContextClassLoader ignored =
new ThreadContextClassLoader(testSpecification.verifier.verifierClassloader)) {
TypeSerializerSnapshot<UpgradedElementT> restoredSerializerSnapshot =
snapshotUnderTest();
TypeSerializer<UpgradedElementT> upgradedSerializer =
testSpecification.verifier.createUpgradedSerializer();
TypeSerializerSchemaCompatibility<UpgradedElementT> upgradeCompatibility =
restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer);
assumeThat(
"This test only applies for test specifications that verify an upgraded serializer that is compatible as is.",
upgradeCompatibility,
TypeSerializerMatchers.isCompatibleAsIs());
assertSerializerIsValid(
upgradedSerializer,
false,
dataUnderTest(),
testSpecification.verifier.testDataMatcher());
}
}
/**
* Asserts that a given {@link TypeSerializer} is valid, given a {@link DataInputView} of
* serialized data.
*
* <p>A serializer is valid, iff:
*
* <ul>
* <li>1. The serializer can read and then write again the given serialized data.
* <li>2. The serializer can produce a serializer snapshot which can be written and then read
* back again.
* <li>3. The serializer's produced snapshot is capable of creating a restore serializer.
* <li>4. The restore serializer created from the serializer snapshot can read and then write
* again data written by step 1. Given that the serializer is not a restore serializer
* already.
* </ul>
*/
private static <T> void assertSerializerIsValid(
TypeSerializer<T> serializer,
boolean isRestoreSerializer,
DataInputView dataInput,
Matcher<T> testDataMatcher)
throws Exception {
DataInputView serializedData =
readAndThenWriteData(dataInput, serializer, serializer, testDataMatcher);
if (!isRestoreSerializer) {
TypeSerializerSnapshot<T> snapshot = writeAndThenReadSerializerSnapshot(serializer);
TypeSerializer<T> restoreSerializer = snapshot.restoreSerializer();
readAndThenWriteData(
serializedData, restoreSerializer, restoreSerializer, testDataMatcher);
}
}
// ------------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------------
/** Paths to use during snapshot generation, which should only use the CURRENT_VERSION. */
private Path getGenerateSerializerSnapshotFilePath() {
return Paths.get(getGenerateResourceDirectory() + "/serializer-snapshot");
}
/** Paths to use during snapshot generation, which should only use the CURRENT_VERSION. */
private Path getGenerateDataFilePath() {
return Paths.get(getGenerateResourceDirectory() + "/test-data");
}
/** Paths to use during snapshot generation, which should only use the CURRENT_VERSION. */
private String getGenerateResourceDirectory() {
return System.getProperty("user.dir")
+ "/src/test/resources/"
+ testSpecification.name
+ "-"
+ CURRENT_VERSION;
}
private Path getSerializerSnapshotFilePath() {
return Paths.get(getTestResourceDirectory() + "/serializer-snapshot");
}
private Path getTestDataFilePath() {
return Paths.get(getTestResourceDirectory() + "/test-data");
}
private String getTestResourceDirectory() {
return System.getProperty("user.dir")
+ "/src/test/resources/"
+ testSpecification.name
+ "-"
+ testSpecification.migrationVersion;
}
private TypeSerializerSnapshot<UpgradedElementT> snapshotUnderTest() throws Exception {
return readSerializerSnapshot(
contentsOf(getSerializerSnapshotFilePath()), testSpecification.migrationVersion);
}
private DataInputView dataUnderTest() {
return contentsOf(getTestDataFilePath());
}
private static void writeContentsTo(Path path, byte[] bytes) {
try {
Files.write(path, bytes);
} catch (IOException e) {
throw new RuntimeException("Failed to write to " + path, e);
}
}
private static DataInputView contentsOf(Path path) {
try {
byte[] bytes = Files.readAllBytes(path);
return new DataInputDeserializer(bytes);
} catch (IOException e) {
throw new RuntimeException("Failed to read contents of " + path, e);
}
}
private static <T> void writeSerializerSnapshot(
DataOutputView out, TypeSerializer<T> serializer, MigrationVersion migrationVersion)
throws IOException {
if (migrationVersion.isNewerVersionThan(MigrationVersion.v1_6)) {
writeSerializerSnapshotCurrentFormat(out, serializer);
} else {
writeSerializerSnapshotPre17Format(out, serializer);
}
}
private static <T> void writeSerializerSnapshotCurrentFormat(
DataOutputView out, TypeSerializer<T> serializer) throws IOException {
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
out, serializer.snapshotConfiguration(), serializer);
}
@SuppressWarnings("deprecation")
private static <T> void writeSerializerSnapshotPre17Format(
DataOutputView out, TypeSerializer<T> serializer) throws IOException {
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
out,
Collections.singletonList(
Tuple2.of(serializer, serializer.snapshotConfiguration())));
}
private static <T> TypeSerializerSnapshot<T> readSerializerSnapshot(
DataInputView in, MigrationVersion migrationVersion) throws IOException {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
if (migrationVersion.isNewerVersionThan(MigrationVersion.v1_6)) {
return readSerializerSnapshotCurrentFormat(in, classLoader);
} else {
return readSerializerSnapshotPre17Format(in, classLoader);
}
}
private static <T> TypeSerializerSnapshot<T> readSerializerSnapshotCurrentFormat(
DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
return TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
in, userCodeClassLoader, null);
}
@SuppressWarnings({"unchecked", "deprecation"})
private static <T> TypeSerializerSnapshot<T> readSerializerSnapshotPre17Format(
DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializerSnapshotPair =
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(
in, userCodeClassLoader);
return (TypeSerializerSnapshot<T>) serializerSnapshotPair.get(0).f1;
}
private static <T> DataInputView readAndThenWriteData(
DataInputView originalDataInput,
TypeSerializer<T> readSerializer,
TypeSerializer<T> writeSerializer,
Matcher<T> testDataMatcher)
throws IOException {
T data = readSerializer.deserialize(originalDataInput);
assertThat(data, testDataMatcher);
DataOutputSerializer out = new DataOutputSerializer(INITIAL_OUTPUT_BUFFER_SIZE);
writeSerializer.serialize(data, out);
return new DataInputDeserializer(out.wrapAsByteBuffer());
}
private static <T> TypeSerializerSnapshot<T> writeAndThenReadSerializerSnapshot(
TypeSerializer<T> serializer) throws IOException {
DataOutputSerializer out = new DataOutputSerializer(INITIAL_OUTPUT_BUFFER_SIZE);
writeSerializerSnapshotCurrentFormat(out, serializer);
DataInputDeserializer in = new DataInputDeserializer(out.wrapAsByteBuffer());
return readSerializerSnapshotCurrentFormat(
in, Thread.currentThread().getContextClassLoader());
}
}