/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.table.descriptors;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.types.InternalType;
import org.apache.flink.table.utils.EncodingUtils;
import org.apache.flink.table.utils.TypeStringUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
 * Utility class for having a unified string-based representation of Table API related classes
 * such as TableSchema, TypeInformation, etc.
 *
 * <p>Note to implementers: Please try to reuse key names as much as possible. Key-names
 * should be hierarchical and lower case. Use "-" instead of dots or camel case.
 * E.g., connector.schema.start-from = from-earliest. Try not to use the higher level in a
 * key-name. E.g., instead of connector.kafka.kafka-version use connector.kafka.version.
 *
 * <p>Properties with key normalization enabled contain only lower-case keys.
 */
@Internal
public class DescriptorProperties {

	public static final String TABLE_SCHEMA_NAME = "name";

	public static final String TABLE_SCHEMA_TYPE = "type";

	private static final Consumer<String> EMPTY_CONSUMER = (value) -> {};

	private final boolean normalizeKeys;

	private final Map<String, String> properties;

	public DescriptorProperties(boolean normalizeKeys) {
		this.properties = new HashMap<>();
		this.normalizeKeys = normalizeKeys;
	}

	public DescriptorProperties() {
		this(true);
	}

	/**
	 * Adds a set of properties.
	 */
	public void putProperties(Map<String, String> properties) {
		for (Map.Entry<String, String> property : properties.entrySet()) {
			put(property.getKey(), property.getValue());
		}
	}

	/**
	 * Adds a set of descriptor properties.
	 */
	public void putProperties(DescriptorProperties otherProperties) {
		for (Map.Entry<String, String> otherProperty : otherProperties.properties.entrySet()) {
			put(otherProperty.getKey(), otherProperty.getValue());
		}
	}

	/**
	 * Adds a class under the given key.
	 */
	public void putClass(String key, Class<?> clazz) {
		checkNotNull(key);
		checkNotNull(clazz);
		final String error = InstantiationUtil.checkForInstantiationError(clazz);
		if (error != null) {
			throw new ValidationException("Class '" + clazz.getName() + "' is not supported: " + error);
		}
		put(key, clazz.getName());
	}

	/**
	 * Adds a string under the given key.
	 */
	public void putString(String key, String str) {
		checkNotNull(key);
		checkNotNull(str);
		put(key, str);
	}

	/**
	 * Adds a boolean under the given key.
	 */
	public void putBoolean(String key, boolean b) {
		checkNotNull(key);
		put(key, Boolean.toString(b));
	}

	/**
	 * Adds a long under the given key.
	 */
	public void putLong(String key, long l) {
		checkNotNull(key);
		put(key, Long.toString(l));
	}

	/**
	 * Adds an integer under the given key.
	 */
	public void putInt(String key, int i) {
		checkNotNull(key);
		put(key, Integer.toString(i));
	}

	/**
	 * Adds a character under the given key.
	 */
	public void putCharacter(String key, char c) {
		checkNotNull(key);
		put(key, Character.toString(c));
	}

	/**
	 * Adds a table schema under the given key.
	 */
	public void putTableSchema(String key, TableSchema schema) {
		checkNotNull(key);
		checkNotNull(schema);

		final String[] fieldNames = schema.getFieldNames();
		final InternalType[] fieldTypes = schema.getFieldTypes();

		final List<List<String>> values = new ArrayList<>();
		for (int i = 0; i < schema.getFieldCount(); i++) {
			values.add(Arrays.asList(fieldNames[i], TypeStringUtils.writeDataType(fieldTypes[i])));
		}

		putIndexedFixedProperties(
			key,
			Arrays.asList(TABLE_SCHEMA_NAME, TABLE_SCHEMA_TYPE),
			values);
	}

	/**
	 * Adds a Flink {@link MemorySize} under the given key.
	 */
	public void putMemorySize(String key, MemorySize size) {
		checkNotNull(key);
		checkNotNull(size);
		put(key, size.toString());
	}

	/**
	 * Adds an indexed sequence of properties (with sub-properties) under a common key.
	 *
	 * <p>For example:
	 *
	 * <pre>
	 *     schema.fields.0.type = INT, schema.fields.0.name = test
	 *     schema.fields.1.type = LONG, schema.fields.1.name = test2
	 * </pre>
	 *
	 * <p>The arity of each subKeyValues must match the arity of propertyKeys.
	 */
	public void putIndexedFixedProperties(String key, List<String> subKeys, List<List<String>> subKeyValues) {
		checkNotNull(key);
		checkNotNull(subKeys);
		checkNotNull(subKeyValues);
		for (int idx = 0; idx < subKeyValues.size(); idx++) {
			final List<String> values = subKeyValues.get(idx);
			if (values == null || values.size() != subKeys.size()) {
				throw new ValidationException("Values must have same arity as keys.");
			}
			for (int keyIdx = 0; keyIdx < values.size(); keyIdx++) {
				put(key + '.' + idx + '.' + subKeys.get(keyIdx), values.get(keyIdx));
			}
		}
	}

