blob: 4fe327f04d43147693f690c701d11b55ca1f5dc6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.executor;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.utils.ResourceByPathUtils;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.ListDataSet;
import org.apache.iotdb.db.query.executor.fill.LastPointReader;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.Gt;
import org.apache.iotdb.tsfile.read.filter.operator.GtEq;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES;
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_VALUE;
import static org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil.satisfyFilter;
public class LastQueryExecutor {
private List<PartialPath> selectedSeries;
private List<TSDataType> dataTypes;
protected IExpression expression;
private static final boolean CACHE_ENABLED =
IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled();
private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
// for test to reload this parameter after restart, it can't be final
private static boolean ID_TABLE_ENABLED =
IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
private static boolean ascending;
private static final Logger logger = LoggerFactory.getLogger(LastQueryExecutor.class);
public LastQueryExecutor(LastQueryPlan lastQueryPlan) {
this.selectedSeries = lastQueryPlan.getDeduplicatedPaths();
this.dataTypes = lastQueryPlan.getDeduplicatedDataTypes();
this.expression = lastQueryPlan.getExpression();
this.ascending = lastQueryPlan.isAscending();
}
public LastQueryExecutor(List<PartialPath> selectedSeries, List<TSDataType> dataTypes) {
this.selectedSeries = selectedSeries;
this.dataTypes = dataTypes;
}
/**
* execute last function
*
* @param context query context
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public QueryDataSet execute(QueryContext context, LastQueryPlan lastQueryPlan)
throws StorageEngineException, IOException, QueryProcessException {
ListDataSet dataSet =
new ListDataSet(
Arrays.asList(
new PartialPath(COLUMN_TIMESERIES, false),
new PartialPath(COLUMN_VALUE, false),
new PartialPath(COLUMN_TIMESERIES_DATATYPE, false)),
Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
List<TimeValuePair> lastPairList =
calculateLastPairForSeries(selectedSeries, dataTypes, context, expression, lastQueryPlan);
for (int i = 0; i < lastPairList.size(); i++) {
if (lastPairList.get(i) != null && lastPairList.get(i).getValue() != null) {
TimeValuePair lastTimeValuePair = lastPairList.get(i);
RowRecord resultRecord = new RowRecord(lastTimeValuePair.getTimestamp());
Field pathField = new Field(TSDataType.TEXT);
pathField.setBinaryV(
new Binary(lastQueryPlan.getResultColumns().get(i).getResultColumnName()));
resultRecord.addField(pathField);
Field valueField = new Field(TSDataType.TEXT);
valueField.setBinaryV(new Binary(lastTimeValuePair.getValue().getStringValue()));
resultRecord.addField(valueField);
Field typeField = new Field(TSDataType.TEXT);
typeField.setBinaryV(new Binary(lastTimeValuePair.getValue().getDataType().name()));
resultRecord.addField(typeField);
dataSet.putRecord(resultRecord);
}
}
if (!lastQueryPlan.isAscending()) {
dataSet.sortByTime();
}
return dataSet;
}
protected List<TimeValuePair> calculateLastPairForSeries(
List<PartialPath> seriesPaths,
List<TSDataType> dataTypes,
QueryContext context,
IExpression expression,
RawDataQueryPlan lastQueryPlan)
throws QueryProcessException, StorageEngineException, IOException {
return calculateLastPairForSeriesLocally(
seriesPaths, dataTypes, context, expression, lastQueryPlan.getDeviceToMeasurements());
}
public static List<TimeValuePair> calculateLastPairForSeriesLocally(
List<PartialPath> seriesPaths,
List<TSDataType> dataTypes,
QueryContext context,
IExpression expression,
Map<String, Set<String>> deviceMeasurementsMap)
throws QueryProcessException, StorageEngineException, IOException {
Filter filter = (expression == null) ? null : ((GlobalTimeExpression) expression).getFilter();
if (CACHE_ENABLED) {
List<LastCacheAccessor> cacheAccessors = new ArrayList<>();
for (PartialPath path : seriesPaths) {
if (ID_TABLE_ENABLED) {
cacheAccessors.add(new IDTableLastCacheAccessor(path));
} else {
cacheAccessors.add(new SchemaProcessorLastCacheAccessor(path));
}
}
List<TimeValuePair> lastPairs =
readLastPairsFromCache(seriesPaths, cacheAccessors, context.isDebug());
List<Integer> nonCachedIndices = new ArrayList<>();
List<PartialPath> nonCachedPaths = new ArrayList<>();
List<TSDataType> nonCachedDataTypes = new ArrayList<>();
for (int i = 0; i < lastPairs.size(); i++) {
TimeValuePair lastPair = lastPairs.get(i);
if (lastPair == null) {
nonCachedPaths.add(((MeasurementPath) seriesPaths.get(i)).transformToExactPath());
nonCachedDataTypes.add(dataTypes.get(i));
nonCachedIndices.add(i);
} else if (!satisfyFilter(filter, lastPair)) {
lastPairs.set(i, null);
boolean isFilterGtOrGe = (filter instanceof Gt || filter instanceof GtEq);
if (!isFilterGtOrGe) {
nonCachedPaths.add(((MeasurementPath) seriesPaths.get(i)).transformToExactPath());
nonCachedDataTypes.add(dataTypes.get(i));
nonCachedIndices.add(i);
}
}
}
List<TimeValuePair> nonCachedLastPairs =
readLastPairsFromStorage(
nonCachedPaths, nonCachedDataTypes, filter, context, deviceMeasurementsMap);
for (int i = 0; i < nonCachedLastPairs.size(); i++) {
// Update the cache only when,
// 1. the last value cache doesn't exist
// 2. the actual last value is not null
// 3. last value cache is enabled
// 4. the filter is gt (greater than) or ge (greater than or equal to)
if (lastPairs.get(nonCachedIndices.get(i)) == null
&& nonCachedLastPairs.get(i) != null
&& ((filter instanceof GtEq) || (filter instanceof Gt))) {
cacheAccessors.get(nonCachedIndices.get(i)).write(nonCachedLastPairs.get(i));
}
lastPairs.set(nonCachedIndices.get(i), nonCachedLastPairs.get(i));
}
return lastPairs;
} else {
return readLastPairsFromStorage(
seriesPaths.stream()
.map(p -> ((MeasurementPath) p).transformToExactPath())
.collect(Collectors.toList()),
dataTypes,
filter,
context,
deviceMeasurementsMap);
}
}
/**
* Get the last values of given timeseries from the cache.
*
* @return A list of {@link TimeValuePair}. The null elements indicate that the last value of
* corresponding timeseries is not cached.
*/
private static List<TimeValuePair> readLastPairsFromCache(
List<PartialPath> seriesPaths, List<LastCacheAccessor> cacheAccessors, boolean debugOn) {
List<TimeValuePair> ret = new ArrayList<>();
for (int i = 0; i < cacheAccessors.size(); i++) {
TimeValuePair tvPair = cacheAccessors.get(i).read();
ret.add(tvPair);
if (tvPair != null && debugOn) {
DEBUG_LOGGER.info(
"[LastQueryExecutor] Last cache hit for path: {} with timestamp: {}",
seriesPaths.get(i),
tvPair.getTimestamp());
}
}
return ret;
}
/**
* Get the last values of given timeseries from the storage.
*
* @return A list of {@link TimeValuePair}. The null elements indicate the last value of
* corresponding timeseries does not exist.
*/
private static List<TimeValuePair> readLastPairsFromStorage(
List<PartialPath> seriesPaths,
List<TSDataType> dataTypes,
Filter filter,
QueryContext context,
Map<String, Set<String>> deviceMeasurementsMap)
throws StorageEngineException, QueryProcessException, IOException {
// Acquire query resources for the rest series paths
Pair<List<DataRegion>, Map<DataRegion, List<PartialPath>>> lockListAndProcessorToSeriesMapPair =
StorageEngine.getInstance().mergeLock(seriesPaths);
List<DataRegion> lockList = lockListAndProcessorToSeriesMapPair.left;
Map<DataRegion, List<PartialPath>> processorToSeriesMap =
lockListAndProcessorToSeriesMapPair.right;
try {
// init QueryDataSource Cache
QueryResourceManager.getInstance()
.initQueryDataSourceCache(processorToSeriesMap, context, filter);
} catch (Exception e) {
logger.error("Meet error when init QueryDataSource ", e);
throw new QueryProcessException("Meet error when init QueryDataSource.", e);
} finally {
StorageEngine.getInstance().mergeUnLock(lockList);
}
List<LastPointReader> readers = new ArrayList<>();
for (int i = 0; i < seriesPaths.size(); i++) {
QueryDataSource dataSource =
QueryResourceManager.getInstance()
.getQueryDataSource(seriesPaths.get(i), context, filter, ascending);
LastPointReader lastReader =
ResourceByPathUtils.getResourceInstance(seriesPaths.get(i))
.createLastPointReader(
dataTypes.get(i),
deviceMeasurementsMap.getOrDefault(
seriesPaths.get(i).getDevice(), new HashSet<>()),
context,
dataSource,
Long.MAX_VALUE,
filter);
readers.add(lastReader);
}
List<TimeValuePair> lastPairs = new ArrayList<>(seriesPaths.size());
for (LastPointReader reader : readers) {
lastPairs.add(reader.readLastPoint());
}
return lastPairs;
}
private interface LastCacheAccessor {
TimeValuePair read();
void write(TimeValuePair pair);
}
private static class SchemaProcessorLastCacheAccessor implements LastCacheAccessor {
private final MeasurementPath path;
private IMeasurementMNode node;
SchemaProcessorLastCacheAccessor(PartialPath seriesPath) {
this.path = (MeasurementPath) seriesPath;
}
public TimeValuePair read() {
try {
node = IoTDB.schemaProcessor.getMeasurementMNode(path);
} catch (MetadataException e) {
// cluster mode may not get remote node
TimeValuePair timeValuePair;
timeValuePair = IoTDB.schemaProcessor.getLastCache(path);
if (timeValuePair != null) {
return timeValuePair;
}
}
if (node == null) {
return null;
}
return IoTDB.schemaProcessor.getLastCache(node);
}
public void write(TimeValuePair pair) {
if (node == null) {
IoTDB.schemaProcessor.updateLastCache(path, pair, false, Long.MIN_VALUE);
} else {
IoTDB.schemaProcessor.updateLastCache(node, pair, false, Long.MIN_VALUE);
}
}
}
private static class IDTableLastCacheAccessor implements LastCacheAccessor {
private PartialPath fullPath;
IDTableLastCacheAccessor(PartialPath seriesPath) {
fullPath = seriesPath;
}
@Override
public TimeValuePair read() {
try {
IDTable table =
StorageEngine.getInstance().getProcessor(fullPath.getDevicePath()).getIdTable();
return table.getLastCache(new TimeseriesID(fullPath));
} catch (StorageEngineException | MetadataException e) {
logger.error("last query can't find storage group: path is: " + fullPath);
}
return null;
}
@Override
public void write(TimeValuePair pair) {
try {
IDTable table =
StorageEngine.getInstance().getProcessor(fullPath.getDevicePath()).getIdTable();
table.updateLastCache(new TimeseriesID(fullPath), pair, false, Long.MIN_VALUE);
} catch (MetadataException | StorageEngineException e) {
logger.error("last query can't find storage group: path is: " + fullPath);
}
}
}
public static void clear() {
ID_TABLE_ENABLED = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
}
}