| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.api.common.state; |
| |
| import org.apache.flink.api.common.ExecutionConfig; |
| import org.apache.flink.api.common.typeinfo.TypeInformation; |
| import org.apache.flink.api.common.typeutils.TypeSerializer; |
| import org.apache.flink.api.common.typeutils.base.StringSerializer; |
| import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; |
| import org.apache.flink.core.fs.Path; |
| import org.apache.flink.core.testutils.CommonTestUtils; |
| |
| import org.junit.Test; |
| |
| import java.io.File; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNotSame; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| /** |
| * Tests for the common/shared functionality of {@link StateDescriptor}. |
| */ |
| public class StateDescriptorTest { |
| |
| // ------------------------------------------------------------------------ |
| // Tests for serializer initialization |
| // ------------------------------------------------------------------------ |
| |
| @Test |
| public void testInitializeWithSerializer() throws Exception { |
| final TypeSerializer<String> serializer = StringSerializer.INSTANCE; |
| final TestStateDescriptor<String> descr = new TestStateDescriptor<>("test", serializer); |
| |
| assertTrue(descr.isSerializerInitialized()); |
| assertNotNull(descr.getSerializer()); |
| assertTrue(descr.getSerializer() instanceof StringSerializer); |
| |
| // this should not have any effect |
| descr.initializeSerializerUnlessSet(new ExecutionConfig()); |
| assertTrue(descr.isSerializerInitialized()); |
| assertNotNull(descr.getSerializer()); |
| assertTrue(descr.getSerializer() instanceof StringSerializer); |
| |
| TestStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(descr); |
| assertTrue(clone.isSerializerInitialized()); |
| assertNotNull(clone.getSerializer()); |
| assertTrue(clone.getSerializer() instanceof StringSerializer); |
| } |
| |
| @Test |
| public void testInitializeSerializerBeforeSerialization() throws Exception { |
| final TestStateDescriptor<String> descr = new TestStateDescriptor<>("test", String.class); |
| |
| assertFalse(descr.isSerializerInitialized()); |
| try { |
| descr.getSerializer(); |
| fail("should fail with an exception"); |
| } catch (IllegalStateException ignored) {} |
| |
| descr.initializeSerializerUnlessSet(new ExecutionConfig()); |
| |
| assertTrue(descr.isSerializerInitialized()); |
| assertNotNull(descr.getSerializer()); |
| assertTrue(descr.getSerializer() instanceof StringSerializer); |
| |
| TestStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(descr); |
| |
| assertTrue(clone.isSerializerInitialized()); |
| assertNotNull(clone.getSerializer()); |
| assertTrue(clone.getSerializer() instanceof StringSerializer); |
| } |
| |
| @Test |
| public void testInitializeSerializerAfterSerialization() throws Exception { |
| final TestStateDescriptor<String> descr = new TestStateDescriptor<>("test", String.class); |
| |
| assertFalse(descr.isSerializerInitialized()); |
| try { |
| descr.getSerializer(); |
| fail("should fail with an exception"); |
| } catch (IllegalStateException ignored) {} |
| |
| TestStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(descr); |
| |
| assertFalse(clone.isSerializerInitialized()); |
| try { |
| clone.getSerializer(); |
| fail("should fail with an exception"); |
| } catch (IllegalStateException ignored) {} |
| |
| clone.initializeSerializerUnlessSet(new ExecutionConfig()); |
| |
| assertTrue(clone.isSerializerInitialized()); |
| assertNotNull(clone.getSerializer()); |
| assertTrue(clone.getSerializer() instanceof StringSerializer); |
| } |
| |
| @Test |
| public void testInitializeSerializerAfterSerializationWithCustomConfig() throws Exception { |
| // guard our test assumptions. |
| assertEquals("broken test assumption", -1, |
| new KryoSerializer<>(String.class, new ExecutionConfig()).getKryo() |
| .getRegistration(File.class).getId()); |
| |
| final ExecutionConfig config = new ExecutionConfig(); |
| config.registerKryoType(File.class); |
| |
| final TestStateDescriptor<Path> original = new TestStateDescriptor<>("test", Path.class); |
| TestStateDescriptor<Path> clone = CommonTestUtils.createCopySerializable(original); |
| |
| clone.initializeSerializerUnlessSet(config); |
| |
| // serialized one (later initialized) carries the registration |
| assertTrue(((KryoSerializer<?>) clone.getSerializer()).getKryo() |
| .getRegistration(File.class).getId() > 0); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Tests for serializer initialization |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * FLINK-6775, tests that the returned serializer is duplicated. |
| * This allows to share the state descriptor across threads. |
| */ |
| @Test |
| public void testSerializerDuplication() throws Exception { |
| // we need a serializer that actually duplicates for testing (a stateful one) |
| // we use Kryo here, because it meets these conditions |
| TypeSerializer<String> statefulSerializer = new KryoSerializer<>(String.class, new ExecutionConfig()); |
| |
| TestStateDescriptor<String> descr = new TestStateDescriptor<>("foobar", statefulSerializer); |
| |
| TypeSerializer<String> serializerA = descr.getSerializer(); |
| TypeSerializer<String> serializerB = descr.getSerializer(); |
| |
| // check that the retrieved serializers are not the same |
| assertNotSame(serializerA, serializerB); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Test hashCode() and equals() |
| // ------------------------------------------------------------------------ |
| |
| @Test |
| public void testHashCodeAndEquals() throws Exception { |
| final String name = "testName"; |
| |
| TestStateDescriptor<String> original = new TestStateDescriptor<>(name, String.class); |
| TestStateDescriptor<String> same = new TestStateDescriptor<>(name, String.class); |
| TestStateDescriptor<String> sameBySerializer = new TestStateDescriptor<>(name, StringSerializer.INSTANCE); |
| |
| // test that hashCode() works on state descriptors with initialized and uninitialized serializers |
| assertEquals(original.hashCode(), same.hashCode()); |
| assertEquals(original.hashCode(), sameBySerializer.hashCode()); |
| |
| assertEquals(original, same); |
| assertEquals(original, sameBySerializer); |
| |
| // equality with a clone |
| TestStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(original); |
| assertEquals(original, clone); |
| |
| // equality with an initialized |
| clone.initializeSerializerUnlessSet(new ExecutionConfig()); |
| assertEquals(original, clone); |
| |
| original.initializeSerializerUnlessSet(new ExecutionConfig()); |
| assertEquals(original, same); |
| } |
| |
| @Test |
| public void testEqualsSameNameAndTypeDifferentClass() throws Exception { |
| final String name = "test name"; |
| |
| final TestStateDescriptor<String> descr1 = new TestStateDescriptor<>(name, String.class); |
| final OtherTestStateDescriptor<String> descr2 = new OtherTestStateDescriptor<>(name, String.class); |
| |
| assertNotEquals(descr1, descr2); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Mock implementations and test types |
| // ------------------------------------------------------------------------ |
| |
| private static class TestStateDescriptor<T> extends StateDescriptor<State, T> { |
| |
| private static final long serialVersionUID = 1L; |
| |
| TestStateDescriptor(String name, TypeSerializer<T> serializer) { |
| super(name, serializer, null); |
| } |
| |
| TestStateDescriptor(String name, TypeInformation<T> typeInfo) { |
| super(name, typeInfo, null); |
| } |
| |
| TestStateDescriptor(String name, Class<T> type) { |
| super(name, type, null); |
| } |
| |
| @Override |
| public State bind(StateBinder stateBinder) throws Exception { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public Type getType() { |
| return Type.VALUE; |
| } |
| } |
| |
| private static class OtherTestStateDescriptor<T> extends StateDescriptor<State, T> { |
| |
| private static final long serialVersionUID = 1L; |
| |
| OtherTestStateDescriptor(String name, TypeSerializer<T> serializer) { |
| super(name, serializer, null); |
| } |
| |
| OtherTestStateDescriptor(String name, TypeInformation<T> typeInfo) { |
| super(name, typeInfo, null); |
| } |
| |
| OtherTestStateDescriptor(String name, Class<T> type) { |
| super(name, type, null); |
| } |
| |
| @Override |
| public State bind(StateBinder stateBinder) throws Exception { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public Type getType() { |
| return Type.VALUE; |
| } |
| } |
| } |