| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.api.common.typeutils; |
| |
| import org.apache.flink.api.common.typeutils.base.DoubleSerializer; |
| import org.apache.flink.api.common.typeutils.base.IntSerializer; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; |
| import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; |
| import org.apache.flink.core.memory.DataInputView; |
| import org.apache.flink.core.memory.DataInputViewStreamWrapper; |
| import org.apache.flink.core.memory.DataOutputView; |
| import org.apache.flink.core.memory.DataOutputViewStreamWrapper; |
| import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader; |
| import org.apache.flink.util.InstantiationUtil; |
| import org.junit.Assert; |
| import org.junit.ClassRule; |
| import org.junit.Test; |
| import org.junit.rules.TemporaryFolder; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.io.InvalidClassException; |
| import java.io.ObjectStreamClass; |
| import java.io.Serializable; |
| import java.net.URL; |
| import java.net.URLClassLoader; |
| import java.nio.ByteBuffer; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.fail; |
| |
| /** |
| * Unit tests for {@link TypeSerializerSerializationUtil}. |
| */ |
| public class TypeSerializerSerializationUtilTest implements Serializable { |
| |
| @ClassRule |
| public static TemporaryFolder temporaryFolder = new TemporaryFolder(); |
| |
| /** |
| * Verifies that reading and writing serializers work correctly. |
| */ |
| @Test |
| public void testSerializerSerialization() throws Exception { |
| |
| TypeSerializer<?> serializer = IntSerializer.INSTANCE; |
| |
| byte[] serialized; |
| try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { |
| TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer); |
| serialized = out.toByteArray(); |
| } |
| |
| TypeSerializer<?> deserializedSerializer; |
| try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { |
| deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer( |
| new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); |
| } |
| |
| Assert.assertEquals(serializer, deserializedSerializer); |
| } |
| |
| /** |
| * Verifies deserialization failure cases when reading a serializer from bytes, in the |
| * case of a {@link ClassNotFoundException}. |
| */ |
| @Test |
| public void testSerializerSerializationWithClassNotFound() throws Exception { |
| |
| TypeSerializer<?> serializer = IntSerializer.INSTANCE; |
| |
| byte[] serialized; |
| try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { |
| TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer); |
| serialized = out.toByteArray(); |
| } |
| |
| TypeSerializer<?> deserializedSerializer; |
| |
| try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { |
| deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer( |
| new DataInputViewStreamWrapper(in), |
| new ArtificialCNFExceptionThrowingClassLoader( |
| Thread.currentThread().getContextClassLoader(), |
| Collections.singleton(IntSerializer.class.getName())), |
| true); |
| } |
| Assert.assertTrue(deserializedSerializer instanceof UnloadableDummyTypeSerializer); |
| |
| Assert.assertArrayEquals( |
| InstantiationUtil.serializeObject(serializer), |
| ((UnloadableDummyTypeSerializer<?>) deserializedSerializer).getActualBytes()); |
| } |
| |
| /** |
| * Verifies deserialization failure cases when reading a serializer from bytes, in the |
| * case of a {@link InvalidClassException}. |
| */ |
| @Test |
| public void testSerializerSerializationWithInvalidClass() throws Exception { |
| |
| TypeSerializer<?> serializer = IntSerializer.INSTANCE; |
| |
| byte[] serialized; |
| try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { |
| TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer); |
| serialized = out.toByteArray(); |
| } |
| |
| TypeSerializer<?> deserializedSerializer; |
| |
| try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { |
| deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer( |
| new DataInputViewStreamWrapper(in), |
| new ArtificialCNFExceptionThrowingClassLoader( |
| Thread.currentThread().getContextClassLoader(), |
| Collections.singleton(IntSerializer.class.getName())), |
| true); |
| } |
| Assert.assertTrue(deserializedSerializer instanceof UnloadableDummyTypeSerializer); |
| } |
| |
| /** |
| * Verifies that reading and writing configuration snapshots work correctly. |
| */ |
| @Test |
| public void testSerializeConfigurationSnapshots() throws Exception { |
| TypeSerializerSerializationUtilTest.TestConfigSnapshot configSnapshot1 = |
| new TypeSerializerSerializationUtilTest.TestConfigSnapshot(1, "foo"); |
| |
| TypeSerializerSerializationUtilTest.TestConfigSnapshot configSnapshot2 = |
| new TypeSerializerSerializationUtilTest.TestConfigSnapshot(2, "bar"); |
| |
| byte[] serializedConfig; |
| try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { |
| TypeSerializerSerializationUtil.writeSerializerConfigSnapshots( |
| new DataOutputViewStreamWrapper(out), |
| configSnapshot1, |
| configSnapshot2); |
| |
| serializedConfig = out.toByteArray(); |
| } |
| |
| TypeSerializerConfigSnapshot[] restoredConfigs; |
| try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { |
| restoredConfigs = TypeSerializerSerializationUtil.readSerializerConfigSnapshots( |
| new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); |
| } |
| |
| assertEquals(2, restoredConfigs.length); |
| assertEquals(configSnapshot1, restoredConfigs[0]); |
| assertEquals(configSnapshot2, restoredConfigs[1]); |
| } |
| |
| /** |
| * Verifies that deserializing config snapshots fail if the config class could not be found. |
| */ |
| @Test |
| public void testFailsWhenConfigurationSnapshotClassNotFound() throws Exception { |
| byte[] serializedConfig; |
| try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { |
| TypeSerializerSerializationUtil.writeSerializerConfigSnapshot( |
| new DataOutputViewStreamWrapper(out), new TypeSerializerSerializationUtilTest.TestConfigSnapshot(123, "foobar")); |
| serializedConfig = out.toByteArray(); |
| } |
| |
| try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { |
| // read using a dummy classloader |
| TypeSerializerSerializationUtil.readSerializerConfigSnapshot( |
| new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null)); |
| fail("Expected a ClassNotFoundException wrapped in IOException"); |
| } catch (IOException expected) { |
| // test passes |
| } |
| } |
| |
| /** |
| * Verifies resilience to serializer deserialization failures when writing and reading |
| * serializer and config snapshot pairs. |
| */ |
| @Test |
| public void testSerializerAndConfigPairsSerializationWithSerializerDeserializationFailures() throws Exception { |
| List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigs = Arrays.asList( |
| new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( |
| IntSerializer.INSTANCE, IntSerializer.INSTANCE.snapshotConfiguration()), |
| new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( |
| DoubleSerializer.INSTANCE, DoubleSerializer.INSTANCE.snapshotConfiguration())); |
| |
| byte[] serializedSerializersAndConfigs; |
| try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { |
| TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( |
| new DataOutputViewStreamWrapper(out), serializersAndConfigs); |
| serializedSerializersAndConfigs = out.toByteArray(); |
| } |
| |
| Set<String> cnfThrowingClassnames = new HashSet<>(); |
| cnfThrowingClassnames.add(IntSerializer.class.getName()); |
| cnfThrowingClassnames.add(DoubleSerializer.class.getName()); |
| |
| List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> restored; |
| try (ByteArrayInputStream in = new ByteArrayInputStream(serializedSerializersAndConfigs)) { |
| restored = TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience( |
| new DataInputViewStreamWrapper(in), |
| new ArtificialCNFExceptionThrowingClassLoader( |
| Thread.currentThread().getContextClassLoader(), |
| cnfThrowingClassnames)); |
| } |
| |
| Assert.assertEquals(2, restored.size()); |
| Assert.assertTrue(restored.get(0).f0 instanceof UnloadableDummyTypeSerializer); |
| Assert.assertEquals(IntSerializer.INSTANCE.snapshotConfiguration(), restored.get(0).f1); |
| Assert.assertTrue(restored.get(1).f0 instanceof UnloadableDummyTypeSerializer); |
| Assert.assertEquals(DoubleSerializer.INSTANCE.snapshotConfiguration(), restored.get(1).f1); |
| } |
| |
| /** |
| * Verifies that serializers of anonymous classes can be deserialized, even if serialVersionUID changes. |
| */ |
| @Test |
| public void testAnonymousSerializerClassWithChangedSerialVersionUID() throws Exception { |
| |
| TypeSerializer anonymousClassSerializer = new AbstractIntSerializer() {}; |
| // assert that our assumption holds |
| Assert.assertTrue(anonymousClassSerializer.getClass().isAnonymousClass()); |
| |
| byte[] anonymousSerializerBytes; |
| try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { |
| TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), anonymousClassSerializer); |
| anonymousSerializerBytes = out.toByteArray(); |
| } |
| |
| long newSerialVersionUID = 1234567L; |
| // assert that we're actually modifying to a different serialVersionUID |
| Assert.assertNotEquals(ObjectStreamClass.lookup(anonymousClassSerializer.getClass()).getSerialVersionUID(), newSerialVersionUID); |
| modifySerialVersionUID(anonymousSerializerBytes, anonymousClassSerializer.getClass().getName(), newSerialVersionUID); |
| |
| try (ByteArrayInputStream in = new ByteArrayInputStream(anonymousSerializerBytes)) { |
| anonymousClassSerializer = TypeSerializerSerializationUtil.tryReadSerializer(new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); |
| } |
| |
| // serializer should have been deserialized despite serialVersionUID mismatch |
| Assert.assertNotNull(anonymousClassSerializer); |
| Assert.assertTrue(anonymousClassSerializer.getClass().isAnonymousClass()); |
| } |
| |
| public static class TestConfigSnapshot extends TypeSerializerConfigSnapshot { |
| |
| static final int VERSION = 1; |
| |
| private int val; |
| private String msg; |
| |
| public TestConfigSnapshot() {} |
| |
| public TestConfigSnapshot(int val, String msg) { |
| this.val = val; |
| this.msg = msg; |
| } |
| |
| @Override |
| public void write(DataOutputView out) throws IOException { |
| super.write(out); |
| out.writeInt(val); |
| out.writeUTF(msg); |
| } |
| |
| @Override |
| public void read(DataInputView in) throws IOException { |
| super.read(in); |
| val = in.readInt(); |
| msg = in.readUTF(); |
| } |
| |
| @Override |
| public int getVersion() { |
| return VERSION; |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (obj == this) { |
| return true; |
| } |
| |
| if (obj == null) { |
| return false; |
| } |
| |
| if (obj instanceof TypeSerializerSerializationUtilTest.TestConfigSnapshot) { |
| return val == ((TypeSerializerSerializationUtilTest.TestConfigSnapshot) obj).val |
| && msg.equals(((TypeSerializerSerializationUtilTest.TestConfigSnapshot) obj).msg); |
| } else { |
| return false; |
| } |
| } |
| |
| @Override |
| public int hashCode() { |
| return 31 * val + msg.hashCode(); |
| } |
| } |
| |
| private static void modifySerialVersionUID(byte[] objectBytes, String classname, long newSerialVersionUID) throws Exception { |
| byte[] classnameBytes = classname.getBytes(); |
| |
| // serialVersionUID follows directly after classname in the object byte stream; |
| // advance serialVersionUIDPosition until end of classname in stream |
| int serialVersionUIDOffset; |
| boolean foundClass = false; |
| int numMatchedBytes = 0; |
| for (serialVersionUIDOffset = 0; serialVersionUIDOffset < objectBytes.length; serialVersionUIDOffset++) { |
| if (objectBytes[serialVersionUIDOffset] == classnameBytes[numMatchedBytes]) { |
| numMatchedBytes++; |
| foundClass = true; |
| } else { |
| if (objectBytes[serialVersionUIDOffset] == classnameBytes[0]) { |
| numMatchedBytes = 1; |
| } else { |
| numMatchedBytes = 0; |
| foundClass = false; |
| } |
| } |
| |
| if (numMatchedBytes == classnameBytes.length) { |
| break; |
| } |
| } |
| |
| if (!foundClass) { |
| throw new RuntimeException("Could not find class " + classname + " in object byte stream."); |
| } |
| |
| byte[] newUIDBytes = ByteBuffer.allocate(Long.SIZE / Byte.SIZE).putLong(newSerialVersionUID).array(); |
| |
| // replace original serialVersionUID bytes with new serialVersionUID bytes |
| for (int uidIndex = 0; uidIndex < newUIDBytes.length; uidIndex++) { |
| objectBytes[serialVersionUIDOffset + 1 + uidIndex] = newUIDBytes[uidIndex]; |
| } |
| } |
| |
| public static abstract class AbstractIntSerializer extends TypeSerializer<Integer> { |
| |
| public static final long serialVersionUID = 1; |
| |
| @Override |
| public Integer createInstance() { |
| return IntSerializer.INSTANCE.createInstance(); |
| } |
| |
| @Override |
| public boolean isImmutableType() { |
| return IntSerializer.INSTANCE.isImmutableType(); |
| } |
| |
| @Override |
| public Integer copy(Integer from) { |
| return IntSerializer.INSTANCE.copy(from); |
| } |
| |
| @Override |
| public Integer copy(Integer from, Integer reuse) { |
| return IntSerializer.INSTANCE.copy(from, reuse); |
| } |
| |
| @Override |
| public void copy(DataInputView source, DataOutputView target) throws IOException { |
| IntSerializer.INSTANCE.copy(source, target); |
| } |
| |
| @Override |
| public Integer deserialize(DataInputView source) throws IOException { |
| return IntSerializer.INSTANCE.deserialize(source); |
| } |
| |
| @Override |
| public Integer deserialize(Integer reuse, DataInputView source) throws IOException { |
| return IntSerializer.INSTANCE.deserialize(reuse, source); |
| } |
| |
| @Override |
| public void serialize(Integer record, DataOutputView target) throws IOException { |
| IntSerializer.INSTANCE.serialize(record, target); |
| } |
| |
| @Override |
| public TypeSerializer<Integer> duplicate() { |
| return IntSerializer.INSTANCE.duplicate(); |
| } |
| |
| @Override |
| public TypeSerializerConfigSnapshot snapshotConfiguration() { |
| return IntSerializer.INSTANCE.snapshotConfiguration(); |
| } |
| |
| @Override |
| public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { |
| return IntSerializer.INSTANCE.ensureCompatibility(configSnapshot); |
| } |
| |
| @Override |
| public int getLength() { |
| return IntSerializer.INSTANCE.getLength(); |
| } |
| |
| @Override |
| public boolean canEqual(Object obj) { |
| return IntSerializer.INSTANCE.canEqual(obj); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| return IntSerializer.INSTANCE.equals(obj); |
| } |
| |
| @Override |
| public int hashCode() { |
| return IntSerializer.INSTANCE.hashCode(); |
| } |
| } |
| } |