	/**
	 * Adds an indexed mapping of properties under a common key.
	 *
	 * <p>For example:
	 *
	 * <pre>
	 *     schema.fields.0.type = INT, schema.fields.0.name = test
	 *                                 schema.fields.1.name = test2
	 * </pre>
	 *
	 * <p>The arity of the subKeyValues can differ.
	 */
	public void putIndexedVariableProperties(String key, List<Map<String, String>> subKeyValues) {
		checkNotNull(key);
		checkNotNull(subKeyValues);
		for (int idx = 0; idx < subKeyValues.size(); idx++) {
			final Map<String, String> values = subKeyValues.get(idx);
			for (Map.Entry<String, String> value : values.entrySet()) {
				put(key + '.' + idx + '.' + value.getKey(), value.getValue());
			}
		}
	}

	// --------------------------------------------------------------------------------------------

	/**
	 * Returns a string value under the given key if it exists.
	 */
	public Optional<String> getOptionalString(String key) {
		return optionalGet(key);
	}

	/**
	 * Returns a string value under the given existing key.
	 */
	public String getString(String key) {
		return getOptionalString(key).orElseThrow(exceptionSupplier(key));
	}

	/**
	 * Returns a character value under the given key if it exists.
	 */
	public Optional<Character> getOptionalCharacter(String key) {
		return optionalGet(key).map((c) -> {
			if (c.length() != 1) {
				throw new ValidationException("The value of '" + key + "' must only contain one character.");
			}
			return c.charAt(0);
		});
	}

	/**
	 * Returns a character value under the given existing key.
	 */
	public Character getCharacter(String key) {
		return getOptionalCharacter(key).orElseThrow(exceptionSupplier(key));
	}

	/**
	 * Returns a class value under the given key if it exists.
	 */
	@SuppressWarnings("unchecked")
	public <T> Optional<Class<T>> getOptionalClass(String key, Class<T> superClass) {
		return optionalGet(key).map((name) -> {
			final Class<?> clazz;
			try {
				clazz = Class.forName(name, true, Thread.currentThread().getContextClassLoader());
				if (!superClass.isAssignableFrom(clazz)) {
					throw new ValidationException(
						"Class '" + name + "' does not extend from the required class '" +
							superClass.getName() + "' for key '" + key + "'.");
				}
				return (Class<T>) clazz;
			} catch (Exception e) {
				throw new ValidationException("Could not get class '" + name + "' for key '" + key + "'.", e);
			}
		});
	}

	/**
	 * Returns a class value under the given existing key.
	 */
	public <T> Class<T> getClass(String key, Class<T> superClass) {
		return getOptionalClass(key, superClass).orElseThrow(exceptionSupplier(key));
	}

	/**
	 * Returns a big decimal value under the given key if it exists.
	 */
	public Optional<BigDecimal> getOptionalBigDecimal(String key) {
		return optionalGet(key).map((value) -> {
			try {
				return new BigDecimal(value);
			} catch (Exception e) {
				throw new ValidationException("Invalid decimal value for key '" + key + "'.", e);
			}
		});
	}

	/**
	 * Returns a big decimal value under the given existing key.
	 */
	public BigDecimal getBigDecimal(String key) {
		return getOptionalBigDecimal(key).orElseThrow(exceptionSupplier(key));
	}

	/**
	 * Returns a boolean value under the given key if it exists.
	 */
	public Optional<Boolean> getOptionalBoolean(String key) {
		return optionalGet(key).map((value) -> {
			try {
				return Boolean.valueOf(value);
			} catch (Exception e) {
				throw new ValidationException("Invalid boolean value for key '" + key + "'.", e);
			}
		});
	}

	/**
	 * Returns a boolean value under the given existing key.
	 */
	public boolean getBoolean(String key) {
		return getOptionalBoolean(key).orElseThrow(exceptionSupplier(key));
	}

	/**
	 * Returns a byte value under the given key if it exists.
	 */
	public Optional<Byte> getOptionalByte(String key) {
		return optionalGet(key).map((value) -> {
			try {
				return Byte.valueOf(value);
			} catch (Exception e) {
				throw new ValidationException("Invalid byte value for key '" + key + "'.", e);
			}
		});
	}

	/**
	 * Returns a byte value under the given existing key.
	 */
	public byte getByte(String key) {
		return getOptionalByte(key).orElseThrow(exceptionSupplier(key));
	}

	/**
	 * Returns a double value under the given key if it exists.
	 */
	public Optional<Double> getOptionalDouble(String key) {
		return optionalGet(key).map((value) -> {
			try {
				return Double.valueOf(value);
			} catch (Exception e) {
				throw new ValidationException("Invalid double value for key '" + key + "'.", e);
			}
		});
	}

