blob: f078519590123784111fabae430e33b6fda40c9c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.cluster.query;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.ReaderNotFoundException;
import org.apache.iotdb.cluster.metadata.CMManager;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.query.filter.SlotTsFileFilter;
import org.apache.iotdb.cluster.query.manage.ClusterQueryManager;
import org.apache.iotdb.cluster.query.reader.ClusterReaderFactory;
import org.apache.iotdb.cluster.query.reader.mult.IMultBatchReader;
import org.apache.iotdb.cluster.rpc.thrift.GetAggrResultRequest;
import org.apache.iotdb.cluster.rpc.thrift.GroupByRequest;
import org.apache.iotdb.cluster.rpc.thrift.LastQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.MultSeriesQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.utils.ClusterQueryUtils;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.db.query.dataset.groupby.GroupByExecutor;
import org.apache.iotdb.db.query.dataset.groupby.LocalGroupByExecutor;
import org.apache.iotdb.db.query.executor.AggregationExecutor;
import org.apache.iotdb.db.query.executor.LastQueryExecutor;
import org.apache.iotdb.db.query.executor.fill.PreviousFill;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.SerializeUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.iotdb.session.Config.DEFAULT_FETCH_SIZE;
public class LocalQueryExecutor {
private static final Logger logger = LoggerFactory.getLogger(LocalQueryExecutor.class);
private DataGroupMember dataGroupMember;
private ClusterReaderFactory readerFactory;
private String name;
private ClusterQueryManager queryManager;
public LocalQueryExecutor(DataGroupMember dataGroupMember) {
this.dataGroupMember = dataGroupMember;
this.readerFactory = new ClusterReaderFactory(dataGroupMember.getMetaGroupMember());
this.name = dataGroupMember.getName();
this.queryManager = dataGroupMember.getQueryManager();
}
private CMManager getCMManager() {
return ((CMManager) IoTDB.metaManager);
}
/** Return the data of the reader whose id is "readerId", using timestamps in "timeBuffer". */
public ByteBuffer fetchSingleSeriesByTimestamps(long readerId, long[] timestamps, int length)
throws ReaderNotFoundException, IOException {
IReaderByTimestamp reader = dataGroupMember.getQueryManager().getReaderByTimestamp(readerId);
if (reader == null) {
throw new ReaderNotFoundException(readerId);
}
Object[] values = reader.getValuesInTimestamps(timestamps, length);
if (values != null) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
SerializeUtils.serializeObjects(values, dataOutputStream);
return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
} else {
return ByteBuffer.allocate(0);
}
}
/**
* Fetch a batch from the reader whose id is "readerId".
*
* @param readerId
*/
public ByteBuffer fetchSingleSeries(long readerId) throws ReaderNotFoundException, IOException {
IBatchReader reader = dataGroupMember.getQueryManager().getReader(readerId);
if (reader == null) {
throw new ReaderNotFoundException(readerId);
}
if (reader.hasNextBatch()) {
BatchData batchData = reader.nextBatch();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
SerializeUtils.serializeBatchData(batchData, dataOutputStream);
logger.debug(
"{}: Send results of reader {}, size:{}",
dataGroupMember.getName(),
readerId,
batchData.length());
return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
} else {
return ByteBuffer.allocate(0);
}
}
/**
* Fetch a batch from the reader whose id is "readerId".
*
* @param readerId reader id
* @param paths mult series path
*/
public Map<String, ByteBuffer> fetchMultSeries(long readerId, List<String> paths)
throws ReaderNotFoundException, IOException {
IMultBatchReader reader =
(IMultBatchReader) dataGroupMember.getQueryManager().getReader(readerId);
if (reader == null) {
throw new ReaderNotFoundException(readerId);
}
Map<String, ByteBuffer> pathByteBuffers = Maps.newHashMap();
for (String path : paths) {
ByteBuffer byteBuffer = null;
if (reader.hasNextBatch(path)) {
BatchData batchData = reader.nextBatch(path);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
SerializeUtils.serializeBatchData(batchData, dataOutputStream);
logger.debug(
"{}: Send results of reader {}, size:{}",
dataGroupMember.getName(),
readerId,
batchData.length());
byteBuffer = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
} else {
byteBuffer = ByteBuffer.allocate(0);
}
pathByteBuffers.put(path, byteBuffer);
}
return pathByteBuffers;
}
/**
* Create an IBatchReader of a path, register it in the query manager to get a reader id for it
* and send the id back to the requester. If the reader does not have any data, an id of -1 will
* be returned.
*
* @param request
*/
public long querySingleSeries(SingleSeriesQueryRequest request)
throws CheckConsistencyException, QueryProcessException, StorageEngineException, IOException {
logger.debug(
"{}: {} is querying {}, queryId: {}",
name,
request.getRequester(),
request.getPath(),
request.getQueryId());
dataGroupMember.syncLeaderWithConsistencyCheck(false);
PartialPath path = null;
try {
path = new PartialPath(request.getPath());
} catch (IllegalPathException e) {
// ignore
}
TSDataType dataType = TSDataType.values()[request.getDataTypeOrdinal()];
Filter timeFilter = null;
Filter valueFilter = null;
if (request.isSetTimeFilterBytes()) {
timeFilter = FilterFactory.deserialize(request.timeFilterBytes);
}
if (request.isSetValueFilterBytes()) {
valueFilter = FilterFactory.deserialize(request.valueFilterBytes);
}
Set<String> deviceMeasurements = request.getDeviceMeasurements();
// the same query from a requester correspond to a context here
RemoteQueryContext queryContext =
queryManager.getQueryContext(
request.getRequester(),
request.getQueryId(),
request.getFetchSize(),
request.getDeduplicatedPathNum());
logger.debug(
"{}: local queryId for {}#{} is {}",
name,
request.getQueryId(),
request.getPath(),
queryContext.getQueryId());
IBatchReader batchReader =
readerFactory.getSeriesBatchReader(
path,
deviceMeasurements,
dataType,
timeFilter,
valueFilter,
queryContext,
dataGroupMember,
request.ascending);
// if the reader contains no data, send a special id of -1 to prevent the requester from
// meaninglessly fetching data
if (batchReader != null && batchReader.hasNextBatch()) {
long readerId = queryManager.registerReader(batchReader);
queryContext.registerLocalReader(readerId);
logger.debug(
"{}: Build a reader of {} for {}#{}, readerId: {}",
name,
path,
request.getRequester(),
request.getQueryId(),
readerId);
return readerId;
} else {
logger.debug(
"{}: There is no data of {} for {}#{}",
name,
path,
request.getRequester(),
request.getQueryId());
if (batchReader != null) {
batchReader.close();
}
return -1;
}
}
/**
* Create an IBatchReader of a path, register it in the query manager to get a reader id for it
* and send the id back to the requester. If the reader does not have any data, an id of -1 will
* be returned.
*
* @param request
*/
public long queryMultSeries(MultSeriesQueryRequest request)
throws CheckConsistencyException, QueryProcessException, StorageEngineException, IOException {
logger.debug(
"{}: {} is querying {}, queryId: {}",
name,
request.getRequester(),
request.getPath(),
request.getQueryId());
dataGroupMember.syncLeaderWithConsistencyCheck(false);
List<PartialPath> paths = Lists.newArrayList();
request
.getPath()
.forEach(
fullPath -> {
try {
paths.add(new PartialPath(fullPath));
} catch (IllegalPathException e) {
logger.warn("Failed to create partial path, fullPath is {}.", fullPath, e);
}
});
List<TSDataType> dataTypes = Lists.newArrayList();
request
.getDataTypeOrdinal()
.forEach(
dataType -> {
dataTypes.add(TSDataType.values()[dataType]);
});
Filter timeFilter = null;
Filter valueFilter = null;
if (request.isSetTimeFilterBytes()) {
timeFilter = FilterFactory.deserialize(request.timeFilterBytes);
}
if (request.isSetValueFilterBytes()) {
valueFilter = FilterFactory.deserialize(request.valueFilterBytes);
}
Map<String, Set<String>> deviceMeasurements = request.getDeviceMeasurements();
// the same query from a requester correspond to a context here
RemoteQueryContext queryContext =
queryManager.getQueryContext(
request.getRequester(),
request.getQueryId(),
request.getFetchSize(),
request.getDeduplicatedPathNum());
logger.debug(
"{}: local queryId for {}#{} is {}",
name,
request.getQueryId(),
request.getPath(),
queryContext.getQueryId());
IBatchReader batchReader =
readerFactory.getMultSeriesBatchReader(
paths,
deviceMeasurements,
dataTypes,
timeFilter,
valueFilter,
queryContext,
dataGroupMember,
request.ascending);
// if the reader contains no data, send a special id of -1 to prevent the requester from
// meaninglessly fetching data
if (batchReader != null && batchReader.hasNextBatch()) {
long readerId = queryManager.registerReader(batchReader);
queryContext.registerLocalReader(readerId);
logger.debug(
"{}: Build a reader of {} for {}#{}, readerId: {}",
name,
paths,
request.getRequester(),
request.getQueryId(),
readerId);
return readerId;
} else {
logger.debug(
"{}: There is no data of {} for {}#{}",
name,
paths,
request.getRequester(),
request.getQueryId());
if (batchReader != null) {
batchReader.close();
}
return -1;
}
}
/**
* Send the timeseries schemas of some prefix paths to the requester. The schemas will be sent in
* the form of a list of MeasurementSchema, but notice the measurements in them are the full
* paths.
*
* @param request
*/
public PullSchemaResp queryTimeSeriesSchema(PullSchemaRequest request)
throws CheckConsistencyException, MetadataException {
// try to synchronize with the leader first in case that some schema logs are accepted but
// not committed yet
dataGroupMember.syncLeaderWithConsistencyCheck(false);
// collect local timeseries schemas and send to the requester
// the measurements in them are the full paths.
List<String> prefixPaths = request.getPrefixPaths();
List<TimeseriesSchema> timeseriesSchemas = new ArrayList<>();
for (String prefixPath : prefixPaths) {
getCMManager().collectTimeseriesSchema(prefixPath, timeseriesSchemas);
}
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Collected {} schemas for {} and other {} paths",
name,
timeseriesSchemas.size(),
prefixPaths.get(0),
prefixPaths.size() - 1);
}
PullSchemaResp resp = new PullSchemaResp();
// serialize the schemas
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
try {
dataOutputStream.writeInt(timeseriesSchemas.size());
for (TimeseriesSchema timeseriesSchema : timeseriesSchemas) {
timeseriesSchema.serializeTo(dataOutputStream);
}
} catch (IOException ignored) {
// unreachable for we are using a ByteArrayOutputStream
}
resp.setSchemaBytes(byteArrayOutputStream.toByteArray());
return resp;
}
/**
* Send the timeseries schemas of some prefix paths to the requester. The schemas will be sent in
* the form of a list of MeasurementSchema, but notice the measurements in them are the full
* paths.
*
* @param request
*/
public PullSchemaResp queryMeasurementSchema(PullSchemaRequest request)
throws CheckConsistencyException, IllegalPathException {
// try to synchronize with the leader first in case that some schema logs are accepted but
// not committed yet
dataGroupMember.syncLeaderWithConsistencyCheck(false);
// collect local timeseries schemas and send to the requester
// the measurements in them are the full paths.
List<String> prefixPaths = request.getPrefixPaths();
List<MeasurementSchema> measurementSchemas = new ArrayList<>();
for (String prefixPath : prefixPaths) {
getCMManager().collectSeries(new PartialPath(prefixPath), measurementSchemas);
}
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Collected {} schemas for {} and other {} paths",
name,
measurementSchemas.size(),
prefixPaths.get(0),
prefixPaths.size() - 1);
}
PullSchemaResp resp = new PullSchemaResp();
// serialize the schemas
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
try {
dataOutputStream.writeInt(measurementSchemas.size());
for (MeasurementSchema timeseriesSchema : measurementSchemas) {
timeseriesSchema.serializeTo(dataOutputStream);
}
} catch (IOException ignored) {
// unreachable for we are using a ByteArrayOutputStream
}
resp.setSchemaBytes(byteArrayOutputStream.toByteArray());
return resp;
}
/**
* Create an IReaderByTime of a path, register it in the query manager to get a reader id for it
* and send the id back to the requester. If the reader does not have any data, an id of -1 will
* be returned.
*
* @param request
*/
public long querySingleSeriesByTimestamp(SingleSeriesQueryRequest request)
throws CheckConsistencyException, QueryProcessException, StorageEngineException {
logger.debug(
"{}: {} is querying {} by timestamp, queryId: {}",
name,
request.getRequester(),
request.getPath(),
request.getQueryId());
dataGroupMember.syncLeaderWithConsistencyCheck(false);
PartialPath path = null;
try {
path = new PartialPath(request.getPath());
} catch (IllegalPathException e) {
// ignore
}
TSDataType dataType = TSDataType.values()[request.dataTypeOrdinal];
Set<String> deviceMeasurements = request.getDeviceMeasurements();
RemoteQueryContext queryContext =
queryManager.getQueryContext(
request.getRequester(),
request.getQueryId(),
request.getFetchSize(),
request.getDeduplicatedPathNum());
logger.debug(
"{}: local queryId for {}#{} is {}",
name,
request.getQueryId(),
request.getPath(),
queryContext.getQueryId());
IReaderByTimestamp readerByTimestamp =
readerFactory.getReaderByTimestamp(
path, deviceMeasurements, dataType, queryContext, dataGroupMember, request.ascending);
if (readerByTimestamp != null) {
long readerId = queryManager.registerReaderByTime(readerByTimestamp);
queryContext.registerLocalReader(readerId);
logger.debug(
"{}: Build a readerByTimestamp of {} for {}, readerId: {}",
name,
path,
request.getRequester(),
readerId);
return readerId;
} else {
logger.debug(
"{}: There is no data {} for {}#{}",
name,
path,
request.getRequester(),
request.getQueryId());
return -1;
}
}
public ByteBuffer getAllMeasurementSchema(ByteBuffer planBuffer)
throws CheckConsistencyException, IOException, MetadataException {
dataGroupMember.syncLeaderWithConsistencyCheck(false);
ShowTimeSeriesPlan plan = (ShowTimeSeriesPlan) PhysicalPlan.Factory.create(planBuffer);
List<ShowTimeSeriesResult> allTimeseriesSchema;
allTimeseriesSchema = getCMManager().showLocalTimeseries(plan, new QueryContext());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
dataOutputStream.writeInt(allTimeseriesSchema.size());
for (ShowTimeSeriesResult result : allTimeseriesSchema) {
result.serialize(outputStream);
}
}
return ByteBuffer.wrap(outputStream.toByteArray());
}
public ByteBuffer getDevices(ByteBuffer planBuffer)
throws CheckConsistencyException, IOException, MetadataException {
dataGroupMember.syncLeaderWithConsistencyCheck(false);
ShowDevicesPlan plan = (ShowDevicesPlan) PhysicalPlan.Factory.create(planBuffer);
List<ShowDevicesResult> allDevicesResult = getCMManager().getLocalDevices(plan);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
dataOutputStream.writeInt(allDevicesResult.size());
for (ShowDevicesResult result : allDevicesResult) {
result.serialize(outputStream);
}
}
return ByteBuffer.wrap(outputStream.toByteArray());
}
/**
* Execute aggregations over the given path and return the results to the requester.
*
* @param request
*/
public List<ByteBuffer> getAggrResult(GetAggrResultRequest request)
throws StorageEngineException, QueryProcessException, IOException {
logger.debug(
"{}: {} is querying {} by aggregation, queryId: {}",
name,
request.getRequestor(),
request.getPath(),
request.getQueryId());
List<String> aggregations = request.getAggregations();
TSDataType dataType = TSDataType.values()[request.getDataTypeOrdinal()];
String path = request.getPath();
Filter timeFilter = null;
if (request.isSetTimeFilterBytes()) {
timeFilter = FilterFactory.deserialize(request.timeFilterBytes);
}
RemoteQueryContext queryContext =
queryManager.getQueryContext(
request.getRequestor(), request.queryId, DEFAULT_FETCH_SIZE, -1);
Set<String> deviceMeasurements = request.getDeviceMeasurements();
boolean ascending = request.ascending;
// do the aggregations locally
List<AggregateResult> results;
results =
getAggrResult(
aggregations, deviceMeasurements, dataType, path, timeFilter, queryContext, ascending);
logger.trace("{}: aggregation results {}, queryId: {}", name, results, request.getQueryId());
// serialize and send the results
List<ByteBuffer> resultBuffers = new ArrayList<>();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
for (AggregateResult result : results) {
try {
result.serializeTo(byteArrayOutputStream);
} catch (IOException e) {
// ignore since we are using a ByteArrayOutputStream
}
resultBuffers.add(ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
byteArrayOutputStream.reset();
}
return resultBuffers;
}
/**
* Execute "aggregation" over "path" with "timeFilter". This method currently requires strong
* consistency. Only data managed by this group will be used for aggregation.
*
* @param aggregations aggregation names in SQLConstant
* @param dataType
* @param path
* @param timeFilter nullable
* @param context
* @return
* @throws IOException
* @throws StorageEngineException
* @throws QueryProcessException
*/
public List<AggregateResult> getAggrResult(
List<String> aggregations,
Set<String> allSensors,
TSDataType dataType,
String path,
Filter timeFilter,
QueryContext context,
boolean ascending)
throws IOException, StorageEngineException, QueryProcessException {
try {
dataGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new QueryProcessException(e.getMessage());
}
ClusterQueryUtils.checkPathExistence(path);
List<AggregateResult> results = new ArrayList<>();
for (String aggregation : aggregations) {
results.add(AggregateResultFactory.getAggrResultByName(aggregation, dataType));
}
List<Integer> nodeSlots =
((SlotPartitionTable) dataGroupMember.getMetaGroupMember().getPartitionTable())
.getNodeSlots(dataGroupMember.getHeader());
try {
if (ascending) {
AggregationExecutor.aggregateOneSeries(
new PartialPath(path),
allSensors,
context,
timeFilter,
dataType,
results,
null,
new SlotTsFileFilter(nodeSlots));
} else {
AggregationExecutor.aggregateOneSeries(
new PartialPath(path),
allSensors,
context,
timeFilter,
dataType,
null,
results,
new SlotTsFileFilter(nodeSlots));
}
} catch (IllegalPathException e) {
// ignore
}
return results;
}
/**
* Check if the given measurements are registered or not
*
* @param timeseriesList
*/
public List<String> getUnregisteredTimeseries(List<String> timeseriesList)
throws CheckConsistencyException {
dataGroupMember.syncLeaderWithConsistencyCheck(true);
List<String> result = new ArrayList<>();
for (String seriesPath : timeseriesList) {
try {
List<PartialPath> path = getCMManager().getAllTimeseriesPath(new PartialPath(seriesPath));
if (path.size() != 1) {
throw new MetadataException(
String.format("Timeseries number of the name [%s] is not 1.", seriesPath));
}
} catch (MetadataException e) {
result.add(seriesPath);
}
}
return result;
}
/**
* Create a local GroupByExecutor that will run aggregations of "aggregationTypes" over "path"
* with "timeFilter". The method currently requires strong consistency.
*
* @param path
* @param dataType
* @param timeFilter nullable
* @param aggregationTypes ordinals of AggregationType
* @param context
* @return
* @throws StorageEngineException
*/
public LocalGroupByExecutor getGroupByExecutor(
PartialPath path,
Set<String> deviceMeasurements,
TSDataType dataType,
Filter timeFilter,
List<Integer> aggregationTypes,
QueryContext context,
boolean ascending)
throws StorageEngineException, QueryProcessException {
// pull the newest data
try {
dataGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new StorageEngineException(e);
}
ClusterQueryUtils.checkPathExistence(path);
List<Integer> nodeSlots =
((SlotPartitionTable) dataGroupMember.getMetaGroupMember().getPartitionTable())
.getNodeSlots(dataGroupMember.getHeader());
LocalGroupByExecutor executor =
new LocalGroupByExecutor(
path,
deviceMeasurements,
dataType,
context,
timeFilter,
new SlotTsFileFilter(nodeSlots),
ascending);
for (Integer aggregationType : aggregationTypes) {
executor.addAggregateResult(
AggregateResultFactory.getAggrResultByType(
AggregationType.values()[aggregationType], dataType, ascending));
}
return executor;
}
/**
* Create a local GroupByExecutor that will run aggregations of "aggregationTypes" over "path"
* with "timeFilter", register it in the query manager to generate the executor id, and send it
* back to the requester.
*
* @param request
*/
public long getGroupByExecutor(GroupByRequest request)
throws QueryProcessException, StorageEngineException {
PartialPath path;
try {
path = new PartialPath(request.getPath());
} catch (IllegalPathException e) {
throw new QueryProcessException(e);
}
List<Integer> aggregationTypeOrdinals = request.getAggregationTypeOrdinals();
TSDataType dataType = TSDataType.values()[request.getDataTypeOrdinal()];
Filter timeFilter = null;
if (request.isSetTimeFilterBytes()) {
timeFilter = FilterFactory.deserialize(request.timeFilterBytes);
}
long queryId = request.getQueryId();
logger.debug(
"{}: {} is querying {} using group by, queryId: {}",
name,
request.getRequestor(),
path,
queryId);
Set<String> deviceMeasurements = request.getDeviceMeasurements();
boolean ascending = request.ascending;
RemoteQueryContext queryContext =
queryManager.getQueryContext(request.getRequestor(), queryId, DEFAULT_FETCH_SIZE, -1);
LocalGroupByExecutor executor =
getGroupByExecutor(
path,
deviceMeasurements,
dataType,
timeFilter,
aggregationTypeOrdinals,
queryContext,
ascending);
if (!executor.isEmpty()) {
long executorId = queryManager.registerGroupByExecutor(executor);
logger.debug(
"{}: Build a GroupByExecutor of {} for {}, executorId: {}",
name,
path,
request.getRequestor(),
executor);
queryContext.registerLocalGroupByExecutor(executorId);
return executorId;
} else {
logger.debug(
"{}: There is no data {} for {}#{}",
name,
path,
request.getRequestor(),
request.getQueryId());
return -1;
}
}
/**
* Fetch the aggregation results between [startTime, endTime] of the executor whose id is
* "executorId". This method currently requires strong consistency.
*
* @param executorId
* @param startTime
* @param endTime
*/
public List<ByteBuffer> getGroupByResult(long executorId, long startTime, long endTime)
throws ReaderNotFoundException, IOException, QueryProcessException {
GroupByExecutor executor = queryManager.getGroupByExecutor(executorId);
if (executor == null) {
throw new ReaderNotFoundException(executorId);
}
List<AggregateResult> results = executor.calcResult(startTime, endTime);
List<ByteBuffer> resultBuffers = new ArrayList<>();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
for (AggregateResult result : results) {
result.serializeTo(byteArrayOutputStream);
resultBuffers.add(ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
byteArrayOutputStream.reset();
}
logger.debug(
"{}: Send results of group by executor {}, size:{}", name, executor, resultBuffers.size());
return resultBuffers;
}
public ByteBuffer peekNextNotNullValue(long executorId, long startTime, long endTime)
throws ReaderNotFoundException, IOException {
GroupByExecutor executor = queryManager.getGroupByExecutor(executorId);
if (executor == null) {
throw new ReaderNotFoundException(executorId);
}
Pair<Long, Object> pair = executor.peekNextNotNullValue(startTime, endTime);
ByteBuffer resultBuffer;
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
dataOutputStream.writeLong(pair.left);
SerializeUtils.serializeObject(pair.right, dataOutputStream);
resultBuffer = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
}
logger.debug(
"{}: Send results of group by executor {}, size:{}", name, executor, resultBuffer.limit());
return resultBuffer;
}
public ByteBuffer previousFill(PreviousFillRequest request)
throws QueryProcessException, StorageEngineException, IOException, IllegalPathException {
PartialPath path = new PartialPath(request.getPath());
TSDataType dataType = TSDataType.values()[request.getDataTypeOrdinal()];
long queryId = request.getQueryId();
long queryTime = request.getQueryTime();
long beforeRange = request.getBeforeRange();
Node requester = request.getRequester();
Set<String> deviceMeasurements = request.getDeviceMeasurements();
RemoteQueryContext queryContext =
queryManager.getQueryContext(requester, queryId, DEFAULT_FETCH_SIZE, -1);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
TimeValuePair timeValuePair =
localPreviousFill(path, dataType, queryTime, beforeRange, deviceMeasurements, queryContext);
SerializeUtils.serializeTVPair(timeValuePair, dataOutputStream);
return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
}
/**
* Perform a local previous fill and return the fill result.
*
* @param path
* @param dataType
* @param queryTime
* @param beforeRange
* @param deviceMeasurements
* @param context
* @return
* @throws QueryProcessException
* @throws StorageEngineException
* @throws IOException
*/
public TimeValuePair localPreviousFill(
PartialPath path,
TSDataType dataType,
long queryTime,
long beforeRange,
Set<String> deviceMeasurements,
QueryContext context)
throws QueryProcessException, StorageEngineException, IOException {
try {
dataGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
throw new QueryProcessException(e.getMessage());
}
PreviousFill previousFill = new PreviousFill(dataType, queryTime, beforeRange);
previousFill.configureFill(path, dataType, queryTime, deviceMeasurements, context);
return previousFill.getFillResult();
}
public int getPathCount(List<String> pathsToQuery, int level)
throws CheckConsistencyException, MetadataException {
dataGroupMember.syncLeaderWithConsistencyCheck(false);
int count = 0;
for (String s : pathsToQuery) {
if (level == -1) {
count += getCMManager().getAllTimeseriesCount(new PartialPath(s));
} else {
count += getCMManager().getNodesCountInGivenLevel(new PartialPath(s), level);
}
}
return count;
}
@SuppressWarnings("java:S1135") // ignore todos
public ByteBuffer last(LastQueryRequest request)
throws CheckConsistencyException, QueryProcessException, IOException, StorageEngineException,
IllegalPathException {
dataGroupMember.syncLeaderWithConsistencyCheck(false);
RemoteQueryContext queryContext =
queryManager.getQueryContext(
request.getRequestor(), request.getQueryId(), DEFAULT_FETCH_SIZE, -1);
List<PartialPath> partialPaths = new ArrayList<>();
for (String path : request.getPaths()) {
partialPaths.add(new PartialPath(path));
}
List<TSDataType> dataTypes = new ArrayList<>(request.dataTypeOrdinals.size());
for (Integer dataTypeOrdinal : request.dataTypeOrdinals) {
dataTypes.add(TSDataType.values()[dataTypeOrdinal]);
}
ClusterQueryUtils.checkPathExistence(partialPaths);
IExpression expression = null;
if (request.isSetFilterBytes()) {
Filter filter = FilterFactory.deserialize(request.filterBytes);
expression = new GlobalTimeExpression(filter);
}
List<Pair<Boolean, TimeValuePair>> timeValuePairs =
LastQueryExecutor.calculateLastPairForSeriesLocally(
partialPaths, dataTypes, queryContext, expression, request.getDeviceMeasurements());
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
for (Pair<Boolean, TimeValuePair> timeValuePair : timeValuePairs) {
SerializeUtils.serializeTVPair(timeValuePair.right, dataOutputStream);
}
return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
}
}