| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.queryengine.execution.operator; |
| |
| import org.apache.iotdb.common.rpc.thrift.TEndPoint; |
| import org.apache.iotdb.commons.client.IClientManager; |
| import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; |
| import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; |
| import org.apache.iotdb.commons.conf.IoTDBConstant; |
| import org.apache.iotdb.db.exception.mpp.FragmentInstanceFetchException; |
| import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; |
| import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; |
| import org.apache.iotdb.db.queryengine.plan.Coordinator; |
| import org.apache.iotdb.db.queryengine.plan.execution.QueryExecution; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; |
| import org.apache.iotdb.db.queryengine.statistics.FragmentInstanceStatisticsDrawer; |
| import org.apache.iotdb.db.queryengine.statistics.QueryStatisticsFetcher; |
| import org.apache.iotdb.db.queryengine.statistics.StatisticLine; |
| import org.apache.iotdb.db.utils.SetThreadName; |
| import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.read.common.block.TsBlock; |
| import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; |
| import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; |
| import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; |
| import org.apache.iotdb.tsfile.utils.Binary; |
| |
| import com.google.common.util.concurrent.ListenableFuture; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| |
| public class ExplainAnalyzeOperator implements ProcessOperator { |
| private final OperatorContext operatorContext; |
| private final Operator child; |
| private final boolean verbose; |
| private boolean outputResult = false; |
| private final List<FragmentInstance> instances; |
| private static final Logger logger = |
| LoggerFactory.getLogger(IoTDBConstant.EXPLAIN_ANALYZE_LOGGER_NAME); |
| private final FragmentInstanceStatisticsDrawer fragmentInstanceStatisticsDrawer = |
| new FragmentInstanceStatisticsDrawer(); |
| private static final String LOG_TITLE = |
| "---------------------Intermediate Results of EXPLAIN ANALYZE---------------------:"; |
| private final ScheduledFuture<?> logRecordTask; |
| private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager; |
| |
| public ExplainAnalyzeOperator( |
| OperatorContext operatorContext, |
| Operator child, |
| long queryId, |
| boolean verbose, |
| long timeout) { |
| this.operatorContext = operatorContext; |
| this.child = child; |
| this.verbose = verbose; |
| Coordinator coordinator = Coordinator.getInstance(); |
| |
| this.clientManager = coordinator.getInternalServiceClientManager(); |
| |
| QueryExecution queryExecution = (QueryExecution) coordinator.getQueryExecution(queryId); |
| this.instances = queryExecution.getDistributedPlan().getInstances(); |
| fragmentInstanceStatisticsDrawer.renderPlanStatistics(queryExecution.getContext()); |
| |
| // The time interval guarantees the result of EXPLAIN ANALYZE will be printed at least three |
| // times. |
| // And the maximum time interval is 15s. |
| long logIntervalInMs = Math.min(timeout / 3, 15000); |
| this.logRecordTask = |
| ScheduledExecutorUtil.safelyScheduleAtFixedRate( |
| queryExecution.getScheduledExecutor(), |
| this::logIntermediateResultIfTimeout, |
| logIntervalInMs, |
| logIntervalInMs, |
| TimeUnit.MILLISECONDS); |
| } |
| |
| @Override |
| public OperatorContext getOperatorContext() { |
| return operatorContext; |
| } |
| |
| @Override |
| public TsBlock next() throws Exception { |
| if (child.hasNextWithTimer()) { |
| child.nextWithTimer(); |
| return null; |
| } |
| |
| // fetch statics from all fragment instances |
| TsBlock result = buildResult(); |
| outputResult = true; |
| return result; |
| } |
| |
| private List<String> buildFragmentInstanceStatistics( |
| List<FragmentInstance> instances, boolean verbose) throws FragmentInstanceFetchException { |
| |
| Map<FragmentInstanceId, TFetchFragmentInstanceStatisticsResp> allStatistics = |
| QueryStatisticsFetcher.fetchAllStatistics(instances, clientManager); |
| List<StatisticLine> statisticLines = |
| fragmentInstanceStatisticsDrawer.renderFragmentInstances(instances, allStatistics, verbose); |
| |
| List<String> analyzeResult = new ArrayList<>(); |
| for (StatisticLine line : statisticLines) { |
| StringBuilder sb = new StringBuilder(); |
| sb.append(line.getValue()); |
| for (int i = 0; |
| i < fragmentInstanceStatisticsDrawer.getMaxLineLength() - line.getValue().length(); |
| i++) { |
| sb.append(" "); |
| } |
| analyzeResult.add(sb.toString()); |
| } |
| return analyzeResult; |
| } |
| |
| // We will log the intermediate result of analyze if timeout |
| // It can be used to analyze deadlock problem. |
| private void logIntermediateResultIfTimeout() { |
| try (SetThreadName ignored = |
| new SetThreadName( |
| String.format( |
| "%s-Explain-Analyze-Logger", |
| operatorContext.getInstanceContext().getId().getQueryId()))) { |
| List<String> analyzeResult = buildFragmentInstanceStatistics(instances, verbose); |
| |
| StringBuilder logContent = new StringBuilder(); |
| logContent.append("\n").append(LOG_TITLE).append("\n"); |
| for (String line : analyzeResult) { |
| logContent.append(line).append("\n"); |
| } |
| String res = logContent.toString(); |
| logger.info(res); |
| } catch (Exception e) { |
| logger.error("Error occurred when logging intermediate result of analyze.", e); |
| } |
| } |
| |
| private TsBlock buildResult() throws FragmentInstanceFetchException { |
| |
| List<String> analyzeResult = buildFragmentInstanceStatistics(instances, verbose); |
| |
| TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.TEXT)); |
| TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder(); |
| ColumnBuilder columnBuilder = builder.getColumnBuilder(0); |
| |
| for (String line : analyzeResult) { |
| timeColumnBuilder.writeLong(0); |
| columnBuilder.writeBinary(new Binary(line.getBytes())); |
| builder.declarePosition(); |
| } |
| return builder.build(); |
| } |
| |
| @Override |
| public boolean hasNext() throws Exception { |
| return child.hasNext() || !outputResult; |
| } |
| |
| @Override |
| public ListenableFuture<?> isBlocked() { |
| return child.isBlocked(); |
| } |
| |
| @Override |
| public void close() throws Exception { |
| child.close(); |
| |
| if (logRecordTask != null) { |
| boolean cancelResult = logRecordTask.cancel(true); |
| if (!cancelResult) { |
| logger.debug("cancel state tracking task failed. {}", logRecordTask.isCancelled()); |
| } |
| } else { |
| logger.debug("trackTask not started"); |
| } |
| } |
| |
| @Override |
| public boolean isFinished() throws Exception { |
| return !child.hasNext() && outputResult; |
| } |
| |
| @Override |
| public long calculateMaxPeekMemory() { |
| return 0; |
| } |
| |
| @Override |
| public long calculateMaxReturnSize() { |
| return 0; |
| } |
| |
| @Override |
| public long calculateRetainedSizeAfterCallingNext() { |
| return 0; |
| } |
| } |