blob: 57c939d54b0d00dc86685df83561e93a33e53db5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Test;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Supplier;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
/**
* A test base for verifying {@link TypeSerializerSnapshot} migration.
*
* @param <ElementT> the element being serialized.
*/
public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends TestLogger {
private final TestSpecification<ElementT> testSpecification;
protected TypeSerializerSnapshotMigrationTestBase(TestSpecification<ElementT> testSpecification) {
this.testSpecification = checkNotNull(testSpecification);
}
@Test
public void serializerSnapshotIsSuccessfullyRead() {
TypeSerializerSnapshot<ElementT> snapshot = snapshotUnderTest();
assertThat(snapshot, allOf(
notNullValue(),
instanceOf(TypeSerializerSnapshot.class)
));
}
@Test
public void specifiedNewSerializerHasExpectedCompatibilityResultsWithSnapshot() {
TypeSerializerSnapshot<ElementT> snapshot = snapshotUnderTest();
TypeSerializerSchemaCompatibility<ElementT> result = snapshot.resolveSchemaCompatibility(testSpecification.createSerializer());
assertThat(result, hasSameCompatibilityType(testSpecification.expectedCompatibilityResult));
}
@Test
public void restoredSerializerIsAbleToDeserializePreviousData() throws IOException {
TypeSerializerSnapshot<ElementT> snapshot = snapshotUnderTest();
TypeSerializer<ElementT> serializer = snapshot.restoreSerializer();
DataInputView input = dataUnderTest();
for (int i = 0; i < testSpecification.testDataCount; i++) {
final ElementT result = serializer.deserialize(input);
assertThat(result, notNullValue());
}
}
@SuppressWarnings("deprecation")
@Test
public void movingForward() throws IOException {
TypeSerializerSnapshot<ElementT> previousSnapshot = snapshotUnderTest();
TypeSerializer<ElementT> restoredSerializer = previousSnapshot.restoreSerializer();
TypeSerializerSnapshot<ElementT> nextSnapshot = restoredSerializer.snapshotConfiguration();
assertThat(nextSnapshot, instanceOf(testSpecification.snapshotClass));
TypeSerializerSnapshot<ElementT> nextSnapshotDeserialized = writeAndThenReadTheSnapshot(restoredSerializer, nextSnapshot);
assertThat(nextSnapshotDeserialized, allOf(
notNullValue(),
not(instanceOf(TypeSerializerConfigSnapshot.class))
));
}
// --------------------------------------------------------------------------------------------------------------
// Test Helpers
// --------------------------------------------------------------------------------------------------------------
private TypeSerializerSnapshot<ElementT> writeAndThenReadTheSnapshot(
TypeSerializer<ElementT> serializer,
TypeSerializerSnapshot<ElementT> newSnapshot) throws IOException {
DataOutputSerializer out = new DataOutputSerializer(128);
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(out, newSnapshot, serializer);
DataInputView in = new DataInputDeserializer(out.wrapAsByteBuffer());
return readSnapshot(in);
}
private TypeSerializerSnapshot<ElementT> snapshotUnderTest() {
DataInputView input = contentsOf(testSpecification.getSnapshotDataLocation());
try {
if (!testSpecification.getTestMigrationVersion().isNewerVersionThan(MigrationVersion.v1_6)) {
return readPre17SnapshotFormat(input);
} else {
return readSnapshot(input);
}
}
catch (IOException e) {
throw new RuntimeException("Unable to read " + testSpecification.getSnapshotDataLocation(), e);
}
}
@SuppressWarnings({"unchecked", "deprecation"})
private TypeSerializerSnapshot<ElementT> readPre17SnapshotFormat(DataInputView input) throws IOException {
final ClassLoader cl = Thread.currentThread().getContextClassLoader();
List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializers =
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(input, cl);
return (TypeSerializerSnapshot<ElementT>) serializers.get(0).f1;
}
private TypeSerializerSnapshot<ElementT> readSnapshot(DataInputView in) throws IOException {
return TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
in, Thread.currentThread().getContextClassLoader(), null);
}
private DataInputView dataUnderTest() {
return contentsOf(testSpecification.getTestDataLocation());
}
// --------------------------------------------------------------------------------------------------------------
// Static Helpers
// --------------------------------------------------------------------------------------------------------------
private static DataInputView contentsOf(Path path) {
try {
byte[] bytes = Files.readAllBytes(path);
return new DataInputDeserializer(bytes);
}
catch (IOException e) {
throw new RuntimeException("Failed to read " + path, e);
}
}
private static Path resourcePath(String resourceName) {
checkNotNull(resourceName, "resource name can not be NULL");
try {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
URL resource = cl.getResource(resourceName);
if (resource == null) {
throw new IllegalArgumentException("unable locate test data " + resourceName);
}
return Paths.get(resource.toURI());
}
catch (URISyntaxException e) {
throw new RuntimeException("unable", e);
}
}
// --------------------------------------------------------------------------------------------------------------
// Test Specification
// --------------------------------------------------------------------------------------------------------------
protected static final class TestSpecification<T> {
private final Class<? extends TypeSerializer<T>> serializerType;
private final Class<? extends TypeSerializerSnapshot<T>> snapshotClass;
private final String name;
private final MigrationVersion testMigrationVersion;
private Supplier<? extends TypeSerializer<T>> serializerProvider;
private TypeSerializerSchemaCompatibility<T> expectedCompatibilityResult;
private String snapshotDataLocation;
private String testDataLocation;
private int testDataCount;
@SuppressWarnings("unchecked")
public static <T> TestSpecification<T> builder(
String name,
Class<? extends TypeSerializer> serializerClass,
Class<? extends TypeSerializerSnapshot> snapshotClass,
MigrationVersion testMigrationVersion) {
return new TestSpecification<>(
name,
(Class<? extends TypeSerializer<T>>) serializerClass,
(Class<? extends TypeSerializerSnapshot<T>>) snapshotClass,
testMigrationVersion);
}
private TestSpecification(
String name,
Class<? extends TypeSerializer<T>> serializerType,
Class<? extends TypeSerializerSnapshot<T>> snapshotClass,
MigrationVersion testMigrationVersion) {
this.name = name;
this.serializerType = serializerType;
this.snapshotClass = snapshotClass;
this.testMigrationVersion = testMigrationVersion;
}
public TestSpecification<T> withNewSerializerProvider(Supplier<? extends TypeSerializer<T>> serializerProvider) {
return withNewSerializerProvider(serializerProvider, TypeSerializerSchemaCompatibility.compatibleAsIs());
}
public TestSpecification<T> withNewSerializerProvider(
Supplier<? extends TypeSerializer<T>> serializerProvider,
TypeSerializerSchemaCompatibility<T> expectedCompatibilityResult) {
this.serializerProvider = serializerProvider;
this.expectedCompatibilityResult = expectedCompatibilityResult;
return this;
}
public TestSpecification<T> withSnapshotDataLocation(String snapshotDataLocation) {
this.snapshotDataLocation = snapshotDataLocation;
return this;
}
public TestSpecification<T> withTestData(String testDataLocation, int testDataCount) {
this.testDataLocation = testDataLocation;
this.testDataCount = testDataCount;
return this;
}
private TypeSerializer<T> createSerializer() {
try {
return (serializerProvider == null) ? serializerType.newInstance() : serializerProvider.get();
}
catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException("serializer provider was not set, and creating the serializer reflectively failed.", e);
}
}
private Path getTestDataLocation() {
return resourcePath(this.testDataLocation);
}
private Path getSnapshotDataLocation() {
return resourcePath(this.snapshotDataLocation);
}
private MigrationVersion getTestMigrationVersion() {
return testMigrationVersion;
}
public Class<? extends TypeSerializerSnapshot<T>> getSnapshotClass() {
return snapshotClass;
}
@Override
public String toString() {
return String.format("%s , %s, %s", name, serializerType.getSimpleName(), snapshotClass.getSimpleName());
}
}
/**
* Utility class to help build a collection of {@link TestSpecification} for
* multiple test migration versions. For each test specification added,
* an entry will be added for each specified migration version.
*/
protected static final class TestSpecifications {
private static final int DEFAULT_TEST_DATA_COUNT = 10;
private static final String DEFAULT_SNAPSHOT_FILENAME_FORMAT = "flink-%s-%s-snapshot";
private static final String DEFAULT_TEST_DATA_FILENAME_FORMAT = "flink-%s-%s-data";
private final Collection<TestSpecification<?>> testSpecifications = new LinkedList<>();
private final MigrationVersion[] testVersions;
public TestSpecifications(MigrationVersion... testVersions) {
checkArgument(
testVersions.length > 0,
"At least one test migration version should be specified.");
this.testVersions = testVersions;
}
/**
* Adds a test specification to be tested for all specified test versions.
*
* <p>This method adds the specification with pre-defined snapshot and data filenames,
* with the format "flink-&lt;testVersion&gt;-&lt;specName&gt;-&lt;data/snapshot&gt;",
* and each specification's test data count is assumed to always be 10.
*
* @param name test specification name.
* @param serializerClass class of the current serializer.
* @param snapshotClass class of the current serializer snapshot class.
* @param serializerProvider provider for an instance of the current serializer.
*
* @param <T> type of the test data.
*/
public <T> void add(
String name,
Class<? extends TypeSerializer> serializerClass,
Class<? extends TypeSerializerSnapshot> snapshotClass,
Supplier<? extends TypeSerializer<T>> serializerProvider) {
for (MigrationVersion testVersion : testVersions) {
testSpecifications.add(
TestSpecification.<T>builder(
getSpecNameForVersion(name, testVersion),
serializerClass,
snapshotClass,
testVersion)
.withNewSerializerProvider(serializerProvider)
.withSnapshotDataLocation(
String.format(DEFAULT_SNAPSHOT_FILENAME_FORMAT, testVersion, name))
.withTestData(
String.format(DEFAULT_TEST_DATA_FILENAME_FORMAT, testVersion, name),
DEFAULT_TEST_DATA_COUNT)
);
}
}
/**
* Adds a test specification to be tested for all specified test versions.
*
* @param name test specification name.
* @param serializerClass class of the current serializer.
* @param snapshotClass class of the current serializer snapshot class.
* @param serializerProvider provider for an instance of the current serializer.
* @param testSnapshotFilenameProvider provider for the filename of the test snapshot.
* @param testDataFilenameProvider provider for the filename of the test data.
* @param testDataCount expected number of records to be read in the test data files.
*
* @param <T> type of the test data.
*/
public <T> void add(
String name,
Class<? extends TypeSerializer> serializerClass,
Class<? extends TypeSerializerSnapshot> snapshotClass,
Supplier<? extends TypeSerializer<T>> serializerProvider,
TestResourceFilenameSupplier testSnapshotFilenameProvider,
TestResourceFilenameSupplier testDataFilenameProvider,
int testDataCount) {
for (MigrationVersion testVersion : testVersions) {
testSpecifications.add(
TestSpecification.<T>builder(
getSpecNameForVersion(name, testVersion),
serializerClass,
snapshotClass,
testVersion)
.withNewSerializerProvider(serializerProvider)
.withSnapshotDataLocation(testSnapshotFilenameProvider.get(testVersion))
.withTestData(testDataFilenameProvider.get(testVersion), testDataCount)
);
}
}
public Collection<TestSpecification<?>> get() {
return Collections.unmodifiableCollection(testSpecifications);
}
private static String getSpecNameForVersion(String baseName, MigrationVersion testVersion) {
return testVersion + "-" + baseName;
}
}
protected interface TestResourceFilenameSupplier {
String get(MigrationVersion testVersion);
}
// --------------------------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------------------------
private <T> Matcher<TypeSerializerSchemaCompatibility<T>> hasSameCompatibilityType(TypeSerializerSchemaCompatibility<T> expectedCompatibilty) {
return new TypeSafeMatcher<TypeSerializerSchemaCompatibility<T>>() {
@Override
protected boolean matchesSafely(TypeSerializerSchemaCompatibility<T> testResultCompatibility) {
if (expectedCompatibilty.isCompatibleAsIs()) {
return testResultCompatibility.isCompatibleAsIs();
} else if (expectedCompatibilty.isIncompatible()) {
return testResultCompatibility.isCompatibleAfterMigration();
} else if (expectedCompatibilty.isIncompatible()) {
return testResultCompatibility.isIncompatible();
} else if (expectedCompatibilty.isCompatibleWithReconfiguredSerializer()) {
return testResultCompatibility.isCompatibleWithReconfiguredSerializer();
}
return false;
}
@Override
public void describeTo(Description description) {
description.appendText("same compatibility as ").appendValue(expectedCompatibilty);
}
};
}
}