| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.table.descriptors; |
| |
| import org.apache.flink.annotation.Internal; |
| import org.apache.flink.api.common.typeinfo.TypeInformation; |
| import org.apache.flink.api.java.typeutils.RowTypeInfo; |
| import org.apache.flink.configuration.MemorySize; |
| import org.apache.flink.table.api.TableException; |
| import org.apache.flink.table.api.TableSchema; |
| import org.apache.flink.table.api.ValidationException; |
| import org.apache.flink.table.api.types.InternalType; |
| import org.apache.flink.table.utils.EncodingUtils; |
| import org.apache.flink.table.utils.TypeStringUtils; |
| import org.apache.flink.util.InstantiationUtil; |
| import org.apache.flink.util.Preconditions; |
| |
| import java.math.BigDecimal; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| import java.util.function.Supplier; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| import java.util.stream.Collectors; |
| import java.util.stream.IntStream; |
| import java.util.stream.Stream; |
| |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| |
| /** |
| * Utility class for having a unified string-based representation of Table API related classes |
| * such as TableSchema, TypeInformation, etc. |
| * |
| * <p>Note to implementers: Please try to reuse key names as much as possible. Key-names |
| * should be hierarchical and lower case. Use "-" instead of dots or camel case. |
| * E.g., connector.schema.start-from = from-earliest. Try not to use the higher level in a |
| * key-name. E.g., instead of connector.kafka.kafka-version use connector.kafka.version. |
| * |
| * <p>Properties with key normalization enabled contain only lower-case keys. |
| */ |
| @Internal |
| public class DescriptorProperties { |
| |
| public static final String TABLE_SCHEMA_NAME = "name"; |
| |
| public static final String TABLE_SCHEMA_TYPE = "type"; |
| |
| private static final Consumer<String> EMPTY_CONSUMER = (value) -> {}; |
| |
| private final boolean normalizeKeys; |
| |
| private final Map<String, String> properties; |
| |
| public DescriptorProperties(boolean normalizeKeys) { |
| this.properties = new HashMap<>(); |
| this.normalizeKeys = normalizeKeys; |
| } |
| |
| public DescriptorProperties() { |
| this(true); |
| } |
| |
| /** |
| * Adds a set of properties. |
| */ |
| public void putProperties(Map<String, String> properties) { |
| for (Map.Entry<String, String> property : properties.entrySet()) { |
| put(property.getKey(), property.getValue()); |
| } |
| } |
| |
| /** |
| * Adds a set of descriptor properties. |
| */ |
| public void putProperties(DescriptorProperties otherProperties) { |
| for (Map.Entry<String, String> otherProperty : otherProperties.properties.entrySet()) { |
| put(otherProperty.getKey(), otherProperty.getValue()); |
| } |
| } |
| |
| /** |
| * Adds a class under the given key. |
| */ |
| public void putClass(String key, Class<?> clazz) { |
| checkNotNull(key); |
| checkNotNull(clazz); |
| final String error = InstantiationUtil.checkForInstantiationError(clazz); |
| if (error != null) { |
| throw new ValidationException("Class '" + clazz.getName() + "' is not supported: " + error); |
| } |
| put(key, clazz.getName()); |
| } |
| |
| /** |
| * Adds a string under the given key. |
| */ |
| public void putString(String key, String str) { |
| checkNotNull(key); |
| checkNotNull(str); |
| put(key, str); |
| } |
| |
| /** |
| * Adds a boolean under the given key. |
| */ |
| public void putBoolean(String key, boolean b) { |
| checkNotNull(key); |
| put(key, Boolean.toString(b)); |
| } |
| |
| /** |
| * Adds a long under the given key. |
| */ |
| public void putLong(String key, long l) { |
| checkNotNull(key); |
| put(key, Long.toString(l)); |
| } |
| |
| /** |
| * Adds an integer under the given key. |
| */ |
| public void putInt(String key, int i) { |
| checkNotNull(key); |
| put(key, Integer.toString(i)); |
| } |
| |
| /** |
| * Adds a character under the given key. |
| */ |
| public void putCharacter(String key, char c) { |
| checkNotNull(key); |
| put(key, Character.toString(c)); |
| } |
| |
| /** |
| * Adds a table schema under the given key. |
| */ |
| public void putTableSchema(String key, TableSchema schema) { |
| checkNotNull(key); |
| checkNotNull(schema); |
| |
| final String[] fieldNames = schema.getFieldNames(); |
| final InternalType[] fieldTypes = schema.getFieldTypes(); |
| |
| final List<List<String>> values = new ArrayList<>(); |
| for (int i = 0; i < schema.getFieldCount(); i++) { |
| values.add(Arrays.asList(fieldNames[i], TypeStringUtils.writeDataType(fieldTypes[i]))); |
| } |
| |
| putIndexedFixedProperties( |
| key, |
| Arrays.asList(TABLE_SCHEMA_NAME, TABLE_SCHEMA_TYPE), |
| values); |
| } |
| |
| /** |
| * Adds a Flink {@link MemorySize} under the given key. |
| */ |
| public void putMemorySize(String key, MemorySize size) { |
| checkNotNull(key); |
| checkNotNull(size); |
| put(key, size.toString()); |
| } |
| |
| /** |
| * Adds an indexed sequence of properties (with sub-properties) under a common key. |
| * |
| * <p>For example: |
| * |
| * <pre> |
| * schema.fields.0.type = INT, schema.fields.0.name = test |
| * schema.fields.1.type = LONG, schema.fields.1.name = test2 |
| * </pre> |
| * |
| * <p>The arity of each subKeyValues must match the arity of propertyKeys. |
| */ |
| public void putIndexedFixedProperties(String key, List<String> subKeys, List<List<String>> subKeyValues) { |
| checkNotNull(key); |
| checkNotNull(subKeys); |
| checkNotNull(subKeyValues); |
| for (int idx = 0; idx < subKeyValues.size(); idx++) { |
| final List<String> values = subKeyValues.get(idx); |
| if (values == null || values.size() != subKeys.size()) { |
| throw new ValidationException("Values must have same arity as keys."); |
| } |
| for (int keyIdx = 0; keyIdx < values.size(); keyIdx++) { |
| put(key + '.' + idx + '.' + subKeys.get(keyIdx), values.get(keyIdx)); |
| } |
| } |
| } |
| |
| /** |
| * Adds an indexed mapping of properties under a common key. |
| * |
| * <p>For example: |
| * |
| * <pre> |
| * schema.fields.0.type = INT, schema.fields.0.name = test |
| * schema.fields.1.name = test2 |
| * </pre> |
| * |
| * <p>The arity of the subKeyValues can differ. |
| */ |
| public void putIndexedVariableProperties(String key, List<Map<String, String>> subKeyValues) { |
| checkNotNull(key); |
| checkNotNull(subKeyValues); |
| for (int idx = 0; idx < subKeyValues.size(); idx++) { |
| final Map<String, String> values = subKeyValues.get(idx); |
| for (Map.Entry<String, String> value : values.entrySet()) { |
| put(key + '.' + idx + '.' + value.getKey(), value.getValue()); |
| } |
| } |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * Returns a string value under the given key if it exists. |
| */ |
| public Optional<String> getOptionalString(String key) { |
| return optionalGet(key); |
| } |
| |
| /** |
| * Returns a string value under the given existing key. |
| */ |
| public String getString(String key) { |
| return getOptionalString(key).orElseThrow(exceptionSupplier(key)); |
| } |
| |
| /** |
| * Returns a character value under the given key if it exists. |
| */ |
| public Optional<Character> getOptionalCharacter(String key) { |
| return optionalGet(key).map((c) -> { |
| if (c.length() != 1) { |
| throw new ValidationException("The value of '" + key + "' must only contain one character."); |
| } |
| return c.charAt(0); |
| }); |
| } |
| |
| /** |
| * Returns a character value under the given existing key. |
| */ |
| public Character getCharacter(String key) { |
| return getOptionalCharacter(key).orElseThrow(exceptionSupplier(key)); |
| } |
| |
| /** |
| * Returns a class value under the given key if it exists. |
| */ |
| @SuppressWarnings("unchecked") |
| public <T> Optional<Class<T>> getOptionalClass(String key, Class<T> superClass) { |
| return optionalGet(key).map((name) -> { |
| final Class<?> clazz; |
| try { |
| clazz = Class.forName(name, true, Thread.currentThread().getContextClassLoader()); |
| if (!superClass.isAssignableFrom(clazz)) { |
| throw new ValidationException( |
| "Class '" + name + "' does not extend from the required class '" + |
| superClass.getName() + "' for key '" + key + "'."); |
| } |
| return (Class<T>) clazz; |
| } catch (Exception e) { |
| throw new ValidationException("Could not get class '" + name + "' for key '" + key + "'.", e); |
| } |
| }); |
| } |
| |
| /** |
| * Returns a class value under the given existing key. |
| */ |
| public <T> Class<T> getClass(String key, Class<T> superClass) { |
| return getOptionalClass(key, superClass).orElseThrow(exceptionSupplier(key)); |
| } |
| |
| /** |
| * Returns a big decimal value under the given key if it exists. |
| */ |
| public Optional<BigDecimal> getOptionalBigDecimal(String key) { |
| return optionalGet(key).map((value) -> { |
| try { |
| return new BigDecimal(value); |
| } catch (Exception e) { |
| throw new ValidationException("Invalid decimal value for key '" + key + "'.", e); |
| } |
| }); |
| } |
| |
| /** |
| * Returns a big decimal value under the given existing key. |
| */ |
| public BigDecimal getBigDecimal(String key) { |
| return getOptionalBigDecimal(key).orElseThrow(exceptionSupplier(key)); |
| } |
| |
| /** |
| * Returns a boolean value under the given key if it exists. |
| */ |
| public Optional<Boolean> getOptionalBoolean(String key) { |
| return optionalGet(key).map((value) -> { |
| try { |
| return Boolean.valueOf(value); |
| } catch (Exception e) { |
| throw new ValidationException("Invalid boolean value for key '" + key + "'.", e); |
| } |
| }); |
| } |
| |
| /** |
| * Returns a boolean value under the given existing key. |
| */ |
| public boolean getBoolean(String key) { |
| return getOptionalBoolean(key).orElseThrow(exceptionSupplier(key)); |
| } |
| |
| /** |
| * Returns a byte value under the given key if it exists. |
| */ |
| public Optional<Byte> getOptionalByte(String key) { |
| return optionalGet(key).map((value) -> { |
| try { |
| return Byte.valueOf(value); |
| } catch (Exception e) { |
| throw new ValidationException("Invalid byte value for key '" + key + "'.", e); |
| } |
| }); |
| } |
| |
| /** |
| * Returns a byte value under the given existing key. |
| */ |
| public byte getByte(String key) { |
| return getOptionalByte(key).orElseThrow(exceptionSupplier(key)); |
| } |
| |
| /** |
| * Returns a double value under the given key if it exists. |
| */ |
| public Optional<Double> getOptionalDouble(String key) { |
| return optionalGet(key).map((value) -> { |
| try { |
| return Double.valueOf(value); |
| } catch (Exception e) { |
| throw new ValidationException("Invalid double value for key '" + key + "'.", e); |
| } |
| }); |
| } |
| |
| /** |
| * Returns a double value under the given existing key. |
| */ |
| public double getDouble(String key) { |
| return getOptionalDouble(key).orElseThrow(exceptionSupplier(key)); |
| } |
| |
| /** |
| * Returns a float value under the given key if it exists. |
| */ |
| public Optional<Float> getOptionalFloat(String key) { |
| return optionalGet(key).map((value) -> { |
| try { |
| return Float.valueOf(value); |
| } catch (Exception e) { |
| throw new ValidationException("Invalid float value for key '" + key + "'.", e); |
| } |
| }); |
| } |
| |
| /** |
| * Returns a float value under the given given existing key. |
| */ |
| public float getFloat(String key) { |
| return getOptionalFloat(key).orElseThrow(exceptionSupplier(key)); |
| } |
| |
| /** |
| * Returns an integer value under the given key if it exists. |
| */ |
| public Optional<Integer> getOptionalInt(String key) { |
| return optionalGet(key).map((value) -> { |
| try { |
| return Integer.valueOf(value); |
| } catch (Exception e) { |
| throw new ValidationException("Invalid integer value for key '" + key + "'.", e); |
| } |
| }); |
| } |
| |
| /** |
| * Returns an integer value under the given existing key. |
| */ |
| public int getInt(String key) { |
| return getOptionalInt(key).orElseThrow(exceptionSupplier(key)); |
| } |
| |
| /** |
| * Returns a long value under the given key if it exists. |
| */ |
| public Optional<Long> getOptionalLong(String key) { |
| return optionalGet(key).map((value) -> { |
| try { |
| return Long.valueOf(value); |
| } catch (Exception e) { |
| throw new ValidationException("Invalid long value for key '" + key + "'.", e); |
| } |
| }); |
| } |
| |
| /** |
| * Returns a long value under the given existing key. |
| */ |
| public long getLong(String key) { |
| return getOptionalLong(key).orElseThrow(exceptionSupplier(key)); |
| } |
| |
| /** |
| * Returns a short value under the given key if it exists. |
| */ |
| public Optional<Short> getOptionalShort(String key) { |
| return optionalGet(key).map((value) -> { |
| try { |
| return Short.valueOf(value); |
| } catch (Exception e) { |
| throw new ValidationException("Invalid short value for key '" + key + "'.", e); |
| } |
| }); |
| } |
| |
| /** |
| * Returns a short value under the given existing key. |
| */ |
| public short getShort(String key) { |
| return getOptionalShort(key).orElseThrow(exceptionSupplier(key)); |
| } |
| |
| /** |
| * Returns the type information under the given key if it exists. |
| */ |
| public Optional<TypeInformation<?>> getOptionalType(String key) { |
| return optionalGet(key).map(TypeStringUtils::readTypeInfo); |
| } |
| |
| /** |
| * Returns the type information under the given existing key. |
| */ |
| public TypeInformation<?> getType(String key) { |
| return getOptionalType(key).orElseThrow(exceptionSupplier(key)); |
| } |
| |
| /** |
| * Returns a table schema under the given key if it exists. |
| */ |
| public Optional<TableSchema> getOptionalTableSchema(String key) { |
| // filter for number of fields |
| final int fieldCount = properties.keySet().stream() |
| .filter((k) -> k.startsWith(key) && k.endsWith('.' + TABLE_SCHEMA_NAME)) |
| .mapToInt((k) -> 1) |
| .sum(); |
| |
| if (fieldCount == 0) { |
| return Optional.empty(); |
| } |
| |
| // validate fields and build schema |
| final TableSchema.Builder schemaBuilder = TableSchema.builder(); |
| for (int i = 0; i < fieldCount; i++) { |
| final String nameKey = key + '.' + i + '.' + TABLE_SCHEMA_NAME; |
| final String typeKey = key + '.' + i + '.' + TABLE_SCHEMA_TYPE; |
| |
| final String name = optionalGet(nameKey).orElseThrow(exceptionSupplier(nameKey)); |
| |
| final InternalType type = optionalGet(typeKey) |
| .map(TypeStringUtils::readDataType) |
| .orElseThrow(exceptionSupplier(typeKey)); |
| |
| schemaBuilder.field(name, type); |
| } |
| return Optional.of(schemaBuilder.build()); |
| } |
| |
| /** |
| * Returns a table schema under the given existing key. |
| */ |
| public TableSchema getTableSchema(String key) { |
| return getOptionalTableSchema(key).orElseThrow(exceptionSupplier(key)); |
| } |
| |
| /** |
| * Returns a Flink {@link MemorySize} under the given key if it exists. |
| */ |
| public Optional<MemorySize> getOptionalMemorySize(String key) { |
| return optionalGet(key).map((value) -> { |
| try { |
| return MemorySize.parse(value, MemorySize.MemoryUnit.BYTES); |
| } catch (Exception e) { |
| throw new ValidationException("Invalid memory size value for key '" + key + "'.", e); |
| } |
| }); |
| } |
| |
| /** |
| * Returns a Flink {@link MemorySize} under the given existing key. |
| */ |
| public MemorySize getMemorySize(String key) { |
| return getOptionalMemorySize(key).orElseThrow(exceptionSupplier(key)); |
| } |
| |
| /** |
| * Returns the property keys of fixed indexed properties. |
| * |
| * <p>For example: |
| * |
| * <pre> |
| * schema.fields.0.type = INT, schema.fields.0.name = test |
| * schema.fields.1.type = LONG, schema.fields.1.name = test2 |
| * </pre> |
| * |
| * <p>getFixedIndexedProperties("schema.fields", List("type", "name")) leads to: |
| * |
| * <pre> |
| * 0: Map("type" -> "schema.fields.0.type", "name" -> "schema.fields.0.name") |
| * 1: Map("type" -> "schema.fields.1.type", "name" -> "schema.fields.1.name") |
| * </pre> |
| */ |
| public List<Map<String, String>> getFixedIndexedProperties(String key, List<String> subKeys) { |
| // determine max index |
| final int maxIndex = extractMaxIndex(key, "\\.(.*)"); |
| |
| // validate and create result |
| final List<Map<String, String>> list = new ArrayList<>(); |
| for (int i = 0; i <= maxIndex; i++) { |
| final Map<String, String> map = new HashMap<>(); |
| |
| for (String subKey : subKeys) { |
| final String fullKey = key + '.' + i + '.' + subKey; |
| // check for existence of full key |
| if (!containsKey(fullKey)) { |
| throw exceptionSupplier(fullKey).get(); |
| } |
| map.put(subKey, fullKey); |
| } |
| |
| list.add(map); |
| } |
| return list; |
| } |
| |
| /** |
| * Returns the property keys of variable indexed properties. |
| * |
| * <p>For example: |
| * |
| * <pre> |
| * schema.fields.0.type = INT, schema.fields.0.name = test |
| * schema.fields.1.type = LONG |
| * </pre> |
| * |
| * <p>getFixedIndexedProperties("schema.fields", List("type")) leads to: |
| * |
| * <pre> |
| * 0: Map("type" -> "schema.fields.0.type", "name" -> "schema.fields.0.name") |
| * 1: Map("type" -> "schema.fields.1.type") |
| * </pre> |
| */ |
| public List<Map<String, String>> getVariableIndexedProperties(String key, List<String> requiredSubKeys) { |
| |
| // determine max index |
| final int maxIndex = extractMaxIndex(key, "(\\.)?(.*)"); |
| |
| // determine optional properties |
| final String escapedKey = Pattern.quote(key); |
| final Pattern pattern = Pattern.compile(escapedKey + "\\.(\\d+)(\\.)?(.*)"); |
| final Set<String> optionalSubKeys = properties.keySet().stream() |
| .flatMap((k) -> { |
| final Matcher matcher = pattern.matcher(k); |
| if (matcher.find()) { |
| return Stream.of(matcher.group(3)); |
| } |
| return Stream.empty(); |
| }) |
| .filter((k) -> k.length() > 0) |
| .collect(Collectors.toSet()); |
| |
| // validate and create result |
| final List<Map<String, String>> list = new ArrayList<>(); |
| for (int i = 0; i <= maxIndex; i++) { |
| final Map<String, String> map = new HashMap<>(); |
| |
| // check and add required keys |
| for (String subKey : requiredSubKeys) { |
| final String fullKey = key + '.' + i + '.' + subKey; |
| // check for existence of full key |
| if (!containsKey(fullKey)) { |
| throw exceptionSupplier(fullKey).get(); |
| } |
| map.put(subKey, fullKey); |
| } |
| |
| // add optional keys |
| for (String subKey : optionalSubKeys) { |
| final String fullKey = key + '.' + i + '.' + subKey; |
| optionalGet(fullKey).ifPresent(value -> map.put(subKey, fullKey)); |
| } |
| |
| list.add(map); |
| } |
| return list; |
| } |
| |
| /** |
| * Returns all properties under a given key that contains an index in between. |
| * |
| * <p>E.g. rowtime.0.name -> returns all rowtime.#.name properties |
| */ |
| public Map<String, String> getIndexedProperty(String key, String subKey) { |
| final String escapedKey = Pattern.quote(key); |
| final String escapedSubKey = Pattern.quote(subKey); |
| return properties.entrySet().stream() |
| .filter(entry -> entry.getKey().matches(escapedKey + "\\.\\d+\\." + escapedSubKey)) |
| .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); |
| } |
| |
| /** |
| * Returns all array elements under a given key if it exists. |
| * |
| * <p>For example: |
| * |
| * <pre> |
| * primary-key.0 = field1 |
| * primary-key.1 = field2 |
| * </pre> |
| * |
| * <p>leads to: List(field1, field2) |
| * |
| * <p>or: |
| * |
| * <pre> |
| * primary-key = field1 |
| * </pre> |
| * |
| * <p>leads to: List(field1) |
| * |
| * <p>The key mapper gets the key of the current value e.g. "primary-key.1". |
| */ |
| public <E> Optional<List<E>> getOptionalArray(String key, Function<String, E> keyMapper) { |
| // determine max index |
| final int maxIndex = extractMaxIndex(key, ""); |
| |
| if (maxIndex < 0) { |
| // check for a single element array |
| if (containsKey(key)) { |
| return Optional.of(Collections.singletonList(keyMapper.apply(key))); |
| } else { |
| return Optional.empty(); |
| } |
| } else { |
| final List<E> list = new ArrayList<>(); |
| for (int i = 0; i < maxIndex + 1; i++) { |
| final String fullKey = key + '.' + i; |
| final E value = keyMapper.apply(fullKey); |
| list.add(value); |
| } |
| return Optional.of(list); |
| } |
| } |
| |
| /** |
| * Returns all array elements under a given existing key. |
| */ |
| public <E> List<E> getArray(String key, Function<String, E> keyMapper) { |
| return getOptionalArray(key, keyMapper).orElseThrow(exceptionSupplier(key)); |
| } |
| |
| /** |
| * Returns if a value under key is exactly equal to the given value. |
| */ |
| public boolean isValue(String key, String value) { |
| return optionalGet(key).orElseThrow(exceptionSupplier(key)).equals(value); |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * Validates a string property. |
| */ |
| public void validateString(String key, boolean isOptional) { |
| validateString(key, isOptional, 0, Integer.MAX_VALUE); |
| } |
| |
| /** |
| * Validates a string property. The boundaries are inclusive. |
| */ |
| public void validateString(String key, boolean isOptional, int minLen) { |
| validateString(key, isOptional, minLen, Integer.MAX_VALUE); |
| } |
| |
| /** |
| * Validates a string property. The boundaries are inclusive. |
| */ |
| public void validateString(String key, boolean isOptional, int minLen, int maxLen) { |
| validateOptional( |
| key, |
| isOptional, |
| (value) -> { |
| final int length = value.length(); |
| if (length < minLen || length > maxLen) { |
| throw new ValidationException( |
| "Property '" + key + "' must have a length between " + minLen + " and " + maxLen + " but was: " + value); |
| } |
| }); |
| } |
| |
| /** |
| * Validates an integer property. |
| */ |
| public void validateInt(String key, boolean isOptional) { |
| validateInt(key, isOptional, Integer.MIN_VALUE, Integer.MAX_VALUE); |
| } |
| |
| /** |
| * Validates an integer property. The boundaries are inclusive. |
| */ |
| public void validateInt(String key, boolean isOptional, int min) { |
| validateInt(key, isOptional, min, Integer.MAX_VALUE); |
| } |
| |
| /** |
| * Validates an integer property. The boundaries are inclusive. |
| */ |
| public void validateInt(String key, boolean isOptional, int min, int max) { |
| validateComparable(key, isOptional, min, max, "integer", Integer::valueOf); |
| } |
| |
| /** |
| * Validates an long property. |
| */ |
| public void validateLong(String key, boolean isOptional) { |
| validateLong(key, isOptional, Long.MIN_VALUE, Long.MAX_VALUE); |
| } |
| |
| /** |
| * Validates an long property. The boundaries are inclusive. |
| */ |
| public void validateLong(String key, boolean isOptional, long min) { |
| validateLong(key, isOptional, min, Long.MAX_VALUE); |
| } |
| |
| /** |
| * Validates an long property. The boundaries are inclusive. |
| */ |
| public void validateLong(String key, boolean isOptional, long min, long max) { |
| validateComparable(key, isOptional, min, max, "long", Long::valueOf); |
| } |
| |
| /** |
| * Validates that a certain value is present under the given key. |
| */ |
| public void validateValue(String key, String value, boolean isOptional) { |
| validateOptional( |
| key, |
| isOptional, |
| (v) -> { |
| if (!v.equals(value)) { |
| throw new ValidationException( |
| "Could not find required value '" + value + "' for property '" + key + "'."); |
| } |
| }); |
| } |
| |
| /** |
| * Validates that a boolean value is present under the given key. |
| */ |
| public void validateBoolean(String key, boolean isOptional) { |
| validateOptional( |
| key, |
| isOptional, |
| (value) -> { |
| if (!value.equalsIgnoreCase("true") && !value.equalsIgnoreCase("false")) { |
| throw new ValidationException( |
| "Property '" + key + "' must be a boolean value (true/false) but was: " + value); |
| } |
| }); |
| } |
| |
| /** |
| * Validates a double property. |
| */ |
| public void validateDouble(String key, boolean isOptional) { |
| validateDouble(key, isOptional, Double.MIN_VALUE, Double.MAX_VALUE); |
| } |
| |
| /** |
| * Validates a double property. The boundaries are inclusive. |
| */ |
| public void validateDouble(String key, boolean isOptional, double min) { |
| validateDouble(key, isOptional, min, Double.MAX_VALUE); |
| } |
| |
| /** |
| * Validates a double property. The boundaries are inclusive. |
| */ |
| public void validateDouble(String key, boolean isOptional, double min, double max) { |
| validateComparable(key, isOptional, min, max, "double", Double::valueOf); |
| } |
| |
| /** |
| * Validates a big decimal property. |
| */ |
| public void validateBigDecimal(String key, boolean isOptional) { |
| validateOptional( |
| key, |
| isOptional, |
| (value) -> { |
| try { |
| new BigDecimal(value); |
| } catch (Exception e) { |
| throw new ValidationException( |
| "Property '" + key + "' must be a big decimal value but was: " + value); |
| } |
| }); |
| } |
| |
| /** |
| * Validates a big decimal property. The boundaries are inclusive. |
| */ |
| public void validateBigDecimal(String key, boolean isOptional, BigDecimal min, BigDecimal max) { |
| validateComparable(key, isOptional, min, max, "decimal", BigDecimal::new); |
| } |
| |
| /** |
| * Validates a byte property. |
| */ |
| public void validateByte(String key, boolean isOptional) { |
| validateByte(key, isOptional, Byte.MIN_VALUE, Byte.MAX_VALUE); |
| } |
| |
| /** |
| * Validates a byte property. The boundaries are inclusive. |
| */ |
| public void validateByte(String key, boolean isOptional, byte min) { |
| validateByte(key, isOptional, min, Byte.MAX_VALUE); |
| } |
| |
| /** |
| * Validates a byte property. The boundaries are inclusive. |
| */ |
| public void validateByte(String key, boolean isOptional, byte min, byte max) { |
| validateComparable(key, isOptional, min, max, "byte", Byte::valueOf); |
| } |
| |
| /** |
| * Validates a float property. |
| */ |
| public void validateFloat(String key, boolean isOptional) { |
| validateFloat(key, isOptional, Float.MIN_VALUE, Float.MAX_VALUE); |
| } |
| |
| /** |
| * Validates a float property. The boundaries are inclusive. |
| */ |
| public void validateFloat(String key, boolean isOptional, float min) { |
| validateFloat(key, isOptional, min, Float.MAX_VALUE); |
| } |
| |
| /** |
| * Validates a float property. The boundaries are inclusive. |
| */ |
| public void validateFloat(String key, boolean isOptional, float min, float max) { |
| validateComparable(key, isOptional, min, max, "float", Float::valueOf); |
| } |
| |
| /** |
| * Validates a short property. |
| */ |
| public void validateShort(String key, boolean isOptional) { |
| validateShort(key, isOptional, Short.MIN_VALUE, Short.MAX_VALUE); |
| } |
| |
| /** |
| * Validates a short property. The boundaries are inclusive. |
| */ |
| public void validateShort(String key, boolean isOptional, short min) { |
| validateShort(key, isOptional, min, Short.MAX_VALUE); |
| } |
| |
| /** |
| * Validates a short property. The boundaries are inclusive. |
| */ |
| public void validateShort(String key, boolean isOptional, short min, short max) { |
| validateComparable(key, isOptional, min, max, "short", Short::valueOf); |
| } |
| |
| /** |
| * Validation for fixed indexed properties. |
| * |
| * <p>For example: |
| * |
| * <pre> |
| * schema.fields.0.type = INT, schema.fields.0.name = test |
| * schema.fields.1.type = LONG, schema.fields.1.name = test2 |
| * </pre> |
| * |
| * <p>The subKeyValidation map must define e.g. "type" and "name" and a validation logic for the given full key. |
| */ |
| public void validateFixedIndexedProperties(String key, boolean allowEmpty, Map<String, Consumer<String>> subKeyValidation) { |
| // determine max index |
| final int maxIndex = extractMaxIndex(key, "\\.(.*)"); |
| |
| if (maxIndex < 0 && !allowEmpty) { |
| throw new ValidationException("Property key '" + key + "' must not be empty."); |
| } |
| |
| // validate |
| for (int i = 0; i < maxIndex; i++) { |
| for (Map.Entry<String, Consumer<String>> subKey : subKeyValidation.entrySet()) { |
| final String fullKey = key + '.' + i + '.' + subKey.getKey(); |
| if (properties.containsKey(fullKey)) { |
| // run validation logic |
| subKey.getValue().accept(fullKey); |
| } else { |
| throw new ValidationException("Required property key '" + fullKey + "' is missing."); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Validates a table schema property. |
| */ |
| public void validateTableSchema(String key, boolean isOptional) { |
| final Consumer<String> nameValidation = (name) -> validateString(name, false, 1); |
| final Consumer<String> typeValidation = (name) -> validateType(name, false, false); |
| |
| final Map<String, Consumer<String>> subKeys = new HashMap<>(); |
| subKeys.put(TABLE_SCHEMA_NAME, nameValidation); |
| subKeys.put(TABLE_SCHEMA_TYPE, typeValidation); |
| |
| validateFixedIndexedProperties( |
| key, |
| isOptional, |
| subKeys |
| ); |
| } |
| |
| /** |
| * Validates a Flink {@link MemorySize}. |
| * |
| * <p>The precision defines the allowed minimum unit in bytes (e.g. 1024 would only allow KB). |
| */ |
| public void validateMemorySize(String key, boolean isOptional, int precision) { |
| validateMemorySize(key, isOptional, precision, 0L, Long.MAX_VALUE); |
| } |
| |
| /** |
| * Validates a Flink {@link MemorySize}. The boundaries are inclusive and in bytes. |
| * |
| * <p>The precision defines the allowed minimum unit in bytes (e.g. 1024 would only allow KB). |
| */ |
| public void validateMemorySize(String key, boolean isOptional, int precision, long min) { |
| validateMemorySize(key, isOptional, precision, min, Long.MAX_VALUE); |
| } |
| |
| /** |
| * Validates a Flink {@link MemorySize}. The boundaries are inclusive and in bytes. |
| * |
| * <p>The precision defines the allowed minimum unit in bytes (e.g. 1024 would only allow KB). |
| */ |
| public void validateMemorySize(String key, boolean isOptional, int precision, long min, long max) { |
| Preconditions.checkArgument(precision > 0); |
| |
| validateComparable( |
| key, |
| isOptional, |
| min, |
| max, |
| "memory size (in bytes)", |
| (value) -> { |
| final long bytes = MemorySize.parse(value, MemorySize.MemoryUnit.BYTES).getBytes(); |
| if (bytes % precision != 0) { |
| throw new ValidationException( |
| "Memory size for key '" + key + "' must be a multiple of " + precision + " bytes but was: " + value); |
| } |
| return bytes; |
| } |
| ); |
| } |
| |
| /** |
| * Validates an enum property with a set of validation logic for each enum value. |
| */ |
| public void validateEnum(String key, boolean isOptional, Map<String, Consumer<String>> enumValidation) { |
| validateOptional( |
| key, |
| isOptional, |
| (value) -> { |
| if (!enumValidation.containsKey(value)) { |
| throw new ValidationException( |
| "Unknown value for property '" + key + "'.\n" + |
| "Supported values are " + enumValidation.keySet() + " but was: " + value); |
| } else { |
| // run validation logic |
| enumValidation.get(value).accept(key); |
| } |
| }); |
| } |
| |
| /** |
| * Validates an enum property with a set of enum values. |
| */ |
| public void validateEnumValues(String key, boolean isOptional, List<String> values) { |
| validateEnum( |
| key, |
| isOptional, |
| values.stream().collect(Collectors.toMap(v -> v, v -> noValidation()))); |
| } |
| |
| /** |
| * Validates a type property. |
| */ |
| public void validateType(String key, boolean isOptional, boolean requireRow) { |
| validateOptional( |
| key, |
| isOptional, |
| (value) -> { |
| // we don't validate the string but let the parser do the work for us |
| // it throws a validation exception |
| final TypeInformation<?> typeInfo = TypeStringUtils.readTypeInfo(value); |
| if (requireRow && !(typeInfo instanceof RowTypeInfo)) { |
| throw new ValidationException( |
| "Row type information expected for key '" + key + "' but was: " + value); |
| } |
| }); |
| } |
| |
| /** |
| * Validates an array of values. |
| * |
| * <p>For example: |
| * |
| * <pre> |
| * primary-key.0 = field1 |
| * primary-key.1 = field2 |
| * </pre> |
| * |
| * <p>leads to: List(field1, field2) |
| * |
| * <p>or: |
| * |
| * <pre> |
| * primary-key = field1 |
| * </pre> |
| * |
| * <p>The validation consumer gets the key of the current value e.g. "primary-key.1". |
| */ |
| public void validateArray(String key, Consumer<String> elementValidation, int minLength) { |
| validateArray(key, elementValidation, minLength, Integer.MAX_VALUE); |
| } |
| |
| /** |
| * Validates an array of values. |
| * |
| * <p>For example: |
| * |
| * <pre> |
| * primary-key.0 = field1 |
| * primary-key.1 = field2 |
| * </pre> |
| * |
| * <p>leads to: List(field1, field2) |
| * |
| * <p>or: |
| * |
| * <pre> |
| * primary-key = field1 |
| * </pre> |
| * |
| * <p>The validation consumer gets the key of the current value e.g. "primary-key.1". |
| */ |
| public void validateArray(String key, Consumer<String> elementValidation, int minLength, int maxLength) { |
| |
| // determine max index |
| final int maxIndex = extractMaxIndex(key, ""); |
| |
| if (maxIndex < 0) { |
| // check for a single element array |
| if (properties.containsKey(key)) { |
| elementValidation.accept(key); |
| } else if (minLength > 0) { |
| throw new ValidationException("Could not find required property array for key '" + key + "'."); |
| } |
| } else { |
| // do not allow a single element array |
| if (properties.containsKey(key)) { |
| throw new ValidationException("Invalid property array for key '" + key + "'."); |
| } |
| |
| final int size = maxIndex + 1; |
| if (size < minLength) { |
| throw new ValidationException( |
| "Array for key '" + key + "' must not have less than " + minLength + " elements but was: " + size); |
| } |
| |
| if (size > maxLength) { |
| throw new ValidationException( |
| "Array for key '" + key + "' must not have more than " + maxLength + " elements but was: " + size); |
| } |
| } |
| |
| // validate array elements |
| for (int i = 0; i < maxIndex; i++) { |
| final String fullKey = key + '.' + i; |
| if (properties.containsKey(fullKey)) { |
| // run validation logic |
| elementValidation.accept(fullKey); |
| } else { |
| throw new ValidationException( |
| "Required array element at index '" + i + "' for key '" + key + "' is missing."); |
| } |
| } |
| } |
| |
| /** |
| * Validates that the given prefix is not included in these properties. |
| */ |
| public void validatePrefixExclusion(String prefix) { |
| properties.keySet().stream() |
| .filter(k -> k.startsWith(prefix)) |
| .findFirst() |
| .ifPresent((k) -> { |
| throw new ValidationException( |
| "Properties with prefix '" + prefix + "' are not allowed in this context. " + |
| "But property '" + k + "' was found."); |
| }); |
| } |
| |
| /** |
| * Validates that the given key is not included in these properties. |
| */ |
| public void validateExclusion(String key) { |
| if (properties.containsKey(key)) { |
| throw new ValidationException("Property '" + key + "' is not allowed in this context."); |
| } |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * Returns if the given key is contained. |
| */ |
| public boolean containsKey(String key) { |
| return properties.containsKey(key); |
| } |
| |
| /** |
| * Returns if a given prefix exists in the properties. |
| */ |
| public boolean hasPrefix(String prefix) { |
| return properties.keySet().stream().anyMatch(k -> k.startsWith(prefix)); |
| } |
| |
| /** |
| * Returns the properties as a map copy. |
| */ |
| public Map<String, String> asMap() { |
| final Map<String, String> copy = new HashMap<>(properties); |
| return Collections.unmodifiableMap(copy); |
| } |
| |
| /** |
| * Returns the properties as a map copy with a prefix key. |
| */ |
| public Map<String, String> asPrefixedMap(String prefix) { |
| return properties.entrySet().stream() |
| .collect(Collectors.toMap(e -> prefix + e.getKey(), Map.Entry::getValue)); |
| } |
| |
| /** |
| * Returns a new properties instance with the given keys removed. |
| */ |
| public DescriptorProperties withoutKeys(List<String> keys) { |
| final Set<String> keySet = new HashSet<>(keys); |
| final DescriptorProperties copy = new DescriptorProperties(normalizeKeys); |
| properties.entrySet().stream() |
| .filter(e -> !keySet.contains(e.getKey())) |
| .forEach(e -> copy.properties.put(e.getKey(), e.getValue())); |
| return copy; |
| } |
| |
| @Override |
| public String toString() { |
| return toString(properties); |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| DescriptorProperties that = (DescriptorProperties) o; |
| return Objects.equals(properties, that.properties); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(properties); |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| private void put(String key, String value) { |
| if (properties.containsKey(key)) { |
| throw new ValidationException("Property already present: " + key); |
| } |
| if (normalizeKeys) { |
| properties.put(key.toLowerCase(), value); |
| } else { |
| properties.put(key, value); |
| } |
| } |
| |
| private Optional<String> optionalGet(String key) { |
| return Optional.ofNullable(properties.get(key)); |
| } |
| |
| private void validateOptional(String key, boolean isOptional, Consumer<String> valueValidation) { |
| if (!properties.containsKey(key)) { |
| if (!isOptional) { |
| throw new ValidationException("Could not find required property '" + key + "'."); |
| } |
| } else { |
| final String value = properties.get(key); |
| valueValidation.accept(value); |
| } |
| } |
| |
| private Supplier<TableException> exceptionSupplier(String key) { |
| return () -> { |
| throw new TableException( |
| "Property with key '" + key + "' could not be found. " + |
| "This is a bug because the validation logic should have checked that before."); |
| }; |
| } |
| |
| private int extractMaxIndex(String key, String suffixPattern) { |
| // extract index and property keys |
| final String escapedKey = Pattern.quote(key); |
| final Pattern pattern = Pattern.compile(escapedKey + "\\.(\\d+)" + suffixPattern); |
| final IntStream indexes = properties.keySet().stream() |
| .flatMapToInt(k -> { |
| final Matcher matcher = pattern.matcher(k); |
| if (matcher.find()) { |
| return IntStream.of(Integer.valueOf(matcher.group(1))); |
| } |
| return IntStream.empty(); |
| }); |
| |
| // determine max index |
| return indexes.max().orElse(-1); |
| } |
| |
| /** |
| * Validates a property by first parsing the string value to a comparable object. The boundaries are inclusive. |
| */ |
| private <T extends Comparable<T>> void validateComparable( |
| String key, |
| boolean isOptional, |
| T min, |
| T max, |
| String typeName, |
| Function<String, T> parseFunction) { |
| if (!properties.containsKey(key)) { |
| if (!isOptional) { |
| throw new ValidationException("Could not find required property '" + key + "'."); |
| } |
| } else { |
| final String value = properties.get(key); |
| try { |
| final T parsed = parseFunction.apply(value); |
| if (parsed.compareTo(min) < 0 || parsed.compareTo(max) > 0) { |
| throw new ValidationException( |
| "Property '" + key + "' must be a " + typeName + " value between " + min + " and " + max + " but was: " + parsed); |
| } |
| } catch (Exception e) { |
| throw new ValidationException("Property '" + key + "' must be a " + typeName + " value but was: " + value); |
| } |
| } |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * Returns an empty validation logic. |
| */ |
| public static Consumer<String> noValidation() { |
| return EMPTY_CONSUMER; |
| } |
| |
| public static String toString(String str) { |
| return EncodingUtils.escapeJava(str); |
| } |
| |
| public static String toString(String key, String value) { |
| return toString(key) + '=' + toString(value); |
| } |
| |
| public static String toString(Map<String, String> propertyMap) { |
| return propertyMap.entrySet().stream() |
| .map(e -> toString(e.getKey(), e.getValue())) |
| .sorted() |
| .collect(Collectors.joining("\n")); |
| } |
| } |