blob: d73089875cef09af71f3ace4b2667c980ea1a0d2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.external.util;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_DELIMITER;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_ESCAPE;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_EXCLUDE;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_EXTERNAL_SCAN_BUFFER_SIZE;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_INCLUDE;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START;
import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureBlobProperties;
import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties;
import static org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties;
import static org.apache.asterix.runtime.evaluators.functions.StringEvaluatorUtils.RESERVED_REGEX_CHARS;
import static org.msgpack.core.MessagePack.Code.ARRAY16;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.common.library.ILibrary;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.external.api.IDataParserFactory;
import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.library.JavaLibrary;
import org.apache.asterix.external.library.msgpack.MessagePackUtils;
import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.AUnionType;
import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.types.TypeTagUtil;
import org.apache.asterix.runtime.evaluators.common.NumberUtils;
import org.apache.asterix.runtime.projection.DataProjectionInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.data.std.primitive.TaggedValuePointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.parsers.BooleanParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import org.apache.hyracks.util.StorageUtil;
public class ExternalDataUtils {
private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024;
private static final int HEADER_FUDGE = 64;
static {
valueParserFactoryMap.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE);
valueParserFactoryMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
valueParserFactoryMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
valueParserFactoryMap.put(ATypeTag.BIGINT, LongParserFactory.INSTANCE);
valueParserFactoryMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
valueParserFactoryMap.put(ATypeTag.BOOLEAN, BooleanParserFactory.INSTANCE);
}
private ExternalDataUtils() {
}
public static int getOrDefaultBufferSize(Map<String, String> configuration) {
String bufferSize = configuration.get(KEY_EXTERNAL_SCAN_BUFFER_SIZE);
return bufferSize != null ? Integer.parseInt(bufferSize) : ExternalDataConstants.DEFAULT_BUFFER_SIZE;
}
// Get a delimiter from the given configuration
public static char validateGetDelimiter(Map<String, String> configuration) throws HyracksDataException {
return validateCharOrDefault(configuration, KEY_DELIMITER, ExternalDataConstants.DEFAULT_DELIMITER.charAt(0));
}
// Get a quote from the given configuration when the delimiter is given
// Need to pass delimiter to check whether they share the same character
public static char validateGetQuote(Map<String, String> configuration, char delimiter) throws HyracksDataException {
char quote = validateCharOrDefault(configuration, KEY_QUOTE, ExternalDataConstants.DEFAULT_QUOTE.charAt(0));
validateDelimiterAndQuote(delimiter, quote);
return quote;
}
public static char validateGetEscape(Map<String, String> configuration) throws HyracksDataException {
return validateCharOrDefault(configuration, KEY_ESCAPE, ExternalDataConstants.ESCAPE);
}
public static char validateGetRecordStart(Map<String, String> configuration) throws HyracksDataException {
return validateCharOrDefault(configuration, KEY_RECORD_START, ExternalDataConstants.DEFAULT_RECORD_START);
}
public static char validateGetRecordEnd(Map<String, String> configuration) throws HyracksDataException {
return validateCharOrDefault(configuration, KEY_RECORD_END, ExternalDataConstants.DEFAULT_RECORD_END);
}
public static void validateDataParserParameters(Map<String, String> configuration) throws AsterixException {
String parser = configuration.get(ExternalDataConstants.KEY_FORMAT);
if (parser == null) {
String parserFactory = configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY);
if (parserFactory == null) {
throw AsterixException.create(ErrorCode.PARAMETERS_REQUIRED,
ExternalDataConstants.KEY_FORMAT + " or " + ExternalDataConstants.KEY_PARSER_FACTORY);
}
}
}
public static void validateDataSourceParameters(Map<String, String> configuration) throws AsterixException {
String reader = configuration.get(ExternalDataConstants.KEY_READER);
if (reader == null) {
throw AsterixException.create(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_READER);
}
}
public static DataSourceType getDataSourceType(Map<String, String> configuration) {
String reader = configuration.get(ExternalDataConstants.KEY_READER);
if ((reader != null) && reader.equals(ExternalDataConstants.READER_STREAM)) {
return DataSourceType.STREAM;
} else {
return DataSourceType.RECORDS;
}
}
public static boolean isExternal(String aString) {
return ((aString != null) && aString.contains(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR)
&& (aString.trim().length() > 1));
}
public static String getLibraryName(String aString) {
return aString.trim().split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
}
public static String getExternalClassName(String aString) {
return aString.trim().split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[1];
}
public static IInputStreamFactory createExternalInputStreamFactory(ILibraryManager libraryManager,
DataverseName dataverse, String stream) throws HyracksDataException {
try {
String libraryName = getLibraryName(stream);
String className = getExternalClassName(stream);
ILibrary lib = libraryManager.getLibrary(dataverse, libraryName);
if (lib.getLanguage() != ExternalFunctionLanguage.JAVA) {
throw new HyracksDataException("Unexpected library language: " + lib.getLanguage());
}
ClassLoader classLoader = ((JavaLibrary) lib).getClassLoader();
return ((IInputStreamFactory) (classLoader.loadClass(className).newInstance()));
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
throw new RuntimeDataException(ErrorCode.UTIL_EXTERNAL_DATA_UTILS_FAIL_CREATE_STREAM_FACTORY, e);
}
}
public static DataverseName getDatasetDataverse(Map<String, String> configuration) throws AsterixException {
return DataverseName.createFromCanonicalForm(configuration.get(ExternalDataConstants.KEY_DATASET_DATAVERSE));
}
public static String getParserFactory(Map<String, String> configuration) {
String parserFactory = configuration.get(ExternalDataConstants.KEY_PARSER);
if (parserFactory != null) {
return parserFactory;
}
parserFactory = configuration.get(ExternalDataConstants.KEY_FORMAT);
return parserFactory != null ? parserFactory : configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY);
}
public static IValueParserFactory[] getValueParserFactories(ARecordType recordType) {
int n = recordType.getFieldTypes().length;
IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
for (int i = 0; i < n; i++) {
ATypeTag tag = null;
if (recordType.getFieldTypes()[i].getTypeTag() == ATypeTag.UNION) {
AUnionType unionType = (AUnionType) recordType.getFieldTypes()[i];
if (!unionType.isUnknownableType()) {
throw new NotImplementedException("Non-optional UNION type is not supported.");
}
tag = unionType.getActualType().getTypeTag();
} else {
tag = recordType.getFieldTypes()[i].getTypeTag();
}
if (tag == null) {
throw new NotImplementedException("Failed to get the type information for field " + i + ".");
}
fieldParserFactories[i] = getParserFactory(tag);
}
return fieldParserFactories;
}
public static IValueParserFactory getParserFactory(ATypeTag tag) {
IValueParserFactory vpf = valueParserFactoryMap.get(tag);
if (vpf == null) {
throw new NotImplementedException("No value parser factory for fields of type " + tag);
}
return vpf;
}
public static boolean hasHeader(Map<String, String> configuration) {
return isTrue(configuration, ExternalDataConstants.KEY_HEADER);
}
public static boolean isTrue(Map<String, String> configuration, String key) {
String value = configuration.get(key);
return value == null ? false : Boolean.valueOf(value);
}
// Currently not used.
public static IRecordReaderFactory<?> createExternalRecordReaderFactory(ILibraryManager libraryManager,
Map<String, String> configuration) throws AsterixException {
String readerFactory = configuration.get(ExternalDataConstants.KEY_READER_FACTORY);
if (readerFactory == null) {
throw new AsterixException("to use " + ExternalDataConstants.EXTERNAL + " reader, the parameter "
+ ExternalDataConstants.KEY_READER_FACTORY + " must be specified.");
}
String[] libraryAndFactory = readerFactory.split(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR); //TODO(MULTI_PART_DATAVERSE_NAME):REVISIT
if (libraryAndFactory.length != 2) {
throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER_FACTORY
+ " must follow the format \"DataverseName.LibraryName#ReaderFactoryFullyQualifiedName\"");
}
String[] dataverseAndLibrary = libraryAndFactory[0].split("\\.");
if (dataverseAndLibrary.length != 2) {
throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER_FACTORY
+ " must follow the format \"DataverseName.LibraryName#ReaderFactoryFullyQualifiedName\"");
}
DataverseName dataverseName = DataverseName.createSinglePartName(dataverseAndLibrary[0]); //TODO(MULTI_PART_DATAVERSE_NAME):REVISIT
String libraryName = dataverseAndLibrary[1];
ILibrary lib;
try {
lib = libraryManager.getLibrary(dataverseName, libraryName);
} catch (HyracksDataException e) {
throw new AsterixException("Cannot load library", e);
}
if (lib.getLanguage() != ExternalFunctionLanguage.JAVA) {
throw new AsterixException("Unexpected library language: " + lib.getLanguage());
}
ClassLoader classLoader = ((JavaLibrary) lib).getClassLoader();
try {
return (IRecordReaderFactory<?>) classLoader.loadClass(libraryAndFactory[1]).newInstance();
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
throw new AsterixException("Failed to create record reader factory", e);
}
}
// Currently not used.
public static IDataParserFactory createExternalParserFactory(ILibraryManager libraryManager,
DataverseName dataverse, String parserFactoryName) throws AsterixException {
try {
String library = parserFactoryName.substring(0,
parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR));
ILibrary lib;
try {
lib = libraryManager.getLibrary(dataverse, library);
} catch (HyracksDataException e) {
throw new AsterixException("Cannot load library", e);
}
if (lib.getLanguage() != ExternalFunctionLanguage.JAVA) {
throw new AsterixException("Unexpected library language: " + lib.getLanguage());
}
ClassLoader classLoader = ((JavaLibrary) lib).getClassLoader();
return (IDataParserFactory) classLoader
.loadClass(parserFactoryName
.substring(parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR) + 1))
.newInstance();
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
throw new AsterixException("Failed to create an external parser factory", e);
}
}
public static boolean isFeed(Map<String, String> configuration) {
if (!configuration.containsKey(ExternalDataConstants.KEY_IS_FEED)) {
return false;
} else {
return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_FEED));
}
}
public static void prepareFeed(Map<String, String> configuration, DataverseName dataverseName, String feedName) {
if (!configuration.containsKey(ExternalDataConstants.KEY_IS_FEED)) {
configuration.put(ExternalDataConstants.KEY_IS_FEED, ExternalDataConstants.TRUE);
}
configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE, dataverseName.getCanonicalForm());
configuration.put(ExternalDataConstants.KEY_FEED_NAME, feedName);
}
public static boolean keepDataSourceOpen(Map<String, String> configuration) {
if (!configuration.containsKey(ExternalDataConstants.KEY_WAIT_FOR_DATA)) {
return true;
}
return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_WAIT_FOR_DATA));
}
public static String getFeedName(Map<String, String> configuration) {
return configuration.get(ExternalDataConstants.KEY_FEED_NAME);
}
public static boolean isRecordWithMeta(Map<String, String> configuration) {
return configuration.containsKey(ExternalDataConstants.KEY_META_TYPE_NAME);
}
public static void setRecordWithMeta(Map<String, String> configuration, String booleanString) {
configuration.put(ExternalDataConstants.FORMAT_RECORD_WITH_METADATA, booleanString);
}
public static boolean isChangeFeed(Map<String, String> configuration) {
return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_CHANGE_FEED));
}
public static boolean isInsertFeed(Map<String, String> configuration) {
return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_INSERT_FEED));
}
public static int getNumberOfKeys(Map<String, String> configuration) throws AsterixException {
String keyIndexes = configuration.get(ExternalDataConstants.KEY_KEY_INDEXES);
if (keyIndexes == null) {
throw AsterixException.create(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_KEY_INDEXES);
}
return keyIndexes.split(",").length;
}
public static void setNumberOfKeys(Map<String, String> configuration, int value) {
configuration.put(ExternalDataConstants.KEY_KEY_SIZE, String.valueOf(value));
}
public static void setChangeFeed(Map<String, String> configuration, String booleanString) {
configuration.put(ExternalDataConstants.KEY_IS_CHANGE_FEED, booleanString);
}
public static int[] getPKIndexes(Map<String, String> configuration) {
String keyIndexes = configuration.get(ExternalDataConstants.KEY_KEY_INDEXES);
String[] stringIndexes = keyIndexes.split(",");
int[] intIndexes = new int[stringIndexes.length];
for (int i = 0; i < stringIndexes.length; i++) {
intIndexes[i] = Integer.parseInt(stringIndexes[i]);
}
return intIndexes;
}
public static int[] getPKSourceIndicators(Map<String, String> configuration) {
String keyIndicators = configuration.get(ExternalDataConstants.KEY_KEY_INDICATORS);
String[] stringIndicators = keyIndicators.split(",");
int[] intIndicators = new int[stringIndicators.length];
for (int i = 0; i < stringIndicators.length; i++) {
intIndicators[i] = Integer.parseInt(stringIndicators[i]);
}
return intIndicators;
}
/**
* Fills the configuration of the external dataset and its adapter with default values if not provided by user.
*
* @param configuration
* external data configuration
*/
public static void defaultConfiguration(Map<String, String> configuration) {
String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
if (format != null) {
// default quote, escape character for quote and fields delimiter for csv and tsv format
if (format.equals(ExternalDataConstants.FORMAT_CSV)) {
configuration.putIfAbsent(KEY_DELIMITER, ExternalDataConstants.DEFAULT_DELIMITER);
configuration.putIfAbsent(KEY_QUOTE, ExternalDataConstants.DEFAULT_QUOTE);
configuration.putIfAbsent(KEY_ESCAPE, ExternalDataConstants.DEFAULT_QUOTE);
} else if (format.equals(ExternalDataConstants.FORMAT_TSV)) {
configuration.putIfAbsent(KEY_DELIMITER, ExternalDataConstants.TAB_STR);
configuration.putIfAbsent(KEY_QUOTE, ExternalDataConstants.NULL_STR);
configuration.putIfAbsent(KEY_ESCAPE, ExternalDataConstants.NULL_STR);
}
}
}
/**
* Prepares the configuration of the external data and its adapter by filling the information required by
* adapters and parsers.
*
* @param adapterName
* adapter name
* @param configuration
* external data configuration
*/
public static void prepare(String adapterName, Map<String, String> configuration) {
if (!configuration.containsKey(ExternalDataConstants.KEY_READER)) {
configuration.put(ExternalDataConstants.KEY_READER, adapterName);
}
final String inputFormat = configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT);
if (ExternalDataConstants.INPUT_FORMAT_PARQUET.equals(inputFormat)) {
//Parquet supports binary-to-binary conversion. No parsing is required
configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_NOOP);
configuration.put(ExternalDataConstants.KEY_FORMAT, ExternalDataConstants.FORMAT_PARQUET);
}
if (!configuration.containsKey(ExternalDataConstants.KEY_PARSER)
&& configuration.containsKey(ExternalDataConstants.KEY_FORMAT)) {
configuration.put(ExternalDataConstants.KEY_PARSER, configuration.get(ExternalDataConstants.KEY_FORMAT));
}
}
/**
* Normalizes the values of certain parameters of the adapter configuration. This should happen before persisting
* the metadata (e.g. when creating external datasets or feeds) and when creating an adapter factory.
*
* @param configuration
* external data configuration
*/
public static void normalize(Map<String, String> configuration) {
// normalize the "format" parameter
String paramValue = configuration.get(ExternalDataConstants.KEY_FORMAT);
if (paramValue != null) {
String lowerCaseFormat = paramValue.toLowerCase().trim();
if (ExternalDataConstants.ALL_FORMATS.contains(lowerCaseFormat)) {
configuration.put(ExternalDataConstants.KEY_FORMAT, lowerCaseFormat);
}
}
// normalize "header" parameter
putToLowerIfExists(configuration, ExternalDataConstants.KEY_HEADER);
// normalize "redact-warnings" parameter
putToLowerIfExists(configuration, ExternalDataConstants.KEY_REDACT_WARNINGS);
}
/**
* Validates the parameter values of the adapter configuration. This should happen after normalizing the values.
*
* @param configuration
* external data configuration
* @throws HyracksDataException
* HyracksDataException
*/
public static void validate(Map<String, String> configuration) throws HyracksDataException {
String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
String header = configuration.get(ExternalDataConstants.KEY_HEADER);
if (format != null && isHeaderRequiredFor(format) && header == null) {
throw new RuntimeDataException(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_HEADER);
}
if (header != null && !isBoolean(header)) {
throw new RuntimeDataException(ErrorCode.INVALID_REQ_PARAM_VAL, ExternalDataConstants.KEY_HEADER, header);
}
char delimiter = validateGetDelimiter(configuration);
validateGetQuote(configuration, delimiter);
validateGetEscape(configuration);
String value = configuration.get(ExternalDataConstants.KEY_REDACT_WARNINGS);
if (value != null && !isBoolean(value)) {
throw new RuntimeDataException(ErrorCode.INVALID_REQ_PARAM_VAL, ExternalDataConstants.KEY_REDACT_WARNINGS,
value);
}
}
private static boolean isHeaderRequiredFor(String format) {
return format.equals(ExternalDataConstants.FORMAT_CSV) || format.equals(ExternalDataConstants.FORMAT_TSV);
}
private static boolean isBoolean(String value) {
return value.equals(ExternalDataConstants.TRUE) || value.equals(ExternalDataConstants.FALSE);
}
private static void validateDelimiterAndQuote(char delimiter, char quote) throws RuntimeDataException {
if (quote == delimiter) {
throw new RuntimeDataException(ErrorCode.QUOTE_DELIMITER_MISMATCH, quote, delimiter);
}
}
private static char validateCharOrDefault(Map<String, String> configuration, String key, char defaultValue)
throws HyracksDataException {
String value = configuration.get(key);
if (value == null) {
return defaultValue;
}
validateChar(value, key);
return value.charAt(0);
}
public static void validateChar(String parameterValue, String parameterName) throws RuntimeDataException {
if (parameterValue.length() != 1) {
throw new RuntimeDataException(ErrorCode.INVALID_CHAR_LENGTH, parameterValue, parameterName);
}
}
private static void putToLowerIfExists(Map<String, String> configuration, String key) {
String paramValue = configuration.get(key);
if (paramValue != null) {
configuration.put(key, paramValue.toLowerCase().trim());
}
}
/**
* Validates adapter specific external dataset properties. Specific properties for different adapters should be
* validated here
*
* @param configuration
* properties
*/
public static void validateAdapterSpecificProperties(Map<String, String> configuration, SourceLocation srcLoc,
IWarningCollector collector, IApplicationContext appCtx) throws CompilationException {
String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
switch (type) {
case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3:
S3Utils.validateProperties(configuration, srcLoc, collector);
break;
case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB:
validateAzureBlobProperties(configuration, srcLoc, collector, appCtx);
break;
case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE:
validateAzureDataLakeProperties(configuration, srcLoc, collector, appCtx);
break;
case ExternalDataConstants.KEY_ADAPTER_NAME_GCS:
validateProperties(configuration, srcLoc, collector);
break;
default:
// Nothing needs to be done
break;
}
}
/**
* Regex matches all the provided patterns against the provided path
*
* @param path
* path to check against
* @return {@code true} if all patterns match, {@code false} otherwise
*/
public static boolean matchPatterns(List<Matcher> matchers, String path) {
for (Matcher matcher : matchers) {
if (matcher.reset(path).matches()) {
return true;
}
}
return false;
}
/**
* Converts the wildcard to proper regex
*
* @param pattern
* wildcard pattern to convert
* @return regex expression
*/
public static String patternToRegex(String pattern) {
int charPosition = 0;
int patternLength = pattern.length();
StringBuilder stuffBuilder = new StringBuilder();
StringBuilder result = new StringBuilder();
while (charPosition < patternLength) {
char c = pattern.charAt(charPosition);
charPosition++;
switch (c) {
case '*':
result.append(".*");
break;
case '?':
result.append(".");
break;
case '[':
int closingBracketPosition = charPosition;
if (closingBracketPosition < patternLength && pattern.charAt(closingBracketPosition) == '!') {
closingBracketPosition++;
}
// 2 cases can happen here:
// 1- Empty character class [] which is invalid for java, so treat ] as literal and find another
// closing bracket, if no closing bracket is found, the whole thing is a literal
// 2- Negated empty class [!] converted to [^] which is invalid for java, so treat ] as literal and
// find another closing bracket, if no closing bracket is found, the whole thing is a literal
if (closingBracketPosition < patternLength && pattern.charAt(closingBracketPosition) == ']') {
closingBracketPosition++;
}
// No [] and [!] cases, search for the closing bracket
while (closingBracketPosition < patternLength && pattern.charAt(closingBracketPosition) != ']') {
closingBracketPosition++;
}
// No closing bracket found (or [] or [!]), escape the opening bracket, treat it as literals
if (closingBracketPosition >= patternLength) {
result.append("\\[");
} else {
// Found closing bracket, get the stuff in between the found the character class ("[" and "]")
String stuff = pattern.substring(charPosition, closingBracketPosition);
stuffBuilder.setLength(0);
int stuffCharPos = 0;
// If first character in the character class is "!" then convert it to "^"
if (stuff.charAt(0) == '!') {
stuffBuilder.append('^');
stuffCharPos++; // ignore first character when escaping metacharacters next step
}
for (; stuffCharPos < stuff.length(); stuffCharPos++) {
char stuffChar = stuff.charAt(stuffCharPos);
if (stuffChar != '-' && Arrays.binarySearch(RESERVED_REGEX_CHARS, stuffChar) >= 0) {
stuffBuilder.append("\\");
}
stuffBuilder.append(stuffChar);
}
String stuffEscaped = stuffBuilder.toString();
// Escape the set operations
stuffEscaped = stuffEscaped.replace("&&", "\\&\\&").replace("~~", "\\~\\~")
.replace("||", "\\|\\|").replace("--", "\\-\\-");
result.append("[").append(stuffEscaped).append("]");
charPosition = closingBracketPosition + 1;
}
break;
default:
if (Arrays.binarySearch(RESERVED_REGEX_CHARS, c) >= 0) {
result.append("\\");
}
result.append(c);
break;
}
}
return result.toString();
}
/**
* Adjusts the prefix (if needed) and returns it
*
* @param configuration
* configuration
*/
public static String getPrefix(Map<String, String> configuration) {
return getPrefix(configuration, true);
}
public static String getPrefix(Map<String, String> configuration, boolean appendSlash) {
String definition = configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
String subPath = configuration.get(ExternalDataConstants.SUBPATH);
boolean hasDefinition = definition != null && !definition.isEmpty();
boolean hasSubPath = subPath != null && !subPath.isEmpty();
if (hasDefinition && !hasSubPath) {
return appendSlash ? definition + (!definition.endsWith("/") ? "/" : "") : definition;
}
String fullPath = "";
if (hasSubPath) {
if (!hasDefinition) {
fullPath = subPath.startsWith("/") ? subPath.substring(1) : subPath;
} else {
// concatenate definition + subPath:
if (definition.endsWith("/") && subPath.startsWith("/")) {
subPath = subPath.substring(1);
} else if (!definition.endsWith("/") && !subPath.startsWith("/")) {
definition = definition + "/";
}
fullPath = definition + subPath;
}
fullPath = appendSlash ? fullPath + (!fullPath.endsWith("/") ? "/" : "") : fullPath;
}
return fullPath;
}
/**
* @param configuration
* configuration map
* @throws CompilationException
* Compilation exception
*/
public static void validateIncludeExclude(Map<String, String> configuration) throws CompilationException {
// Ensure that include and exclude are not provided at the same time + ensure valid format or property
List<Map.Entry<String, String>> includes = new ArrayList<>();
List<Map.Entry<String, String>> excludes = new ArrayList<>();
// Accepted formats are include, include#1, include#2, ... etc, same for excludes
for (Map.Entry<String, String> entry : configuration.entrySet()) {
String key = entry.getKey();
if (key.equals(ExternalDataConstants.KEY_INCLUDE)) {
includes.add(entry);
} else if (key.equals(ExternalDataConstants.KEY_EXCLUDE)) {
excludes.add(entry);
} else if (key.startsWith(ExternalDataConstants.KEY_INCLUDE)
|| key.startsWith(ExternalDataConstants.KEY_EXCLUDE)) {
// Split by the "#", length should be 2, left should be include/exclude, right should be integer
String[] splits = key.split("#");
if (key.startsWith(ExternalDataConstants.KEY_INCLUDE) && splits.length == 2
&& splits[0].equals(ExternalDataConstants.KEY_INCLUDE)
&& NumberUtils.isIntegerNumericString(splits[1])) {
includes.add(entry);
} else if (key.startsWith(ExternalDataConstants.KEY_EXCLUDE) && splits.length == 2
&& splits[0].equals(ExternalDataConstants.KEY_EXCLUDE)
&& NumberUtils.isIntegerNumericString(splits[1])) {
excludes.add(entry);
} else {
throw new CompilationException(ErrorCode.INVALID_PROPERTY_FORMAT, key);
}
}
}
// Ensure either include or exclude are provided, but not both of them
if (!includes.isEmpty() && !excludes.isEmpty()) {
throw new CompilationException(ErrorCode.PARAMETERS_NOT_ALLOWED_AT_SAME_TIME,
ExternalDataConstants.KEY_INCLUDE, ExternalDataConstants.KEY_EXCLUDE);
}
}
public static IncludeExcludeMatcher getIncludeExcludeMatchers(Map<String, String> configuration)
throws CompilationException {
// Get and compile the patterns for include/exclude if provided
List<Matcher> includeMatchers = new ArrayList<>();
List<Matcher> excludeMatchers = new ArrayList<>();
String pattern = null;
try {
for (Map.Entry<String, String> entry : configuration.entrySet()) {
if (entry.getKey().startsWith(KEY_INCLUDE)) {
pattern = entry.getValue();
includeMatchers.add(Pattern.compile(patternToRegex(pattern)).matcher(""));
} else if (entry.getKey().startsWith(KEY_EXCLUDE)) {
pattern = entry.getValue();
excludeMatchers.add(Pattern.compile(patternToRegex(pattern)).matcher(""));
}
}
} catch (PatternSyntaxException ex) {
throw new CompilationException(ErrorCode.INVALID_REGEX_PATTERN, pattern);
}
IncludeExcludeMatcher includeExcludeMatcher;
if (!includeMatchers.isEmpty()) {
includeExcludeMatcher =
new IncludeExcludeMatcher(includeMatchers, (matchers1, key) -> matchPatterns(matchers1, key));
} else if (!excludeMatchers.isEmpty()) {
includeExcludeMatcher =
new IncludeExcludeMatcher(excludeMatchers, (matchers1, key) -> !matchPatterns(matchers1, key));
} else {
includeExcludeMatcher = new IncludeExcludeMatcher(Collections.emptyList(), (matchers1, key) -> true);
}
return includeExcludeMatcher;
}
public static boolean supportsPushdown(Map<String, String> properties) {
//Currently, only Apache Parquet format is supported
return isParquetFormat(properties);
}
/**
* Validate Parquet dataset's declared type and configuration
*
* @param properties
* external dataset configuration
* @param datasetRecordType
* dataset declared type
*/
public static void validateParquetTypeAndConfiguration(Map<String, String> properties,
ARecordType datasetRecordType) throws CompilationException {
if (isParquetFormat(properties)) {
if (datasetRecordType.getFieldTypes().length != 0) {
throw new CompilationException(ErrorCode.UNSUPPORTED_TYPE_FOR_PARQUET, datasetRecordType.getTypeName());
} else if (properties.containsKey(ParquetOptions.TIMEZONE)
&& !ParquetOptions.VALID_TIME_ZONES.contains(properties.get(ParquetOptions.TIMEZONE))) {
//Ensure the configured time zone id is correct
throw new CompilationException(ErrorCode.INVALID_TIMEZONE, properties.get(ParquetOptions.TIMEZONE));
}
}
}
public static boolean isParquetFormat(Map<String, String> properties) {
String inputFormat = properties.get(ExternalDataConstants.KEY_INPUT_FORMAT);
return ExternalDataConstants.CLASS_NAME_PARQUET_INPUT_FORMAT.equals(inputFormat)
|| ExternalDataConstants.INPUT_FORMAT_PARQUET.equals(inputFormat)
|| ExternalDataConstants.FORMAT_PARQUET.equals(properties.get(ExternalDataConstants.KEY_FORMAT));
}
public static void setExternalDataProjectionInfo(DataProjectionInfo projectionInfo, Map<String, String> properties)
throws IOException {
properties.put(ExternalDataConstants.KEY_REQUESTED_FIELDS,
serializeExpectedTypeToString(projectionInfo.getProjectionInfo()));
properties.put(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
serializeFunctionCallInfoToString(projectionInfo.getFunctionCallInfoMap()));
}
/**
* Serialize {@link ARecordType} as Base64 string to pass it to {@link org.apache.hadoop.conf.Configuration}
*
* @param expectedType
* expected type
* @return the expected type as Base64 string
*/
private static String serializeExpectedTypeToString(ARecordType expectedType) throws IOException {
if (expectedType == DataProjectionInfo.EMPTY_TYPE || expectedType == DataProjectionInfo.ALL_FIELDS_TYPE) {
//Return the type name of EMPTY_TYPE and ALL_FIELDS_TYPE
return expectedType.getTypeName();
}
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
Base64.Encoder encoder = Base64.getEncoder();
DataProjectionInfo.writeTypeField(expectedType, dataOutputStream);
return encoder.encodeToString(byteArrayOutputStream.toByteArray());
}
/**
* Serialize {@link FunctionCallInformation} map as Base64 string to pass it to
* {@link org.apache.hadoop.conf.Configuration}
*
* @param functionCallInfoMap
* function information map
* @return function information map as Base64 string
*/
static String serializeFunctionCallInfoToString(Map<String, FunctionCallInformation> functionCallInfoMap)
throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
Base64.Encoder encoder = Base64.getEncoder();
DataProjectionInfo.writeFunctionCallInformationMapField(functionCallInfoMap, dataOutputStream);
return encoder.encodeToString(byteArrayOutputStream.toByteArray());
}
public static int roundUpToNearestFrameSize(int size, int framesize) {
return ((size / framesize) + 1) * framesize;
}
public static int getArgBufferSize() {
int maxArgSz = DEFAULT_MAX_ARGUMENT_SZ + HEADER_FUDGE;
String userArgSz = System.getProperty("udf.buf.size");
if (userArgSz != null) {
long parsedSize = StorageUtil.getByteValue(userArgSz) + HEADER_FUDGE;
if (parsedSize < Integer.MAX_VALUE && parsedSize > 0) {
maxArgSz = (int) parsedSize;
}
}
return maxArgSz;
}
public static Optional<String> getFirstNotNull(Map<String, String> configuration, String... parameters) {
return Arrays.stream(parameters).filter(field -> configuration.get(field) != null).findFirst();
}
public static ATypeTag peekArgument(IAType type, IValueReference valueReference, TaggedValuePointable pointy)
throws HyracksDataException {
ATypeTag tag = type.getTypeTag();
if (tag == ATypeTag.ANY) {
pointy.set(valueReference);
ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(pointy.getTag());
IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
return MessagePackUtils.peekUnknown(rtType);
} else {
return MessagePackUtils.peekUnknown(type);
}
}
public static void setVoidArgument(ArrayBackedValueStorage argHolder) throws IOException {
argHolder.getDataOutput().writeByte(ARRAY16);
argHolder.getDataOutput().writeShort((short) 0);
}
}