	/**
	 * Returns a double value under the given existing key.
	 */
	public double getDouble(String key) {
		return getOptionalDouble(key).orElseThrow(exceptionSupplier(key));
	}

	/**
	 * Returns a float value under the given key if it exists.
	 */
	public Optional<Float> getOptionalFloat(String key) {
		return optionalGet(key).map((value) -> {
			try {
				return Float.valueOf(value);
			} catch (Exception e) {
				throw new ValidationException("Invalid float value for key '" + key + "'.", e);
			}
		});
	}

	/**
	 * Returns a float value under the given given existing key.
	 */
	public float getFloat(String key) {
		return getOptionalFloat(key).orElseThrow(exceptionSupplier(key));
	}

	/**
	 * Returns an integer value under the given key if it exists.
	 */
	public Optional<Integer> getOptionalInt(String key) {
		return optionalGet(key).map((value) -> {
			try {
				return Integer.valueOf(value);
			} catch (Exception e) {
				throw new ValidationException("Invalid integer value for key '" + key + "'.", e);
			}
		});
	}

	/**
	 * Returns an integer value under the given existing key.
	 */
	public int getInt(String key) {
		return getOptionalInt(key).orElseThrow(exceptionSupplier(key));
	}

	/**
	 * Returns a long value under the given key if it exists.
	 */
	public Optional<Long> getOptionalLong(String key) {
		return optionalGet(key).map((value) -> {
			try {
				return Long.valueOf(value);
			} catch (Exception e) {
				throw new ValidationException("Invalid long value for key '" + key + "'.", e);
			}
		});
	}

	/**
	 * Returns a long value under the given existing key.
	 */
	public long getLong(String key) {
		return getOptionalLong(key).orElseThrow(exceptionSupplier(key));
	}

	/**
	 * Returns a short value under the given key if it exists.
	 */
	public Optional<Short> getOptionalShort(String key) {
		return optionalGet(key).map((value) -> {
			try {
				return Short.valueOf(value);
			} catch (Exception e) {
				throw new ValidationException("Invalid short value for key '" + key + "'.", e);
			}
		});
	}

	/**
	 * Returns a short value under the given existing key.
	 */
	public short getShort(String key) {
		return getOptionalShort(key).orElseThrow(exceptionSupplier(key));
	}

	/**
	 * Returns the type information under the given key if it exists.
	 */
	public Optional<TypeInformation<?>> getOptionalType(String key) {
		return optionalGet(key).map(TypeStringUtils::readTypeInfo);
	}

	/**
	 * Returns the type information under the given existing key.
	 */
	public TypeInformation<?> getType(String key) {
		return getOptionalType(key).orElseThrow(exceptionSupplier(key));
	}

	/**
	 * Returns a table schema under the given key if it exists.
	 */
	public Optional<TableSchema> getOptionalTableSchema(String key) {
		// filter for number of fields
		final int fieldCount = properties.keySet().stream()
			.filter((k) -> k.startsWith(key) && k.endsWith('.' + TABLE_SCHEMA_NAME))
			.mapToInt((k) -> 1)
			.sum();

		if (fieldCount == 0) {
			return Optional.empty();
		}

		// validate fields and build schema
		final TableSchema.Builder schemaBuilder = TableSchema.builder();
		for (int i = 0; i < fieldCount; i++) {
			final String nameKey = key + '.' + i + '.' + TABLE_SCHEMA_NAME;
			final String typeKey = key + '.' + i + '.' + TABLE_SCHEMA_TYPE;

			final String name = optionalGet(nameKey).orElseThrow(exceptionSupplier(nameKey));

			final InternalType type = optionalGet(typeKey)
				.map(TypeStringUtils::readDataType)
				.orElseThrow(exceptionSupplier(typeKey));

			schemaBuilder.field(name, type);
		}
		return Optional.of(schemaBuilder.build());
	}

	/**
	 * Returns a table schema under the given existing key.
	 */
	public TableSchema getTableSchema(String key) {
		return getOptionalTableSchema(key).orElseThrow(exceptionSupplier(key));
	}

	/**
	 * Returns a Flink {@link MemorySize} under the given key if it exists.
	 */
	public Optional<MemorySize> getOptionalMemorySize(String key) {
		return optionalGet(key).map((value) -> {
			try {
				return MemorySize.parse(value, MemorySize.MemoryUnit.BYTES);
			} catch (Exception e) {
				throw new ValidationException("Invalid memory size value for key '" + key + "'.", e);
			}
		});
	}

	/**
	 * Returns a Flink {@link MemorySize} under the given existing key.
	 */
	public MemorySize getMemorySize(String key) {
		return getOptionalMemorySize(key).orElseThrow(exceptionSupplier(key));
	}

