| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.planner; |
| |
| import static org.junit.Assert.fail; |
| |
| import java.io.FileWriter; |
| import java.io.IOException; |
| import java.nio.file.Paths; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| import org.apache.commons.lang.exception.ExceptionUtils; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.impala.analysis.ColumnLineageGraph; |
| import org.apache.impala.analysis.DescriptorTable; |
| import org.apache.impala.catalog.Catalog; |
| import org.apache.impala.catalog.CatalogException; |
| import org.apache.impala.common.FrontendTestBase; |
| import org.apache.impala.common.ImpalaException; |
| import org.apache.impala.common.RuntimeEnv; |
| import org.apache.impala.service.Frontend.PlanCtx; |
| import org.apache.impala.testutil.TestFileParser; |
| import org.apache.impala.testutil.TestFileParser.Section; |
| import org.apache.impala.testutil.TestFileParser.TestCase; |
| import org.apache.impala.testutil.TestUtils; |
| import org.apache.impala.testutil.TestUtils.ResultFilter; |
| import org.apache.impala.thrift.ImpalaInternalServiceConstants; |
| import org.apache.impala.thrift.TDescriptorTable; |
| import org.apache.impala.thrift.TExecRequest; |
| import org.apache.impala.thrift.TExplainLevel; |
| import org.apache.impala.thrift.THBaseKeyRange; |
| import org.apache.impala.thrift.THdfsFileSplit; |
| import org.apache.impala.thrift.THdfsPartition; |
| import org.apache.impala.thrift.THdfsPartitionLocation; |
| import org.apache.impala.thrift.THdfsScanNode; |
| import org.apache.impala.thrift.THdfsTable; |
| import org.apache.impala.thrift.TLineageGraph; |
| import org.apache.impala.thrift.TPlanExecInfo; |
| import org.apache.impala.thrift.TPlanFragment; |
| import org.apache.impala.thrift.TPlanNode; |
| import org.apache.impala.thrift.TQueryCtx; |
| import org.apache.impala.thrift.TQueryExecRequest; |
| import org.apache.impala.thrift.TQueryOptions; |
| import org.apache.impala.thrift.TScanRangeLocationList; |
| import org.apache.impala.thrift.TScanRangeSpec; |
| import org.apache.impala.thrift.TTableDescriptor; |
| import org.apache.impala.thrift.TTableSink; |
| import org.apache.impala.thrift.TTupleDescriptor; |
| import org.apache.impala.thrift.TUpdateExecutorMembershipRequest; |
| import org.apache.impala.util.ExecutorMembershipSnapshot; |
| import org.apache.kudu.client.KuduClient; |
| import org.apache.kudu.client.KuduScanToken; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| |
| public class PlannerTestBase extends FrontendTestBase { |
| private final static Logger LOG = LoggerFactory.getLogger(PlannerTest.class); |
| private final static boolean GENERATE_OUTPUT_FILE = true; |
| private final java.nio.file.Path testDir_ = Paths.get("functional-planner", "queries", |
| "PlannerTest"); |
| private static java.nio.file.Path outDir_; |
| private static KuduClient kuduClient_; |
| |
| // Map from plan ID (TPlanNodeId) to the plan node with that ID. |
| private final Map<Integer, TPlanNode> planMap_ = Maps.newHashMap(); |
| // Map from tuple ID (TTupleId) to the tuple descriptor with that ID. |
| private final Map<Integer, TTupleDescriptor> tupleMap_ = Maps.newHashMap(); |
| // Map from table ID (TTableId) to the table descriptor with that ID. |
| private final Map<Integer, TTableDescriptor> tableMap_ = Maps.newHashMap(); |
| |
| @BeforeClass |
| public static void setUp() throws Exception { |
| // Mimic the 3 node test mini-cluster. |
| TUpdateExecutorMembershipRequest updateReq = new TUpdateExecutorMembershipRequest(); |
| updateReq.setIp_addresses(Sets.newHashSet("127.0.0.1")); |
| updateReq.setHostnames(Sets.newHashSet("localhost")); |
| updateReq.setNum_executors(3); |
| ExecutorMembershipSnapshot.update(updateReq); |
| |
| if (RuntimeEnv.INSTANCE.isKuduSupported()) { |
| kuduClient_ = new KuduClient.KuduClientBuilder("127.0.0.1:7051").build(); |
| } |
| String logDir = System.getenv("IMPALA_FE_TEST_LOGS_DIR"); |
| if (logDir == null) logDir = "/tmp"; |
| outDir_ = Paths.get(logDir, "PlannerTest"); |
| } |
| |
| @Before |
| public void setUpTest() throws Exception { |
| // Reset the RuntimeEnv - individual tests may change it. |
| RuntimeEnv.INSTANCE.reset(); |
| // Use 8 cores for resource estimation. |
| RuntimeEnv.INSTANCE.setNumCores(8); |
| // Set test env to control the explain level. |
| RuntimeEnv.INSTANCE.setTestEnv(true); |
| } |
| |
| @AfterClass |
| public static void cleanUp() throws Exception { |
| RuntimeEnv.INSTANCE.reset(); |
| |
| if (kuduClient_ != null) { |
| kuduClient_.close(); |
| kuduClient_ = null; |
| } |
| } |
| |
| /** |
| * Clears the old maps and constructs new maps based on the new |
| * execRequest so that findPartitions() can locate various thrift |
| * metadata structures quickly. |
| */ |
| private void buildMaps(TQueryExecRequest execRequest) { |
| // Build maps that will be used by findPartition(). |
| planMap_.clear(); |
| tupleMap_.clear(); |
| tableMap_.clear(); |
| for (TPlanExecInfo execInfo: execRequest.plan_exec_info) { |
| for (TPlanFragment frag: execInfo.fragments) { |
| for (TPlanNode node: frag.plan.nodes) { |
| planMap_.put(node.node_id, node); |
| } |
| } |
| } |
| if (execRequest.query_ctx.isSetDesc_tbl_testonly()) { |
| TDescriptorTable descTbl = execRequest.query_ctx.desc_tbl_testonly; |
| for (TTupleDescriptor tupleDesc: descTbl.tupleDescriptors) { |
| tupleMap_.put(tupleDesc.id, tupleDesc); |
| } |
| if (descTbl.isSetTableDescriptors()) { |
| for (TTableDescriptor tableDesc: descTbl.tableDescriptors) { |
| tableMap_.put(tableDesc.id, tableDesc); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Look up the table corresponding to the plan node (identified by |
| * nodeId). |
| */ |
| private THdfsTable findTable(int nodeId) { |
| TPlanNode node = planMap_.get(nodeId); |
| Preconditions.checkNotNull(node); |
| Preconditions.checkState(node.node_id == nodeId && node.isSetHdfs_scan_node()); |
| THdfsScanNode scanNode = node.getHdfs_scan_node(); |
| int tupleId = scanNode.getTuple_id(); |
| TTupleDescriptor tupleDesc = tupleMap_.get(tupleId); |
| Preconditions.checkNotNull(tupleDesc); |
| Preconditions.checkState(tupleDesc.id == tupleId); |
| TTableDescriptor tableDesc = tableMap_.get(tupleDesc.tableId); |
| Preconditions.checkNotNull(tableDesc); |
| Preconditions.checkState(tableDesc.id == tupleDesc.tableId && |
| tableDesc.isSetHdfsTable()); |
| return tableDesc.getHdfsTable(); |
| } |
| |
| /** |
| * Look up the partition corresponding to the plan node (identified by |
| * nodeId) and a file split. |
| */ |
| private THdfsPartition findPartition(int nodeId, THdfsFileSplit split) { |
| THdfsTable hdfsTable = findTable(nodeId); |
| THdfsPartition partition = hdfsTable.getPartitions().get(split.partition_id); |
| Preconditions.checkNotNull(partition); |
| Preconditions.checkState(partition.id == split.partition_id); |
| return partition; |
| } |
| |
| /** |
| * Verify that all THdfsPartitions included in the descriptor table are referenced by |
| * at least one scan range or part of an inserted table. printScanRangeLocations() |
| * will implicitly verify the converse (it'll fail if a scan range references a |
| * table/partition descriptor that is not present). |
| */ |
| private void testHdfsPartitionsReferenced(TQueryExecRequest execRequest, |
| String query, StringBuilder errorLog) { |
| long insertTableId = -1; |
| // Collect all partitions that are referenced by a scan range. |
| Set<THdfsPartition> scanRangePartitions = Sets.newHashSet(); |
| for (TPlanExecInfo execInfo: execRequest.plan_exec_info) { |
| if (execInfo.per_node_scan_ranges != null) { |
| for (Map.Entry<Integer, TScanRangeSpec> entry : |
| execInfo.per_node_scan_ranges.entrySet()) { |
| if (entry.getValue() == null || !entry.getValue().isSetConcrete_ranges()) { |
| continue; |
| } |
| for (TScanRangeLocationList locationList : entry.getValue().concrete_ranges) { |
| if (locationList.scan_range.isSetHdfs_file_split()) { |
| THdfsFileSplit split = locationList.scan_range.getHdfs_file_split(); |
| THdfsPartition partition = findPartition(entry.getKey(), split); |
| scanRangePartitions.add(partition); |
| } |
| } |
| } |
| } |
| } |
| |
| if (execRequest.isSetFinalize_params()) { |
| insertTableId = execRequest.getFinalize_params().getTable_id(); |
| } |
| |
| boolean first = true; |
| // Iterate through all partitions of the descriptor table and verify all partitions |
| // are referenced. |
| if (execRequest.query_ctx.isSetDesc_tbl_testonly() |
| && execRequest.query_ctx.desc_tbl_testonly.isSetTableDescriptors()) { |
| for (TTableDescriptor tableDesc: |
| execRequest.query_ctx.desc_tbl_testonly.tableDescriptors) { |
| // All partitions of insertTableId are okay. |
| if (tableDesc.getId() == insertTableId) continue; |
| if (!tableDesc.isSetHdfsTable()) continue; |
| THdfsTable hdfsTable = tableDesc.getHdfsTable(); |
| for (Map.Entry<Long, THdfsPartition> e : |
| hdfsTable.getPartitions().entrySet()) { |
| THdfsPartition partition = e.getValue(); |
| if (!scanRangePartitions.contains(partition)) { |
| if (first) errorLog.append("query:\n" + query + "\n"); |
| errorLog.append( |
| " unreferenced partition: HdfsTable: " + tableDesc.getId() + |
| " HdfsPartition: " + partition.getId() + "\n"); |
| first = false; |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Construct a string representation of the scan ranges for this request. |
| */ |
| private StringBuilder printScanRangeLocations(TQueryExecRequest execRequest) { |
| StringBuilder result = new StringBuilder(); |
| for (TPlanExecInfo execInfo: execRequest.plan_exec_info) { |
| if (execInfo.per_node_scan_ranges == null) continue; |
| for (Map.Entry<Integer, TScanRangeSpec> entry : |
| execInfo.per_node_scan_ranges.entrySet()) { |
| result.append("NODE " + entry.getKey().toString() + ":\n"); |
| if (entry.getValue() == null || !entry.getValue().isSetConcrete_ranges()) { |
| continue; |
| } |
| |
| for (TScanRangeLocationList locations : entry.getValue().concrete_ranges) { |
| // print scan range |
| result.append(" "); |
| if (locations.scan_range.isSetHdfs_file_split()) { |
| THdfsFileSplit split = locations.scan_range.getHdfs_file_split(); |
| THdfsTable table = findTable(entry.getKey()); |
| THdfsPartition partition = table.getPartitions().get(split.partition_id); |
| THdfsPartitionLocation location = partition.getLocation(); |
| String file_location = location.getSuffix(); |
| if (location.prefix_index != -1) { |
| file_location = |
| table.getPartition_prefixes().get(location.prefix_index) + file_location; |
| } |
| Path filePath = new Path(file_location, split.relative_path); |
| filePath = cleanseFilePath(filePath); |
| result.append("HDFS SPLIT " + filePath.toString() + " " |
| + Long.toString(split.offset) + ":" + Long.toString(split.length)); |
| } |
| if (locations.scan_range.isSetHbase_key_range()) { |
| THBaseKeyRange keyRange = locations.scan_range.getHbase_key_range(); |
| result.append("HBASE KEYRANGE "); |
| if (keyRange.isSetStartKey()) { |
| result.append(HBaseScanNode.printKey(keyRange.getStartKey().getBytes())); |
| } else { |
| result.append("<unbounded>"); |
| } |
| result.append(":"); |
| if (keyRange.isSetStopKey()) { |
| result.append(HBaseScanNode.printKey(keyRange.getStopKey().getBytes())); |
| } else { |
| result.append("<unbounded>"); |
| } |
| } |
| |
| if (locations.scan_range.isSetKudu_scan_token()) { |
| Preconditions.checkNotNull(kuduClient_, |
| "Test should not be invoked on platforms that do not support Kudu."); |
| try { |
| String token = KuduScanToken.stringifySerializedToken( |
| locations.scan_range.kudu_scan_token.array(), kuduClient_); |
| // Don't match against the table id as its nondeterministic. |
| token = token.replaceAll(" table-id=.*?,", ""); |
| result.append(token); |
| } catch (IOException e) { |
| throw new IllegalStateException("Unable to parse Kudu scan token", e); |
| } |
| } |
| result.append("\n"); |
| } |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Normalize components of the given file path, removing any environment- or test-run |
| * dependent components. For example, substitutes the unique id portion of Impala |
| * generated file names with a fixed literal. Subclasses should override to do |
| * filesystem specific cleansing. |
| */ |
| protected Path cleanseFilePath(Path path) { |
| String fileName = path.getName(); |
| Pattern pattern = Pattern.compile("\\w{16}-\\w{16}_\\d+_data"); |
| Matcher matcher = pattern.matcher(fileName); |
| fileName = matcher.replaceFirst("<UID>_data"); |
| return new Path(path.getParent(), fileName); |
| } |
| |
| /** |
| * Extracts and returns the expected error message from expectedPlan. |
| * Returns null if expectedPlan is empty or its first element is not an error message. |
| * The accepted format for error messages is the exception string. We currently |
| * support only NotImplementedException and InternalException. |
| */ |
| private String getExpectedErrorMessage(ArrayList<String> expectedPlan) { |
| if (expectedPlan == null || expectedPlan.isEmpty()) return null; |
| if (!expectedPlan.get(0).contains("NotImplementedException") && |
| !expectedPlan.get(0).contains("InternalException")) return null; |
| return expectedPlan.get(0).trim(); |
| } |
| |
| private void handleException(String query, String expectedErrorMsg, |
| StringBuilder errorLog, StringBuilder actualOutput, Throwable e) { |
| String actualErrorMsg = e.getClass().getSimpleName() + ": " + e.getMessage(); |
| actualOutput.append(actualErrorMsg).append("\n"); |
| if (expectedErrorMsg == null) { |
| // Exception is unexpected |
| errorLog.append(String.format("Query:\n%s\nError Stack:\n%s\n", query, |
| ExceptionUtils.getStackTrace(e))); |
| } else { |
| // Compare actual and expected error messages. |
| if (expectedErrorMsg != null && !expectedErrorMsg.isEmpty()) { |
| if (!actualErrorMsg.toLowerCase().startsWith(expectedErrorMsg.toLowerCase())) { |
| errorLog.append("query:\n" + query + "\nExpected error message: '" |
| + expectedErrorMsg + "'\nActual error message: '" + actualErrorMsg + "'\n"); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Merge the options of b into a and return a |
| */ |
| protected TQueryOptions mergeQueryOptions(TQueryOptions a, TQueryOptions b) { |
| for(TQueryOptions._Fields f : TQueryOptions._Fields.values()) { |
| if (b.isSet(f)) { |
| a.setFieldValue(f, b.getFieldValue(f)); |
| } |
| } |
| return a; |
| } |
| |
| protected TQueryOptions defaultQueryOptions() { |
| TQueryOptions options = new TQueryOptions(); |
| options.setExplain_level(TExplainLevel.STANDARD); |
| options.setExec_single_node_rows_threshold(0); |
| return options; |
| } |
| |
| /** |
| * Produces single-node, distributed, and parallel plans for testCase and compares |
| * plan and scan range results. |
| * Appends the actual plans as well as the printed |
| * scan ranges to actualOutput, along with the requisite section header. |
| * locations to actualScanRangeLocations; compares both to the appropriate sections |
| * of 'testCase'. |
| */ |
| private void runTestCase(TestCase testCase, StringBuilder errorLog, |
| StringBuilder actualOutput, String dbName, |
| Set<PlannerTestOption> testOptions) throws CatalogException { |
| String query = testCase.getQuery(); |
| LOG.info("running query " + query); |
| if (query.isEmpty()) { |
| throw new IllegalStateException("Cannot plan empty query in line: " + |
| testCase.getStartingLineNum()); |
| } |
| // Set up the query context. Note that we need to deep copy it before planning each |
| // time since planning modifies it. |
| TQueryCtx queryCtx = TestUtils.createQueryContext( |
| dbName, System.getProperty("user.name")); |
| queryCtx.client_request.query_options = testCase.getOptions(); |
| // Test single node plan, scan range locations, and column lineage. |
| TExecRequest singleNodeExecRequest = testPlan(testCase, Section.PLAN, queryCtx.deepCopy(), |
| testOptions, errorLog, actualOutput); |
| validateTableIds(singleNodeExecRequest); |
| if (scanRangeLocationsCheckEnabled()) { |
| checkScanRangeLocations(testCase, singleNodeExecRequest, errorLog, actualOutput); |
| } |
| checkColumnLineage(testCase, singleNodeExecRequest, errorLog, actualOutput); |
| checkLimitCardinality(query, singleNodeExecRequest, errorLog); |
| // Test distributed plan. |
| testPlan(testCase, Section.DISTRIBUTEDPLAN, queryCtx.deepCopy(), testOptions, |
| errorLog, actualOutput); |
| // test parallel plans |
| testPlan(testCase, Section.PARALLELPLANS, queryCtx.deepCopy(), testOptions, |
| errorLog, actualOutput); |
| } |
| |
| /** |
| * Validate that all tables in the descriptor table of 'request' have a unique id and |
| * those are properly referenced by tuple descriptors and table sink. |
| */ |
| private void validateTableIds(TExecRequest request) { |
| if (request == null || !request.isSetQuery_exec_request()) return; |
| TQueryExecRequest execRequest = request.query_exec_request; |
| HashSet<Integer> seenTableIds = Sets.newHashSet(); |
| if (execRequest.query_ctx.isSetDesc_tbl_testonly()) { |
| TDescriptorTable descTbl = execRequest.query_ctx.desc_tbl_testonly; |
| if (descTbl.isSetTableDescriptors()) { |
| for (TTableDescriptor tableDesc: descTbl.tableDescriptors) { |
| if (seenTableIds.contains(tableDesc.id)) { |
| throw new IllegalStateException("Failed to verify table id for table: " + |
| tableDesc.getDbName() + "." + tableDesc.getTableName() + |
| ".\nTable id: " + tableDesc.id + " already used."); |
| } |
| seenTableIds.add(tableDesc.id); |
| } |
| } |
| |
| if (descTbl.isSetTupleDescriptors()) { |
| for (TTupleDescriptor tupleDesc: descTbl.tupleDescriptors) { |
| if (tupleDesc.isSetTableId() && !seenTableIds.contains(tupleDesc.tableId)) { |
| throw new IllegalStateException("TableDescriptor does not include table id" + |
| "of:\n" + tupleDesc.toString()); |
| } |
| } |
| } |
| } |
| |
| if (execRequest.isSetPlan_exec_info() && !execRequest.plan_exec_info.isEmpty()) { |
| TPlanFragment firstPlanFragment = execRequest.plan_exec_info.get(0).fragments.get(0); |
| if (firstPlanFragment.isSetOutput_sink() |
| && firstPlanFragment.output_sink.isSetTable_sink()) { |
| TTableSink tableSink = firstPlanFragment.output_sink.table_sink; |
| if (!seenTableIds.contains(tableSink.target_table_id) |
| || tableSink.target_table_id != DescriptorTable.TABLE_SINK_ID) { |
| throw new IllegalStateException("Table sink id error for target table:\n" + |
| tableSink.toString()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Produces the single-node or distributed plan for testCase and compares the |
| * actual/expected plans if the corresponding test section exists in testCase. |
| * |
| * Returns the produced exec request or null if there was an error generating |
| * the plan. |
| * |
| * testOptions control exactly how the plan is generated and compared. |
| */ |
| private TExecRequest testPlan(TestCase testCase, Section section, |
| TQueryCtx queryCtx, Set<PlannerTestOption> testOptions, |
| StringBuilder errorLog, StringBuilder actualOutput) { |
| String query = testCase.getQuery(); |
| queryCtx.client_request.setStmt(query); |
| TQueryOptions queryOptions = queryCtx.client_request.getQuery_options(); |
| if (section == Section.PLAN) { |
| queryOptions.setNum_nodes(1); |
| } else { |
| // for distributed and parallel execution we want to run on all available nodes |
| queryOptions.setNum_nodes( |
| ImpalaInternalServiceConstants.NUM_NODES_ALL); |
| } |
| if (section == Section.PARALLELPLANS |
| && (!queryOptions.isSetMt_dop() || queryOptions.getMt_dop() == 0)) { |
| // Set mt_dop to force production of parallel plans. |
| queryCtx.client_request.query_options.setMt_dop(2); |
| } |
| ArrayList<String> expectedPlan = testCase.getSectionContents(section); |
| boolean sectionExists = expectedPlan != null && !expectedPlan.isEmpty(); |
| String expectedErrorMsg = getExpectedErrorMessage(expectedPlan); |
| |
| TExecRequest execRequest = null; |
| if (sectionExists) actualOutput.append(section.getHeader() + "\n"); |
| String explainStr = ""; |
| try { |
| PlanCtx planCtx = new PlanCtx(queryCtx); |
| planCtx.disableDescTblSerialization(); |
| execRequest = frontend_.createExecRequest(planCtx); |
| explainStr = planCtx.getExplainString(); |
| } catch (Exception e) { |
| if (!sectionExists) return null; |
| handleException(query, expectedErrorMsg, errorLog, actualOutput, e); |
| } |
| // No expected plan was specified for section. Skip expected/actual comparison. |
| if (!sectionExists) return execRequest; |
| // Failed to produce an exec request. |
| if (execRequest == null) return null; |
| |
| explainStr = removeExplainHeader(explainStr, testOptions); |
| actualOutput.append(explainStr); |
| LOG.info(section.toString() + ":" + explainStr); |
| if (expectedErrorMsg != null) { |
| errorLog.append(String.format( |
| "\nExpected failure, but query produced %s.\nQuery:\n%s\n\n%s:\n%s", |
| section, query, section, explainStr)); |
| } else { |
| List<ResultFilter> resultFilters = |
| Lists.<ResultFilter>newArrayList(TestUtils.FILE_SIZE_FILTER); |
| if (!testOptions.contains(PlannerTestOption.VALIDATE_RESOURCES)) { |
| resultFilters.addAll(TestUtils.RESOURCE_FILTERS); |
| } |
| if (!testOptions.contains(PlannerTestOption.VALIDATE_CARDINALITY)) { |
| resultFilters.add(TestUtils.ROW_SIZE_FILTER); |
| resultFilters.add(TestUtils.CARDINALITY_FILTER); |
| } |
| if (!testOptions.contains(PlannerTestOption.VALIDATE_SCAN_FS)) { |
| resultFilters.add(TestUtils.SCAN_NODE_SCHEME_FILTER); |
| } |
| |
| String planDiff = TestUtils.compareOutput( |
| Lists.newArrayList(explainStr.split("\n")), expectedPlan, true, resultFilters); |
| if (!planDiff.isEmpty()) { |
| errorLog.append(String.format( |
| "\nSection %s of query:\n%s\n\n%s", section, query, planDiff)); |
| // Append the VERBOSE explain plan because it contains details about |
| // tuples/sizes/cardinality for easier debugging. |
| String verbosePlan = getVerboseExplainPlan(queryCtx); |
| errorLog.append("\nVerbose plan:\n" + verbosePlan); |
| } |
| } |
| return execRequest; |
| } |
| |
| /** |
| * Returns the VERBOSE explain plan for the given queryCtx, or a stack trace |
| * if an error occurred while creating the plan. |
| */ |
| private String getVerboseExplainPlan(TQueryCtx queryCtx) { |
| String explainStr; |
| TExecRequest execRequest = null; |
| TExplainLevel origExplainLevel = |
| queryCtx.client_request.getQuery_options().getExplain_level(); |
| try { |
| queryCtx.client_request.getQuery_options().setExplain_level(TExplainLevel.VERBOSE); |
| PlanCtx planCtx = new PlanCtx(queryCtx); |
| planCtx.disableDescTblSerialization(); |
| execRequest = frontend_.createExecRequest(planCtx); |
| explainStr = planCtx.getExplainString(); |
| } catch (ImpalaException e) { |
| return ExceptionUtils.getStackTrace(e); |
| } finally { |
| queryCtx.client_request.getQuery_options().setExplain_level(origExplainLevel); |
| } |
| Preconditions.checkNotNull(execRequest); |
| return removeExplainHeader( |
| explainStr, Collections.<PlannerTestOption>emptySet()); |
| } |
| |
| private void checkScanRangeLocations(TestCase testCase, TExecRequest execRequest, |
| StringBuilder errorLog, StringBuilder actualOutput) { |
| String query = testCase.getQuery(); |
| // Query exec request may not be set for DDL, e.g., CTAS. |
| String locationsStr = null; |
| if (execRequest != null && execRequest.isSetQuery_exec_request()) { |
| if (execRequest.query_exec_request.plan_exec_info == null) return; |
| buildMaps(execRequest.query_exec_request); |
| // If we optimize the partition key scans, we may get all the partition key values |
| // from the metadata and don't reference any table. Skip the check in this case. |
| TQueryOptions options = execRequest.getQuery_options(); |
| if (!(options.isSetOptimize_partition_key_scans() && |
| options.optimize_partition_key_scans)) { |
| testHdfsPartitionsReferenced(execRequest.query_exec_request, query, errorLog); |
| } |
| locationsStr = |
| printScanRangeLocations(execRequest.query_exec_request).toString(); |
| } |
| |
| // compare scan range locations |
| LOG.info("scan range locations: " + locationsStr); |
| ArrayList<String> expectedLocations = |
| testCase.getSectionContents(Section.SCANRANGELOCATIONS); |
| |
| if (expectedLocations.size() > 0 && locationsStr != null) { |
| // Locations' order does not matter. |
| String result = TestUtils.compareOutput( |
| Lists.newArrayList(locationsStr.split("\n")), expectedLocations, false, |
| Collections.<TestUtils.ResultFilter>emptyList()); |
| if (!result.isEmpty()) { |
| errorLog.append("section " + Section.SCANRANGELOCATIONS + " of query:\n" |
| + query + "\n" + result); |
| } |
| actualOutput.append(Section.SCANRANGELOCATIONS.getHeader() + "\n"); |
| // Print the locations out sorted since the order is random and messed up |
| // the diffs. The values in locationStr contains "Node X" labels as well |
| // as paths. |
| ArrayList<String> locations = Lists.newArrayList(locationsStr.split("\n")); |
| ArrayList<String> perNodeLocations = Lists.newArrayList(); |
| |
| for (int i = 0; i < locations.size(); ++i) { |
| if (locations.get(i).startsWith("NODE")) { |
| if (!perNodeLocations.isEmpty()) { |
| Collections.sort(perNodeLocations); |
| actualOutput.append(Joiner.on("\n").join(perNodeLocations)).append("\n"); |
| perNodeLocations.clear(); |
| } |
| actualOutput.append(locations.get(i)).append("\n"); |
| } else { |
| perNodeLocations.add(locations.get(i)); |
| } |
| } |
| |
| if (!perNodeLocations.isEmpty()) { |
| Collections.sort(perNodeLocations); |
| actualOutput.append(Joiner.on("\n").join(perNodeLocations)).append("\n"); |
| } |
| |
| // TODO: check that scan range locations are identical in both cases |
| } |
| } |
| |
| /** Checks that limits are accounted for in the cardinality of plan nodes. |
| */ |
| private void checkLimitCardinality( |
| String query, TExecRequest execRequest, StringBuilder errorLog) { |
| if (execRequest == null) return; |
| if (!execRequest.isSetQuery_exec_request() |
| || execRequest.query_exec_request == null |
| || execRequest.query_exec_request.plan_exec_info == null) { |
| return; |
| } |
| for (TPlanExecInfo execInfo : execRequest.query_exec_request.plan_exec_info) { |
| for (TPlanFragment planFragment : execInfo.fragments) { |
| if (!planFragment.isSetPlan() || planFragment.plan == null) continue; |
| for (TPlanNode node : planFragment.plan.nodes) { |
| if (!node.isSetLimit() || -1 == node.limit) continue; |
| if (!node.isSetEstimated_stats() || node.estimated_stats == null) continue; |
| if (node.limit < node.estimated_stats.cardinality) { |
| StringBuilder limitCardinalityError = new StringBuilder(); |
| limitCardinalityError.append("Query: " + query + "\n"); |
| limitCardinalityError.append( |
| "Expected cardinality estimate less than or equal to LIMIT: " |
| + node.limit + "\n"); |
| limitCardinalityError.append( |
| "Actual cardinality estimate: " |
| + node.estimated_stats.cardinality + "\n"); |
| limitCardinalityError.append( |
| "In node id " |
| + node.node_id + "\n"); |
| errorLog.append(limitCardinalityError.toString()); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * This function plans the given query and fails if the estimated cardinalities are |
| * not within the specified bounds [min, max]. |
| */ |
| protected void checkCardinality(String query, long min, long max) |
| throws ImpalaException { |
| TQueryCtx queryCtx = TestUtils.createQueryContext(Catalog.DEFAULT_DB, |
| System.getProperty("user.name")); |
| queryCtx.client_request.setStmt(query); |
| PlanCtx planCtx = new PlanCtx(queryCtx); |
| planCtx.disableDescTblSerialization(); |
| TExecRequest execRequest = frontend_.createExecRequest(planCtx); |
| |
| if (!execRequest.isSetQuery_exec_request() |
| || execRequest.query_exec_request == null |
| || execRequest.query_exec_request.plan_exec_info == null) { |
| return; |
| } |
| for (TPlanExecInfo execInfo : execRequest.query_exec_request.plan_exec_info) { |
| for (TPlanFragment planFragment : execInfo.fragments) { |
| if (!planFragment.isSetPlan() || planFragment.plan == null) continue; |
| for (TPlanNode node : planFragment.plan.nodes) { |
| if (node.estimated_stats == null) { |
| fail("Query: " + query + " has no estimated statistics"); |
| } |
| long cardinality = node.estimated_stats.cardinality; |
| if (cardinality < min || cardinality > max) { |
| StringBuilder errorLog = new StringBuilder(); |
| errorLog.append("Query: " + query + "\n"); |
| errorLog.append( |
| "Expected cardinality estimate between " + min + " and " + max + "\n"); |
| errorLog.append("Actual cardinality estimate: " + cardinality + "\n"); |
| errorLog.append("In node id " + node.node_id + "\n"); |
| fail(errorLog.toString()); |
| } |
| } |
| } |
| } |
| } |
| |
| private void checkColumnLineage(TestCase testCase, TExecRequest execRequest, |
| StringBuilder errorLog, StringBuilder actualOutput) { |
| String query = testCase.getQuery(); |
| ArrayList<String> expectedLineage = testCase.getSectionContents(Section.LINEAGE); |
| if (expectedLineage == null || expectedLineage.isEmpty()) return; |
| TLineageGraph lineageGraph = null; |
| if (execRequest == null) return; |
| if (execRequest.isSetQuery_exec_request()) { |
| lineageGraph = execRequest.query_exec_request.lineage_graph; |
| } else if (execRequest.isSetCatalog_op_request()) { |
| lineageGraph = execRequest.catalog_op_request.lineage_graph; |
| } |
| ArrayList<String> expected = |
| testCase.getSectionContents(Section.LINEAGE); |
| if (expected.size() > 0 && lineageGraph != null) { |
| String serializedGraph = Joiner.on("\n").join(expected); |
| ColumnLineageGraph expectedGraph = |
| ColumnLineageGraph.createFromJSON(serializedGraph); |
| ColumnLineageGraph outputGraph = |
| ColumnLineageGraph.fromThrift(lineageGraph); |
| if (expectedGraph == null || outputGraph == null || |
| !outputGraph.equalsForTests(expectedGraph)) { |
| StringBuilder lineageError = new StringBuilder(); |
| lineageError.append("section " + Section.LINEAGE + " of query:\n" |
| + query + "\n"); |
| lineageError.append("Output:"); |
| lineageError.append(TestUtils.prettyPrintJson(outputGraph.toJson() + "\n")); |
| lineageError.append("\nExpected:\n"); |
| lineageError.append(serializedGraph + "\n"); |
| errorLog.append(lineageError.toString()); |
| } |
| actualOutput.append(Section.LINEAGE.getHeader()); |
| actualOutput.append(TestUtils.prettyPrintJson(outputGraph.toJson())); |
| actualOutput.append("\n"); |
| } |
| } |
| |
| /** |
| * Strip out all or part of the the explain header. |
| * This can be used to remove lines containing resource estimates and the warning about |
| * missing stats from the given explain plan. |
| * explain is the original explain output |
| * testOptions controls which parts of the header to include |
| */ |
| private String removeExplainHeader(String explain, Set<PlannerTestOption> testOptions) { |
| if (testOptions.contains(PlannerTestOption.INCLUDE_EXPLAIN_HEADER)) return explain; |
| boolean keepResources = |
| testOptions.contains(PlannerTestOption.INCLUDE_RESOURCE_HEADER); |
| boolean keepQueryWithImplicitCasts = |
| testOptions.contains(PlannerTestOption.INCLUDE_QUERY_WITH_IMPLICIT_CASTS); |
| StringBuilder builder = new StringBuilder(); |
| boolean inHeader = true; |
| boolean inImplictCasts = false; |
| |
| for (String line: explain.split("\n")) { |
| if (inHeader) { |
| // The first empty line indicates the end of the header. |
| if (line.isEmpty()) { |
| inHeader = false; |
| } else if (keepResources && line.contains("Resource")) { |
| builder.append(line).append("\n"); |
| } else if (keepQueryWithImplicitCasts) { |
| inImplictCasts |= line.contains("Analyzed query:"); |
| if (inImplictCasts) { |
| // Keep copying the query with implicit casts. |
| // This works because this is the last thing in the header. |
| builder.append(line).append("\n"); |
| } |
| } |
| } else { |
| builder.append(line).append("\n"); |
| } |
| } |
| return builder.toString(); |
| } |
| |
| /** |
| * Assorted binary options that alter the behaviour of planner tests, generally |
| * enabling additional more-detailed checks. |
| */ |
| protected static enum PlannerTestOption { |
| // Generate extended explain plans (default is STANDARD). |
| EXTENDED_EXPLAIN, |
| // Include the header of the explain plan (default is to strip the explain header). |
| INCLUDE_EXPLAIN_HEADER, |
| // Include the part of the explain header that has top-level resource consumption. |
| // If INCLUDE_EXPLAIN_HEADER is enabled, these are already included. |
| INCLUDE_RESOURCE_HEADER, |
| // Include the part of the extended explain header that has the query including |
| // implicit casts. Equivalent to enabling INCLUDE_EXPLAIN_HEADER and EXTENDED_EXPLAIN. |
| INCLUDE_QUERY_WITH_IMPLICIT_CASTS, |
| // Validate the values of resource requirement values within the plan (default is to |
| // ignore differences in resource values). Operator- and fragment-level resource |
| // requirements are only included if EXTENDED_EXPLAIN is also enabled. |
| VALIDATE_RESOURCES, |
| // Verify the row size and cardinality fields in the plan. Default is |
| // to ignore these values (for backward compatibility.) Turn this option |
| // on for test that validate cardinality calculations: joins, scan |
| // cardinality, etc. |
| VALIDATE_CARDINALITY, |
| // Validate the filesystem schemes shown in the scan node plan. An example of a scan |
| // node profile that contains fs specific information is: |
| // |
| // 00:SCAN HDFS [functional.testtbl] |
| // HDFS partitions=1/1 files=0 size=0B |
| // S3 partitions=1/0 files=0 size=0B |
| // ADLS partitions=1/0 files=0 size=0B |
| // |
| // By default, this flag is disabled. So tests will ignore the values of 'HDFS', |
| // 'S3', and 'ADLS' in the above explain plan. |
| VALIDATE_SCAN_FS, |
| // If set, disables the attempt to compute an estimated number of rows in an |
| // hdfs table. |
| DISABLE_HDFS_NUM_ROWS_ESTIMATE |
| } |
| |
| protected void runPlannerTestFile(String testFile, TQueryOptions options) { |
| runPlannerTestFile(testFile, "default", options, |
| Collections.<PlannerTestOption>emptySet()); |
| } |
| |
| protected void runPlannerTestFile(String testFile, TQueryOptions options, |
| Set<PlannerTestOption> testOptions) { |
| runPlannerTestFile(testFile, "default", options, testOptions); |
| } |
| |
| protected void runPlannerTestFile( |
| String testFile, Set<PlannerTestOption> testOptions) { |
| runPlannerTestFile(testFile, "default", defaultQueryOptions(), testOptions); |
| } |
| |
| protected void runPlannerTestFile( |
| String testFile, String dbName, Set<PlannerTestOption> testOptions) { |
| runPlannerTestFile(testFile, dbName, defaultQueryOptions(), testOptions); |
| } |
| |
| protected void runPlannerTestFile( |
| String testFile, String dbName, TQueryOptions options) { |
| runPlannerTestFile(testFile, dbName, options, |
| Collections.<PlannerTestOption>emptySet()); |
| } |
| |
| private void runPlannerTestFile(String testFile, String dbName, TQueryOptions options, |
| Set<PlannerTestOption> testOptions) { |
| String fileName = testDir_.resolve(testFile + ".test").toString(); |
| if (options == null) { |
| options = defaultQueryOptions(); |
| } else { |
| options = mergeQueryOptions(defaultQueryOptions(), options); |
| } |
| if (testOptions.contains(PlannerTestOption.EXTENDED_EXPLAIN)) { |
| options.setExplain_level(TExplainLevel.EXTENDED); |
| } |
| TestFileParser queryFileParser = new TestFileParser(fileName, options); |
| StringBuilder actualOutput = new StringBuilder(); |
| |
| queryFileParser.parseFile(); |
| StringBuilder errorLog = new StringBuilder(); |
| for (TestCase testCase : queryFileParser.getTestCases()) { |
| actualOutput.append(testCase.getSectionAsString(Section.QUERY, true, "\n")); |
| actualOutput.append("\n"); |
| |
| String neededHiveMajorVersion = |
| testCase.getSectionAsString(Section.HIVE_MAJOR_VERSION, false, ""); |
| if (neededHiveMajorVersion != null && !neededHiveMajorVersion.isEmpty() && |
| Integer.parseInt(neededHiveMajorVersion) != TestUtils.getHiveMajorVersion()) { |
| actualOutput.append("Skipping test case (needs Hive major version: "); |
| actualOutput.append(neededHiveMajorVersion); |
| actualOutput.append(")\n"); |
| actualOutput.append("====\n"); |
| continue; |
| } |
| |
| String queryOptionsSection = testCase.getSectionAsString( |
| Section.QUERYOPTIONS, true, "\n"); |
| if (queryOptionsSection != null && !queryOptionsSection.isEmpty()) { |
| actualOutput.append("---- QUERYOPTIONS\n"); |
| actualOutput.append(queryOptionsSection); |
| actualOutput.append("\n"); |
| } |
| try { |
| runTestCase(testCase, errorLog, actualOutput, dbName, testOptions); |
| } catch (CatalogException e) { |
| errorLog.append(String.format("Failed to plan query\n%s\n%s", |
| testCase.getQuery(), e.getMessage())); |
| } |
| actualOutput.append("====\n"); |
| } |
| |
| // Create the actual output file |
| if (GENERATE_OUTPUT_FILE) { |
| try { |
| outDir_.toFile().mkdirs(); |
| FileWriter fw = new FileWriter(outDir_.resolve(testFile + ".test").toFile()); |
| fw.write(actualOutput.toString()); |
| fw.close(); |
| } catch (IOException e) { |
| errorLog.append("Unable to create output file: " + e.getMessage()); |
| } |
| } |
| |
| if (errorLog.length() != 0) { |
| fail(errorLog.toString()); |
| } |
| } |
| |
| protected void runPlannerTestFile(String testFile) { |
| runPlannerTestFile(testFile, "default", defaultQueryOptions(), |
| Collections.<PlannerTestOption>emptySet()); |
| } |
| |
| protected void runPlannerTestFile(String testFile, String dbName) { |
| runPlannerTestFile(testFile, dbName, defaultQueryOptions(), |
| Collections.<PlannerTestOption>emptySet()); |
| } |
| |
| /** |
| * Returns true if {@link #checkScanRangeLocations(TestCase, TExecRequest, |
| * StringBuilder, StringBuilder)} should be run, returns false if it should not be run. |
| */ |
| protected boolean scanRangeLocationsCheckEnabled() { |
| return true; |
| } |
| } |