blob: 57015c78be0af6d152a386530254e0151a97eecc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.junit.Test;
/**
* Abstract test base for serializers.
*
* We have a toString() call on all deserialized
* values because this is further evidence that the deserialized value is actually correct.
* (JodaTime DataTime) with the default KryoSerializer used to pass this test but the
* internal state would be corrupt, which becomes evident when toString is called.
*/
public abstract class SerializerTestBase<T> extends TestLogger {
protected abstract TypeSerializer<T> createSerializer();
/**
* Gets the expected length for the serializer's {@link TypeSerializer#getLength()} method.
*
* <p>The expected length should be positive, for fix-length data types, or {@code -1} for
* variable-length types.
*/
protected abstract int getLength();
protected abstract Class<T> getTypeClass();
protected abstract T[] getTestData();
// --------------------------------------------------------------------------------------------
@Test
public void testInstantiate() {
try {
TypeSerializer<T> serializer = getSerializer();
if(serializer.getClass().getName().endsWith("KryoSerializer")) {
// the kryo serializer will return null. We ignore this test for Kryo.
return;
}
T instance = serializer.createInstance();
assertNotNull("The created instance must not be null.", instance);
Class<T> type = getTypeClass();
assertNotNull("The test is corrupt: type class is null.", type);
if (!type.isAssignableFrom(instance.getClass())) {
fail("Type of the instantiated object is wrong. " +
"Expected Type: " + type + " present type " + instance.getClass());
}
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Exception in test: " + e.getMessage());
}
}
@Test
public void testConfigSnapshotInstantiation() {
TypeSerializerConfigSnapshot configSnapshot = getSerializer().snapshotConfiguration();
InstantiationUtil.instantiate(configSnapshot.getClass());
}
@Test
public void testSnapshotConfigurationAndReconfigure() throws Exception {
final TypeSerializerConfigSnapshot configSnapshot = getSerializer().snapshotConfiguration();
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(
new DataOutputViewStreamWrapper(out), configSnapshot);
serializedConfig = out.toByteArray();
}
TypeSerializerConfigSnapshot restoredConfig;
try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
restoredConfig = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
CompatibilityResult strategy = getSerializer().ensureCompatibility(restoredConfig);
assertFalse(strategy.isRequiresMigration());
// also verify that the serializer's reconfigure implementation detects incompatibility
strategy = getSerializer().ensureCompatibility(new TestIncompatibleSerializerConfigSnapshot());
assertTrue(strategy.isRequiresMigration());
}
@Test
public void testGetLength() {
final int len = getLength();
if (len == 0) {
fail("Broken serializer test base - zero length cannot be the expected length");
}
try {
TypeSerializer<T> serializer = getSerializer();
assertEquals(len, serializer.getLength());
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Exception in test: " + e.getMessage());
}
}
@Test
public void testCopy() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
for (T datum : testData) {
T copy = serializer.copy(datum);
copy.toString();
deepEquals("Copied element is not equal to the original element.", datum, copy);
}
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Exception in test: " + e.getMessage());
}
}
@Test
public void testCopyIntoNewElements() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
for (T datum : testData) {
T copy = serializer.copy(datum, serializer.createInstance());
copy.toString();
deepEquals("Copied element is not equal to the original element.", datum, copy);
}
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Exception in test: " + e.getMessage());
}
}
@Test
public void testCopyIntoReusedElements() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
T target = serializer.createInstance();
for (T datum : testData) {
T copy = serializer.copy(datum, target);
copy.toString();
deepEquals("Copied element is not equal to the original element.", datum, copy);
target = copy;
}
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Exception in test: " + e.getMessage());
}
}
@Test
public void testSerializeIndividually() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
for (T value : testData) {
TestOutputView out = new TestOutputView();
serializer.serialize(value, out);
TestInputView in = out.getInputView();
assertTrue("No data available during deserialization.", in.available() > 0);
T deserialized = serializer.deserialize(serializer.createInstance(), in);
deserialized.toString();
deepEquals("Deserialized value if wrong.", value, deserialized);
assertTrue("Trailing data available after deserialization.", in.available() == 0);
}
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Exception in test: " + e.getMessage());
}
}
@Test
public void testSerializeIndividuallyReusingValues() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
T reuseValue = serializer.createInstance();
for (T value : testData) {
TestOutputView out = new TestOutputView();
serializer.serialize(value, out);
TestInputView in = out.getInputView();
assertTrue("No data available during deserialization.", in.available() > 0);
T deserialized = serializer.deserialize(reuseValue, in);
deserialized.toString();
deepEquals("Deserialized value if wrong.", value, deserialized);
assertTrue("Trailing data available after deserialization.", in.available() == 0);
reuseValue = deserialized;
}
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Exception in test: " + e.getMessage());
}
}
@Test
public void testSerializeAsSequenceNoReuse() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
TestOutputView out = new TestOutputView();
for (T value : testData) {
serializer.serialize(value, out);
}
TestInputView in = out.getInputView();
int num = 0;
while (in.available() > 0) {
T deserialized = serializer.deserialize(in);
deserialized.toString();
deepEquals("Deserialized value if wrong.", testData[num], deserialized);
num++;
}
assertEquals("Wrong number of elements deserialized.", testData.length, num);
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Exception in test: " + e.getMessage());
}
}
@Test
public void testSerializeAsSequenceReusingValues() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
TestOutputView out = new TestOutputView();
for (T value : testData) {
serializer.serialize(value, out);
}
TestInputView in = out.getInputView();
T reuseValue = serializer.createInstance();
int num = 0;
while (in.available() > 0) {
T deserialized = serializer.deserialize(reuseValue, in);
deserialized.toString();
deepEquals("Deserialized value if wrong.", testData[num], deserialized);
reuseValue = deserialized;
num++;
}
assertEquals("Wrong number of elements deserialized.", testData.length, num);
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Exception in test: " + e.getMessage());
}
}
@Test
public void testSerializedCopyIndividually() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
for (T value : testData) {
TestOutputView out = new TestOutputView();
serializer.serialize(value, out);
TestInputView source = out.getInputView();
TestOutputView target = new TestOutputView();
serializer.copy(source, target);
TestInputView toVerify = target.getInputView();
assertTrue("No data available copying.", toVerify.available() > 0);
T deserialized = serializer.deserialize(serializer.createInstance(), toVerify);
deserialized.toString();
deepEquals("Deserialized value if wrong.", value, deserialized);
assertTrue("Trailing data available after deserialization.", toVerify.available() == 0);
}
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Exception in test: " + e.getMessage());
}
}
@Test
public void testSerializedCopyAsSequence() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
TestOutputView out = new TestOutputView();
for (T value : testData) {
serializer.serialize(value, out);
}
TestInputView source = out.getInputView();
TestOutputView target = new TestOutputView();
for (int i = 0; i < testData.length; i++) {
serializer.copy(source, target);
}
TestInputView toVerify = target.getInputView();
int num = 0;
while (toVerify.available() > 0) {
T deserialized = serializer.deserialize(serializer.createInstance(), toVerify);
deserialized.toString();
deepEquals("Deserialized value if wrong.", testData[num], deserialized);
num++;
}
assertEquals("Wrong number of elements copied.", testData.length, num);
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Exception in test: " + e.getMessage());
}
}
@Test
public void testSerializabilityAndEquals() {
try {
TypeSerializer<T> ser1 = getSerializer();
TypeSerializer<T> ser2;
try {
ser2 = SerializationUtils.clone(ser1);
} catch (SerializationException e) {
fail("The serializer is not serializable: " + e);
return;
}
assertEquals("The copy of the serializer is not equal to the original one.", ser1, ser2);
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Exception in test: " + e.getMessage());
}
}
// --------------------------------------------------------------------------------------------
protected void deepEquals(String message, T should, T is) {
if (should.getClass().isArray()) {
if (should instanceof boolean[]) {
Assert.assertTrue(message, Arrays.equals((boolean[]) should, (boolean[]) is));
}
else if (should instanceof byte[]) {
assertArrayEquals(message, (byte[]) should, (byte[]) is);
}
else if (should instanceof short[]) {
assertArrayEquals(message, (short[]) should, (short[]) is);
}
else if (should instanceof int[]) {
assertArrayEquals(message, (int[]) should, (int[]) is);
}
else if (should instanceof long[]) {
assertArrayEquals(message, (long[]) should, (long[]) is);
}
else if (should instanceof float[]) {
assertArrayEquals(message, (float[]) should, (float[]) is, 0.0f);
}
else if (should instanceof double[]) {
assertArrayEquals(message, (double[]) should, (double[]) is, 0.0);
}
else if (should instanceof char[]) {
assertArrayEquals(message, (char[]) should, (char[]) is);
}
else {
assertArrayEquals(message, (Object[]) should, (Object[]) is);
}
}
else if (should instanceof Throwable) {
assertEquals(((Throwable)should).getMessage(), ((Throwable)is).getMessage());
}
else {
assertEquals(message, should, is);
}
}
// --------------------------------------------------------------------------------------------
protected TypeSerializer<T> getSerializer() {
TypeSerializer<T> serializer = createSerializer();
if (serializer == null) {
throw new RuntimeException("Test case corrupt. Returns null as serializer.");
}
return serializer;
}
private T[] getData() {
T[] data = getTestData();
if (data == null) {
throw new RuntimeException("Test case corrupt. Returns null as test data.");
}
return data;
}
// --------------------------------------------------------------------------------------------
private static final class TestOutputView extends DataOutputStream implements DataOutputView {
public TestOutputView() {
super(new ByteArrayOutputStream(4096));
}
public TestInputView getInputView() {
ByteArrayOutputStream baos = (ByteArrayOutputStream) out;
return new TestInputView(baos.toByteArray());
}
@Override
public void skipBytesToWrite(int numBytes) throws IOException {
for (int i = 0; i < numBytes; i++) {
write(0);
}
}
@Override
public void write(DataInputView source, int numBytes) throws IOException {
byte[] buffer = new byte[numBytes];
source.readFully(buffer);
write(buffer);
}
}
private static final class TestInputView extends DataInputStream implements DataInputView {
public TestInputView(byte[] data) {
super(new ByteArrayInputStream(data));
}
@Override
public void skipBytesToRead(int numBytes) throws IOException {
while (numBytes > 0) {
int skipped = skipBytes(numBytes);
numBytes -= skipped;
}
}
}
public static final class TestIncompatibleSerializerConfigSnapshot extends TypeSerializerConfigSnapshot {
@Override
public int getVersion() {
return 0;
}
@Override
public boolean equals(Object obj) {
return obj instanceof TestIncompatibleSerializerConfigSnapshot;
}
@Override
public int hashCode() {
return getClass().hashCode();
}
}
}