	/**
	 * Returns the property keys of fixed indexed properties.
	 *
	 * <p>For example:
	 *
	 * <pre>
	 *     schema.fields.0.type = INT, schema.fields.0.name = test
	 *     schema.fields.1.type = LONG, schema.fields.1.name = test2
	 * </pre>
	 *
	 * <p>getFixedIndexedProperties("schema.fields", List("type", "name")) leads to:
	 *
	 * <pre>
	 *     0: Map("type" -> "schema.fields.0.type", "name" -> "schema.fields.0.name")
	 *     1: Map("type" -> "schema.fields.1.type", "name" -> "schema.fields.1.name")
	 * </pre>
	 */
	public List<Map<String, String>> getFixedIndexedProperties(String key, List<String> subKeys) {
		// determine max index
		final int maxIndex = extractMaxIndex(key, "\\.(.*)");

		// validate and create result
		final List<Map<String, String>> list = new ArrayList<>();
		for (int i = 0; i <= maxIndex; i++) {
			final Map<String, String> map = new HashMap<>();

			for (String subKey : subKeys) {
				final String fullKey = key + '.' + i + '.' + subKey;
				// check for existence of full key
				if (!containsKey(fullKey)) {
					throw exceptionSupplier(fullKey).get();
				}
				map.put(subKey, fullKey);
			}

			list.add(map);
		}
		return list;
	}

	/**
	 * Returns the property keys of variable indexed properties.
	 *
	 * <p>For example:
	 *
	 * <pre>
	 *     schema.fields.0.type = INT, schema.fields.0.name = test
	 *     schema.fields.1.type = LONG
	 * </pre>
	 *
	 * <p>getFixedIndexedProperties("schema.fields", List("type")) leads to:
	 *
	 * <pre>
	 *     0: Map("type" -> "schema.fields.0.type", "name" -> "schema.fields.0.name")
	 *     1: Map("type" -> "schema.fields.1.type")
	 * </pre>
	 */
	public List<Map<String, String>> getVariableIndexedProperties(String key, List<String> requiredSubKeys) {

		// determine max index
		final int maxIndex = extractMaxIndex(key, "(\\.)?(.*)");

		// determine optional properties
		final String escapedKey = Pattern.quote(key);
		final Pattern pattern = Pattern.compile(escapedKey + "\\.(\\d+)(\\.)?(.*)");
		final Set<String> optionalSubKeys = properties.keySet().stream()
			.flatMap((k) -> {
				final Matcher matcher = pattern.matcher(k);
				if (matcher.find()) {
					return Stream.of(matcher.group(3));
				}
				return Stream.empty();
			})
			.filter((k) -> k.length() > 0)
			.collect(Collectors.toSet());

		// validate and create result
		final List<Map<String, String>> list = new ArrayList<>();
		for (int i = 0; i <= maxIndex; i++) {
			final Map<String, String> map = new HashMap<>();

			// check and add required keys
			for (String subKey : requiredSubKeys) {
				final String fullKey = key + '.' + i + '.' + subKey;
				// check for existence of full key
				if (!containsKey(fullKey)) {
					throw exceptionSupplier(fullKey).get();
				}
				map.put(subKey, fullKey);
			}

			// add optional keys
			for (String subKey : optionalSubKeys) {
				final String fullKey = key + '.' + i + '.' + subKey;
				optionalGet(fullKey).ifPresent(value -> map.put(subKey, fullKey));
			}

			list.add(map);
		}
		return list;
	}

	/**
	 * Returns all properties under a given key that contains an index in between.
	 *
	 * <p>E.g. rowtime.0.name -> returns all rowtime.#.name properties
	 */
	public Map<String, String> getIndexedProperty(String key, String subKey) {
		final String escapedKey = Pattern.quote(key);
		final String escapedSubKey = Pattern.quote(subKey);
		return properties.entrySet().stream()
			.filter(entry -> entry.getKey().matches(escapedKey + "\\.\\d+\\." + escapedSubKey))
			.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
	}

	/**
	 * Returns all array elements under a given key if it exists.
	 *
	 * <p>For example:
	 *
	 * <pre>
	 *     primary-key.0 = field1
	 *     primary-key.1 = field2
	 * </pre>
	 *
	 * <p>leads to: List(field1, field2)
	 *
	 * <p>or:
	 *
	 * <pre>
	 *     primary-key = field1
	 * </pre>
	 *
	 * <p>leads to: List(field1)
	 *
	 * <p>The key mapper gets the key of the current value e.g. "primary-key.1".
	 */
	public <E> Optional<List<E>> getOptionalArray(String key, Function<String, E> keyMapper) {
		// determine max index
		final int maxIndex = extractMaxIndex(key, "");

		if (maxIndex < 0) {
			// check for a single element array
			if (containsKey(key)) {
				return Optional.of(Collections.singletonList(keyMapper.apply(key)));
			} else {
				return Optional.empty();
			}
		} else {
			final List<E> list = new ArrayList<>();
			for (int i = 0; i < maxIndex + 1; i++) {
				final String fullKey = key + '.' + i;
				final E value = keyMapper.apply(fullKey);
				list.add(value);
			}
			return Optional.of(list);
		}
	}

