blob: 8d2a5ffc643c35ae190d20fef2be614495a4558e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.descriptors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.types.InternalType;
import org.apache.flink.table.utils.EncodingUtils;
import org.apache.flink.table.utils.TypeStringUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Utility class for having a unified string-based representation of Table API related classes
* such as TableSchema, TypeInformation, etc.
*
* <p>Note to implementers: Please try to reuse key names as much as possible. Key-names
* should be hierarchical and lower case. Use "-" instead of dots or camel case.
* E.g., connector.schema.start-from = from-earliest. Try not to use the higher level in a
* key-name. E.g., instead of connector.kafka.kafka-version use connector.kafka.version.
*
* <p>Properties with key normalization enabled contain only lower-case keys.
*/
@Internal
public class DescriptorProperties {
public static final String TABLE_SCHEMA_NAME = "name";
public static final String TABLE_SCHEMA_TYPE = "type";
private static final Consumer<String> EMPTY_CONSUMER = (value) -> {};
private final boolean normalizeKeys;
private final Map<String, String> properties;
public DescriptorProperties(boolean normalizeKeys) {
this.properties = new HashMap<>();
this.normalizeKeys = normalizeKeys;
}
public DescriptorProperties() {
this(true);
}
/**
* Adds a set of properties.
*/
public void putProperties(Map<String, String> properties) {
for (Map.Entry<String, String> property : properties.entrySet()) {
put(property.getKey(), property.getValue());
}
}
/**
* Adds a set of descriptor properties.
*/
public void putProperties(DescriptorProperties otherProperties) {
for (Map.Entry<String, String> otherProperty : otherProperties.properties.entrySet()) {
put(otherProperty.getKey(), otherProperty.getValue());
}
}
/**
* Adds a class under the given key.
*/
public void putClass(String key, Class<?> clazz) {
checkNotNull(key);
checkNotNull(clazz);
final String error = InstantiationUtil.checkForInstantiationError(clazz);
if (error != null) {
throw new ValidationException("Class '" + clazz.getName() + "' is not supported: " + error);
}
put(key, clazz.getName());
}
/**
* Adds a string under the given key.
*/
public void putString(String key, String str) {
checkNotNull(key);
checkNotNull(str);
put(key, str);
}
/**
* Adds a boolean under the given key.
*/
public void putBoolean(String key, boolean b) {
checkNotNull(key);
put(key, Boolean.toString(b));
}
/**
* Adds a long under the given key.
*/
public void putLong(String key, long l) {
checkNotNull(key);
put(key, Long.toString(l));
}
/**
* Adds an integer under the given key.
*/
public void putInt(String key, int i) {
checkNotNull(key);
put(key, Integer.toString(i));
}
/**
* Adds a character under the given key.
*/
public void putCharacter(String key, char c) {
checkNotNull(key);
put(key, Character.toString(c));
}
/**
* Adds a table schema under the given key.
*/
public void putTableSchema(String key, TableSchema schema) {
checkNotNull(key);
checkNotNull(schema);
final String[] fieldNames = schema.getFieldNames();
final InternalType[] fieldTypes = schema.getFieldTypes();
final List<List<String>> values = new ArrayList<>();
for (int i = 0; i < schema.getFieldCount(); i++) {
values.add(Arrays.asList(fieldNames[i], TypeStringUtils.writeDataType(fieldTypes[i])));
}
putIndexedFixedProperties(
key,
Arrays.asList(TABLE_SCHEMA_NAME, TABLE_SCHEMA_TYPE),
values);
}
/**
* Adds a Flink {@link MemorySize} under the given key.
*/
public void putMemorySize(String key, MemorySize size) {
checkNotNull(key);
checkNotNull(size);
put(key, size.toString());
}
/**
* Adds an indexed sequence of properties (with sub-properties) under a common key.
*
* <p>For example:
*
* <pre>
* schema.fields.0.type = INT, schema.fields.0.name = test
* schema.fields.1.type = LONG, schema.fields.1.name = test2
* </pre>
*
* <p>The arity of each subKeyValues must match the arity of propertyKeys.
*/
public void putIndexedFixedProperties(String key, List<String> subKeys, List<List<String>> subKeyValues) {
checkNotNull(key);
checkNotNull(subKeys);
checkNotNull(subKeyValues);
for (int idx = 0; idx < subKeyValues.size(); idx++) {
final List<String> values = subKeyValues.get(idx);
if (values == null || values.size() != subKeys.size()) {
throw new ValidationException("Values must have same arity as keys.");
}
for (int keyIdx = 0; keyIdx < values.size(); keyIdx++) {
put(key + '.' + idx + '.' + subKeys.get(keyIdx), values.get(keyIdx));
}
}
}
/**
* Adds an indexed mapping of properties under a common key.
*
* <p>For example:
*
* <pre>
* schema.fields.0.type = INT, schema.fields.0.name = test
* schema.fields.1.name = test2
* </pre>
*
* <p>The arity of the subKeyValues can differ.
*/
public void putIndexedVariableProperties(String key, List<Map<String, String>> subKeyValues) {
checkNotNull(key);
checkNotNull(subKeyValues);
for (int idx = 0; idx < subKeyValues.size(); idx++) {
final Map<String, String> values = subKeyValues.get(idx);
for (Map.Entry<String, String> value : values.entrySet()) {
put(key + '.' + idx + '.' + value.getKey(), value.getValue());
}
}
}
// --------------------------------------------------------------------------------------------
/**
* Returns a string value under the given key if it exists.
*/
public Optional<String> getOptionalString(String key) {
return optionalGet(key);
}
/**
* Returns a string value under the given existing key.
*/
public String getString(String key) {
return getOptionalString(key).orElseThrow(exceptionSupplier(key));
}
/**
* Returns a character value under the given key if it exists.
*/
public Optional<Character> getOptionalCharacter(String key) {
return optionalGet(key).map((c) -> {
if (c.length() != 1) {
throw new ValidationException("The value of '" + key + "' must only contain one character.");
}
return c.charAt(0);
});
}
/**
* Returns a character value under the given existing key.
*/
public Character getCharacter(String key) {
return getOptionalCharacter(key).orElseThrow(exceptionSupplier(key));
}
/**
* Returns a class value under the given key if it exists.
*/
@SuppressWarnings("unchecked")
public <T> Optional<Class<T>> getOptionalClass(String key, Class<T> superClass) {
return optionalGet(key).map((name) -> {
final Class<?> clazz;
try {
clazz = Class.forName(name, true, Thread.currentThread().getContextClassLoader());
if (!superClass.isAssignableFrom(clazz)) {
throw new ValidationException(
"Class '" + name + "' does not extend from the required class '" +
superClass.getName() + "' for key '" + key + "'.");
}
return (Class<T>) clazz;
} catch (Exception e) {
throw new ValidationException("Could not get class '" + name + "' for key '" + key + "'.", e);
}
});
}
/**
* Returns a class value under the given existing key.
*/
public <T> Class<T> getClass(String key, Class<T> superClass) {
return getOptionalClass(key, superClass).orElseThrow(exceptionSupplier(key));
}
/**
* Returns a big decimal value under the given key if it exists.
*/
public Optional<BigDecimal> getOptionalBigDecimal(String key) {
return optionalGet(key).map((value) -> {
try {
return new BigDecimal(value);
} catch (Exception e) {
throw new ValidationException("Invalid decimal value for key '" + key + "'.", e);
}
});
}
/**
* Returns a big decimal value under the given existing key.
*/
public BigDecimal getBigDecimal(String key) {
return getOptionalBigDecimal(key).orElseThrow(exceptionSupplier(key));
}
/**
* Returns a boolean value under the given key if it exists.
*/
public Optional<Boolean> getOptionalBoolean(String key) {
return optionalGet(key).map((value) -> {
try {
return Boolean.valueOf(value);
} catch (Exception e) {
throw new ValidationException("Invalid boolean value for key '" + key + "'.", e);
}
});
}
/**
* Returns a boolean value under the given existing key.
*/
public boolean getBoolean(String key) {
return getOptionalBoolean(key).orElseThrow(exceptionSupplier(key));
}
/**
* Returns a byte value under the given key if it exists.
*/
public Optional<Byte> getOptionalByte(String key) {
return optionalGet(key).map((value) -> {
try {
return Byte.valueOf(value);
} catch (Exception e) {
throw new ValidationException("Invalid byte value for key '" + key + "'.", e);
}
});
}
/**
* Returns a byte value under the given existing key.
*/
public byte getByte(String key) {
return getOptionalByte(key).orElseThrow(exceptionSupplier(key));
}
/**
* Returns a double value under the given key if it exists.
*/
public Optional<Double> getOptionalDouble(String key) {
return optionalGet(key).map((value) -> {
try {
return Double.valueOf(value);
} catch (Exception e) {
throw new ValidationException("Invalid double value for key '" + key + "'.", e);
}
});
}
/**
* Returns a double value under the given existing key.
*/
public double getDouble(String key) {
return getOptionalDouble(key).orElseThrow(exceptionSupplier(key));
}
/**
* Returns a float value under the given key if it exists.
*/
public Optional<Float> getOptionalFloat(String key) {
return optionalGet(key).map((value) -> {
try {
return Float.valueOf(value);
} catch (Exception e) {
throw new ValidationException("Invalid float value for key '" + key + "'.", e);
}
});
}
/**
* Returns a float value under the given given existing key.
*/
public float getFloat(String key) {
return getOptionalFloat(key).orElseThrow(exceptionSupplier(key));
}
/**
* Returns an integer value under the given key if it exists.
*/
public Optional<Integer> getOptionalInt(String key) {
return optionalGet(key).map((value) -> {
try {
return Integer.valueOf(value);
} catch (Exception e) {
throw new ValidationException("Invalid integer value for key '" + key + "'.", e);
}
});
}
/**
* Returns an integer value under the given existing key.
*/
public int getInt(String key) {
return getOptionalInt(key).orElseThrow(exceptionSupplier(key));
}
/**
* Returns a long value under the given key if it exists.
*/
public Optional<Long> getOptionalLong(String key) {
return optionalGet(key).map((value) -> {
try {
return Long.valueOf(value);
} catch (Exception e) {
throw new ValidationException("Invalid long value for key '" + key + "'.", e);
}
});
}
/**
* Returns a long value under the given existing key.
*/
public long getLong(String key) {
return getOptionalLong(key).orElseThrow(exceptionSupplier(key));
}
/**
* Returns a short value under the given key if it exists.
*/
public Optional<Short> getOptionalShort(String key) {
return optionalGet(key).map((value) -> {
try {
return Short.valueOf(value);
} catch (Exception e) {
throw new ValidationException("Invalid short value for key '" + key + "'.", e);
}
});
}
/**
* Returns a short value under the given existing key.
*/
public short getShort(String key) {
return getOptionalShort(key).orElseThrow(exceptionSupplier(key));
}
/**
* Returns the type information under the given key if it exists.
*/
public Optional<TypeInformation<?>> getOptionalType(String key) {
return optionalGet(key).map(TypeStringUtils::readTypeInfo);
}
/**
* Returns the type information under the given existing key.
*/
public TypeInformation<?> getType(String key) {
return getOptionalType(key).orElseThrow(exceptionSupplier(key));
}
/**
* Returns a table schema under the given key if it exists.
*/
public Optional<TableSchema> getOptionalTableSchema(String key) {
// filter for number of fields
final int fieldCount = properties.keySet().stream()
.filter((k) -> k.startsWith(key) && k.endsWith('.' + TABLE_SCHEMA_NAME))
.mapToInt((k) -> 1)
.sum();
if (fieldCount == 0) {
return Optional.empty();
}
// validate fields and build schema
final TableSchema.Builder schemaBuilder = TableSchema.builder();
for (int i = 0; i < fieldCount; i++) {
final String nameKey = key + '.' + i + '.' + TABLE_SCHEMA_NAME;
final String typeKey = key + '.' + i + '.' + TABLE_SCHEMA_TYPE;
final String name = optionalGet(nameKey).orElseThrow(exceptionSupplier(nameKey));
final InternalType type = optionalGet(typeKey)
.map(TypeStringUtils::readDataType)
.orElseThrow(exceptionSupplier(typeKey));
schemaBuilder.field(name, type);
}
return Optional.of(schemaBuilder.build());
}
/**
* Returns a table schema under the given existing key.
*/
public TableSchema getTableSchema(String key) {
return getOptionalTableSchema(key).orElseThrow(exceptionSupplier(key));
}
/**
* Returns a Flink {@link MemorySize} under the given key if it exists.
*/
public Optional<MemorySize> getOptionalMemorySize(String key) {
return optionalGet(key).map((value) -> {
try {
return MemorySize.parse(value, MemorySize.MemoryUnit.BYTES);
} catch (Exception e) {
throw new ValidationException("Invalid memory size value for key '" + key + "'.", e);
}
});
}
/**
* Returns a Flink {@link MemorySize} under the given existing key.
*/
public MemorySize getMemorySize(String key) {
return getOptionalMemorySize(key).orElseThrow(exceptionSupplier(key));
}
/**
* Returns the property keys of fixed indexed properties.
*
* <p>For example:
*
* <pre>
* schema.fields.0.type = INT, schema.fields.0.name = test
* schema.fields.1.type = LONG, schema.fields.1.name = test2
* </pre>
*
* <p>getFixedIndexedProperties("schema.fields", List("type", "name")) leads to:
*
* <pre>
* 0: Map("type" -> "schema.fields.0.type", "name" -> "schema.fields.0.name")
* 1: Map("type" -> "schema.fields.1.type", "name" -> "schema.fields.1.name")
* </pre>
*/
public List<Map<String, String>> getFixedIndexedProperties(String key, List<String> subKeys) {
// determine max index
final int maxIndex = extractMaxIndex(key, "\\.(.*)");
// validate and create result
final List<Map<String, String>> list = new ArrayList<>();
for (int i = 0; i <= maxIndex; i++) {
final Map<String, String> map = new HashMap<>();
for (String subKey : subKeys) {
final String fullKey = key + '.' + i + '.' + subKey;
// check for existence of full key
if (!containsKey(fullKey)) {
throw exceptionSupplier(fullKey).get();
}
map.put(subKey, fullKey);
}
list.add(map);
}
return list;
}
/**
* Returns the property keys of variable indexed properties.
*
* <p>For example:
*
* <pre>
* schema.fields.0.type = INT, schema.fields.0.name = test
* schema.fields.1.type = LONG
* </pre>
*
* <p>getFixedIndexedProperties("schema.fields", List("type")) leads to:
*
* <pre>
* 0: Map("type" -> "schema.fields.0.type", "name" -> "schema.fields.0.name")
* 1: Map("type" -> "schema.fields.1.type")
* </pre>
*/
public List<Map<String, String>> getVariableIndexedProperties(String key, List<String> requiredSubKeys) {
// determine max index
final int maxIndex = extractMaxIndex(key, "(\\.)?(.*)");
// determine optional properties
final String escapedKey = Pattern.quote(key);
final Pattern pattern = Pattern.compile(escapedKey + "\\.(\\d+)(\\.)?(.*)");
final Set<String> optionalSubKeys = properties.keySet().stream()
.flatMap((k) -> {
final Matcher matcher = pattern.matcher(k);
if (matcher.find()) {
return Stream.of(matcher.group(3));
}
return Stream.empty();
})
.filter((k) -> k.length() > 0)
.collect(Collectors.toSet());
// validate and create result
final List<Map<String, String>> list = new ArrayList<>();
for (int i = 0; i <= maxIndex; i++) {
final Map<String, String> map = new HashMap<>();
// check and add required keys
for (String subKey : requiredSubKeys) {
final String fullKey = key + '.' + i + '.' + subKey;
// check for existence of full key
if (!containsKey(fullKey)) {
throw exceptionSupplier(fullKey).get();
}
map.put(subKey, fullKey);
}
// add optional keys
for (String subKey : optionalSubKeys) {
final String fullKey = key + '.' + i + '.' + subKey;
optionalGet(fullKey).ifPresent(value -> map.put(subKey, fullKey));
}
list.add(map);
}
return list;
}
/**
* Returns all properties under a given key that contains an index in between.
*
* <p>E.g. rowtime.0.name -> returns all rowtime.#.name properties
*/
public Map<String, String> getIndexedProperty(String key, String subKey) {
final String escapedKey = Pattern.quote(key);
final String escapedSubKey = Pattern.quote(subKey);
return properties.entrySet().stream()
.filter(entry -> entry.getKey().matches(escapedKey + "\\.\\d+\\." + escapedSubKey))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
/**
* Returns all array elements under a given key if it exists.
*
* <p>For example:
*
* <pre>
* primary-key.0 = field1
* primary-key.1 = field2
* </pre>
*
* <p>leads to: List(field1, field2)
*
* <p>or:
*
* <pre>
* primary-key = field1
* </pre>
*
* <p>leads to: List(field1)
*
* <p>The key mapper gets the key of the current value e.g. "primary-key.1".
*/
public <E> Optional<List<E>> getOptionalArray(String key, Function<String, E> keyMapper) {
// determine max index
final int maxIndex = extractMaxIndex(key, "");
if (maxIndex < 0) {
// check for a single element array
if (containsKey(key)) {
return Optional.of(Collections.singletonList(keyMapper.apply(key)));
} else {
return Optional.empty();
}
} else {
final List<E> list = new ArrayList<>();
for (int i = 0; i < maxIndex + 1; i++) {
final String fullKey = key + '.' + i;
final E value = keyMapper.apply(fullKey);
list.add(value);
}
return Optional.of(list);
}
}
/**
* Returns all array elements under a given existing key.
*/
public <E> List<E> getArray(String key, Function<String, E> keyMapper) {
return getOptionalArray(key, keyMapper).orElseThrow(exceptionSupplier(key));
}
/**
* Returns if a value under key is exactly equal to the given value.
*/
public boolean isValue(String key, String value) {
return optionalGet(key).orElseThrow(exceptionSupplier(key)).equals(value);
}
// --------------------------------------------------------------------------------------------
/**
* Validates a string property.
*/
public void validateString(String key, boolean isOptional) {
validateString(key, isOptional, 0, Integer.MAX_VALUE);
}
/**
* Validates a string property. The boundaries are inclusive.
*/
public void validateString(String key, boolean isOptional, int minLen) {
validateString(key, isOptional, minLen, Integer.MAX_VALUE);
}
/**
* Validates a string property. The boundaries are inclusive.
*/
public void validateString(String key, boolean isOptional, int minLen, int maxLen) {
validateOptional(
key,
isOptional,
(value) -> {
final int length = value.length();
if (length < minLen || length > maxLen) {
throw new ValidationException(
"Property '" + key + "' must have a length between " + minLen + " and " + maxLen + " but was: " + value);
}
});
}
/**
* Validates an integer property.
*/
public void validateInt(String key, boolean isOptional) {
validateInt(key, isOptional, Integer.MIN_VALUE, Integer.MAX_VALUE);
}
/**
* Validates an integer property. The boundaries are inclusive.
*/
public void validateInt(String key, boolean isOptional, int min) {
validateInt(key, isOptional, min, Integer.MAX_VALUE);
}
/**
* Validates an integer property. The boundaries are inclusive.
*/
public void validateInt(String key, boolean isOptional, int min, int max) {
validateComparable(key, isOptional, min, max, "integer", Integer::valueOf);
}
/**
* Validates an long property.
*/
public void validateLong(String key, boolean isOptional) {
validateLong(key, isOptional, Long.MIN_VALUE, Long.MAX_VALUE);
}
/**
* Validates an long property. The boundaries are inclusive.
*/
public void validateLong(String key, boolean isOptional, long min) {
validateLong(key, isOptional, min, Long.MAX_VALUE);
}
/**
* Validates an long property. The boundaries are inclusive.
*/
public void validateLong(String key, boolean isOptional, long min, long max) {
validateComparable(key, isOptional, min, max, "long", Long::valueOf);
}
/**
* Validates that a certain value is present under the given key.
*/
public void validateValue(String key, String value, boolean isOptional) {
validateOptional(
key,
isOptional,
(v) -> {
if (!v.equals(value)) {
throw new ValidationException(
"Could not find required value '" + value + "' for property '" + key + "'.");
}
});
}
/**
* Validates that a boolean value is present under the given key.
*/
public void validateBoolean(String key, boolean isOptional) {
validateOptional(
key,
isOptional,
(value) -> {
if (!value.equalsIgnoreCase("true") && !value.equalsIgnoreCase("false")) {
throw new ValidationException(
"Property '" + key + "' must be a boolean value (true/false) but was: " + value);
}
});
}
/**
* Validates a double property.
*/
public void validateDouble(String key, boolean isOptional) {
validateDouble(key, isOptional, Double.MIN_VALUE, Double.MAX_VALUE);
}
/**
* Validates a double property. The boundaries are inclusive.
*/
public void validateDouble(String key, boolean isOptional, double min) {
validateDouble(key, isOptional, min, Double.MAX_VALUE);
}
/**
* Validates a double property. The boundaries are inclusive.
*/
public void validateDouble(String key, boolean isOptional, double min, double max) {
validateComparable(key, isOptional, min, max, "double", Double::valueOf);
}
/**
* Validates a big decimal property.
*/
public void validateBigDecimal(String key, boolean isOptional) {
validateOptional(
key,
isOptional,
(value) -> {
try {
new BigDecimal(value);
} catch (Exception e) {
throw new ValidationException(
"Property '" + key + "' must be a big decimal value but was: " + value);
}
});
}
/**
* Validates a big decimal property. The boundaries are inclusive.
*/
public void validateBigDecimal(String key, boolean isOptional, BigDecimal min, BigDecimal max) {
validateComparable(key, isOptional, min, max, "decimal", BigDecimal::new);
}
/**
* Validates a byte property.
*/
public void validateByte(String key, boolean isOptional) {
validateByte(key, isOptional, Byte.MIN_VALUE, Byte.MAX_VALUE);
}
/**
* Validates a byte property. The boundaries are inclusive.
*/
public void validateByte(String key, boolean isOptional, byte min) {
validateByte(key, isOptional, min, Byte.MAX_VALUE);
}
/**
* Validates a byte property. The boundaries are inclusive.
*/
public void validateByte(String key, boolean isOptional, byte min, byte max) {
validateComparable(key, isOptional, min, max, "byte", Byte::valueOf);
}
/**
* Validates a float property.
*/
public void validateFloat(String key, boolean isOptional) {
validateFloat(key, isOptional, Float.MIN_VALUE, Float.MAX_VALUE);
}
/**
* Validates a float property. The boundaries are inclusive.
*/
public void validateFloat(String key, boolean isOptional, float min) {
validateFloat(key, isOptional, min, Float.MAX_VALUE);
}
/**
* Validates a float property. The boundaries are inclusive.
*/
public void validateFloat(String key, boolean isOptional, float min, float max) {
validateComparable(key, isOptional, min, max, "float", Float::valueOf);
}
/**
* Validates a short property.
*/
public void validateShort(String key, boolean isOptional) {
validateShort(key, isOptional, Short.MIN_VALUE, Short.MAX_VALUE);
}
/**
* Validates a short property. The boundaries are inclusive.
*/
public void validateShort(String key, boolean isOptional, short min) {
validateShort(key, isOptional, min, Short.MAX_VALUE);
}
/**
* Validates a short property. The boundaries are inclusive.
*/
public void validateShort(String key, boolean isOptional, short min, short max) {
validateComparable(key, isOptional, min, max, "short", Short::valueOf);
}
/**
* Validation for fixed indexed properties.
*
* <p>For example:
*
* <pre>
* schema.fields.0.type = INT, schema.fields.0.name = test
* schema.fields.1.type = LONG, schema.fields.1.name = test2
* </pre>
*
* <p>The subKeyValidation map must define e.g. "type" and "name" and a validation logic for the given full key.
*/
public void validateFixedIndexedProperties(String key, boolean allowEmpty, Map<String, Consumer<String>> subKeyValidation) {
// determine max index
final int maxIndex = extractMaxIndex(key, "\\.(.*)");
if (maxIndex < 0 && !allowEmpty) {
throw new ValidationException("Property key '" + key + "' must not be empty.");
}
// validate
for (int i = 0; i < maxIndex; i++) {
for (Map.Entry<String, Consumer<String>> subKey : subKeyValidation.entrySet()) {
final String fullKey = key + '.' + i + '.' + subKey.getKey();
if (properties.containsKey(fullKey)) {
// run validation logic
subKey.getValue().accept(fullKey);
} else {
throw new ValidationException("Required property key '" + fullKey + "' is missing.");
}
}
}
}
/**
* Validates a table schema property.
*/
public void validateTableSchema(String key, boolean isOptional) {
final Consumer<String> nameValidation = (name) -> validateString(name, false, 1);
final Consumer<String> typeValidation = (name) -> validateType(name, false, false);
final Map<String, Consumer<String>> subKeys = new HashMap<>();
subKeys.put(TABLE_SCHEMA_NAME, nameValidation);
subKeys.put(TABLE_SCHEMA_TYPE, typeValidation);
validateFixedIndexedProperties(
key,
isOptional,
subKeys
);
}
/**
* Validates a Flink {@link MemorySize}.
*
* <p>The precision defines the allowed minimum unit in bytes (e.g. 1024 would only allow KB).
*/
public void validateMemorySize(String key, boolean isOptional, int precision) {
validateMemorySize(key, isOptional, precision, 0L, Long.MAX_VALUE);
}
/**
* Validates a Flink {@link MemorySize}. The boundaries are inclusive and in bytes.
*
* <p>The precision defines the allowed minimum unit in bytes (e.g. 1024 would only allow KB).
*/
public void validateMemorySize(String key, boolean isOptional, int precision, long min) {
validateMemorySize(key, isOptional, precision, min, Long.MAX_VALUE);
}
/**
* Validates a Flink {@link MemorySize}. The boundaries are inclusive and in bytes.
*
* <p>The precision defines the allowed minimum unit in bytes (e.g. 1024 would only allow KB).
*/
public void validateMemorySize(String key, boolean isOptional, int precision, long min, long max) {
Preconditions.checkArgument(precision > 0);
validateComparable(
key,
isOptional,
min,
max,
"memory size (in bytes)",
(value) -> {
final long bytes = MemorySize.parse(value, MemorySize.MemoryUnit.BYTES).getBytes();
if (bytes % precision != 0) {
throw new ValidationException(
"Memory size for key '" + key + "' must be a multiple of " + precision + " bytes but was: " + value);
}
return bytes;
}
);
}
/**
* Validates an enum property with a set of validation logic for each enum value.
*/
public void validateEnum(String key, boolean isOptional, Map<String, Consumer<String>> enumValidation) {
validateOptional(
key,
isOptional,
(value) -> {
if (!enumValidation.containsKey(value)) {
throw new ValidationException(
"Unknown value for property '" + key + "'.\n" +
"Supported values are " + enumValidation.keySet() + " but was: " + value);
} else {
// run validation logic
enumValidation.get(value).accept(key);
}
});
}
/**
* Validates an enum property with a set of enum values.
*/
public void validateEnumValues(String key, boolean isOptional, List<String> values) {
validateEnum(
key,
isOptional,
values.stream().collect(Collectors.toMap(v -> v, v -> noValidation())));
}
/**
* Validates a type property.
*/
public void validateType(String key, boolean isOptional, boolean requireRow) {
validateOptional(
key,
isOptional,
(value) -> {
// we don't validate the string but let the parser do the work for us
// it throws a validation exception
final TypeInformation<?> typeInfo = TypeStringUtils.readTypeInfo(value);
if (requireRow && !(typeInfo instanceof RowTypeInfo)) {
throw new ValidationException(
"Row type information expected for key '" + key + "' but was: " + value);
}
});
}
/**
* Validates an array of values.
*
* <p>For example:
*
* <pre>
* primary-key.0 = field1
* primary-key.1 = field2
* </pre>
*
* <p>leads to: List(field1, field2)
*
* <p>or:
*
* <pre>
* primary-key = field1
* </pre>
*
* <p>The validation consumer gets the key of the current value e.g. "primary-key.1".
*/
public void validateArray(String key, Consumer<String> elementValidation, int minLength) {
validateArray(key, elementValidation, minLength, Integer.MAX_VALUE);
}
/**
* Validates an array of values.
*
* <p>For example:
*
* <pre>
* primary-key.0 = field1
* primary-key.1 = field2
* </pre>
*
* <p>leads to: List(field1, field2)
*
* <p>or:
*
* <pre>
* primary-key = field1
* </pre>
*
* <p>The validation consumer gets the key of the current value e.g. "primary-key.1".
*/
public void validateArray(String key, Consumer<String> elementValidation, int minLength, int maxLength) {
// determine max index
final int maxIndex = extractMaxIndex(key, "");
if (maxIndex < 0) {
// check for a single element array
if (properties.containsKey(key)) {
elementValidation.accept(key);
} else if (minLength > 0) {
throw new ValidationException("Could not find required property array for key '" + key + "'.");
}
} else {
// do not allow a single element array
if (properties.containsKey(key)) {
throw new ValidationException("Invalid property array for key '" + key + "'.");
}
final int size = maxIndex + 1;
if (size < minLength) {
throw new ValidationException(
"Array for key '" + key + "' must not have less than " + minLength + " elements but was: " + size);
}
if (size > maxLength) {
throw new ValidationException(
"Array for key '" + key + "' must not have more than " + maxLength + " elements but was: " + size);
}
}
// validate array elements
for (int i = 0; i < maxIndex; i++) {
final String fullKey = key + '.' + i;
if (properties.containsKey(fullKey)) {
// run validation logic
elementValidation.accept(fullKey);
} else {
throw new ValidationException(
"Required array element at index '" + i + "' for key '" + key + "' is missing.");
}
}
}
/**
* Validates that the given prefix is not included in these properties.
*/
public void validatePrefixExclusion(String prefix) {
properties.keySet().stream()
.filter(k -> k.startsWith(prefix))
.findFirst()
.ifPresent((k) -> {
throw new ValidationException(
"Properties with prefix '" + prefix + "' are not allowed in this context. " +
"But property '" + k + "' was found.");
});
}
/**
* Validates that the given key is not included in these properties.
*/
public void validateExclusion(String key) {
if (properties.containsKey(key)) {
throw new ValidationException("Property '" + key + "' is not allowed in this context.");
}
}
// --------------------------------------------------------------------------------------------
/**
* Returns if the given key is contained.
*/
public boolean containsKey(String key) {
return properties.containsKey(key);
}
/**
* Returns if a given prefix exists in the properties.
*/
public boolean hasPrefix(String prefix) {
return properties.keySet().stream().anyMatch(k -> k.startsWith(prefix));
}
/**
* Returns the properties as a map copy.
*/
public Map<String, String> asMap() {
final Map<String, String> copy = new HashMap<>(properties);
return Collections.unmodifiableMap(copy);
}
/**
* Returns the properties as a map copy with a prefix key.
*/
public Map<String, String> asPrefixedMap(String prefix) {
return properties.entrySet().stream()
.collect(Collectors.toMap(e -> prefix + e.getKey(), Map.Entry::getValue));
}
/**
* Returns a new properties instance with the given keys removed.
*/
public DescriptorProperties withoutKeys(List<String> keys) {
final Set<String> keySet = new HashSet<>(keys);
final DescriptorProperties copy = new DescriptorProperties(normalizeKeys);
properties.entrySet().stream()
.filter(e -> !keySet.contains(e.getKey()))
.forEach(e -> copy.properties.put(e.getKey(), e.getValue()));
return copy;
}
@Override
public String toString() {
return toString(properties);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DescriptorProperties that = (DescriptorProperties) o;
return Objects.equals(properties, that.properties);
}
@Override
public int hashCode() {
return Objects.hash(properties);
}
// --------------------------------------------------------------------------------------------
private void put(String key, String value) {
if (properties.containsKey(key)) {
throw new ValidationException("Property already present: " + key);
}
if (normalizeKeys) {
properties.put(key.toLowerCase(), value);
} else {
properties.put(key, value);
}
}
private Optional<String> optionalGet(String key) {
return Optional.ofNullable(properties.get(key));
}
private void validateOptional(String key, boolean isOptional, Consumer<String> valueValidation) {
if (!properties.containsKey(key)) {
if (!isOptional) {
throw new ValidationException("Could not find required property '" + key + "'.");
}
} else {
final String value = properties.get(key);
valueValidation.accept(value);
}
}
private Supplier<TableException> exceptionSupplier(String key) {
return () -> {
throw new TableException(
"Property with key '" + key + "' could not be found. " +
"This is a bug because the validation logic should have checked that before.");
};
}
private int extractMaxIndex(String key, String suffixPattern) {
// extract index and property keys
final String escapedKey = Pattern.quote(key);
final Pattern pattern = Pattern.compile(escapedKey + "\\.(\\d+)" + suffixPattern);
final IntStream indexes = properties.keySet().stream()
.flatMapToInt(k -> {
final Matcher matcher = pattern.matcher(k);
if (matcher.find()) {
return IntStream.of(Integer.valueOf(matcher.group(1)));
}
return IntStream.empty();
});
// determine max index
return indexes.max().orElse(-1);
}
/**
* Validates a property by first parsing the string value to a comparable object. The boundaries are inclusive.
*/
private <T extends Comparable<T>> void validateComparable(
String key,
boolean isOptional,
T min,
T max,
String typeName,
Function<String, T> parseFunction) {
if (!properties.containsKey(key)) {
if (!isOptional) {
throw new ValidationException("Could not find required property '" + key + "'.");
}
} else {
final String value = properties.get(key);
try {
final T parsed = parseFunction.apply(value);
if (parsed.compareTo(min) < 0 || parsed.compareTo(max) > 0) {
throw new ValidationException(
"Property '" + key + "' must be a " + typeName + " value between " + min + " and " + max + " but was: " + parsed);
}
} catch (Exception e) {
throw new ValidationException("Property '" + key + "' must be a " + typeName + " value but was: " + value);
}
}
}
// --------------------------------------------------------------------------------------------
/**
* Returns an empty validation logic.
*/
public static Consumer<String> noValidation() {
return EMPTY_CONSUMER;
}
public static String toString(String str) {
return EncodingUtils.escapeJava(str);
}
public static String toString(String key, String value) {
return toString(key) + '=' + toString(value);
}
public static String toString(Map<String, String> propertyMap) {
return propertyMap.entrySet().stream()
.map(e -> toString(e.getKey(), e.getValue()))
.sorted()
.collect(Collectors.joining("\n"));
}
}