merge lmh/groupByTest (#9848)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java index 344bf05..d4ac9d7 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -95,6 +95,7 @@ public static final String AUDIT_LOGGER_NAME = "IoTDB_AUDIT_LOGGER"; public static final String SLOW_SQL_LOGGER_NAME = "SLOW_SQL"; public static final String COMPACTION_LOGGER_NAME = "COMPACTION"; + public static final String QUERY_STATISTICS_LOGGER_NAME = "QUERY_STATISTICS"; public static final String IOTDB_JMX_LOCAL = "iotdb.jmx.local"; public static final String IOTDB_JMX_PORT = "com.sun.management.jmxremote.port";
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java index 5564df7..0258522 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.metric.ChunkCacheMetrics; import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; @@ -136,7 +137,9 @@ chunkMetaData.getDeleteIntervalList(), chunkMetaData.getStatistics()); } finally { - QUERY_METRICS.recordSeriesScanCost(READ_CHUNK_ALL, System.nanoTime() - startTime); + long costTime = System.nanoTime() - startTime; + QUERY_METRICS.recordSeriesScanCost(READ_CHUNK_ALL, costTime); + QueryStatistics.getInstance().addCost(QueryStatistics.LOAD_CHUNK, costTime); } }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java index e359ce8..3842d26 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -19,9 +19,9 @@ package org.apache.iotdb.db.mpp.aggregation; -import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -33,8 +33,7 @@ import java.util.List; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA; -import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.AGGREGATOR_PROCESS_TSBLOCK; public class Aggregator { @@ -43,8 +42,6 @@ protected List<InputLocation[]> inputLocationList; protected final AggregationStep step; - protected final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance(); - // Used for SeriesAggregateScanOperator public Aggregator(Accumulator accumulator, AggregationStep step) { this.accumulator = accumulator; @@ -78,34 +75,30 @@ accumulator.addInput(timeAndValueColumn, bitMap, lastIndex); } } finally { - QUERY_METRICS.recordExecutionCost(AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime); + QueryStatistics.getInstance() + .addCost(AGGREGATOR_PROCESS_TSBLOCK, System.nanoTime() - startTime); } } // Used for AggregateOperator public void processTsBlocks(TsBlock[] tsBlock) { - long startTime = System.nanoTime(); - try { - checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input"); - if (step.isInputFinal()) { - checkArgument(inputLocationList.size() == 1, "Final output can only be single column"); - Column finalResult = - tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn( - inputLocationList.get(0)[0].getValueColumnIndex()); - accumulator.setFinal(finalResult); - } else { - for (InputLocation[] inputLocations : inputLocationList) { - Column[] columns = new Column[inputLocations.length]; - for (int i = 0; i < inputLocations.length; i++) { - columns[i] = - tsBlock[inputLocations[i].getTsBlockIndex()].getColumn( - inputLocations[i].getValueColumnIndex()); - } - accumulator.addIntermediate(columns); + checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input"); + if (step.isInputFinal()) { + checkArgument(inputLocationList.size() == 1, "Final output can only be single column"); + Column finalResult = + tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn( + inputLocationList.get(0)[0].getValueColumnIndex()); + accumulator.setFinal(finalResult); + } else { + for (InputLocation[] inputLocations : inputLocationList) { + Column[] columns = new Column[inputLocations.length]; + for (int i = 0; i < inputLocations.length; i++) { + columns[i] = + tsBlock[inputLocations[i].getTsBlockIndex()].getColumn( + inputLocations[i].getValueColumnIndex()); } + accumulator.addIntermediate(columns); } - } finally { - QUERY_METRICS.recordExecutionCost(AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime); } } @@ -119,14 +112,9 @@ /** Used for SeriesAggregateScanOperator. */ public void processStatistics(Statistics[] statistics) { - long startTime = System.nanoTime(); - try { - for (InputLocation[] inputLocations : inputLocationList) { - int valueIndex = inputLocations[0].getValueColumnIndex(); - accumulator.addStatistics(statistics[valueIndex]); - } - } finally { - QUERY_METRICS.recordExecutionCost(AGGREGATION_FROM_STATISTICS, System.nanoTime() - startTime); + for (InputLocation[] inputLocations : inputLocationList) { + int valueIndex = inputLocations[0].getValueColumnIndex(); + accumulator.addStatistics(statistics[valueIndex]); } }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java index 7fe07b9..01c1c35 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import com.google.common.util.concurrent.SettableFuture; @@ -30,6 +31,7 @@ import java.util.List; import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.QUERY_RESOURCE_INIT; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.INIT_SOURCE_OP; /** * One dataDriver is responsible for one FragmentInstance which is for data query, which may @@ -81,6 +83,7 @@ // And it's safe for us to throw this exception here in such case. throw new IllegalStateException("QueryDataSource should never be null!"); } + long start = System.nanoTime(); sourceOperators.forEach( sourceOperator -> { // construct QueryDataSource for source operator @@ -91,11 +94,18 @@ sourceOperator.initQueryDataSource(queryDataSource); }); + driverContext + .getFragmentInstanceContext() + .addOperationTime(INIT_SOURCE_OP, System.nanoTime() - start); } this.init = true; } finally { - QUERY_METRICS.recordExecutionCost(QUERY_RESOURCE_INIT, System.nanoTime() - startTime); + long costTime = System.nanoTime() - startTime; + QUERY_METRICS.recordExecutionCost(QUERY_RESOURCE_INIT, costTime); + driverContext + .getFragmentInstanceContext() + .addOperationTime(QueryStatistics.QUERY_RESOURCE_INIT, costTime); } }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java index a214522..5a81807 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId; import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import com.google.common.collect.ImmutableList; @@ -51,6 +52,8 @@ import static java.lang.Boolean.TRUE; import static org.apache.iotdb.db.mpp.execution.operator.Operator.NOT_BLOCKED; import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.DRIVER_INTERNAL_PROCESS; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SEND_TSBLOCK; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SET_NO_MORE_TSBLOCK; public abstract class Driver implements IDriver { @@ -223,7 +226,11 @@ if (root.hasNextWithTimer()) { TsBlock tsBlock = root.nextWithTimer(); if (tsBlock != null && !tsBlock.isEmpty()) { + long startTime = System.nanoTime(); sink.send(tsBlock); + driverContext + .getFragmentInstanceContext() + .addOperationTime(SEND_TSBLOCK, System.nanoTime() - startTime); } } return NOT_BLOCKED; @@ -244,8 +251,11 @@ driverContext.failed(newException); throw newException; } finally { - QUERY_METRICS.recordExecutionCost( - DRIVER_INTERNAL_PROCESS, System.nanoTime() - startTimeNanos); + long costTime = System.nanoTime() - startTimeNanos; + QUERY_METRICS.recordExecutionCost(DRIVER_INTERNAL_PROCESS, costTime); + driverContext + .getFragmentInstanceContext() + .addOperationTime(QueryStatistics.DRIVER_INTERNAL_PROCESS, costTime); } } @@ -374,6 +384,7 @@ Throwable inFlightException = null; try { + long startTime = System.nanoTime(); root.close(); if (driverContext.mayHaveTmpFile()) { @@ -381,6 +392,9 @@ } sink.setNoMoreTsBlocks(); + driverContext + .getFragmentInstanceContext() + .addOperationTime(SET_NO_MORE_TSBLOCK, System.nanoTime() - startTime); // record operator execution statistics to metrics List<OperatorContext> operatorContexts = driverContext.getOperatorContexts();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java index 6e48426..eca396e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.execution.exchange.sink.LocalSinkChannel; import org.apache.iotdb.db.mpp.execution.exchange.source.LocalSourceHandle; import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.utils.Pair; @@ -41,11 +42,17 @@ import static com.google.common.util.concurrent.Futures.immediateVoidFuture; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.FREE_MEM; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.NOTIFY_END; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.NOTIFY_NEW_TSBLOCK; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.RESERVE_MEMORY; /** This is not thread safe class, the caller should ensure multi-threads safety. */ @NotThreadSafe public class SharedTsBlockQueue { + private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance(); + private static final Logger LOGGER = LoggerFactory.getLogger(SharedTsBlockQueue.class); private final TFragmentInstanceId localFragmentInstanceId; @@ -157,7 +164,9 @@ } this.noMoreTsBlocks = noMoreTsBlocks; if (!blocked.isDone()) { + long startTime = System.nanoTime(); blocked.set(null); + QUERY_STATISTICS.addCost(NOTIFY_END, System.nanoTime() - startTime); } if (this.sourceHandle != null) { this.sourceHandle.checkAndInvokeOnFinished(); @@ -173,6 +182,7 @@ throw new IllegalStateException("queue has been destroyed"); } TsBlock tsBlock = queue.remove(); + long startTime = System.nanoTime(); localMemoryManager .getQueryPool() .free( @@ -180,6 +190,7 @@ fullFragmentInstanceId, localPlanNodeId, tsBlock.getRetainedSizeInBytes()); + QUERY_STATISTICS.addCost(FREE_MEM, System.nanoTime() - startTime); bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes(); // Every time LocalSourceHandle consumes a TsBlock, it needs to send the event to // corresponding LocalSinkChannel. @@ -212,17 +223,24 @@ localFragmentInstanceId.queryId, fullFragmentInstanceId, localPlanNodeId); alreadyRegistered = true; } - Pair<ListenableFuture<Void>, Boolean> pair = - localMemoryManager - .getQueryPool() - .reserve( - localFragmentInstanceId.getQueryId(), - fullFragmentInstanceId, - localPlanNodeId, - tsBlock.getRetainedSizeInBytes(), - maxBytesCanReserve); - blockedOnMemory = pair.left; - bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes(); + + long startTime = System.nanoTime(); + Pair<ListenableFuture<Void>, Boolean> pair; + try { + pair = + localMemoryManager + .getQueryPool() + .reserve( + localFragmentInstanceId.getQueryId(), + fullFragmentInstanceId, + localPlanNodeId, + tsBlock.getRetainedSizeInBytes(), + maxBytesCanReserve); + blockedOnMemory = pair.left; + bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes(); + } finally { + QUERY_STATISTICS.addCost(RESERVE_MEMORY, System.nanoTime() - startTime); + } // reserve memory failed, we should wait until there is enough memory if (!pair.right) { @@ -239,7 +257,9 @@ } else { // reserve memory succeeded, add the TsBlock directly queue.add(tsBlock); if (!blocked.isDone()) { + startTime = System.nanoTime(); blocked.set(null); + QUERY_STATISTICS.addCost(NOTIFY_NEW_TSBLOCK, System.nanoTime() - startTime); } }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java index aae1f8f..a209837 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkListener; import org.apache.iotdb.db.mpp.execution.exchange.SharedTsBlockQueue; import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -35,9 +36,14 @@ import static com.google.common.util.concurrent.Futures.immediateVoidFuture; import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_LOCAL; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CHECK_AND_INVOKE_ON_FINISHED; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SINK_HANDLE_END_LISTENER; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SINK_HANDLE_FINISH_LISTENER; public class LocalSinkChannel implements ISinkChannel { + private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance(); + private static final Logger LOGGER = LoggerFactory.getLogger(LocalSinkChannel.class); private TFragmentInstanceId localFragmentInstanceId; @@ -109,8 +115,10 @@ if (isFinished()) { synchronized (this) { if (!invokedOnFinished) { + long start = System.nanoTime(); sinkListener.onFinish(this); invokedOnFinished = true; + QUERY_STATISTICS.addCost(SINK_HANDLE_FINISH_LISTENER, System.nanoTime() - start); } } } @@ -156,10 +164,15 @@ return; } queue.setNoMoreTsBlocks(true); + long startTime = System.nanoTime(); sinkListener.onEndOfBlocks(this); + QUERY_STATISTICS.addCost(SINK_HANDLE_END_LISTENER, System.nanoTime() - startTime); } } + + long startTime = System.nanoTime(); checkAndInvokeOnFinished(); + QUERY_STATISTICS.addCost(CHECK_AND_INVOKE_ON_FINISHED, System.nanoTime() - startTime); LOGGER.debug("[EndSetNoMoreTsBlocksOnLocal]"); }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java index 9ffe6e0..8c9015e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener; import org.apache.iotdb.db.mpp.execution.exchange.SharedTsBlockQueue; import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import org.apache.iotdb.rpc.TSStatusCode; @@ -40,11 +41,15 @@ import static org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.createFullIdFrom; import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SOURCE_HANDLE_DESERIALIZE_TSBLOCK_LOCAL; import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SOURCE_HANDLE_GET_TSBLOCK_LOCAL; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.LOCAL_SOURCE_HANDLE_GET_TSBLOCK; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.LOCAL_SOURCE_HANDLE_SER_TSBLOCK; public class LocalSourceHandle implements ISourceHandle { private static final Logger LOGGER = LoggerFactory.getLogger(LocalSourceHandle.class); + private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance(); + private TFragmentInstanceId localFragmentInstanceId; private String localPlanNodeId; private final SourceHandleListener sourceHandleListener; @@ -121,8 +126,9 @@ checkAndInvokeOnFinished(); return tsBlock; } finally { - QUERY_METRICS.recordDataExchangeCost( - SOURCE_HANDLE_GET_TSBLOCK_LOCAL, System.nanoTime() - startTime); + long costTime = System.nanoTime() - startTime; + QUERY_METRICS.recordDataExchangeCost(SOURCE_HANDLE_GET_TSBLOCK_LOCAL, costTime); + QUERY_STATISTICS.addCost(LOCAL_SOURCE_HANDLE_GET_TSBLOCK, costTime); } } @@ -136,8 +142,9 @@ } catch (Exception e) { throw new IoTDBException(e, TSStatusCode.TSBLOCK_SERIALIZE_ERROR.getStatusCode()); } finally { - QUERY_METRICS.recordDataExchangeCost( - SOURCE_HANDLE_DESERIALIZE_TSBLOCK_LOCAL, System.nanoTime() - startTime); + long costTime = System.nanoTime() - startTime; + QUERY_METRICS.recordDataExchangeCost(SOURCE_HANDLE_DESERIALIZE_TSBLOCK_LOCAL, costTime); + QUERY_STATISTICS.addCost(LOCAL_SOURCE_HANDLE_SER_TSBLOCK, costTime); } } else { return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java index 0188249..29c142d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.metadata.idtable.IDTable; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.common.SessionInfo; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.tsfile.read.filter.basic.Filter; @@ -45,6 +46,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.ADD_REFERENCE; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.QUERY_RESOURCE_LIST; + public class FragmentInstanceContext extends QueryContext { private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceContext.class); @@ -85,6 +89,8 @@ // private final AtomicLong endFullGcCount = new AtomicLong(-1); // private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1); + private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance(); + public static FragmentInstanceContext createFragmentInstanceContext( FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) { FragmentInstanceContext instanceContext = @@ -281,6 +287,7 @@ selectedDeviceIdSet.add(translatedPath.getDevice()); } + long startTime = System.nanoTime(); this.sharedQueryDataSource = dataRegion.query( pathList, @@ -289,13 +296,16 @@ selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null, this, timeFilter != null ? timeFilter.copy() : null); + addOperationTime(QUERY_RESOURCE_LIST, System.nanoTime() - startTime); // used files should be added before mergeLock is unlocked, or they may be deleted by // running merge if (sharedQueryDataSource != null) { closedFilePaths = new HashSet<>(); unClosedFilePaths = new HashSet<>(); + startTime = System.nanoTime(); addUsedFilesForQuery(sharedQueryDataSource); + addOperationTime(ADD_REFERENCE, System.nanoTime() - startTime); } } finally { dataRegion.readUnlock(); @@ -385,4 +395,8 @@ public boolean mayHaveTmpFile() { return mayHaveTmpFile; } + + public void addOperationTime(String key, long costTimeInNanos) { + QUERY_STATISTICS.addCost(key, costTimeInNanos); + } }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java index d563f9c..ae0bb57 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.mpp.plan.planner.LocalExecutionPlanner; import org.apache.iotdb.db.mpp.plan.planner.PipelineDriverFactory; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.db.utils.SetThreadName; import io.airlift.stats.CounterStat; @@ -52,6 +53,8 @@ import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceExecution.createFragmentInstanceExecution; import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.LOCAL_EXECUTION_PLANNER; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CREATE_FI_CONTEXT; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CREATE_FI_EXEC; public class FragmentInstanceManager { @@ -76,6 +79,7 @@ private final ExecutorService intoOperationExecutor; private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance(); + private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance(); public static FragmentInstanceManager getInstance() { return FragmentInstanceManager.InstanceHolder.INSTANCE; @@ -118,6 +122,7 @@ FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + long start = System.nanoTime(); FragmentInstanceContext context = instanceContext.computeIfAbsent( instanceId, @@ -128,6 +133,7 @@ instance.getSessionInfo(), dataRegion, instance.getTimeFilter())); + QUERY_STATISTICS.addCost(CREATE_FI_CONTEXT, System.nanoTime() - start); try { List<PipelineDriverFactory> driverFactories = @@ -141,6 +147,7 @@ // get the sink of last driver ISink sink = drivers.get(drivers.size() - 1).getSink(); + start = System.nanoTime(); return createFragmentInstanceExecution( scheduler, instanceId, @@ -154,6 +161,8 @@ logger.warn("error when create FragmentInstanceExecution.", t); stateMachine.failed(t); return null; + } finally { + QUERY_STATISTICS.addCost(CREATE_FI_EXEC, System.nanoTime() - start); } }); @@ -171,7 +180,9 @@ return createFailedInstanceInfo(instanceId); } } finally { - QUERY_METRICS.recordExecutionCost(LOCAL_EXECUTION_PLANNER, System.nanoTime() - startTime); + long endTime = System.nanoTime() - startTime; + QUERY_METRICS.recordExecutionCost(LOCAL_EXECUTION_PLANNER, endTime); + QUERY_STATISTICS.addCost(QueryStatistics.LOCAL_EXECUTION_PLANNER, endTime); } }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java index c5c0db7..7a55772 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.mpp.execution.driver.DriverContext; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import io.airlift.units.Duration; @@ -45,6 +46,8 @@ private long totalExecutionTimeInNanos = 0L; private long nextCalledCount = 0L; + private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance(); + public OperatorContext( int operatorId, PlanNodeId planNodeId, String operatorType, DriverContext driverContext) { this.operatorId = operatorId; @@ -65,6 +68,10 @@ this.driverContext = new DriverContext(fragmentInstanceContext, 0); } + public void addOperatorTime(String key, long costTimeInNanos) { + QUERY_STATISTICS.addCost(key, costTimeInNanos); + } + public int getOperatorId() { return operatorId; }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java index 80ac16a..ec10b22 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java
@@ -57,7 +57,7 @@ public boolean hasNext() throws Exception { int currentIndex = downStreamChannelIndex.getCurrentIndex(); boolean currentChannelClosed = sinkHandle.isChannelClosed(currentIndex); - if (!currentChannelClosed && children.get(currentIndex).hasNext()) { + if (!currentChannelClosed && children.get(currentIndex).hasNextWithTimer()) { return true; } else if (currentChannelClosed) { // we close the child directly. The child could be an ExchangeOperator which is the downstream @@ -96,7 +96,7 @@ needToReturnNull = false; return null; } - return children.get(downStreamChannelIndex.getCurrentIndex()).next(); + return children.get(downStreamChannelIndex.getCurrentIndex()).nextWithTimer(); } @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java index 67b42a4..e200869 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -22,8 +22,10 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator; import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; +import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; @@ -41,6 +43,17 @@ import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult; +import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA; +import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.AGG_SCAN_OPERATOR; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.BUILD_AGG_RES; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_CHUNK; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_CHUNK_STAT; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_FILE; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_FILE_STAT; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_PAGE; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_PAGE_STAT; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_NEXT_AGG_RES; public abstract class AbstractSeriesAggregationScanOperator extends AbstractDataSourceOperator { @@ -122,28 +135,33 @@ // start stopwatch long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); long start = System.nanoTime(); + try { + while (System.nanoTime() - start < maxRuntime + && timeRangeIterator.hasNextTimeRange() + && !resultTsBlockBuilder.isFull()) { + // move to next time window + curTimeRange = timeRangeIterator.nextTimeRange(); - while (System.nanoTime() - start < maxRuntime - && timeRangeIterator.hasNextTimeRange() - && !resultTsBlockBuilder.isFull()) { - // move to next time window - curTimeRange = timeRangeIterator.nextTimeRange(); + // clear previous aggregation result + for (Aggregator aggregator : aggregators) { + aggregator.reset(); + } - // clear previous aggregation result - for (Aggregator aggregator : aggregators) { - aggregator.reset(); + long startTime = System.nanoTime(); + // calculate aggregation result on current time window + calculateNextAggregationResult(); + operatorContext.addOperatorTime(CAL_NEXT_AGG_RES, System.nanoTime() - startTime); } - // calculate aggregation result on current time window - calculateNextAggregationResult(); - } - - if (resultTsBlockBuilder.getPositionCount() > 0) { - TsBlock resultTsBlock = resultTsBlockBuilder.build(); - resultTsBlockBuilder.reset(); - return resultTsBlock; - } else { - return null; + if (resultTsBlockBuilder.getPositionCount() > 0) { + TsBlock resultTsBlock = resultTsBlockBuilder.build(); + resultTsBlockBuilder.reset(); + return resultTsBlock; + } else { + return null; + } + } finally { + operatorContext.addOperatorTime(AGG_SCAN_OPERATOR, System.nanoTime() - start); } } @@ -184,8 +202,10 @@ } protected void updateResultTsBlock() { + long startTime = System.nanoTime(); appendAggregationResult( resultTsBlockBuilder, aggregators, timeRangeIterator.currentOutputTime()); + operatorContext.addOperatorTime(BUILD_AGG_RES, System.nanoTime() - startTime); } protected boolean calcFromCachedData() { @@ -193,139 +213,205 @@ } private boolean calcFromRawData(TsBlock tsBlock) { - Pair<Boolean, TsBlock> calcResult = - calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange, ascending); - inputTsBlock = calcResult.getRight(); - return calcResult.getLeft(); + long startTime = System.nanoTime(); + try { + Pair<Boolean, TsBlock> calcResult = + calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange, ascending); + inputTsBlock = calcResult.getRight(); + return calcResult.getLeft(); + } finally { + operatorContext.addOperatorTime( + QueryStatistics.CAL_AGG_FROM_RAW_DATA, System.nanoTime() - startTime); + QueryMetricsManager.getInstance() + .recordExecutionCost(AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime); + } } protected void calcFromStatistics(Statistics[] statistics) { - for (Aggregator aggregator : aggregators) { - if (aggregator.hasFinalResult()) { - continue; + long startTime = System.nanoTime(); + try { + for (Aggregator aggregator : aggregators) { + if (aggregator.hasFinalResult()) { + continue; + } + aggregator.processStatistics(statistics); } - aggregator.processStatistics(statistics); + } finally { + QueryMetricsManager.getInstance() + .recordExecutionCost(AGGREGATION_FROM_STATISTICS, System.nanoTime() - startTime); + operatorContext.addOperatorTime( + QueryStatistics.CAL_AGG_FROM_STAT, System.nanoTime() - startTime); } } protected boolean readAndCalcFromFile() throws IOException { - while (seriesScanUtil.hasNextFile()) { - if (canUseCurrentFileStatistics()) { - Statistics fileTimeStatistics = seriesScanUtil.currentFileTimeStatistics(); - if (fileTimeStatistics.getStartTime() > curTimeRange.getMax()) { - if (ascending) { - return true; - } else { - seriesScanUtil.skipCurrentFile(); - continue; + long startTime = System.nanoTime(); + long chunkCostTime = 0L; + try { + while (seriesScanUtil.hasNextFile()) { + long start = System.nanoTime(); + try { + if (canUseCurrentFileStatistics()) { + Statistics fileTimeStatistics = seriesScanUtil.currentFileTimeStatistics(); + if (fileTimeStatistics.getStartTime() > curTimeRange.getMax()) { + if (ascending) { + return true; + } else { + seriesScanUtil.skipCurrentFile(); + continue; + } + } + // calc from fileMetaData + if (curTimeRange.contains( + fileTimeStatistics.getStartTime(), fileTimeStatistics.getEndTime())) { + Statistics[] statisticsList = new Statistics[subSensorSize]; + for (int i = 0; i < subSensorSize; i++) { + statisticsList[i] = seriesScanUtil.currentFileStatistics(i); + } + calcFromStatistics(statisticsList); + seriesScanUtil.skipCurrentFile(); + if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) { + return true; + } else { + continue; + } + } } + } finally { + operatorContext.addOperatorTime(CAL_AGG_FROM_FILE_STAT, System.nanoTime() - start); } - // calc from fileMetaData - if (curTimeRange.contains( - fileTimeStatistics.getStartTime(), fileTimeStatistics.getEndTime())) { - Statistics[] statisticsList = new Statistics[subSensorSize]; - for (int i = 0; i < subSensorSize; i++) { - statisticsList[i] = seriesScanUtil.currentFileStatistics(i); - } - calcFromStatistics(statisticsList); - seriesScanUtil.skipCurrentFile(); - if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) { - return true; - } else { - continue; - } - } - } - // read chunk - if (readAndCalcFromChunk()) { - return true; + // read chunk + start = System.nanoTime(); + try { + if (readAndCalcFromChunk()) { + return true; + } + } finally { + chunkCostTime += System.nanoTime() - start; + } } + return false; + } finally { + operatorContext.addOperatorTime( + CAL_AGG_FROM_FILE, System.nanoTime() - startTime - chunkCostTime); } - - return false; } protected boolean readAndCalcFromChunk() throws IOException { - while (seriesScanUtil.hasNextChunk()) { - if (canUseCurrentChunkStatistics()) { - Statistics chunkTimeStatistics = seriesScanUtil.currentChunkTimeStatistics(); - if (chunkTimeStatistics.getStartTime() > curTimeRange.getMax()) { - if (ascending) { - return true; - } else { - seriesScanUtil.skipCurrentChunk(); - continue; + long startTime = System.nanoTime(); + long pageCostTime = 0L; + try { + while (seriesScanUtil.hasNextChunk()) { + long start = System.nanoTime(); + try { + if (canUseCurrentChunkStatistics()) { + Statistics chunkTimeStatistics = seriesScanUtil.currentChunkTimeStatistics(); + if (chunkTimeStatistics.getStartTime() > curTimeRange.getMax()) { + if (ascending) { + return true; + } else { + seriesScanUtil.skipCurrentChunk(); + continue; + } + } + // calc from chunkMetaData + if (curTimeRange.contains( + chunkTimeStatistics.getStartTime(), chunkTimeStatistics.getEndTime())) { + // calc from chunkMetaData + Statistics[] statisticsList = new Statistics[subSensorSize]; + for (int i = 0; i < subSensorSize; i++) { + statisticsList[i] = seriesScanUtil.currentChunkStatistics(i); + } + calcFromStatistics(statisticsList); + seriesScanUtil.skipCurrentChunk(); + if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) { + return true; + } else { + continue; + } + } } + } finally { + operatorContext.addOperatorTime(CAL_AGG_FROM_CHUNK_STAT, System.nanoTime() - start); } - // calc from chunkMetaData - if (curTimeRange.contains( - chunkTimeStatistics.getStartTime(), chunkTimeStatistics.getEndTime())) { - // calc from chunkMetaData - Statistics[] statisticsList = new Statistics[subSensorSize]; - for (int i = 0; i < subSensorSize; i++) { - statisticsList[i] = seriesScanUtil.currentChunkStatistics(i); - } - calcFromStatistics(statisticsList); - seriesScanUtil.skipCurrentChunk(); - if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) { - return true; - } else { - continue; - } - } - } - // read page - if (readAndCalcFromPage()) { - return true; + // read page + start = System.nanoTime(); + try { + if (readAndCalcFromPage()) { + return true; + } + } finally { + pageCostTime += System.nanoTime() - start; + } } + return false; + } finally { + operatorContext.addOperatorTime( + CAL_AGG_FROM_CHUNK, System.nanoTime() - startTime - pageCostTime); } - return false; } protected boolean readAndCalcFromPage() throws IOException { - while (seriesScanUtil.hasNextPage()) { - if (canUseCurrentPageStatistics()) { - Statistics pageTimeStatistics = seriesScanUtil.currentPageTimeStatistics(); - // There is no more eligible points in current time range - if (pageTimeStatistics.getStartTime() > curTimeRange.getMax()) { - if (ascending) { - return true; - } else { - seriesScanUtil.skipCurrentPage(); - continue; + long startTime = System.nanoTime(); + long rawDataCostTime = 0L; + try { + while (seriesScanUtil.hasNextPage()) { + long start = System.nanoTime(); + try { + if (canUseCurrentPageStatistics()) { + Statistics pageTimeStatistics = seriesScanUtil.currentPageTimeStatistics(); + // There is no more eligible points in current time range + if (pageTimeStatistics.getStartTime() > curTimeRange.getMax()) { + if (ascending) { + return true; + } else { + seriesScanUtil.skipCurrentPage(); + continue; + } + } + // can use pageHeader + if (curTimeRange.contains( + pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) { + Statistics[] statisticsList = new Statistics[subSensorSize]; + for (int i = 0; i < subSensorSize; i++) { + statisticsList[i] = seriesScanUtil.currentPageStatistics(i); + } + calcFromStatistics(statisticsList); + seriesScanUtil.skipCurrentPage(); + if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) { + return true; + } else { + continue; + } + } } + } finally { + operatorContext.addOperatorTime(CAL_AGG_FROM_PAGE_STAT, System.nanoTime() - start); } - // can use pageHeader - if (curTimeRange.contains( - pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) { - Statistics[] statisticsList = new Statistics[subSensorSize]; - for (int i = 0; i < subSensorSize; i++) { - statisticsList[i] = seriesScanUtil.currentPageStatistics(i); - } - calcFromStatistics(statisticsList); - seriesScanUtil.skipCurrentPage(); - if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) { + + // calc from page data + TsBlock tsBlock = seriesScanUtil.nextPage(); + if (tsBlock == null || tsBlock.isEmpty()) { + continue; + } + + // calc from raw data + start = System.nanoTime(); + try { + if (calcFromRawData(tsBlock)) { return true; - } else { - continue; } + } finally { + rawDataCostTime += System.nanoTime() - start; } } - - // calc from page data - TsBlock tsBlock = seriesScanUtil.nextPage(); - if (tsBlock == null || tsBlock.isEmpty()) { - continue; - } - - // calc from raw data - if (calcFromRawData(tsBlock)) { - return true; - } + return false; + } finally { + operatorContext.addOperatorTime( + CAL_AGG_FROM_PAGE, System.nanoTime() - startTime - rawDataCostTime); } - return false; } protected boolean canUseCurrentFileStatistics() throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java index ebfe5f8..1e1cdd0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions; import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.reader.universal.AlignedDescPriorityMergeReader; import org.apache.iotdb.db.query.reader.universal.AlignedPriorityMergeReader; @@ -130,68 +131,87 @@ @Override protected void filterFirstTimeSeriesMetadata() throws IOException { - if (firstTimeSeriesMetadata != null - && !isFileOverlapped() - && !firstTimeSeriesMetadata.isModified()) { - Filter queryFilter = scanOptions.getQueryFilter(); - if (queryFilter != null) { - // TODO accept valueStatisticsList to filter - if (!queryFilter.satisfy(firstTimeSeriesMetadata.getStatistics())) { - skipCurrentFile(); - } - } else { - // For aligned series, When we only query some measurements under an aligned device, if the - // values of these queried measurements at a timestamp are all null, the timestamp will not - // be selected. - // NOTE: if we change the query semantic in the future for aligned series, we need to remove - // this check here. - long rowCount = - ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getTimeStatistics().getCount(); - for (Statistics statistics : - ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getValueStatisticsList()) { - if (statistics == null || statistics.hasNullValue(rowCount)) { - return; + long startTime = System.nanoTime(); + try { + if (firstTimeSeriesMetadata != null + && !isFileOverlapped() + && !firstTimeSeriesMetadata.isModified()) { + Filter queryFilter = scanOptions.getQueryFilter(); + if (queryFilter != null) { + // TODO accept valueStatisticsList to filter + if (!queryFilter.satisfy(firstTimeSeriesMetadata.getStatistics())) { + skipCurrentFile(); + } + } else { + // For aligned series, When we only query some measurements under an aligned device, if + // the + // values of these queried measurements at a timestamp are all null, the timestamp will + // not + // be selected. + // NOTE: if we change the query semantic in the future for aligned series, we need to + // remove + // this check here. + long rowCount = + ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getTimeStatistics().getCount(); + for (Statistics statistics : + ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getValueStatisticsList()) { + if (statistics == null || statistics.hasNullValue(rowCount)) { + return; + } + } + // When the number of points in all value chunk groups is the same as that in the time + // chunk + // group, it means that there is no null value, and all timestamps will be selected. + if (paginationController.hasCurOffset(rowCount)) { + skipCurrentFile(); + paginationController.consumeOffset(rowCount); } } - // When the number of points in all value chunk groups is the same as that in the time chunk - // group, it means that there is no null value, and all timestamps will be selected. - if (paginationController.hasCurOffset(rowCount)) { - skipCurrentFile(); - paginationController.consumeOffset(rowCount); - } } + } finally { + QueryStatistics.getInstance() + .addCost(QueryStatistics.FILTER_FIRST_TIMESERIES_METADATA, System.nanoTime() - startTime); } } @Override protected void filterFirstChunkMetadata() throws IOException { - if (firstChunkMetadata != null && !isChunkOverlapped() && !firstChunkMetadata.isModified()) { - Filter queryFilter = scanOptions.getQueryFilter(); - if (queryFilter != null) { - // TODO accept valueStatisticsList to filter - if (!queryFilter.satisfy(firstChunkMetadata.getStatistics())) { - skipCurrentChunk(); - } - } else { - // For aligned series, When we only query some measurements under an aligned device, if the - // values of these queried measurements at a timestamp are all null, the timestamp will not - // be selected. - // NOTE: if we change the query semantic in the future for aligned series, we need to remove - // this check here. - long rowCount = firstChunkMetadata.getStatistics().getCount(); - for (Statistics statistics : - ((AlignedChunkMetadata) firstChunkMetadata).getValueStatisticsList()) { - if (statistics == null || statistics.hasNullValue(rowCount)) { - return; + long startTime = System.nanoTime(); + try { + if (firstChunkMetadata != null && !isChunkOverlapped() && !firstChunkMetadata.isModified()) { + Filter queryFilter = scanOptions.getQueryFilter(); + if (queryFilter != null) { + // TODO accept valueStatisticsList to filter + if (!queryFilter.satisfy(firstChunkMetadata.getStatistics())) { + skipCurrentChunk(); + } + } else { + // For aligned series, When we only query some measurements under an aligned device, if + // the + // values of these queried measurements at a timestamp are all null, the timestamp will + // not + // be selected. + // NOTE: if we change the query semantic in the future for aligned series, we need to + // remove + // this check here. + long rowCount = firstChunkMetadata.getStatistics().getCount(); + for (Statistics statistics : + ((AlignedChunkMetadata) firstChunkMetadata).getValueStatisticsList()) { + if (statistics == null || statistics.hasNullValue(rowCount)) { + return; + } + } + // When the number of points in all value chunks is the same as that in the time chunk, it + // means that there is no null value, and all timestamps will be selected. + if (paginationController.hasCurOffset(rowCount)) { + skipCurrentChunk(); + paginationController.consumeOffset(rowCount); } } - // When the number of points in all value chunks is the same as that in the time chunk, it - // means that there is no null value, and all timestamps will be selected. - if (paginationController.hasCurOffset(rowCount)) { - skipCurrentChunk(); - paginationController.consumeOffset(rowCount); - } } + } finally { + QueryStatistics.getInstance() + .addCost(QueryStatistics.FILTER_FIRST_CHUNK_METADATA, System.nanoTime() - startTime); } } }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java index d3988be..2191571 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions; import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.reader.chunk.MemAlignedPageReader; import org.apache.iotdb.db.query.reader.chunk.MemPageReader; @@ -66,6 +67,7 @@ import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.BUILD_TSBLOCK_FROM_PAGE_READER_ALIGNED_MEM; import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_DISK; import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_MEM; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.PAGE_READER; public class SeriesScanUtil { @@ -174,42 +176,48 @@ } public boolean hasNextFile() throws IOException { - if (!paginationController.hasCurLimit()) { - return false; - } + long startTime = System.nanoTime(); + try { + if (!paginationController.hasCurLimit()) { + return false; + } - if (!unSeqPageReaders.isEmpty() - || firstPageReader != null - || mergeReader.hasNextTimeValuePair()) { - throw new IOException( - "all cached pages should be consumed first unSeqPageReaders.isEmpty() is " - + unSeqPageReaders.isEmpty() - + " firstPageReader != null is " - + (firstPageReader != null) - + " mergeReader.hasNextTimeValuePair() = " - + mergeReader.hasNextTimeValuePair()); - } + if (!unSeqPageReaders.isEmpty() + || firstPageReader != null + || mergeReader.hasNextTimeValuePair()) { + throw new IOException( + "all cached pages should be consumed first unSeqPageReaders.isEmpty() is " + + unSeqPageReaders.isEmpty() + + " firstPageReader != null is " + + (firstPageReader != null) + + " mergeReader.hasNextTimeValuePair() = " + + mergeReader.hasNextTimeValuePair()); + } - if (firstChunkMetadata != null || !cachedChunkMetadata.isEmpty()) { - throw new IOException("all cached chunks should be consumed first"); - } + if (firstChunkMetadata != null || !cachedChunkMetadata.isEmpty()) { + throw new IOException("all cached chunks should be consumed first"); + } - if (firstTimeSeriesMetadata != null) { - return true; - } + if (firstTimeSeriesMetadata != null) { + return true; + } - while (firstTimeSeriesMetadata == null - && (orderUtils.hasNextSeqResource() - || orderUtils.hasNextUnseqResource() - || !seqTimeSeriesMetadata.isEmpty() - || !unSeqTimeSeriesMetadata.isEmpty())) { - // init first time series metadata whose startTime is minimum - tryToUnpackAllOverlappedFilesToTimeSeriesMetadata(); - // filter file based on push-down conditions - filterFirstTimeSeriesMetadata(); - } + while (firstTimeSeriesMetadata == null + && (orderUtils.hasNextSeqResource() + || orderUtils.hasNextUnseqResource() + || !seqTimeSeriesMetadata.isEmpty() + || !unSeqTimeSeriesMetadata.isEmpty())) { + // init first time series metadata whose startTime is minimum + tryToUnpackAllOverlappedFilesToTimeSeriesMetadata(); + // filter file based on push-down conditions + filterFirstTimeSeriesMetadata(); + } - return firstTimeSeriesMetadata != null; + return firstTimeSeriesMetadata != null; + } finally { + QueryStatistics.getInstance() + .addCost(QueryStatistics.HAS_NEXT_FILE, System.nanoTime() - startTime); + } } boolean isFileOverlapped() throws IOException { @@ -254,50 +262,62 @@ * overlapped chunks are consumed */ public boolean hasNextChunk() throws IOException { - if (!paginationController.hasCurLimit()) { - return false; - } + long startTime = System.nanoTime(); + try { + if (!paginationController.hasCurLimit()) { + return false; + } - if (!unSeqPageReaders.isEmpty() - || firstPageReader != null - || mergeReader.hasNextTimeValuePair()) { - throw new IOException( - "all cached pages should be consumed first unSeqPageReaders.isEmpty() is " - + unSeqPageReaders.isEmpty() - + " firstPageReader != null is " - + (firstPageReader != null) - + " mergeReader.hasNextTimeValuePair() = " - + mergeReader.hasNextTimeValuePair()); - } + if (!unSeqPageReaders.isEmpty() + || firstPageReader != null + || mergeReader.hasNextTimeValuePair()) { + throw new IOException( + "all cached pages should be consumed first unSeqPageReaders.isEmpty() is " + + unSeqPageReaders.isEmpty() + + " firstPageReader != null is " + + (firstPageReader != null) + + " mergeReader.hasNextTimeValuePair() = " + + mergeReader.hasNextTimeValuePair()); + } - if (firstChunkMetadata != null) { - return true; - // hasNextFile() has not been invoked - } else if (firstTimeSeriesMetadata == null && cachedChunkMetadata.isEmpty()) { - return false; - } + if (firstChunkMetadata != null) { + return true; + // hasNextFile() has not been invoked + } else if (firstTimeSeriesMetadata == null && cachedChunkMetadata.isEmpty()) { + return false; + } - while (firstChunkMetadata == null && (!cachedChunkMetadata.isEmpty() || hasNextFile())) { - initFirstChunkMetadata(); - // filter chunk based on push-down conditions - filterFirstChunkMetadata(); + while (firstChunkMetadata == null && (!cachedChunkMetadata.isEmpty() || hasNextFile())) { + initFirstChunkMetadata(); + // filter chunk based on push-down conditions + filterFirstChunkMetadata(); + } + return firstChunkMetadata != null; + } finally { + QueryStatistics.getInstance() + .addCost(QueryStatistics.HAS_NEXT_CHUNK, System.nanoTime() - startTime); } - return firstChunkMetadata != null; } protected void filterFirstChunkMetadata() throws IOException { - if (firstChunkMetadata != null && !isChunkOverlapped() && !firstChunkMetadata.isModified()) { - Filter queryFilter = scanOptions.getQueryFilter(); - Statistics statistics = firstChunkMetadata.getStatistics(); - if (queryFilter == null || queryFilter.allSatisfy(statistics)) { - long rowCount = statistics.getCount(); - if (paginationController.hasCurOffset(rowCount)) { + long startTime = System.nanoTime(); + try { + if (firstChunkMetadata != null && !isChunkOverlapped() && !firstChunkMetadata.isModified()) { + Filter queryFilter = scanOptions.getQueryFilter(); + Statistics statistics = firstChunkMetadata.getStatistics(); + if (queryFilter == null || queryFilter.allSatisfy(statistics)) { + long rowCount = statistics.getCount(); + if (paginationController.hasCurOffset(rowCount)) { + skipCurrentChunk(); + paginationController.consumeOffset(rowCount); + } + } else if (!queryFilter.satisfy(statistics)) { skipCurrentChunk(); - paginationController.consumeOffset(rowCount); } - } else if (!queryFilter.satisfy(statistics)) { - skipCurrentChunk(); } + } finally { + QueryStatistics.getInstance() + .addCost(QueryStatistics.FILTER_FIRST_CHUNK_METADATA, System.nanoTime() - startTime); } } @@ -401,60 +421,67 @@ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning public boolean hasNextPage() throws IOException { - if (!paginationController.hasCurLimit()) { - return false; - } + long startTime = System.nanoTime(); + try { + if (!paginationController.hasCurLimit()) { + return false; + } - /* - * has overlapped data before - */ - if (hasCachedNextOverlappedPage) { - return true; - } else if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) { - if (hasNextOverlappedPage()) { - cachedTsBlock = nextOverlappedPage(); - if (cachedTsBlock != null && !cachedTsBlock.isEmpty()) { - hasCachedNextOverlappedPage = true; - return true; + /* + * has overlapped data before + */ + if (hasCachedNextOverlappedPage) { + return true; + } else if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) { + if (hasNextOverlappedPage()) { + cachedTsBlock = nextOverlappedPage(); + if (cachedTsBlock != null && !cachedTsBlock.isEmpty()) { + hasCachedNextOverlappedPage = true; + return true; + } } } - } - if (firstPageReader != null) { - return true; - } + if (firstPageReader != null) { + return true; + } - /* - * construct first page reader - */ - if (firstChunkMetadata != null) { /* - * try to unpack all overlapped ChunkMetadata to cachedPageReaders + * construct first page reader */ - unpackAllOverlappedChunkMetadataToPageReaders( - orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true); - } else { - /* - * first chunk metadata is already unpacked, consume cached pages - */ - initFirstPageReader(); - } - - if (isExistOverlappedPage()) { - return true; - } - - // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page - // readers - while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) { - - initFirstPageReader(); + if (firstChunkMetadata != null) { + /* + * try to unpack all overlapped ChunkMetadata to cachedPageReaders + */ + unpackAllOverlappedChunkMetadataToPageReaders( + orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true); + } else { + /* + * first chunk metadata is already unpacked, consume cached pages + */ + initFirstPageReader(); + } if (isExistOverlappedPage()) { return true; } + + // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page + // readers + while (firstPageReader == null + && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) { + + initFirstPageReader(); + + if (isExistOverlappedPage()) { + return true; + } + } + return firstPageReader != null; + } finally { + QueryStatistics.getInstance() + .addCost(QueryStatistics.HAS_NEXT_PAGE, System.nanoTime() - startTime); } - return firstPageReader != null; } private boolean isExistOverlappedPage() throws IOException { @@ -765,66 +792,73 @@ */ timeValuePair = mergeReader.nextTimeValuePair(); - Object valueForFilter = timeValuePair.getValue().getValue(); + long st = System.nanoTime(); + try { + Object valueForFilter = timeValuePair.getValue().getValue(); - // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will - // only accept AlignedPath with only one sub sensor - if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) { - for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) { - if (tsPrimitiveType != null) { - valueForFilter = tsPrimitiveType.getValue(); - break; + // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it + // will + // only accept AlignedPath with only one sub sensor + if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) { + for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) { + if (tsPrimitiveType != null) { + valueForFilter = tsPrimitiveType.getValue(); + break; + } } } - } - Filter queryFilter = scanOptions.getQueryFilter(); - if (queryFilter != null - && !queryFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) { - continue; - } - if (paginationController.hasCurOffset()) { - paginationController.consumeOffset(); - continue; - } - if (paginationController.hasCurLimit()) { - timeBuilder.writeLong(timeValuePair.getTimestamp()); - switch (dataType) { - case BOOLEAN: - builder.getColumnBuilder(0).writeBoolean(timeValuePair.getValue().getBoolean()); - break; - case INT32: - builder.getColumnBuilder(0).writeInt(timeValuePair.getValue().getInt()); - break; - case INT64: - builder.getColumnBuilder(0).writeLong(timeValuePair.getValue().getLong()); - break; - case FLOAT: - builder.getColumnBuilder(0).writeFloat(timeValuePair.getValue().getFloat()); - break; - case DOUBLE: - builder.getColumnBuilder(0).writeDouble(timeValuePair.getValue().getDouble()); - break; - case TEXT: - builder.getColumnBuilder(0).writeBinary(timeValuePair.getValue().getBinary()); - break; - case VECTOR: - TsPrimitiveType[] values = timeValuePair.getValue().getVector(); - for (int i = 0; i < values.length; i++) { - if (values[i] == null) { - builder.getColumnBuilder(i).appendNull(); - } else { - builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]); - } - } - break; - default: - throw new UnSupportedDataTypeException(String.valueOf(dataType)); + Filter queryFilter = scanOptions.getQueryFilter(); + if (queryFilter != null + && !queryFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) { + continue; } - builder.declarePosition(); - paginationController.consumeLimit(); - } else { - break; + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + continue; + } + if (paginationController.hasCurLimit()) { + timeBuilder.writeLong(timeValuePair.getTimestamp()); + switch (dataType) { + case BOOLEAN: + builder.getColumnBuilder(0).writeBoolean(timeValuePair.getValue().getBoolean()); + break; + case INT32: + builder.getColumnBuilder(0).writeInt(timeValuePair.getValue().getInt()); + break; + case INT64: + builder.getColumnBuilder(0).writeLong(timeValuePair.getValue().getLong()); + break; + case FLOAT: + builder.getColumnBuilder(0).writeFloat(timeValuePair.getValue().getFloat()); + break; + case DOUBLE: + builder.getColumnBuilder(0).writeDouble(timeValuePair.getValue().getDouble()); + break; + case TEXT: + builder.getColumnBuilder(0).writeBinary(timeValuePair.getValue().getBinary()); + break; + case VECTOR: + TsPrimitiveType[] values = timeValuePair.getValue().getVector(); + for (int i = 0; i < values.length; i++) { + if (values[i] == null) { + builder.getColumnBuilder(i).appendNull(); + } else { + builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]); + } + } + break; + default: + throw new UnSupportedDataTypeException(String.valueOf(dataType)); + } + builder.declarePosition(); + paginationController.consumeLimit(); + } else { + break; + } + } finally { + QueryStatistics.getInstance() + .addCost(QueryStatistics.MERGE_READER_BUILD_RES, System.nanoTime() - st); } } hasCachedNextOverlappedPage = !builder.isEmpty(); @@ -843,11 +877,13 @@ } } } finally { + long costTime = System.nanoTime() - startTime; QUERY_METRICS.recordSeriesScanCost( isAligned ? BUILD_TSBLOCK_FROM_MERGE_READER_ALIGNED : BUILD_TSBLOCK_FROM_MERGE_READER_NONALIGNED, - System.nanoTime() - startTime); + costTime); + QueryStatistics.getInstance().addCost(QueryStatistics.HAS_NEXT_OVERLAPPED_PAGE, costTime); } } @@ -992,18 +1028,23 @@ * find end time of the first TimeSeriesMetadata */ long endTime = -1L; - if (!seqTimeSeriesMetadata.isEmpty() && unSeqTimeSeriesMetadata.isEmpty()) { - // only has seq - endTime = orderUtils.getOverlapCheckTime(seqTimeSeriesMetadata.get(0).getStatistics()); - } else if (seqTimeSeriesMetadata.isEmpty() && !unSeqTimeSeriesMetadata.isEmpty()) { - // only has unseq - endTime = orderUtils.getOverlapCheckTime(unSeqTimeSeriesMetadata.peek().getStatistics()); - } else if (!seqTimeSeriesMetadata.isEmpty()) { - // has seq and unseq - endTime = - orderUtils.getCurrentEndPoint( - seqTimeSeriesMetadata.get(0).getStatistics(), - unSeqTimeSeriesMetadata.peek().getStatistics()); + long t1 = System.nanoTime(); + try { + if (!seqTimeSeriesMetadata.isEmpty() && unSeqTimeSeriesMetadata.isEmpty()) { + // only has seq + endTime = orderUtils.getOverlapCheckTime(seqTimeSeriesMetadata.get(0).getStatistics()); + } else if (seqTimeSeriesMetadata.isEmpty() && !unSeqTimeSeriesMetadata.isEmpty()) { + // only has unseq + endTime = orderUtils.getOverlapCheckTime(unSeqTimeSeriesMetadata.peek().getStatistics()); + } else if (!seqTimeSeriesMetadata.isEmpty()) { + // has seq and unseq + endTime = + orderUtils.getCurrentEndPoint( + seqTimeSeriesMetadata.get(0).getStatistics(), + unSeqTimeSeriesMetadata.peek().getStatistics()); + } + } finally { + QueryStatistics.getInstance().addCost(QueryStatistics.FIND_END_TIME, System.nanoTime() - t1); } /* @@ -1016,39 +1057,51 @@ /* * update the first TimeSeriesMetadata */ - if (!seqTimeSeriesMetadata.isEmpty() && unSeqTimeSeriesMetadata.isEmpty()) { - // only has seq - firstTimeSeriesMetadata = seqTimeSeriesMetadata.remove(0); - } else if (seqTimeSeriesMetadata.isEmpty() && !unSeqTimeSeriesMetadata.isEmpty()) { - // only has unseq - firstTimeSeriesMetadata = unSeqTimeSeriesMetadata.poll(); - } else if (!seqTimeSeriesMetadata.isEmpty()) { - // has seq and unseq - if (orderUtils.isTakeSeqAsFirst( - seqTimeSeriesMetadata.get(0).getStatistics(), - unSeqTimeSeriesMetadata.peek().getStatistics())) { + t1 = System.nanoTime(); + try { + if (!seqTimeSeriesMetadata.isEmpty() && unSeqTimeSeriesMetadata.isEmpty()) { + // only has seq firstTimeSeriesMetadata = seqTimeSeriesMetadata.remove(0); - } else { + } else if (seqTimeSeriesMetadata.isEmpty() && !unSeqTimeSeriesMetadata.isEmpty()) { + // only has unseq firstTimeSeriesMetadata = unSeqTimeSeriesMetadata.poll(); + } else if (!seqTimeSeriesMetadata.isEmpty()) { + // has seq and unseq + if (orderUtils.isTakeSeqAsFirst( + seqTimeSeriesMetadata.get(0).getStatistics(), + unSeqTimeSeriesMetadata.peek().getStatistics())) { + firstTimeSeriesMetadata = seqTimeSeriesMetadata.remove(0); + } else { + firstTimeSeriesMetadata = unSeqTimeSeriesMetadata.poll(); + } } + } finally { + QueryStatistics.getInstance() + .addCost(QueryStatistics.PICK_FIRST_TIMESERIES_METADATA, System.nanoTime() - t1); } } protected void filterFirstTimeSeriesMetadata() throws IOException { - if (firstTimeSeriesMetadata != null - && !isFileOverlapped() - && !firstTimeSeriesMetadata.isModified()) { - Filter queryFilter = scanOptions.getQueryFilter(); - Statistics statistics = firstTimeSeriesMetadata.getStatistics(); - if (queryFilter == null || queryFilter.allSatisfy(statistics)) { - long rowCount = statistics.getCount(); - if (paginationController.hasCurOffset(rowCount)) { + long startTime = System.nanoTime(); + try { + if (firstTimeSeriesMetadata != null + && !isFileOverlapped() + && !firstTimeSeriesMetadata.isModified()) { + Filter queryFilter = scanOptions.getQueryFilter(); + Statistics statistics = firstTimeSeriesMetadata.getStatistics(); + if (queryFilter == null || queryFilter.allSatisfy(statistics)) { + long rowCount = statistics.getCount(); + if (paginationController.hasCurOffset(rowCount)) { + skipCurrentFile(); + paginationController.consumeOffset(rowCount); + } + } else if (!queryFilter.satisfy(statistics)) { skipCurrentFile(); - paginationController.consumeOffset(rowCount); } - } else if (!queryFilter.satisfy(statistics)) { - skipCurrentFile(); } + } finally { + QueryStatistics.getInstance() + .addCost(QueryStatistics.FILTER_FIRST_TIMESERIES_METADATA, System.nanoTime() - startTime); } } @@ -1116,7 +1169,7 @@ return scanOptions.getGlobalTimeFilter(); } - protected static class VersionPageReader { + protected class VersionPageReader { private final PriorityMergeReader.MergeReaderPriority version; private final IPageReader data; @@ -1160,6 +1213,7 @@ } return tsBlock; } finally { + long costTime = System.nanoTime() - startTime; QUERY_METRICS.recordSeriesScanCost( isAligned ? (isMem @@ -1168,7 +1222,8 @@ : (isMem ? BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_MEM : BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_DISK), - System.nanoTime() - startTime); + costTime); + QueryStatistics.getInstance().addCost(PAGE_READER, costTime); } }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java index 4b4a89ff4..2044964 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -134,6 +134,7 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.ShowVersionStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkTypeStatement; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.db.query.control.SessionManager; import org.apache.iotdb.db.utils.FileLoaderUtils; import org.apache.iotdb.db.utils.TimePartitionUtils; @@ -247,8 +248,10 @@ } else { schemaTree = schemaFetcher.fetchSchema(patternTree, context); } - QueryMetricsManager.getInstance() - .recordPlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime); + long endTime = System.nanoTime() - startTime; + QueryMetricsManager.getInstance().recordPlanCost(SCHEMA_FETCHER, endTime); + QueryStatistics.getInstance().addCost(QueryStatistics.SCHEMA_FETCHER, endTime); + logger.debug("[EndFetchSchema]"); // If there is no leaf node in the schema tree, the query should be completed immediately @@ -1549,8 +1552,9 @@ return partitionFetcher.getDataPartition(sgNameToQueryParamsMap); } } finally { - QueryMetricsManager.getInstance() - .recordPlanCost(PARTITION_FETCHER, System.nanoTime() - startTime); + long costTime = System.nanoTime() - startTime; + QueryMetricsManager.getInstance().recordPlanCost(PARTITION_FETCHER, costTime); + QueryStatistics.getInstance().addCost(QueryStatistics.PARTITION_FETCHER, costTime); } }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java index 99ebd6f..61e1166 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher; import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher; import org.apache.iotdb.db.mpp.plan.statement.Statement; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import static org.apache.iotdb.db.mpp.common.QueryId.mockQueryId; import static org.apache.iotdb.db.mpp.metric.QueryPlanCostMetricSet.ANALYZER; @@ -48,7 +49,9 @@ new AnalyzeVisitor(partitionFetcher, schemaFetcher).process(statement, context); if (statement.isQuery()) { - QueryMetricsManager.getInstance().recordPlanCost(ANALYZER, System.nanoTime() - startTime); + long costTime = System.nanoTime() - startTime; + QueryMetricsManager.getInstance().recordPlanCost(ANALYZER, costTime); + QueryStatistics.getInstance().addCost(QueryStatistics.ANALYZER, costTime); } return analysis; }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index 715cf29..eafa3a6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -63,6 +63,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import org.apache.iotdb.rpc.RpcUtils; @@ -140,6 +141,7 @@ private long totalExecutionTime; private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance(); + private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance(); private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS = PerformanceOverviewMetrics.getInstance(); @@ -327,7 +329,10 @@ syncInternalServiceClientManager, asyncInternalServiceClientManager); this.scheduler.start(); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleCost(System.nanoTime() - startTime); + + long endTime = System.nanoTime() - startTime; + PERFORMANCE_OVERVIEW_METRICS.recordScheduleCost(endTime); + QueryStatistics.getInstance().addCost(QueryStatistics.DISPATCHER, endTime); } // Use LogicalPlanner to do the logical query plan and logical optimization @@ -349,7 +354,9 @@ this.distributedPlan = planner.planFragments(); if (rawStatement.isQuery()) { - QUERY_METRICS.recordPlanCost(DISTRIBUTION_PLANNER, System.nanoTime() - startTime); + long endTime = System.nanoTime() - startTime; + QUERY_METRICS.recordPlanCost(DISTRIBUTION_PLANNER, endTime); + QueryStatistics.getInstance().addCost(QueryStatistics.DISTRIBUTION_PLANNER, endTime); } if (isQuery() && logger.isDebugEnabled()) { logger.debug( @@ -482,7 +489,9 @@ ListenableFuture<?> blocked = resultHandle.isBlocked(); blocked.get(); } finally { - QUERY_METRICS.recordExecutionCost(WAIT_FOR_RESULT, System.nanoTime() - startTime); + long costTime = System.nanoTime() - startTime; + QUERY_METRICS.recordExecutionCost(WAIT_FOR_RESULT, costTime); + QUERY_STATISTICS.addCost(QueryStatistics.WAIT_FOR_RESULT, costTime); } if (!resultHandle.isFinished()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java index 30a982b..b1189dc 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.rpc.TSStatusCode; @@ -38,6 +39,9 @@ import java.util.ArrayList; import java.util.List; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CHECK_MEMORY; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.NODE_TO_OPERATOR; + /** * Used to plan a fragment instance. Currently, we simply change it from PlanNode to executable * Operator tree, but in the future, we may split one fragment instance into multiple pipeline to @@ -45,6 +49,8 @@ */ public class LocalExecutionPlanner { + private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance(); + private static final Logger LOGGER = LoggerFactory.getLogger(LocalExecutionPlanner.class); /** allocated memory for operator execution */ @@ -62,12 +68,17 @@ // Generate pipelines, return the last pipeline data structure // TODO Replace operator with operatorFactory to build multiple driver for one pipeline + long startTime = System.nanoTime(); Operator root = plan.accept(new OperatorTreeGenerator(), context); + long endTime = System.nanoTime(); + QUERY_STATISTICS.addCost(NODE_TO_OPERATOR, endTime - startTime); + startTime = endTime; // check whether current free memory is enough to execute current query long estimatedMemorySize = checkMemory(root, instanceContext.getStateMachine()); - context.addPipelineDriverFactory(root, context.getDriverContext(), estimatedMemorySize); + endTime = System.nanoTime(); + QUERY_STATISTICS.addCost(CHECK_MEMORY, endTime - startTime); instanceContext.setSourcePaths(collectSourcePaths(context));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java index f55d695..f3298af 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.optimization.PlanOptimizer; import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import java.util.List; @@ -46,8 +47,9 @@ // optimize the query logical plan if (analysis.getStatement().isQuery()) { - QueryMetricsManager.getInstance() - .recordPlanCost(LOGICAL_PLANNER, System.nanoTime() - startTime); + long endTime = System.nanoTime() - startTime; + QueryMetricsManager.getInstance().recordPlanCost(LOGICAL_PLANNER, endTime); + QueryStatistics.getInstance().addCost(QueryStatistics.LOGICAL_PLANNER, endTime); for (PlanOptimizer optimizer : optimizers) { rootNode = optimizer.optimize(rootNode, analysis, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java index cb64aca..4e1ca48 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.plan.analyze.QueryType; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance; import org.apache.iotdb.mpp.rpc.thrift.TPlanNode; @@ -124,7 +125,9 @@ RpcUtils.getStatus( TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage()))); } finally { - QUERY_METRICS.recordExecutionCost(DISPATCH_READ, System.nanoTime() - startTime); + long endTime = System.nanoTime() - startTime; + QUERY_METRICS.recordExecutionCost(DISPATCH_READ, endTime); + QueryStatistics.getInstance().addCost(QueryStatistics.DISPATCH_READ, endTime); } } return immediateFuture(new FragInstanceDispatchResult(true));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java new file mode 100644 index 0000000..70492f7 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
@@ -0,0 +1,498 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.mpp.statistics; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.commons.conf.IoTDBConstant; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; + +import java.text.DecimalFormat; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +@ThreadSafe +public class QueryStatistics { + + private static final long QUERY_STATISTICS_PRINT_INTERVAL_IN_MS = 100_000; + + private static final Logger QUERY_STATISTICS_LOGGER = + LoggerFactory.getLogger(IoTDBConstant.QUERY_STATISTICS_LOGGER_NAME); + + private static final DecimalFormat format = new DecimalFormat("#,###"); + + private final AtomicBoolean tracing = new AtomicBoolean(true); + + private final Map<String, OperationStatistic> operationStatistics = new ConcurrentHashMap<>(); + + public static final String LOCAL_EXECUTION_PLANNER = "LocalExecutionPlanner"; + + public static final String CREATE_FI_CONTEXT = "CreateFIContext"; + + public static final String CREATE_FI_EXEC = "CreateFIExec"; + + public static final String NODE_TO_OPERATOR = "ToOpTree"; + + public static final String CHECK_MEMORY = "CheckMem"; + + public static final String QUERY_RESOURCE_INIT = "QueryResourceInit"; + + public static final String INIT_SOURCE_OP = "InitSourceOp"; + + public static final String QUERY_RESOURCE_LIST = "TsFileList"; + public static final String ADD_REFERENCE = "AddRef"; + + public static final String LOCAL_SOURCE_HANDLE_GET_TSBLOCK = "LocalSourceHandleGetTsBlock"; + + public static final String LOCAL_SOURCE_HANDLE_SER_TSBLOCK = "LocalSourceHandleSerializeTsBlock"; + + public static final String WAIT_FOR_RESULT = "WaitForResult"; + + public static final String AGG_SCAN_OPERATOR = "AbstractSeriesAggregationScanOperator"; + + public static final String CAL_NEXT_AGG_RES = "CalcNextAggRes"; + + public static final String CAL_AGG_FROM_RAW_DATA = "CalcAggFromRawData"; + + public static final String CAL_AGG_FROM_STAT = "CalcAggFromStat"; + + public static final String AGGREGATOR_PROCESS_TSBLOCK = "AggProcTsBlock"; + + public static final String CAL_AGG_FROM_PAGE = "CalcAggFromPage"; + + public static final String CAL_AGG_FROM_CHUNK = "CalcAggFromChunk"; + + public static final String CAL_AGG_FROM_FILE = "CalcAggFromFile"; + + public static final String CAL_AGG_FROM_FILE_STAT = "CalcAggFromFileStat"; + public static final String CAL_AGG_FROM_CHUNK_STAT = "CalcAggFromChunkStat"; + public static final String CAL_AGG_FROM_PAGE_STAT = "CalcAggFromPageStat"; + + public static final String BUILD_AGG_RES = "BuildAggRes"; + + public static final String PARSER = "Parser"; + + public static final String CREATE_QUERY_EXEC = "CreateQueryExec"; + + public static final String SERIALIZE_TSBLOCK = "SerTsBlock"; + + public static final String ANALYZER = "Analyzer"; + public static final String SCHEMA_FETCHER = "SchemaFetcher"; + public static final String PARTITION_FETCHER = "PartitionFetcher"; + public static final String LOGICAL_PLANNER = "LogicalPlanner"; + public static final String DISTRIBUTION_PLANNER = "DistributionPlanner"; + public static final String DISPATCHER = "Dispatcher"; + + public static final String DISPATCH_READ = "DispatchRead"; + + public static final String DRIVER_CLOSE = "CloseDriver"; + + public static final String DRIVER_INTERNAL_PROCESS = "DriverInternalProcess"; + + public static final String SEND_TSBLOCK = "SendTsBlock"; + + public static final String RESERVE_MEMORY = "ReserveMem"; + + public static final String NOTIFY_NEW_TSBLOCK = "NotifyNewTsBlock"; + + public static final String NOTIFY_END = "NotifyEnd"; + + public static final String FREE_MEM = "FreeMem"; + + public static final String SINK_HANDLE_END_LISTENER = "SinkHandleEndListener"; + + public static final String SINK_HANDLE_FINISH_LISTENER = "SinkHandleFinishListener"; + + public static final String CHECK_AND_INVOKE_ON_FINISHED = "CheckAndInvokeOnFinished"; + + public static final String SET_NO_MORE_TSBLOCK = "SetNoMoreTsBlock"; + + public static final String SERVER_RPC_RT = "ServerRpcRT"; + + public static final String LOAD_TIME_SERIES_METADATA = "loadTimeSeriesMetadata"; + public static final String LOAD_CHUNK_METADATA_LIST = "loadChunkMetadataList"; + public static final String LOAD_PAGE_READER_LIST = "loadPageReaderList"; + public static final String LOAD_CHUNK = "loadChunk"; + public static final String INIT_PAGE_READERS = "initAllPageReaders"; + public static final String PAGE_READER = "IPageReader"; + + public static final String HAS_NEXT_FILE = "hasNextFile"; + public static final String FILTER_FIRST_TIMESERIES_METADATA = "filterFirstTimeSeriesMetadata"; + public static final String FIND_END_TIME = "findEndTime"; + public static final String PICK_FIRST_TIMESERIES_METADATA = "pickFirstTimeSeriesMetadata"; + + public static final String HAS_NEXT_CHUNK = "hasNextChunk"; + public static final String FILTER_FIRST_CHUNK_METADATA = "filterFirstChunkMetadata"; + + public static final String HAS_NEXT_PAGE = "hasNextPage"; + public static final String HAS_NEXT_OVERLAPPED_PAGE = "hasNextOverlappedPage"; + public static final String MERGE_READER_ADD_READER = "mergeReader#addReader"; + public static final String MERGE_READER_NEXT = "mergeReader#nextTimeValuePair"; + public static final String MERGE_READER_UPDATE_HEAP = "mergeReader#updateHeap"; + public static final String MERGE_READER_FILL_NULL_VALUE = "mergeReader#fillNullValue"; + public static final String MERGE_READER_BUILD_RES = "mergeReader#buildRes"; + + private QueryStatistics() { + ScheduledExecutorService scheduledExecutor = + IoTDBThreadPoolFactory.newScheduledThreadPool(1, "Query-Statistics-Print"); + ScheduledExecutorUtil.safelyScheduleAtFixedRate( + scheduledExecutor, + this::printQueryStatistics, + 0, + QUERY_STATISTICS_PRINT_INTERVAL_IN_MS, + TimeUnit.MILLISECONDS); + } + + private void printQueryStatistics() { + if (tracing.get()) { + + StringBuilder builder = new StringBuilder(System.lineSeparator()); + builder + .append("Client Connection Thread:") + .append(System.lineSeparator()) + .append(System.lineSeparator()); + + builder + .append("ServerRpcRT ") + .append(operationStatistics.get(SERVER_RPC_RT)) + .append(System.lineSeparator()); + builder + .append("|___CreateQueryExec ") + .append(operationStatistics.get(CREATE_QUERY_EXEC)) + .append(System.lineSeparator()); + builder + .append("| |___Parser ") + .append(operationStatistics.get(PARSER)) + .append(System.lineSeparator()); + builder + .append("| |___Analyzer ") + .append(operationStatistics.get(ANALYZER)) + .append(System.lineSeparator()); + builder + .append("| | |___PartitionFetcher ") + .append(operationStatistics.get(PARTITION_FETCHER)) + .append(System.lineSeparator()); + builder + .append("| | |___SchemaFetcher ") + .append(operationStatistics.get(SCHEMA_FETCHER)) + .append(System.lineSeparator()); + builder + .append("| |___LogicalPlanner ") + .append(operationStatistics.get(LOGICAL_PLANNER)) + .append(System.lineSeparator()); + builder + .append("| |___DistributionPlanner ") + .append(operationStatistics.get(DISTRIBUTION_PLANNER)) + .append(System.lineSeparator()); + builder + .append("| |___Dispatcher ") + .append(operationStatistics.get(DISPATCHER)) + .append(System.lineSeparator()); + builder + .append("| |___DispatchRead ") + .append(operationStatistics.get(DISPATCH_READ)) + .append(System.lineSeparator()); + builder + .append("| |___LocalExecPlanner ") + .append(operationStatistics.get(LOCAL_EXECUTION_PLANNER)) + .append(System.lineSeparator()); + builder + .append("| |___FIContext ") + .append(operationStatistics.get(CREATE_FI_CONTEXT)) + .append(System.lineSeparator()); + builder + .append("| |___ToOpTree ") + .append(operationStatistics.get(NODE_TO_OPERATOR)) + .append(System.lineSeparator()); + builder + .append("| |___CheckMem ") + .append(operationStatistics.get(CHECK_MEMORY)) + .append(System.lineSeparator()); + builder + .append("| |___FIExec ") + .append(operationStatistics.get(CREATE_FI_EXEC)) + .append(System.lineSeparator()); + builder + .append("|___SerTsBlock ") + .append(operationStatistics.get(SERIALIZE_TSBLOCK)) + .append(System.lineSeparator()); + builder + .append(" |___WaitForResult ") + .append(operationStatistics.get(WAIT_FOR_RESULT)) + .append(System.lineSeparator()); + builder + .append(" |___GetTsBlock ") + .append(operationStatistics.get(LOCAL_SOURCE_HANDLE_GET_TSBLOCK)) + .append(System.lineSeparator()); + builder + .append(" |___FreeMem ") + .append(operationStatistics.get(FREE_MEM)) + .append(System.lineSeparator()); + + builder + .append("Query Execution Thread:") + .append(System.lineSeparator()) + .append(System.lineSeparator()); + + builder + .append("|___QueryResourceInit ") + .append(operationStatistics.get(QUERY_RESOURCE_INIT)) + .append(System.lineSeparator()); + builder + .append("| |___TsFileList ") + .append(operationStatistics.get(QUERY_RESOURCE_LIST)) + .append(System.lineSeparator()); + builder + .append("| |___AddRef ") + .append(operationStatistics.get(ADD_REFERENCE)) + .append(System.lineSeparator()); + builder + .append("| |___InitSourceOp ") + .append(operationStatistics.get(INIT_SOURCE_OP)) + .append(System.lineSeparator()); + builder + .append("|___DriverInternalProcess ") + .append(operationStatistics.get(DRIVER_INTERNAL_PROCESS)) + .append(System.lineSeparator()); + builder + .append("| |___AggScanOperator ") + .append(operationStatistics.get(AGG_SCAN_OPERATOR)) + .append(System.lineSeparator()); + + builder.append("| | |[FileLoaderInterface]").append(System.lineSeparator()); + builder + .append("| | |___loadTSMetadata ") + .append(operationStatistics.get(LOAD_TIME_SERIES_METADATA)) + .append(System.lineSeparator()); + builder + .append("| | |___loadChunkMetaList ") + .append(operationStatistics.get(LOAD_CHUNK_METADATA_LIST)) + .append(System.lineSeparator()); + builder + .append("| | |___loadPageReaderList ") + .append(operationStatistics.get(LOAD_PAGE_READER_LIST)) + .append(System.lineSeparator()); + builder + .append("| | | |___loadChunk ") + .append(operationStatistics.get(LOAD_CHUNK)) + .append(System.lineSeparator()); + builder + .append("| | | |___initPageReaders ") + .append(operationStatistics.get(INIT_PAGE_READERS)) + .append(System.lineSeparator()); + builder + .append("| | |___pageReader ") + .append(operationStatistics.get(PAGE_READER)) + .append(System.lineSeparator()); + + builder.append("| | |[AggregatorInterface]").append(System.lineSeparator()); + builder + .append("| | |___AggFromStat ") + .append(operationStatistics.get(CAL_AGG_FROM_STAT)) + .append(System.lineSeparator()); + builder + .append("| | |___AggFromRawData ") + .append(operationStatistics.get(CAL_AGG_FROM_RAW_DATA)) + .append(System.lineSeparator()); + builder + .append("| | | |___AggProcTsBlock ") + .append(operationStatistics.get(AGGREGATOR_PROCESS_TSBLOCK)) + .append(System.lineSeparator()); + + builder.append("| | |[OperatorMethods]").append(System.lineSeparator()); + builder + .append("| | |___CalcNextAggRes ") + .append(operationStatistics.get(CAL_NEXT_AGG_RES)) + .append(System.lineSeparator()); + builder + .append("| | |___AggFromFile ") + .append(operationStatistics.get(CAL_AGG_FROM_FILE)) + .append(System.lineSeparator()); + builder + .append("| | | |___TryAggFromFileStat ") + .append(operationStatistics.get(CAL_AGG_FROM_FILE_STAT)) + .append(System.lineSeparator()); + builder + .append("| | |___AggFromChunk ") + .append(operationStatistics.get(CAL_AGG_FROM_CHUNK)) + .append(System.lineSeparator()); + builder + .append("| | | |___TryAggFromChunkStat ") + .append(operationStatistics.get(CAL_AGG_FROM_CHUNK_STAT)) + .append(System.lineSeparator()); + builder + .append("| | |___AggFromPage ") + .append(operationStatistics.get(CAL_AGG_FROM_PAGE)) + .append(System.lineSeparator()); + builder + .append("| | | |___TryAggFromPageStat ") + .append(operationStatistics.get(CAL_AGG_FROM_PAGE_STAT)) + .append(System.lineSeparator()); + builder + .append("| | |___BuildAggRes ") + .append(operationStatistics.get(BUILD_AGG_RES)) + .append(System.lineSeparator()); + + builder.append("| | |[SeriesScanUtilCost]").append(System.lineSeparator()); + builder + .append("| | |___hasNextFile ") + .append(operationStatistics.get(HAS_NEXT_FILE)) + .append(System.lineSeparator()); + builder + .append("| | | |___findEndTime ") + .append(operationStatistics.get(FIND_END_TIME)) + .append(System.lineSeparator()); + builder + .append("| | | |___pickFirstTimeSeriesMetadata ") + .append(operationStatistics.get(PICK_FIRST_TIMESERIES_METADATA)) + .append(System.lineSeparator()); + builder + .append("| | | |___filterFirstTimeSeriesMetadata ") + .append(operationStatistics.get(FILTER_FIRST_TIMESERIES_METADATA)) + .append(System.lineSeparator()); + builder + .append("| | |___hasNextChunk ") + .append(operationStatistics.get(HAS_NEXT_CHUNK)) + .append(System.lineSeparator()); + builder + .append("| | | |___filterFirstChunkMetadata ") + .append(operationStatistics.get(FILTER_FIRST_CHUNK_METADATA)) + .append(System.lineSeparator()); + builder + .append("| | |___hasNextPage ") + .append(operationStatistics.get(HAS_NEXT_PAGE)) + .append(System.lineSeparator()); + builder + .append("| | |___hasNextOverlappedPage ") + .append(operationStatistics.get(HAS_NEXT_OVERLAPPED_PAGE)) + .append(System.lineSeparator()); + builder + .append("| | | |___mergeReader#nextTimeValuePair ") + .append(operationStatistics.get(MERGE_READER_NEXT)) + .append(System.lineSeparator()); + builder + .append("| | | |___mergeReader#updateHeap ") + .append(operationStatistics.get(MERGE_READER_UPDATE_HEAP)) + .append(System.lineSeparator()); + builder + .append("| | | |___mergeReader#fillNullValue ") + .append(operationStatistics.get(MERGE_READER_FILL_NULL_VALUE)) + .append(System.lineSeparator()); + builder + .append("| | | |___buildTsblock ") + .append(operationStatistics.get(MERGE_READER_BUILD_RES)) + .append(System.lineSeparator()); + + builder + .append("| |___SendTsBlock ") + .append(operationStatistics.get(SEND_TSBLOCK)) + .append(System.lineSeparator()); + builder + .append("| |___ReserveMem ") + .append(operationStatistics.get(RESERVE_MEMORY)) + .append(System.lineSeparator()); + builder + .append("| |___NotifyNewTsBlock ") + .append(operationStatistics.get(NOTIFY_NEW_TSBLOCK)) + .append(System.lineSeparator()); + builder + .append("|___SetNoMoreTsBlock ") + .append(operationStatistics.get(SET_NO_MORE_TSBLOCK)) + .append(System.lineSeparator()); + builder + .append(" |___NotifyEnd ") + .append(operationStatistics.get(NOTIFY_END)) + .append(System.lineSeparator()); + builder + .append(" |___EndListener ") + .append(operationStatistics.get(SINK_HANDLE_END_LISTENER)) + .append(System.lineSeparator()); + builder + .append(" |___CkAndInvOnFinished ") + .append(operationStatistics.get(CHECK_AND_INVOKE_ON_FINISHED)) + .append(System.lineSeparator()); + builder + .append(" |___FinishListener ") + .append(operationStatistics.get(SINK_HANDLE_FINISH_LISTENER)) + .append(System.lineSeparator()); + + QUERY_STATISTICS_LOGGER.info(builder.toString()); + QUERY_STATISTICS_LOGGER.info(""); + } + } + + public static QueryStatistics getInstance() { + return QueryStatisticsHolder.INSTANCE; + } + + public void addCost(String key, long costTimeInNanos) { + if (tracing.get()) { + operationStatistics + .computeIfAbsent(key, k -> new OperationStatistic()) + .addTimeCost(costTimeInNanos); + } + } + + private static class OperationStatistic { + // accumulated operation time in ns + private final AtomicLong totalTime; + private final AtomicLong totalCount; + + public OperationStatistic() { + this.totalTime = new AtomicLong(0); + this.totalCount = new AtomicLong(0); + } + + public void addTimeCost(long costTimeInNanos) { + totalTime.addAndGet(costTimeInNanos); + totalCount.incrementAndGet(); + } + + @Override + public String toString() { + long time = totalTime.get() / 1_000; + long count = totalCount.get(); + return "{" + + "totalTime=" + + format.format(time) + + "us" + + ", totalCount=" + + format.format(count) + + ", avgOpTime=" + + format.format(time / count) + + "us" + + '}'; + } + } + + private static class QueryStatisticsHolder { + + private static final QueryStatistics INSTANCE = new QueryStatistics(); + + private QueryStatisticsHolder() {} + } +}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java index b433641..efb0ca8 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java
@@ -20,6 +20,7 @@ import org.apache.iotdb.db.engine.cache.ChunkCache; import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; @@ -72,7 +73,9 @@ long t2 = System.nanoTime(); IChunkReader chunkReader = new AlignedChunkReader(timeChunk, valueChunkList, timeFilter); - QUERY_METRICS.recordSeriesScanCost(INIT_CHUNK_READER_ALIGNED_DISK, System.nanoTime() - t2); + long costTime = System.nanoTime() - t2; + QUERY_METRICS.recordSeriesScanCost(INIT_CHUNK_READER_ALIGNED_DISK, costTime); + QueryStatistics.getInstance().addCost(QueryStatistics.INIT_PAGE_READERS, costTime); return chunkReader; } finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java index 85ee83b..b038737 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java
@@ -21,6 +21,7 @@ import org.apache.iotdb.db.engine.cache.ChunkCache; import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.read.common.Chunk; @@ -65,7 +66,9 @@ long t2 = System.nanoTime(); IChunkReader chunkReader = new ChunkReader(chunk, timeFilter); - QUERY_METRICS.recordSeriesScanCost(INIT_CHUNK_READER_NONALIGNED_DISK, System.nanoTime() - t2); + long costTime = System.nanoTime() - t2; + QUERY_METRICS.recordSeriesScanCost(INIT_CHUNK_READER_NONALIGNED_DISK, costTime); + QueryStatistics.getInstance().addCost(QueryStatistics.INIT_PAGE_READERS, costTime); return chunkReader; } finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java index 9d5875a..801aaf8 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java
@@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.query.reader.universal; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; @@ -35,7 +36,13 @@ */ @Override protected void fillNullValue(TimeValuePair v, TimeValuePair c) { - fillNullValueInAligned(v, c); + long startTime = System.nanoTime(); + try { + fillNullValueInAligned(v, c); + } finally { + QueryStatistics.getInstance() + .addCost(QueryStatistics.MERGE_READER_FILL_NULL_VALUE, System.nanoTime() - startTime); + } } static void fillNullValueInAligned(TimeValuePair v, TimeValuePair c) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java index 471090b..475b0d0 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
@@ -18,12 +18,12 @@ */ package org.apache.iotdb.db.query.reader.universal; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.reader.IPointReader; import java.io.IOException; -import java.util.List; import java.util.Objects; import java.util.PriorityQueue; @@ -47,21 +47,6 @@ }); } - // only used in external sort, need to refactor later - public PriorityMergeReader(List<IPointReader> prioritySeriesReaders, int startPriority) - throws IOException { - heap = - new PriorityQueue<>( - (o1, o2) -> { - int timeCompare = - Long.compare(o1.timeValuePair.getTimestamp(), o2.timeValuePair.getTimestamp()); - return timeCompare != 0 ? timeCompare : o2.priority.compareTo(o1.priority); - }); - for (IPointReader reader : prioritySeriesReaders) { - addReader(reader, startPriority++); - } - } - public void addReader(IPointReader reader, long priority) throws IOException { if (reader.hasNextTimeValuePair()) { heap.add( @@ -74,11 +59,17 @@ public void addReader( IPointReader reader, MergeReaderPriority priority, long endTime, QueryContext context) throws IOException { - if (reader.hasNextTimeValuePair()) { - heap.add(new Element(reader, reader.nextTimeValuePair(), priority)); - currentReadStopTime = Math.max(currentReadStopTime, endTime); - } else { - reader.close(); + long startTime = System.nanoTime(); + try { + if (reader.hasNextTimeValuePair()) { + heap.add(new Element(reader, reader.nextTimeValuePair(), priority)); + currentReadStopTime = Math.max(currentReadStopTime, endTime); + } else { + reader.close(); + } + } finally { + QueryStatistics.getInstance() + .addCost(QueryStatistics.MERGE_READER_ADD_READER, System.nanoTime() - startTime); } } @@ -93,19 +84,25 @@ @Override public TimeValuePair nextTimeValuePair() throws IOException { - Element top = heap.poll(); - TimeValuePair ret = top.getTimeValuePair(); - TimeValuePair topNext = null; - if (top.hasNext()) { - top.next(); - topNext = top.currPair(); + long startTime = System.nanoTime(); + try { + Element top = heap.poll(); + TimeValuePair ret = top.getTimeValuePair(); + TimeValuePair topNext = null; + if (top.hasNext()) { + top.next(); + topNext = top.currPair(); + } + updateHeap(ret, topNext); + if (topNext != null) { + top.timeValuePair = topNext; + heap.add(top); + } + return ret; + } finally { + QueryStatistics.getInstance() + .addCost(QueryStatistics.MERGE_READER_NEXT, System.nanoTime() - startTime); } - updateHeap(ret, topNext); - if (topNext != null) { - top.timeValuePair = topNext; - heap.add(top); - } - return ret; } @Override @@ -119,29 +116,35 @@ * TimeValuePair */ protected void updateHeap(TimeValuePair ret, TimeValuePair topNext) throws IOException { - long topTime = ret.getTimestamp(); - long topNextTime = (topNext == null ? Long.MAX_VALUE : topNext.getTimestamp()); - while (!heap.isEmpty() && heap.peek().currTime() == topTime) { - Element e = heap.poll(); - fillNullValue(ret, e.getTimeValuePair()); - if (!e.hasNext()) { - e.reader.close(); - continue; - } - e.next(); - if (e.currTime() == topNextTime) { - // if the next value of the peek will be overwritten by the next of the top, skip it - fillNullValue(topNext, e.getTimeValuePair()); - if (e.hasNext()) { - e.next(); - heap.add(e); - } else { - // the chunk is end - e.close(); + long startTime = System.nanoTime(); + try { + long topTime = ret.getTimestamp(); + long topNextTime = (topNext == null ? Long.MAX_VALUE : topNext.getTimestamp()); + while (!heap.isEmpty() && heap.peek().currTime() == topTime) { + Element e = heap.poll(); + fillNullValue(ret, e.getTimeValuePair()); + if (!e.hasNext()) { + e.reader.close(); + continue; } - } else { - heap.add(e); + e.next(); + if (e.currTime() == topNextTime) { + // if the next value of the peek will be overwritten by the next of the top, skip it + fillNullValue(topNext, e.getTimeValuePair()); + if (e.hasNext()) { + e.next(); + heap.add(e); + } else { + // the chunk is end + e.close(); + } + } else { + heap.add(e); + } } + } finally { + QueryStatistics.getInstance() + .addCost(QueryStatistics.MERGE_READER_UPDATE_HEAP, System.nanoTime() - startTime); } }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java index 5d14569..b695b29 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.DropSchemaTemplateStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.template.SetSchemaTemplateStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTemplateStatement; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.db.query.control.SessionManager; import org.apache.iotdb.db.query.control.clientsession.IClientSession; import org.apache.iotdb.db.quotas.DataNodeThrottleQuotaManager; @@ -135,6 +136,7 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SERVER_RPC_RT; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; @@ -166,16 +168,22 @@ private static final SelectResult SELECT_RESULT = (resp, queryExecution, fetchSize) -> { + long startTime = System.nanoTime(); Pair<List<ByteBuffer>, Boolean> pair = QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, fetchSize); + QueryStatistics.getInstance() + .addCost(QueryStatistics.SERIALIZE_TSBLOCK, System.nanoTime() - startTime); resp.setQueryResult(pair.left); return pair.right; }; private static final SelectResult OLD_SELECT_RESULT = (resp, queryExecution, fetchSize) -> { + long startTime = System.nanoTime(); Pair<TSQueryDataSet, Boolean> pair = QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, fetchSize); + QueryStatistics.getInstance() + .addCost(QueryStatistics.SERIALIZE_TSBLOCK, System.nanoTime() - startTime); resp.setQueryDataSet(pair.left); return pair.right; }; @@ -198,6 +206,7 @@ } long startTime = System.currentTimeMillis(); + long startTimeInNano = System.nanoTime(); StatementType statementType = null; Throwable t = null; try { @@ -208,6 +217,12 @@ RpcUtils.getStatus( TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported")); } + + if (s.isQuery()) { + QueryStatistics.getInstance() + .addCost(QueryStatistics.PARSER, System.nanoTime() - startTimeInNano); + } + // permission check TSStatus status = AuthorityChecker.checkAuthority(s, clientSession); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { @@ -223,6 +238,7 @@ } queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); + long start = System.nanoTime(); // create and cache dataset ExecutionResult result = COORDINATOR.execute( @@ -233,6 +249,10 @@ partitionFetcher, schemaFetcher, req.getTimeout()); + if (s.isQuery()) { + QueryStatistics.getInstance() + .addCost(QueryStatistics.CREATE_QUERY_EXEC, System.nanoTime() - start); + } if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { @@ -527,7 +547,12 @@ @Override public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req) { - return executeStatementV2(req); + long startTime = System.nanoTime(); + try { + return executeStatementV2(req); + } finally { + QueryStatistics.getInstance().addCost(SERVER_RPC_RT, System.nanoTime() - startTime); + } } @Override @@ -557,6 +582,7 @@ @Override public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) { + long startTimeNanos = System.nanoTime(); long startTime = System.currentTimeMillis(); boolean finished = false; StatementType statementType = null; @@ -608,6 +634,7 @@ COORDINATOR.cleanupQueryExecution(req.queryId, t); } SESSION_MANAGER.updateIdleTime(); + QueryStatistics.getInstance().addCost(SERVER_RPC_RT, System.nanoTime() - startTimeNanos); } } @@ -1071,7 +1098,12 @@ @Override public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) { - return executeStatement(req); + long startTime = System.nanoTime(); + try { + return executeStatement(req); + } finally { + QueryStatistics.getInstance().addCost(SERVER_RPC_RT, System.nanoTime() - startTime); + } } @Override @@ -1082,6 +1114,7 @@ @Override public TSFetchResultsResp fetchResults(TSFetchResultsReq req) { boolean finished = false; + long startTimeNanos = System.nanoTime(); long startTime = System.currentTimeMillis(); StatementType statementType = null; Throwable t = null; @@ -1132,6 +1165,7 @@ COORDINATOR.cleanupQueryExecution(req.queryId, t); } SESSION_MANAGER.updateIdleTime(); + QueryStatistics.getInstance().addCost(SERVER_RPC_RT, System.nanoTime() - startTimeNanos); } }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java index 1dc6500..6cd3a6d 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.reader.chunk.metadata.DiskAlignedChunkMetadataLoader; import org.apache.iotdb.db.query.reader.chunk.metadata.DiskChunkMetadataLoader; @@ -175,11 +176,12 @@ } return timeSeriesMetadata; } finally { + long costTime = System.nanoTime() - t1; QUERY_METRICS.recordSeriesScanCost( loadFromMem ? LOAD_TIMESERIES_METADATA_NONALIGNED_MEM : LOAD_TIMESERIES_METADATA_NONALIGNED_DISK, - System.nanoTime() - t1); + costTime); } } @@ -286,11 +288,13 @@ } return alignedTimeSeriesMetadata; } finally { + long costTime = System.nanoTime() - t1; QUERY_METRICS.recordSeriesScanCost( loadFromMem ? LOAD_TIMESERIES_METADATA_ALIGNED_MEM : LOAD_TIMESERIES_METADATA_ALIGNED_DISK, - System.nanoTime() - t1); + costTime); + QueryStatistics.getInstance().addCost(QueryStatistics.LOAD_TIME_SERIES_METADATA, costTime); } } @@ -301,7 +305,11 @@ */ public static List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeSeriesMetadata) throws IOException { - return timeSeriesMetadata.loadChunkMetadataList(); + long startTime = System.nanoTime(); + List<IChunkMetadata> chunkMetadataList = timeSeriesMetadata.loadChunkMetadataList(); + QueryStatistics.getInstance() + .addCost(QueryStatistics.LOAD_CHUNK_METADATA_LIST, System.nanoTime() - startTime); + return chunkMetadataList; } /** @@ -312,11 +320,17 @@ */ public static List<IPageReader> loadPageReaderList( IChunkMetadata chunkMetaData, Filter timeFilter) throws IOException { - if (chunkMetaData == null) { - throw new IOException("Can't init null chunkMeta"); + long startTime = System.nanoTime(); + try { + if (chunkMetaData == null) { + throw new IOException("Can't init null chunkMeta"); + } + IChunkLoader chunkLoader = chunkMetaData.getChunkLoader(); + IChunkReader chunkReader = chunkLoader.getChunkReader(chunkMetaData, timeFilter); + return chunkReader.loadPageReaderList(); + } finally { + QueryStatistics.getInstance() + .addCost(QueryStatistics.LOAD_PAGE_READER_LIST, System.nanoTime() - startTime); } - IChunkLoader chunkLoader = chunkMetaData.getChunkLoader(); - IChunkReader chunkReader = chunkLoader.getChunkReader(chunkMetaData, timeFilter); - return chunkReader.loadPageReaderList(); } }