	/**
	 * Returns all array elements under a given existing key.
	 */
	public <E> List<E> getArray(String key, Function<String, E> keyMapper) {
		return getOptionalArray(key, keyMapper).orElseThrow(exceptionSupplier(key));
	}

	/**
	 * Returns if a value under key is exactly equal to the given value.
	 */
	public boolean isValue(String key, String value) {
		return optionalGet(key).orElseThrow(exceptionSupplier(key)).equals(value);
	}

	// --------------------------------------------------------------------------------------------

	/**
	 * Validates a string property.
	 */
	public void validateString(String key, boolean isOptional) {
		validateString(key, isOptional, 0, Integer.MAX_VALUE);
	}

	/**
	 * Validates a string property. The boundaries are inclusive.
	 */
	public void validateString(String key, boolean isOptional, int minLen) {
		validateString(key, isOptional, minLen, Integer.MAX_VALUE);
	}

	/**
	 * Validates a string property. The boundaries are inclusive.
	 */
	public void validateString(String key, boolean isOptional, int minLen, int maxLen) {
		validateOptional(
			key,
			isOptional,
			(value) -> {
				final int length = value.length();
				if (length < minLen || length > maxLen) {
					throw new ValidationException(
						"Property '" + key + "' must have a length between " + minLen + " and " + maxLen + " but was: " + value);
				}
			});
	}

	/**
	 * Validates an integer property.
	 */
	public void validateInt(String key, boolean isOptional) {
		validateInt(key, isOptional, Integer.MIN_VALUE, Integer.MAX_VALUE);
	}

	/**
	 * Validates an integer property. The boundaries are inclusive.
	 */
	public void validateInt(String key, boolean isOptional, int min) {
		validateInt(key, isOptional, min, Integer.MAX_VALUE);
	}

	/**
	 * Validates an integer property. The boundaries are inclusive.
	 */
	public void validateInt(String key, boolean isOptional, int min, int max) {
		validateComparable(key, isOptional, min, max, "integer", Integer::valueOf);
	}

	/**
	 * Validates an long property.
	 */
	public void validateLong(String key, boolean isOptional) {
		validateLong(key, isOptional, Long.MIN_VALUE, Long.MAX_VALUE);
	}

	/**
	 * Validates an long property. The boundaries are inclusive.
	 */
	public void validateLong(String key, boolean isOptional, long min) {
		validateLong(key, isOptional, min, Long.MAX_VALUE);
	}

	/**
	 * Validates an long property. The boundaries are inclusive.
	 */
	public void validateLong(String key, boolean isOptional, long min, long max) {
		validateComparable(key, isOptional, min, max, "long", Long::valueOf);
	}

	/**
	 * Validates that a certain value is present under the given key.
	 */
	public void validateValue(String key, String value, boolean isOptional) {
		validateOptional(
			key,
			isOptional,
			(v) -> {
				if (!v.equals(value)) {
					throw new ValidationException(
						"Could not find required value '" + value + "' for property '" + key + "'.");
				}
			});
	}

	/**
	 * Validates that a boolean value is present under the given key.
	 */
	public void validateBoolean(String key, boolean isOptional) {
		validateOptional(
			key,
			isOptional,
			(value) -> {
				if (!value.equalsIgnoreCase("true") && !value.equalsIgnoreCase("false")) {
					throw new ValidationException(
						"Property '" + key + "' must be a boolean value (true/false) but was: " + value);
				}
			});
	}

	/**
	 * Validates a double property.
	 */
	public void validateDouble(String key, boolean isOptional) {
		validateDouble(key, isOptional, Double.MIN_VALUE, Double.MAX_VALUE);
	}

	/**
	 * Validates a double property. The boundaries are inclusive.
	 */
	public void validateDouble(String key, boolean isOptional, double min) {
		validateDouble(key, isOptional, min, Double.MAX_VALUE);
	}

	/**
	 * Validates a double property. The boundaries are inclusive.
	 */
	public void validateDouble(String key, boolean isOptional, double min, double max) {
		validateComparable(key, isOptional, min, max, "double", Double::valueOf);
	}

	/**
	 * Validates a big decimal property.
	 */
	public void validateBigDecimal(String key, boolean isOptional) {
		validateOptional(
			key,
			isOptional,
			(value) -> {
				try {
					new BigDecimal(value);
				} catch (Exception e) {
					throw new ValidationException(
						"Property '" + key + "' must be a big decimal value but was: " + value);
				}
			});
	}

	/**
	 * Validates a big decimal property. The boundaries are inclusive.
	 */
	public void validateBigDecimal(String key, boolean isOptional, BigDecimal min, BigDecimal max) {
		validateComparable(key, isOptional, min, max, "decimal", BigDecimal::new);
	}

