/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.api.common.state;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils;

import org.junit.Test;

import java.io.File;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
 * Tests for the common/shared functionality of {@link StateDescriptor}.
 */
public class StateDescriptorTest {

	// ------------------------------------------------------------------------
	//  Tests for serializer initialization
	// ------------------------------------------------------------------------

	@Test
	public void testInitializeWithSerializer() throws Exception {
		final TypeSerializer<String> serializer = StringSerializer.INSTANCE;
		final TestStateDescriptor<String> descr = new TestStateDescriptor<>("test", serializer);

		assertTrue(descr.isSerializerInitialized());
		assertNotNull(descr.getSerializer());
		assertTrue(descr.getSerializer() instanceof StringSerializer);

		// this should not have any effect
		descr.initializeSerializerUnlessSet(new ExecutionConfig());
		assertTrue(descr.isSerializerInitialized());
		assertNotNull(descr.getSerializer());
		assertTrue(descr.getSerializer() instanceof StringSerializer);

		TestStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(descr);
		assertTrue(clone.isSerializerInitialized());
		assertNotNull(clone.getSerializer());
		assertTrue(clone.getSerializer() instanceof StringSerializer);
	}

	@Test
	public void testInitializeSerializerBeforeSerialization() throws Exception {
		final TestStateDescriptor<String> descr = new TestStateDescriptor<>("test", String.class);

		assertFalse(descr.isSerializerInitialized());
		try {
			descr.getSerializer();
			fail("should fail with an exception");
		} catch (IllegalStateException ignored) {}

		descr.initializeSerializerUnlessSet(new ExecutionConfig());

		assertTrue(descr.isSerializerInitialized());
		assertNotNull(descr.getSerializer());
		assertTrue(descr.getSerializer() instanceof StringSerializer);

		TestStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(descr);

		assertTrue(clone.isSerializerInitialized());
		assertNotNull(clone.getSerializer());
		assertTrue(clone.getSerializer() instanceof StringSerializer);
	}

	@Test
	public void testInitializeSerializerAfterSerialization() throws Exception {
		final TestStateDescriptor<String> descr = new TestStateDescriptor<>("test", String.class);

		assertFalse(descr.isSerializerInitialized());
		try {
			descr.getSerializer();
			fail("should fail with an exception");
		} catch (IllegalStateException ignored) {}

		TestStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(descr);

		assertFalse(clone.isSerializerInitialized());
		try {
			clone.getSerializer();
			fail("should fail with an exception");
		} catch (IllegalStateException ignored) {}

		clone.initializeSerializerUnlessSet(new ExecutionConfig());

		assertTrue(clone.isSerializerInitialized());
		assertNotNull(clone.getSerializer());
		assertTrue(clone.getSerializer() instanceof StringSerializer);
	}

	@Test
	public void testInitializeSerializerAfterSerializationWithCustomConfig() throws Exception {
		// guard our test assumptions.
		assertEquals("broken test assumption", -1,
				new KryoSerializer<>(String.class, new ExecutionConfig()).getKryo()
						.getRegistration(File.class).getId());

		final ExecutionConfig config = new ExecutionConfig();
		config.registerKryoType(File.class);

		final TestStateDescriptor<Path> original = new TestStateDescriptor<>("test", Path.class);
		TestStateDescriptor<Path> clone = CommonTestUtils.createCopySerializable(original);

		clone.initializeSerializerUnlessSet(config);

		// serialized one (later initialized) carries the registration
		assertTrue(((KryoSerializer<?>) clone.getSerializer()).getKryo()
				.getRegistration(File.class).getId() > 0);
	}

	// ------------------------------------------------------------------------
	//  Tests for serializer initialization
	// ------------------------------------------------------------------------

	/**
	 * FLINK-6775, tests that the returned serializer is duplicated.
	 * This allows to share the state descriptor across threads.
	 */
	@Test
	public void testSerializerDuplication() throws Exception {
		// we need a serializer that actually duplicates for testing (a stateful one)
		// we use Kryo here, because it meets these conditions
		TypeSerializer<String> statefulSerializer = new KryoSerializer<>(String.class, new ExecutionConfig());

		TestStateDescriptor<String> descr = new TestStateDescriptor<>("foobar", statefulSerializer);

		TypeSerializer<String> serializerA = descr.getSerializer();
		TypeSerializer<String> serializerB = descr.getSerializer();

		// check that the retrieved serializers are not the same
		assertNotSame(serializerA, serializerB);
	}

	// ------------------------------------------------------------------------
	//  Test hashCode() and equals()
	// ------------------------------------------------------------------------

	@Test
	public void testHashCodeAndEquals() throws Exception {
		final String name = "testName";

		TestStateDescriptor<String> original = new TestStateDescriptor<>(name, String.class);
		TestStateDescriptor<String> same = new TestStateDescriptor<>(name, String.class);
		TestStateDescriptor<String> sameBySerializer = new TestStateDescriptor<>(name, StringSerializer.INSTANCE);

		// test that hashCode() works on state descriptors with initialized and uninitialized serializers
		assertEquals(original.hashCode(), same.hashCode());
		assertEquals(original.hashCode(), sameBySerializer.hashCode());

		assertEquals(original, same);
		assertEquals(original, sameBySerializer);

		// equality with a clone
		TestStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(original);
		assertEquals(original, clone);

		// equality with an initialized
		clone.initializeSerializerUnlessSet(new ExecutionConfig());
		assertEquals(original, clone);

		original.initializeSerializerUnlessSet(new ExecutionConfig());
		assertEquals(original, same);
	}

	@Test
	public void testEqualsSameNameAndTypeDifferentClass() throws Exception {
		final String name = "test name";

		final TestStateDescriptor<String> descr1 = new TestStateDescriptor<>(name, String.class);
		final OtherTestStateDescriptor<String> descr2 = new OtherTestStateDescriptor<>(name, String.class);

		assertNotEquals(descr1, descr2);
	}

	// ------------------------------------------------------------------------
	//  Mock implementations and test types
	// ------------------------------------------------------------------------

	private static class TestStateDescriptor<T> extends StateDescriptor<State, T> {

		private static final long serialVersionUID = 1L;

		TestStateDescriptor(String name, TypeSerializer<T> serializer) {
			super(name, serializer, null);
		}

		TestStateDescriptor(String name, TypeInformation<T> typeInfo) {
			super(name, typeInfo, null);
		}

		TestStateDescriptor(String name, Class<T> type) {
			super(name, type, null);
		}

		@Override
		public State bind(StateBinder stateBinder) throws Exception {
			throw new UnsupportedOperationException();
		}

		@Override
		public Type getType() {
			return Type.VALUE;
		}
	}

	private static class OtherTestStateDescriptor<T> extends StateDescriptor<State, T> {

		private static final long serialVersionUID = 1L;

		OtherTestStateDescriptor(String name, TypeSerializer<T> serializer) {
			super(name, serializer, null);
		}

		OtherTestStateDescriptor(String name, TypeInformation<T> typeInfo) {
			super(name, typeInfo, null);
		}

		OtherTestStateDescriptor(String name, Class<T> type) {
			super(name, type, null);
		}

		@Override
		public State bind(StateBinder stateBinder) throws Exception {
			throw new UnsupportedOperationException();
		}

		@Override
		public Type getType() {
			return Type.VALUE;
		}
	}
}
