| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.sql.planner; |
| |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Scanner; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| import java.util.stream.Collectors; |
| import org.apache.calcite.rel.RelRoot; |
| import org.apache.calcite.rel.logical.LogicalProject; |
| import org.apache.calcite.rel.type.RelDataType; |
| import org.apache.calcite.rel.type.RelDataTypeFactoryImpl; |
| import org.apache.calcite.rel.type.RelDataTypeField; |
| import org.apache.calcite.rel.type.RelRecordType; |
| import org.apache.calcite.rex.RexNode; |
| import org.apache.calcite.sql.type.SqlTypeName; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.commons.lang3.Validate; |
| import org.apache.samza.SamzaException; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.sql.data.SamzaSqlRelMessage; |
| import org.apache.samza.sql.dsl.SamzaSqlDslConverter; |
| import org.apache.samza.sql.interfaces.RelSchemaProvider; |
| import org.apache.samza.sql.interfaces.SamzaRelConverter; |
| import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl; |
| import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; |
| import org.apache.samza.sql.schema.SqlFieldSchema; |
| import org.apache.samza.sql.schema.SqlSchema; |
| import org.apache.samza.sql.util.SamzaSqlQueryParser; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * SamzaSqlValidator that uses calcite engine to convert the sql query to relational graph and validates the query |
| * including the output. |
| */ |
| public class SamzaSqlValidator { |
| private static final Logger LOG = LoggerFactory.getLogger(SamzaSqlValidator.class); |
| |
| private final Config config; |
| |
| public SamzaSqlValidator(Config config) { |
| this.config = config; |
| } |
| |
| /** |
| * Validate a list of sql statements |
| * @param sqlStmts list of sql statements |
| * @throws SamzaSqlValidatorException exception for sql validation |
| */ |
| public void validate(List<String> sqlStmts) throws SamzaSqlValidatorException { |
| for (String sql: sqlStmts) { |
| SamzaSqlApplicationConfig sqlConfig = SamzaSqlDslConverter.getSqlConfig(Collections.singletonList(sql), config); |
| QueryPlanner planner = SamzaSqlDslConverter.getQueryPlanner(sqlConfig); |
| |
| // we always pass only select query to the planner for samza sql. The reason is that samza sql supports |
| // schema evolution where source and destination could up to an extent have independent schema evolution while |
| // calcite expects strict conformance of the destination schema with that of the fields in the select query. |
| SamzaSqlQueryParser.QueryInfo qinfo = SamzaSqlQueryParser.parseQuery(sql); |
| RelRoot relRoot; |
| try { |
| relRoot = planner.plan(qinfo.getSelectQuery()); |
| } catch (SamzaException e) { |
| throw new SamzaSqlValidatorException(String.format("Validation failed for sql stmt:\n%s\n with the following" |
| + " error: \n%s\n", sql, e), e); |
| } |
| |
| // Now that we have logical plan, validate different aspects. |
| String sink = qinfo.getSink(); |
| validate(relRoot, sink, sqlConfig.getRelSchemaProviders().get(sink), sqlConfig.getSamzaRelConverters().get(sink)); |
| } |
| } |
| |
| /** |
| * Determine if validation needs to be done on Calcite plan based on the schema provider and schema converter. |
| * @param relRoot |
| * @param sink |
| * @param outputSchemaProvider |
| * @param ouputRelSchemaConverter |
| * @return if the validation needs to be skipped |
| */ |
| protected boolean skipOutputValidation(RelRoot relRoot, String sink, RelSchemaProvider outputSchemaProvider, |
| SamzaRelConverter ouputRelSchemaConverter) { |
| return false; |
| } |
| |
| // TODO: Remove this API. This API is introduced to take care of cases where RelSchemaProviders have a complex |
| // mechanism to determine if a given output field is optional. We will need system specific validators to take |
| // care of such cases and once that is introduced, we can get rid of the below API. |
| protected boolean isOptional(RelSchemaProvider outputRelSchemaProvider, String outputFieldName, |
| RelRecordType projectRecord) { |
| return false; |
| } |
| |
| private void validate(RelRoot relRoot, String sink, RelSchemaProvider outputSchemaProvider, |
| SamzaRelConverter outputRelSchemaConverter) throws SamzaSqlValidatorException { |
| if (!skipOutputValidation(relRoot, sink, outputSchemaProvider, outputRelSchemaConverter)) { |
| // Validate select fields (including Udf return types) with output schema |
| validateOutput(relRoot, outputSchemaProvider); |
| } |
| |
| // TODO: |
| // 1. SAMZA-2314: Validate Udf arguments. |
| // 2. SAMZA-2315: Validate operators. These are the operators that are supported by Calcite but not by Samza Sql. |
| // Eg: LogicalAggregate with sum function is not supported by Samza Sql. |
| } |
| |
| private void validateOutput(RelRoot relRoot, RelSchemaProvider outputRelSchemaProvider) |
| throws SamzaSqlValidatorException { |
| LogicalProject project = (LogicalProject) relRoot.rel; |
| |
| RelRecordType projectRecord = (RelRecordType) project.getRowType(); |
| RelRecordType outputRecord = (RelRecordType) QueryPlanner.getSourceRelSchema(outputRelSchemaProvider, |
| new RelSchemaConverter()); |
| |
| // Handle any DELETE ops. |
| if (projectRecord.getFieldList().stream().anyMatch(f -> f.getName().equalsIgnoreCase(SamzaSqlRelMessage.OP_NAME))) { |
| validateDeleteOp(relRoot); |
| return; |
| } |
| |
| // Get Samza Sql schema along with Calcite schema. The reason is that the Calcite schema does not have a way |
| // to represent optional fields while Samza Sql schema can represent optional fields. This is the reason that |
| // we use SqlSchema in validating output. |
| SqlSchema outputSqlSchema = QueryPlanner.getSourceSqlSchema(outputRelSchemaProvider); |
| |
| validateOutputRecords(outputSqlSchema, outputRecord, projectRecord, outputRelSchemaProvider); |
| LOG.info("Samza Sql Validation finished successfully."); |
| } |
| |
| private void validateDeleteOp(RelRoot relRoot) throws SamzaSqlValidatorException { |
| LogicalProject project = (LogicalProject) relRoot.rel; |
| RelRecordType projectRecord = (RelRecordType) project.getRowType(); |
| |
| // In the case of DELETE op, only the key and DELETE op are required. |
| |
| if (projectRecord.getFieldCount() != 2) { |
| throw new SamzaSqlValidatorException(String.format("Only two select query fields are expected for DELETE op." |
| + " But there are %d fields given in the query.", projectRecord.getFieldCount())); |
| } |
| |
| RelDataTypeField keyField = projectRecord.getField(SamzaSqlRelMessage.KEY_NAME, true, true); |
| if (keyField == null) { |
| throw new SamzaSqlValidatorException(String.format("Select query needs to specify '%s' field while using DELETE" |
| + " op. Eg: 'SELECT myKey AS %s, '%s' AS %s FROM myTable'", SamzaSqlRelMessage.KEY_NAME, |
| SamzaSqlRelMessage.KEY_NAME, SamzaSqlRelMessage.DELETE_OP, SamzaSqlRelMessage.OP_NAME)); |
| } |
| int keyIdx = projectRecord.getFieldList().indexOf(keyField); |
| // Get the node corresponding to the special op. |
| RexNode node = project.getProjects().get(1 - keyIdx); |
| if (!node.toString().equals(String.format("'%s'", SamzaSqlRelMessage.DELETE_OP))) { |
| throw new SamzaSqlValidatorException(String.format("%s op is not supported. Please note that only '%s' op is" |
| + " currently supported. Eg:'SELECT myKey AS %s, '%s' AS %s FROM myStream'", node.toString(), |
| SamzaSqlRelMessage.DELETE_OP, SamzaSqlRelMessage.KEY_NAME, SamzaSqlRelMessage.DELETE_OP, |
| SamzaSqlRelMessage.OP_NAME)); |
| } |
| } |
| |
| private void validateOutputRecords(SqlSchema outputSqlSchema, RelRecordType outputRecord, |
| RelRecordType projectRecord, RelSchemaProvider outputRelSchemaProvider) |
| throws SamzaSqlValidatorException { |
| Map<String, RelDataType> outputRecordMap = outputRecord.getFieldList().stream().collect( |
| Collectors.toMap(RelDataTypeField::getName, RelDataTypeField::getType)); |
| Map<String, SqlFieldSchema> outputFieldSchemaMap = outputSqlSchema.getFields().stream().collect( |
| Collectors.toMap(SqlSchema.SqlField::getFieldName, SqlSchema.SqlField::getFieldSchema)); |
| Map<String, RelDataType> projectedRecordMap = projectRecord.getFieldList().stream().collect( |
| Collectors.toMap(RelDataTypeField::getName, RelDataTypeField::getType)); |
| |
| // Ensure that all fields from sql statement exist in the output schema and are of the same type. |
| for (Map.Entry<String, RelDataType> entry : projectedRecordMap.entrySet()) { |
| String projectedFieldName = entry.getKey(); |
| RelDataType outputFieldType = outputRecordMap.get(projectedFieldName); |
| SqlFieldSchema outputSqlFieldSchema = outputFieldSchemaMap.get(projectedFieldName); |
| |
| if (outputFieldType == null) { |
| // If the field names are specified more than once in the select query, calcite appends 'n' as suffix to the |
| // dup fields based on the order they are specified, where 'n' starts from 0 for the first dup field. |
| // Take the following example: SELECT id as str, secondaryId as str, tertiaryId as str FROM store.myTable |
| // Calcite renames the projected fieldNames in select query as str, str0, str1 respectively. |
| // Samza Sql allows a field name to be specified up to 2 times. Do the validation accordingly. |
| |
| // This type of pattern is typically followed when users want to just modify one field in the input table while |
| // keeping rest of the fields the same. Eg: SELECT myUdf(id) as id, * from store.myTable |
| if (projectedFieldName.endsWith("0")) { |
| projectedFieldName = StringUtils.chop(projectedFieldName); |
| outputFieldType = outputRecordMap.get(projectedFieldName); |
| outputSqlFieldSchema = outputFieldSchemaMap.get(projectedFieldName); |
| } |
| |
| if (outputFieldType == null) { |
| String errMsg = String.format("Field '%s' in select query does not match any field in output schema.", entry.getKey()); |
| LOG.error(errMsg); |
| throw new SamzaSqlValidatorException(errMsg); |
| } |
| } |
| |
| Validate.notNull(outputFieldType); |
| Validate.notNull(outputSqlFieldSchema); |
| |
| RelDataType calciteSqlType = getCalciteSqlFieldType(entry.getValue()); |
| if (!compareFieldTypes(outputFieldType, outputSqlFieldSchema, calciteSqlType, outputRelSchemaProvider)) { |
| String errMsg = String.format("Field '%s' with type '%s' (calciteSqlType:'%s') in select query does not match " |
| + "the field type '%s' in output schema.", entry.getKey(), entry.getValue(), calciteSqlType, |
| outputSqlFieldSchema.getFieldType()); |
| LOG.error(errMsg); |
| throw new SamzaSqlValidatorException(errMsg); |
| } |
| } |
| |
| // Ensure that all non-optional fields in output schema are set in the sql query and are of the |
| // same type. |
| for (Map.Entry<String, RelDataType> entry : outputRecordMap.entrySet()) { |
| RelDataType projectedFieldType = projectedRecordMap.get(entry.getKey()); |
| SqlFieldSchema outputSqlFieldSchema = outputFieldSchemaMap.get(entry.getKey()); |
| |
| if (projectedFieldType == null) { |
| // If an output schema field is not found in the sql query, ignore it if the field is optional. |
| // Otherwise, throw an error. |
| if (outputSqlFieldSchema.isOptional() || isOptional(outputRelSchemaProvider, entry.getKey(), projectRecord)) { |
| continue; |
| } |
| String errMsg = String.format("Non-optional field '%s' in output schema is missing in projected fields of " |
| + "select query.", entry.getKey()); |
| LOG.error(errMsg); |
| throw new SamzaSqlValidatorException(errMsg); |
| } else { |
| RelDataType calciteSqlType = getCalciteSqlFieldType(projectedFieldType); |
| if (!compareFieldTypes(entry.getValue(), outputSqlFieldSchema, calciteSqlType, outputRelSchemaProvider)) { |
| String errMsg = String.format("Field '%s' with type '%s' in output schema does not match the field" |
| + " type '%s' (calciteType:'%s') in projected fields.", entry.getKey(), |
| outputSqlFieldSchema.getFieldType(), projectedFieldType, calciteSqlType); |
| LOG.error(errMsg); |
| throw new SamzaSqlValidatorException(errMsg); |
| } |
| } |
| } |
| } |
| |
| private RelDataType getCalciteSqlFieldType(RelDataType fieldType) { |
| RelDataType sqlFieldType; |
| |
| // JavaTypes are relevant for Udf argument and return types |
| // TODO: Support UDF argument validation. Currently, only return types are validated and argument types are |
| // validated during run-time. |
| if (fieldType instanceof RelDataTypeFactoryImpl.JavaType) { |
| sqlFieldType = new SamzaSqlJavaTypeFactoryImpl().toSql(fieldType); |
| } else { |
| sqlFieldType = fieldType; |
| } |
| |
| return sqlFieldType; |
| } |
| |
| private boolean compareFieldTypes(RelDataType outputFieldType, SqlFieldSchema sqlFieldSchema, |
| RelDataType selectQueryFieldType, RelSchemaProvider outputRelSchemaProvider) { |
| |
| SqlTypeName outputSqlType = outputFieldType.getSqlTypeName(); |
| SqlTypeName projectSqlType = selectQueryFieldType.getSqlTypeName(); |
| |
| if (projectSqlType == SqlTypeName.ANY || outputSqlType == SqlTypeName.ANY) { |
| return true; |
| } else if (outputSqlType != SqlTypeName.ROW && outputSqlType == projectSqlType) { |
| return true; |
| } |
| |
| switch (outputSqlType) { |
| case CHAR: |
| return projectSqlType == SqlTypeName.VARCHAR; |
| case VARCHAR: |
| return projectSqlType == SqlTypeName.CHAR; |
| case BIGINT: |
| return projectSqlType == SqlTypeName.INTEGER; |
| case INTEGER: |
| return projectSqlType == SqlTypeName.BIGINT; |
| case FLOAT: |
| return projectSqlType == SqlTypeName.DOUBLE; |
| case DOUBLE: |
| return projectSqlType == SqlTypeName.FLOAT; |
| case ROW: |
| try { |
| validateOutputRecords(sqlFieldSchema.getRowSchema(), (RelRecordType) outputFieldType, |
| (RelRecordType) selectQueryFieldType, outputRelSchemaProvider); |
| } catch (SamzaSqlValidatorException e) { |
| LOG.error("A field in select query does not match with the output schema.", e); |
| return false; |
| } |
| return true; |
| default: |
| return false; |
| } |
| } |
| |
| // -- All Static Methods below -- |
| |
| /** |
| * Format the Calcite exception to a more readable form. |
| * |
| * As an example, consider the below sql query which fails calcite validation due to a non existing field : |
| * "Insert into testavro.outputTopic(id) select non_existing_name, name as string_value" |
| * + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1" |
| * |
| * This function takes in the above multi-line sql query and the below sample exception as input: |
| * "org.apache.calcite.runtime.CalciteContextException: From line 1, column 8 to line 1, column 26: Column |
| * 'non_existing_name' not found in any table" |
| * |
| * And returns the following string: |
| * 2019-08-30 09:05:08 ERROR QueryPlanner:174 - Failed with exception for the following sql statement: |
| * |
| * Sql syntax error: |
| * |
| * SELECT `non_existing_name`, `name` AS `string_value` |
| * -------^^^^^^^^^^^^^^^^^^^-------------------------- |
| * FROM `testavro`.`level1`.`level2`.`SIMPLE1` AS `s` |
| * WHERE `s`.`id` = 1 |
| * |
| * @param query sql query |
| * @param e Exception returned by Calcite |
| * @return formatted error string |
| */ |
| public static String formatErrorString(String query, Exception e) { |
| Pattern pattern = Pattern.compile("line [0-9]+, column [0-9]+"); |
| Matcher matcher = pattern.matcher(e.getMessage()); |
| String[] queryLines = query.split("\\n"); |
| StringBuilder result = new StringBuilder(); |
| int startColIdx; |
| int endColIdx; |
| int startLineIdx; |
| int endLineIdx; |
| |
| try { |
| if (matcher.find()) { |
| String match = matcher.group(); |
| LOG.info(match); |
| startLineIdx = getIdxFromString(match, "line "); |
| startColIdx = getIdxFromString(match, "column "); |
| if (matcher.find()) { |
| match = matcher.group(); |
| LOG.info(match); |
| endLineIdx = getIdxFromString(match, "line "); |
| endColIdx = getIdxFromString(match, "column "); |
| } else { |
| endColIdx = startColIdx; |
| endLineIdx = startLineIdx; |
| } |
| int lineLen = endLineIdx - startLineIdx; |
| int colLen = endColIdx - startColIdx + 1; |
| |
| // Error spanning across multiple lines is not supported yet. |
| if (lineLen > 0) { |
| throw new SamzaException("lineLen formatting validation error: error cannot span across multiple lines."); |
| } |
| |
| int lineIdx = 0; |
| for (String line : queryLines) { |
| result.append(line) |
| .append("\n"); |
| if (lineIdx == startLineIdx) { |
| String lineStr = getStringWithRepeatedChars('-', line.length() - 1); |
| String pointerStr = getStringWithRepeatedChars('^', colLen); |
| String errorMarkerStr = |
| new StringBuilder(lineStr).replace(startColIdx, endColIdx, pointerStr).toString(); |
| result.append(errorMarkerStr) |
| .append("\n"); |
| } |
| lineIdx++; |
| } |
| } |
| |
| String[] errorMsgParts = e.getMessage().split("Exception:"); |
| result.append("\n") |
| .append(errorMsgParts[errorMsgParts.length - 1].trim()); |
| return String.format("Sql syntax error:\n\n%s\n", |
| result); |
| } catch (Exception ex) { |
| // Ignore any formatting errors. |
| LOG.error("Formatting error (Not the actual error. Look for the logs for actual error)", ex); |
| return String.format("Failed with formatting exception (not the actual error) for the following sql" |
| + " statement:\n\"%s\"\n\n%s", query, e.getMessage()); |
| } |
| } |
| |
| private static int getIdxFromString(String inputString, String delimiterStr) { |
| String[] splitStr = inputString.split(delimiterStr); |
| Scanner in = new Scanner(splitStr[1]).useDelimiter("[^0-9]+"); |
| return in.nextInt() - 1; |
| } |
| |
| private static String getStringWithRepeatedChars(char ch, int len) { |
| char[] chars = new char[len]; |
| Arrays.fill(chars, ch); |
| return new String(chars); |
| } |
| } |