	/**
	 * Validates a byte property.
	 */
	public void validateByte(String key, boolean isOptional) {
		validateByte(key, isOptional, Byte.MIN_VALUE, Byte.MAX_VALUE);
	}

	/**
	 * Validates a byte property. The boundaries are inclusive.
	 */
	public void validateByte(String key, boolean isOptional, byte min) {
		validateByte(key, isOptional, min, Byte.MAX_VALUE);
	}

	/**
	 * Validates a byte property. The boundaries are inclusive.
	 */
	public void validateByte(String key, boolean isOptional, byte min, byte max) {
		validateComparable(key, isOptional, min, max, "byte", Byte::valueOf);
	}

	/**
	 * Validates a float property.
	 */
	public void validateFloat(String key, boolean isOptional) {
		validateFloat(key, isOptional, Float.MIN_VALUE, Float.MAX_VALUE);
	}

	/**
	 * Validates a float property. The boundaries are inclusive.
	 */
	public void validateFloat(String key, boolean isOptional, float min) {
		validateFloat(key, isOptional, min, Float.MAX_VALUE);
	}

	/**
	 * Validates a float property. The boundaries are inclusive.
	 */
	public void validateFloat(String key, boolean isOptional, float min, float max) {
		validateComparable(key, isOptional, min, max, "float", Float::valueOf);
	}

	/**
	 * Validates a short property.
	 */
	public void validateShort(String key, boolean isOptional) {
		validateShort(key, isOptional, Short.MIN_VALUE, Short.MAX_VALUE);
	}

	/**
	 * Validates a short property. The boundaries are inclusive.
	 */
	public void validateShort(String key, boolean isOptional, short min) {
		validateShort(key, isOptional, min, Short.MAX_VALUE);
	}

	/**
	 * Validates a short property. The boundaries are inclusive.
	 */
	public void validateShort(String key, boolean isOptional, short min, short max) {
		validateComparable(key, isOptional, min, max, "short", Short::valueOf);
	}

	/**
	 * Validation for fixed indexed properties.
	 *
	 * <p>For example:
	 *
	 * <pre>
	 *     schema.fields.0.type = INT, schema.fields.0.name = test
	 *     schema.fields.1.type = LONG, schema.fields.1.name = test2
	 * </pre>
	 *
	 * <p>The subKeyValidation map must define e.g. "type" and "name" and a validation logic for the given full key.
	 */
	public void validateFixedIndexedProperties(String key, boolean allowEmpty, Map<String, Consumer<String>> subKeyValidation) {
		// determine max index
		final int maxIndex = extractMaxIndex(key, "\\.(.*)");

		if (maxIndex < 0 && !allowEmpty) {
			throw new ValidationException("Property key '" + key + "' must not be empty.");
		}

		// validate
		for (int i = 0; i < maxIndex; i++) {
			for (Map.Entry<String, Consumer<String>> subKey : subKeyValidation.entrySet()) {
				final String fullKey = key + '.' + i + '.' + subKey.getKey();
				if (properties.containsKey(fullKey)) {
					// run validation logic
					subKey.getValue().accept(fullKey);
				} else {
					throw new ValidationException("Required property key '" + fullKey + "' is missing.");
				}
			}
		}
	}

	/**
	 * Validates a table schema property.
	 */
	public void validateTableSchema(String key, boolean isOptional) {
		final Consumer<String> nameValidation = (name) -> validateString(name, false, 1);
		final Consumer<String> typeValidation = (name) -> validateType(name, false, false);

		final Map<String, Consumer<String>> subKeys = new HashMap<>();
		subKeys.put(TABLE_SCHEMA_NAME, nameValidation);
		subKeys.put(TABLE_SCHEMA_TYPE, typeValidation);

		validateFixedIndexedProperties(
			key,
			isOptional,
			subKeys
		);
	}

	/**
	 * Validates a Flink {@link MemorySize}.
	 *
	 * <p>The precision defines the allowed minimum unit in bytes (e.g. 1024 would only allow KB).
	 */
	public void validateMemorySize(String key, boolean isOptional, int precision) {
		validateMemorySize(key, isOptional, precision, 0L, Long.MAX_VALUE);
	}

	/**
	 * Validates a Flink {@link MemorySize}. The boundaries are inclusive and in bytes.
	 *
	 * <p>The precision defines the allowed minimum unit in bytes (e.g. 1024 would only allow KB).
	 */
	public void validateMemorySize(String key, boolean isOptional, int precision, long min) {
		validateMemorySize(key, isOptional, precision, min, Long.MAX_VALUE);
	}

	/**
	 * Validates a Flink {@link MemorySize}. The boundaries are inclusive and in bytes.
	 *
	 * <p>The precision defines the allowed minimum unit in bytes (e.g. 1024 would only allow KB).
	 */
	public void validateMemorySize(String key, boolean isOptional, int precision, long min, long max) {
		Preconditions.checkArgument(precision > 0);

		validateComparable(
			key,
			isOptional,
			min,
			max,
			"memory size (in bytes)",
			(value) -> {
				final long bytes = MemorySize.parse(value, MemorySize.MemoryUnit.BYTES).getBytes();
				if (bytes % precision != 0) {
					throw new ValidationException(
						"Memory size for key '" + key + "' must be a multiple of " + precision + " bytes but was: " + value);
				}
				return bytes;
			}
		);
	}

	/**
	 * Validates an enum property with a set of validation logic for each enum value.
	 */
	public void validateEnum(String key, boolean isOptional, Map<String, Consumer<String>> enumValidation) {
		validateOptional(
			key,
			isOptional,
			(value) -> {
				if (!enumValidation.containsKey(value)) {
					throw new ValidationException(
						"Unknown value for property '" + key + "'.\n" +
						"Supported values are " + enumValidation.keySet() + " but was: " + value);
				} else {
					// run validation logic
					enumValidation.get(value).accept(key);
				}
			});
	}

	/**
	 * Validates an enum property with a set of enum values.
	 */
	public void validateEnumValues(String key, boolean isOptional, List<String> values) {
		validateEnum(
			key,
			isOptional,
			values.stream().collect(Collectors.toMap(v -> v, v -> noValidation())));
	}

	/**
	 * Validates a type property.
	 */
	public void validateType(String key, boolean isOptional, boolean requireRow) {
		validateOptional(
			key,
			isOptional,
			(value) -> {
				// we don't validate the string but let the parser do the work for us
				// it throws a validation exception
				final TypeInformation<?> typeInfo = TypeStringUtils.readTypeInfo(value);
				if (requireRow && !(typeInfo instanceof RowTypeInfo)) {
					throw new ValidationException(
						"Row type information expected for key '" + key + "' but was: " + value);
				}
			});
	}

	/**
	 * Validates an array of values.
	 *
	 * <p>For example:
	 *
	 * <pre>
	 *     primary-key.0 = field1
	 *     primary-key.1 = field2
	 * </pre>
	 *
	 * <p>leads to: List(field1, field2)
	 *
	 * <p>or:
	 *
	 * <pre>
	 *     primary-key = field1
	 * </pre>
	 *
	 * <p>The validation consumer gets the key of the current value e.g. "primary-key.1".
	 */
	public void validateArray(String key, Consumer<String> elementValidation, int minLength) {
		validateArray(key, elementValidation, minLength, Integer.MAX_VALUE);
	}

	/**
	 * Validates an array of values.
	 *
	 * <p>For example:
	 *
	 * <pre>
	 *     primary-key.0 = field1
	 *     primary-key.1 = field2
	 * </pre>
	 *
	 * <p>leads to: List(field1, field2)
	 *
	 * <p>or:
	 *
	 * <pre>
	 *     primary-key = field1
	 * </pre>
	 *
	 * <p>The validation consumer gets the key of the current value e.g. "primary-key.1".
	 */
	public void validateArray(String key, Consumer<String> elementValidation, int minLength, int maxLength) {

		// determine max index
		final int maxIndex = extractMaxIndex(key, "");

		if (maxIndex < 0) {
			// check for a single element array
			if (properties.containsKey(key)) {
				elementValidation.accept(key);
			} else if (minLength > 0) {
				throw new ValidationException("Could not find required property array for key '" + key + "'.");
			}
		} else {
			// do not allow a single element array
			if (properties.containsKey(key)) {
				throw new ValidationException("Invalid property array for key '" + key + "'.");
			}

			final int size = maxIndex + 1;
			if (size < minLength) {
				throw new ValidationException(
					"Array for key '" + key + "' must not have less than " + minLength + " elements but was: " + size);
			}

			if (size > maxLength) {
				throw new ValidationException(
					"Array for key '" + key + "' must not have more than " + maxLength + " elements but was: " + size);
			}
		}

		// validate array elements
		for (int i = 0; i < maxIndex; i++) {
			final String fullKey = key + '.' + i;
			if (properties.containsKey(fullKey)) {
				// run validation logic
				elementValidation.accept(fullKey);
			} else {
				throw new ValidationException(
					"Required array element at index '" + i + "' for key '" + key + "' is missing.");
			}
		}
	}

	/**
	 * Validates that the given prefix is not included in these properties.
	 */
	public void validatePrefixExclusion(String prefix) {
		properties.keySet().stream()
			.filter(k -> k.startsWith(prefix))
			.findFirst()
			.ifPresent((k) -> {
				throw new ValidationException(
					"Properties with prefix '" + prefix + "' are not allowed in this context. " +
					"But property '" + k + "' was found.");
			});
	}

	/**
	 * Validates that the given key is not included in these properties.
	 */
	public void validateExclusion(String key) {
		if (properties.containsKey(key)) {
			throw new ValidationException("Property '" + key + "' is not allowed in this context.");
		}
	}

	// --------------------------------------------------------------------------------------------

	/**
	 * Returns if the given key is contained.
	 */
	public boolean containsKey(String key) {
		return properties.containsKey(key);
	}

	/**
	 * Returns if a given prefix exists in the properties.
	 */
	public boolean hasPrefix(String prefix) {
		return properties.keySet().stream().anyMatch(k -> k.startsWith(prefix));
	}

	/**
	 * Returns the properties as a map copy.
	 */
	public Map<String, String> asMap() {
		final Map<String, String> copy = new HashMap<>(properties);
		return Collections.unmodifiableMap(copy);
	}

	/**
	 * Returns the properties as a map copy with a prefix key.
	 */
	public Map<String, String> asPrefixedMap(String prefix) {
		return properties.entrySet().stream()
			.collect(Collectors.toMap(e -> prefix + e.getKey(), Map.Entry::getValue));
	}

	/**
	 * Returns a new properties instance with the given keys removed.
	 */
	public DescriptorProperties withoutKeys(List<String> keys) {
		final Set<String> keySet = new HashSet<>(keys);
		final DescriptorProperties copy = new DescriptorProperties(normalizeKeys);
		properties.entrySet().stream()
			.filter(e -> !keySet.contains(e.getKey()))
			.forEach(e -> copy.properties.put(e.getKey(), e.getValue()));
		return copy;
	}

	@Override
	public String toString() {
		return toString(properties);
	}

	@Override
	public boolean equals(Object o) {
		if (this == o) {
			return true;
		}
		if (o == null || getClass() != o.getClass()) {
			return false;
		}
		DescriptorProperties that = (DescriptorProperties) o;
		return Objects.equals(properties, that.properties);
	}

	@Override
	public int hashCode() {
		return Objects.hash(properties);
	}

	// --------------------------------------------------------------------------------------------

	private void put(String key, String value) {
		if (properties.containsKey(key)) {
			throw new ValidationException("Property already present: " + key);
		}
		if (normalizeKeys) {
			properties.put(key.toLowerCase(), value);
		} else {
			properties.put(key, value);
		}
	}

	private Optional<String> optionalGet(String key) {
		return Optional.ofNullable(properties.get(key));
	}

	private void validateOptional(String key, boolean isOptional, Consumer<String> valueValidation) {
		if (!properties.containsKey(key)) {
			if (!isOptional) {
				throw new ValidationException("Could not find required property '" + key + "'.");
			}
		} else {
			final String value = properties.get(key);
			valueValidation.accept(value);
		}
	}

	private Supplier<TableException> exceptionSupplier(String key) {
		return () -> {
			throw new TableException(
				"Property with key '" + key + "' could not be found. " +
				"This is a bug because the validation logic should have checked that before.");
		};
	}

	private int extractMaxIndex(String key, String suffixPattern) {
		// extract index and property keys
		final String escapedKey = Pattern.quote(key);
		final Pattern pattern = Pattern.compile(escapedKey + "\\.(\\d+)" + suffixPattern);
		final IntStream indexes = properties.keySet().stream()
			.flatMapToInt(k -> {
				final Matcher matcher = pattern.matcher(k);
				if (matcher.find()) {
					return IntStream.of(Integer.valueOf(matcher.group(1)));
				}
				return IntStream.empty();
			});

		// determine max index
		return indexes.max().orElse(-1);
	}

	/**
	 * Validates a property by first parsing the string value to a comparable object. The boundaries are inclusive.
	 */
	private <T extends Comparable<T>> void validateComparable(
			String key,
			boolean isOptional,
			T min,
			T max,
			String typeName,
			Function<String, T> parseFunction) {
		if (!properties.containsKey(key)) {
			if (!isOptional) {
				throw new ValidationException("Could not find required property '" + key + "'.");
			}
		} else {
			final String value = properties.get(key);
			try {
				final T parsed = parseFunction.apply(value);
				if (parsed.compareTo(min) < 0 || parsed.compareTo(max) > 0) {
					throw new ValidationException(
						"Property '" + key + "' must be a " + typeName + " value between " + min + " and " + max + " but was: " + parsed);
				}
			} catch (Exception e) {
				throw new ValidationException("Property '" + key + "' must be a " + typeName + " value but was: " + value);
			}
		}
	}

	// --------------------------------------------------------------------------------------------

	/**
	 * Returns an empty validation logic.
	 */
	public static Consumer<String> noValidation() {
		return EMPTY_CONSUMER;
	}

	public static String toString(String str) {
		return EncodingUtils.escapeJava(str);
	}

	public static String toString(String key, String value) {
		return toString(key) + '=' + toString(value);
	}

	public static String toString(Map<String, String> propertyMap) {
		return propertyMap.entrySet().stream()
			.map(e -> toString(e.getKey(), e.getValue()))
			.sorted()
			.collect(Collectors.joining("\n"));
	}
}
