blob: b092469f018916ca06ddad73fcf8dec8a2598b4c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.qp.executor;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
import org.apache.iotdb.db.auth.entity.PathPrivilege;
import org.apache.iotdb.db.auth.entity.Role;
import org.apache.iotdb.db.auth.entity.User;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.cq.ContinuousQueryService;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.merge.manage.MergeManager.TaskStatus;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.ContinuousQueryException;
import org.apache.iotdb.db.exception.QueryIdNotExsitException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.TriggerExecutionException;
import org.apache.iotdb.db.exception.TriggerManagementException;
import org.apache.iotdb.db.exception.UDFRegistrationException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePartitionPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.SetSchemaTemplatePlan;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.CountPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateFunctionPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
import org.apache.iotdb.db.qp.physical.sys.DropFunctionPlan;
import org.apache.iotdb.db.qp.physical.sys.DropTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
import org.apache.iotdb.db.qp.physical.sys.KillQueryPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
import org.apache.iotdb.db.qp.physical.sys.MergePlan;
import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowChildNodesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowFunctionsPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowLockInfoPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.TracingPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryTimeManager;
import org.apache.iotdb.db.query.control.QueryTimeManager.QueryInfo;
import org.apache.iotdb.db.query.control.TracingManager;
import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
import org.apache.iotdb.db.query.dataset.ListDataSet;
import org.apache.iotdb.db.query.dataset.ShowContinuousQueriesResult;
import org.apache.iotdb.db.query.dataset.ShowDevicesDataSet;
import org.apache.iotdb.db.query.dataset.ShowTimeseriesDataSet;
import org.apache.iotdb.db.query.dataset.SingleDataSet;
import org.apache.iotdb.db.query.executor.IQueryRouter;
import org.apache.iotdb.db.query.executor.QueryRouter;
import org.apache.iotdb.db.query.udf.service.UDFRegistrationInformation;
import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.tools.TsFileRewriteTool;
import org.apache.iotdb.db.utils.AuthUtils;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.dataset.EmptyDataSet;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_NODES;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_NAME;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_QUERY_SQL;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_TARGET_PATH;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CREATED_TIME;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DONE;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_CLASS;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_NAME;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_TYPE;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_LOCK_INFO;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PROGRESS;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_NAME;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDAF;
import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDTF;
import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDAF;
import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDTF;
import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_NATIVE;
import static org.apache.iotdb.db.conf.IoTDBConstant.QUERY_ID;
import static org.apache.iotdb.db.conf.IoTDBConstant.STATEMENT;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
public class PlanExecutor implements IPlanExecutor {
private static final Logger logger = LoggerFactory.getLogger(PlanExecutor.class);
private static final Logger AUDIT_LOGGER =
LoggerFactory.getLogger(IoTDBConstant.AUDIT_LOGGER_NAME);
// for data query
protected IQueryRouter queryRouter;
// for administration
private IAuthorizer authorizer;
private static final String INSERT_MEASUREMENTS_FAILED_MESSAGE = "failed to insert measurements ";
public PlanExecutor() throws QueryProcessException {
queryRouter = new QueryRouter();
try {
authorizer = BasicAuthorizer.getInstance();
} catch (AuthException e) {
throw new QueryProcessException(e.getMessage());
}
}
@Override
public QueryDataSet processQuery(PhysicalPlan queryPlan, QueryContext context)
throws IOException, StorageEngineException, QueryFilterOptimizationException,
QueryProcessException, MetadataException, InterruptedException {
if (queryPlan instanceof QueryPlan) {
return processDataQuery((QueryPlan) queryPlan, context);
} else if (queryPlan instanceof AuthorPlan) {
return processAuthorQuery((AuthorPlan) queryPlan);
} else if (queryPlan instanceof ShowPlan) {
return processShowQuery((ShowPlan) queryPlan, context);
} else {
throw new QueryProcessException(String.format("Unrecognized query plan %s", queryPlan));
}
}
@Override
public boolean processNonQuery(PhysicalPlan plan)
throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
switch (plan.getOperatorType()) {
case DELETE:
delete((DeletePlan) plan);
return true;
case INSERT:
insert((InsertRowPlan) plan);
return true;
case BATCH_INSERT_ONE_DEVICE:
insert((InsertRowsOfOneDevicePlan) plan);
return true;
case BATCH_INSERT_ROWS:
insert((InsertRowsPlan) plan);
return true;
case BATCH_INSERT:
insertTablet((InsertTabletPlan) plan);
return true;
case MULTI_BATCH_INSERT:
insertTablet((InsertMultiTabletPlan) plan);
return true;
case CREATE_ROLE:
case DELETE_ROLE:
case CREATE_USER:
case REVOKE_USER_ROLE:
case REVOKE_ROLE_PRIVILEGE:
case REVOKE_USER_PRIVILEGE:
case GRANT_ROLE_PRIVILEGE:
case GRANT_USER_PRIVILEGE:
case GRANT_USER_ROLE:
case MODIFY_PASSWORD:
case DELETE_USER:
AuthorPlan author = (AuthorPlan) plan;
return operateAuthor(author);
case GRANT_WATERMARK_EMBEDDING:
return operateWatermarkEmbedding(((DataAuthPlan) plan).getUsers(), true);
case REVOKE_WATERMARK_EMBEDDING:
return operateWatermarkEmbedding(((DataAuthPlan) plan).getUsers(), false);
case DELETE_TIMESERIES:
return deleteTimeSeries((DeleteTimeSeriesPlan) plan);
case CREATE_TIMESERIES:
return createTimeSeries((CreateTimeSeriesPlan) plan);
case CREATE_ALIGNED_TIMESERIES:
return createAlignedTimeSeries((CreateAlignedTimeSeriesPlan) plan);
case CREATE_MULTI_TIMESERIES:
return createMultiTimeSeries((CreateMultiTimeSeriesPlan) plan);
case ALTER_TIMESERIES:
return alterTimeSeries((AlterTimeSeriesPlan) plan);
case SET_STORAGE_GROUP:
return setStorageGroup((SetStorageGroupPlan) plan);
case DELETE_STORAGE_GROUP:
return deleteStorageGroups((DeleteStorageGroupPlan) plan);
case TTL:
operateTTL((SetTTLPlan) plan);
return true;
case LOAD_CONFIGURATION:
loadConfiguration((LoadConfigurationPlan) plan);
return true;
case LOAD_FILES:
operateLoadFiles((OperateFilePlan) plan);
return true;
case REMOVE_FILE:
operateRemoveFile((OperateFilePlan) plan);
return true;
case MOVE_FILE:
operateMoveFile((OperateFilePlan) plan);
return true;
case FLUSH:
operateFlush((FlushPlan) plan);
return true;
case MERGE:
case FULL_MERGE:
operateMerge((MergePlan) plan);
return true;
case TRACING:
operateTracing((TracingPlan) plan);
return true;
case CLEAR_CACHE:
operateClearCache();
return true;
case DELETE_PARTITION:
DeletePartitionPlan p = (DeletePartitionPlan) plan;
TimePartitionFilter filter =
(storageGroupName, partitionId) ->
storageGroupName.equals(
((DeletePartitionPlan) plan).getStorageGroupName().getFullPath())
&& p.getPartitionId().contains(partitionId);
StorageEngine.getInstance()
.removePartitions(((DeletePartitionPlan) plan).getStorageGroupName(), filter);
return true;
case CREATE_SCHEMA_SNAPSHOT:
operateCreateSnapshot();
return true;
case CREATE_FUNCTION:
return operateCreateFunction((CreateFunctionPlan) plan);
case DROP_FUNCTION:
return operateDropFunction((DropFunctionPlan) plan);
case CREATE_TRIGGER:
return operateCreateTrigger((CreateTriggerPlan) plan);
case DROP_TRIGGER:
return operateDropTrigger((DropTriggerPlan) plan);
case START_TRIGGER:
return operateStartTrigger((StartTriggerPlan) plan);
case STOP_TRIGGER:
return operateStopTrigger((StopTriggerPlan) plan);
case CREATE_INDEX:
throw new QueryProcessException("Create index hasn't been supported yet");
case DROP_INDEX:
throw new QueryProcessException("Drop index hasn't been supported yet");
case KILL:
try {
operateKillQuery((KillQueryPlan) plan);
} catch (QueryIdNotExsitException e) {
throw new QueryProcessException(e.getMessage());
}
return true;
case CREATE_TEMPLATE:
return createSchemaTemplate((CreateTemplatePlan) plan);
case SET_DEVICE_TEMPLATE:
return setSchemaTemplate((SetSchemaTemplatePlan) plan);
case CREATE_CONTINUOUS_QUERY:
return operateCreateContinuousQuery((CreateContinuousQueryPlan) plan);
case DROP_CONTINUOUS_QUERY:
return operateDropContinuousQuery((DropContinuousQueryPlan) plan);
default:
throw new UnsupportedOperationException(
String.format("operation %s is not supported", plan.getOperatorType()));
}
}
private boolean createSchemaTemplate(CreateTemplatePlan createTemplatePlan)
throws QueryProcessException {
try {
IoTDB.metaManager.createSchemaTemplate(createTemplatePlan);
} catch (MetadataException e) {
throw new QueryProcessException(e);
}
return true;
}
private boolean setSchemaTemplate(SetSchemaTemplatePlan setSchemaTemplatePlan)
throws QueryProcessException {
try {
IoTDB.metaManager.setSchemaTemplate(setSchemaTemplatePlan);
} catch (MetadataException e) {
throw new QueryProcessException(e);
}
return true;
}
private boolean operateCreateFunction(CreateFunctionPlan plan) throws UDFRegistrationException {
UDFRegistrationService.getInstance()
.register(plan.getUdfName(), plan.getClassName(), plan.isTemporary(), true);
return true;
}
private boolean operateDropFunction(DropFunctionPlan plan) throws UDFRegistrationException {
UDFRegistrationService.getInstance().deregister(plan.getUdfName());
return true;
}
private boolean operateCreateTrigger(CreateTriggerPlan plan)
throws TriggerManagementException, TriggerExecutionException {
TriggerRegistrationService.getInstance().register(plan);
return true;
}
private boolean operateDropTrigger(DropTriggerPlan plan) throws TriggerManagementException {
TriggerRegistrationService.getInstance().deregister(plan);
return true;
}
private boolean operateStartTrigger(StartTriggerPlan plan)
throws TriggerManagementException, TriggerExecutionException {
TriggerRegistrationService.getInstance().activate(plan);
return true;
}
private boolean operateStopTrigger(StopTriggerPlan plan) throws TriggerManagementException {
TriggerRegistrationService.getInstance().inactivate(plan);
return true;
}
private void operateMerge(MergePlan plan) throws StorageEngineException {
if (plan.getOperatorType() == OperatorType.FULL_MERGE) {
StorageEngine.getInstance().mergeAll(true);
} else {
StorageEngine.getInstance().mergeAll(false);
}
}
private void operateClearCache() {
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
}
private void operateCreateSnapshot() {
IoTDB.metaManager.createMTreeSnapshot();
}
private void operateKillQuery(KillQueryPlan killQueryPlan) throws QueryIdNotExsitException {
QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
long killQueryId = killQueryPlan.getQueryId();
if (killQueryId != -1) {
if (queryTimeManager.getQueryInfoMap().get(killQueryId) != null) {
queryTimeManager.killQuery(killQueryId);
} else {
throw new QueryIdNotExsitException(
String.format(
"Query Id %d is not exist, please check it.", killQueryPlan.getQueryId()));
}
} else {
// if queryId is not specified, kill all running queries
if (!queryTimeManager.getQueryInfoMap().isEmpty()) {
synchronized (queryTimeManager.getQueryInfoMap()) {
List<Long> queryIdList = new ArrayList<>(queryTimeManager.getQueryInfoMap().keySet());
for (Long queryId : queryIdList) {
queryTimeManager.killQuery(queryId);
}
}
}
}
}
/** when tracing off need Close the stream */
private void operateTracing(TracingPlan plan) {
IoTDBDescriptor.getInstance().getConfig().setEnablePerformanceTracing(plan.isTracingOn());
if (!plan.isTracingOn()) {
TracingManager.getInstance().close();
} else {
if (!TracingManager.getInstance().getWriterStatus()) {
TracingManager.getInstance().openTracingWriteStream();
}
}
}
private void operateFlush(FlushPlan plan) throws StorageGroupNotSetException {
if (plan.getPaths().isEmpty()) {
StorageEngine.getInstance().syncCloseAllProcessor();
} else {
flushSpecifiedStorageGroups(plan);
}
if (!plan.getPaths().isEmpty()) {
List<PartialPath> noExistSg = checkStorageGroupExist(plan.getPaths());
if (!noExistSg.isEmpty()) {
StringBuilder sb = new StringBuilder();
noExistSg.forEach(storageGroup -> sb.append(storageGroup.getFullPath()).append(","));
throw new StorageGroupNotSetException(sb.subSequence(0, sb.length() - 1).toString(), true);
}
}
}
private boolean operateCreateContinuousQuery(CreateContinuousQueryPlan plan)
throws ContinuousQueryException {
return ContinuousQueryService.getInstance().register(plan, true);
}
private boolean operateDropContinuousQuery(DropContinuousQueryPlan plan)
throws ContinuousQueryException {
return ContinuousQueryService.getInstance().deregister(plan);
}
public static void flushSpecifiedStorageGroups(FlushPlan plan)
throws StorageGroupNotSetException {
Map<PartialPath, List<Pair<Long, Boolean>>> storageGroupMap =
plan.getStorageGroupPartitionIds();
for (Entry<PartialPath, List<Pair<Long, Boolean>>> entry : storageGroupMap.entrySet()) {
PartialPath storageGroupName = entry.getKey();
// normal flush
if (entry.getValue() == null) {
if (plan.isSeq() == null) {
StorageEngine.getInstance()
.closeStorageGroupProcessor(storageGroupName, true, plan.isSync());
StorageEngine.getInstance()
.closeStorageGroupProcessor(storageGroupName, false, plan.isSync());
} else {
StorageEngine.getInstance()
.closeStorageGroupProcessor(storageGroupName, plan.isSeq(), plan.isSync());
}
}
// partition specified flush, for snapshot flush plan
else {
List<Pair<Long, Boolean>> partitionIdSequencePairs = entry.getValue();
for (Pair<Long, Boolean> pair : partitionIdSequencePairs) {
StorageEngine.getInstance()
.closeStorageGroupProcessor(storageGroupName, pair.left, pair.right, true);
}
}
}
}
protected QueryDataSet processDataQuery(QueryPlan queryPlan, QueryContext context)
throws StorageEngineException, QueryFilterOptimizationException, QueryProcessException,
IOException, InterruptedException {
QueryDataSet queryDataSet;
if (queryPlan instanceof AlignByDevicePlan) {
queryDataSet = getAlignByDeviceDataSet((AlignByDevicePlan) queryPlan, context, queryRouter);
} else {
if (queryPlan.getPaths() == null || queryPlan.getPaths().isEmpty()) {
// no time series are selected, return EmptyDataSet
return new EmptyDataSet();
} else if (queryPlan instanceof UDTFPlan) {
UDTFPlan udtfPlan = (UDTFPlan) queryPlan;
queryDataSet = queryRouter.udtfQuery(udtfPlan, context);
} else if (queryPlan instanceof GroupByTimeFillPlan) {
GroupByTimeFillPlan groupByFillPlan = (GroupByTimeFillPlan) queryPlan;
queryDataSet = queryRouter.groupByFill(groupByFillPlan, context);
} else if (queryPlan instanceof GroupByTimePlan) {
GroupByTimePlan groupByTimePlan = (GroupByTimePlan) queryPlan;
queryDataSet = queryRouter.groupBy(groupByTimePlan, context);
} else if (queryPlan instanceof QueryIndexPlan) {
throw new QueryProcessException("Query index hasn't been supported yet");
} else if (queryPlan instanceof AggregationPlan) {
AggregationPlan aggregationPlan = (AggregationPlan) queryPlan;
queryDataSet = queryRouter.aggregate(aggregationPlan, context);
} else if (queryPlan instanceof FillQueryPlan) {
FillQueryPlan fillQueryPlan = (FillQueryPlan) queryPlan;
queryDataSet = queryRouter.fill(fillQueryPlan, context);
} else if (queryPlan instanceof LastQueryPlan) {
queryDataSet = queryRouter.lastQuery((LastQueryPlan) queryPlan, context);
} else {
queryDataSet = queryRouter.rawDataQuery((RawDataQueryPlan) queryPlan, context);
}
}
queryDataSet.setRowLimit(queryPlan.getRowLimit());
queryDataSet.setRowOffset(queryPlan.getRowOffset());
queryDataSet.setWithoutAllNull(queryPlan.isWithoutAllNull());
queryDataSet.setWithoutAnyNull(queryPlan.isWithoutAnyNull());
return queryDataSet;
}
protected AlignByDeviceDataSet getAlignByDeviceDataSet(
AlignByDevicePlan plan, QueryContext context, IQueryRouter router) {
return new AlignByDeviceDataSet(plan, context, router);
}
protected QueryDataSet processShowQuery(ShowPlan showPlan, QueryContext context)
throws QueryProcessException, MetadataException {
switch (showPlan.getShowContentType()) {
case TTL:
return processShowTTLQuery((ShowTTLPlan) showPlan);
case FLUSH_TASK_INFO:
return processShowFlushTaskInfo();
case VERSION:
return processShowVersion();
case TIMESERIES:
return processShowTimeseries((ShowTimeSeriesPlan) showPlan, context);
case STORAGE_GROUP:
return processShowStorageGroup((ShowStorageGroupPlan) showPlan);
case LOCK_INFO:
return processShowLockInfo((ShowLockInfoPlan) showPlan);
case DEVICES:
return processShowDevices((ShowDevicesPlan) showPlan);
case CHILD_PATH:
return processShowChildPaths((ShowChildPathsPlan) showPlan);
case CHILD_NODE:
return processShowChildNodes((ShowChildNodesPlan) showPlan);
case COUNT_TIMESERIES:
return processCountTimeSeries((CountPlan) showPlan);
case COUNT_NODE_TIMESERIES:
return processCountNodeTimeSeries((CountPlan) showPlan);
case COUNT_DEVICES:
return processCountDevices((CountPlan) showPlan);
case COUNT_STORAGE_GROUP:
return processCountStorageGroup((CountPlan) showPlan);
case COUNT_NODES:
return processCountNodes((CountPlan) showPlan);
case MERGE_STATUS:
return processShowMergeStatus();
case QUERY_PROCESSLIST:
return processShowQueryProcesslist();
case FUNCTIONS:
return processShowFunctions((ShowFunctionsPlan) showPlan);
case TRIGGERS:
return processShowTriggers();
case CONTINUOUS_QUERY:
return processShowContinuousQueries();
default:
throw new QueryProcessException(String.format("Unrecognized show plan %s", showPlan));
}
}
private QueryDataSet processCountNodes(CountPlan countPlan) throws MetadataException {
int num = getNodesNumInGivenLevel(countPlan.getPath(), countPlan.getLevel());
return createSingleDataSet(COLUMN_COUNT, TSDataType.INT32, num);
}
private QueryDataSet processCountNodeTimeSeries(CountPlan countPlan) throws MetadataException {
// get the nodes that need to group by first
List<PartialPath> nodes = getNodesList(countPlan.getPath(), countPlan.getLevel());
ListDataSet listDataSet =
new ListDataSet(
Arrays.asList(
new PartialPath(COLUMN_COLUMN, false), new PartialPath(COLUMN_COUNT, false)),
Arrays.asList(TSDataType.TEXT, TSDataType.INT32));
for (PartialPath columnPath : nodes) {
RowRecord record = new RowRecord(0);
Field field = new Field(TSDataType.TEXT);
field.setBinaryV(new Binary(columnPath.getFullPath()));
Field field1 = new Field(TSDataType.INT32);
// get the count of every group
field1.setIntV(getPathsNum(columnPath));
record.addField(field);
record.addField(field1);
listDataSet.putRecord(record);
}
return listDataSet;
}
private QueryDataSet processCountDevices(CountPlan countPlan) throws MetadataException {
int num = getDevicesNum(countPlan.getPath());
return createSingleDataSet(COLUMN_DEVICES, TSDataType.INT32, num);
}
private QueryDataSet processCountStorageGroup(CountPlan countPlan) throws MetadataException {
int num = getStorageGroupNum(countPlan.getPath());
return createSingleDataSet(COLUMN_STORAGE_GROUP, TSDataType.INT32, num);
}
private QueryDataSet createSingleDataSet(String columnName, TSDataType columnType, Object val) {
SingleDataSet singleDataSet =
new SingleDataSet(
Collections.singletonList(new PartialPath(columnName, false)),
Collections.singletonList(columnType));
Field field = new Field(columnType);
switch (columnType) {
case TEXT:
field.setBinaryV(((Binary) val));
break;
case FLOAT:
field.setFloatV(((float) val));
break;
case INT32:
field.setIntV(((int) val));
break;
case INT64:
field.setLongV(((long) val));
break;
case DOUBLE:
field.setDoubleV(((double) val));
break;
case BOOLEAN:
field.setBoolV(((boolean) val));
break;
default:
throw new UnSupportedDataTypeException("Unsupported data type" + columnType);
}
RowRecord record = new RowRecord(0);
record.addField(field);
singleDataSet.setRecord(record);
return singleDataSet;
}
private int getDevicesNum(PartialPath path) throws MetadataException {
return IoTDB.metaManager.getDevicesNum(path);
}
private int getStorageGroupNum(PartialPath path) throws MetadataException {
return IoTDB.metaManager.getStorageGroupNum(path);
}
protected int getPathsNum(PartialPath path) throws MetadataException {
return IoTDB.metaManager.getAllTimeseriesCount(path);
}
protected int getNodesNumInGivenLevel(PartialPath path, int level) throws MetadataException {
return IoTDB.metaManager.getNodesCountInGivenLevel(path, level);
}
protected List<PartialPath> getPathsName(PartialPath path) throws MetadataException {
return IoTDB.metaManager.getAllTimeseriesPath(path);
}
protected List<PartialPath> getNodesList(PartialPath schemaPattern, int level)
throws MetadataException {
return IoTDB.metaManager.getNodesList(schemaPattern, level);
}
private QueryDataSet processCountTimeSeries(CountPlan countPlan) throws MetadataException {
int num = getPathsNum(countPlan.getPath());
return createSingleDataSet(COLUMN_COUNT, TSDataType.INT32, num);
}
private QueryDataSet processShowDevices(ShowDevicesPlan showDevicesPlan)
throws MetadataException {
return new ShowDevicesDataSet(showDevicesPlan);
}
protected Set<PartialPath> getDevices(PartialPath path) throws MetadataException {
return IoTDB.metaManager.getDevices(path);
}
private QueryDataSet processShowChildPaths(ShowChildPathsPlan showChildPathsPlan)
throws MetadataException {
Set<String> childPathsList = getPathNextChildren(showChildPathsPlan.getPath());
ListDataSet listDataSet =
new ListDataSet(
Collections.singletonList(new PartialPath(COLUMN_CHILD_PATHS, false)),
Collections.singletonList(TSDataType.TEXT));
for (String s : childPathsList) {
RowRecord record = new RowRecord(0);
Field field = new Field(TSDataType.TEXT);
field.setBinaryV(new Binary(s));
record.addField(field);
listDataSet.putRecord(record);
}
return listDataSet;
}
protected Set<String> getPathNextChildren(PartialPath path) throws MetadataException {
return IoTDB.metaManager.getChildNodePathInNextLevel(path);
}
private QueryDataSet processShowChildNodes(ShowChildNodesPlan showChildNodesPlan)
throws MetadataException {
// getNodeNextChildren
Set<String> childNodesList = getNodeNextChildren(showChildNodesPlan.getPath());
ListDataSet listDataSet =
new ListDataSet(
Collections.singletonList(new PartialPath(COLUMN_CHILD_NODES, false)),
Collections.singletonList(TSDataType.TEXT));
for (String s : childNodesList) {
RowRecord record = new RowRecord(0);
Field field = new Field(TSDataType.TEXT);
field.setBinaryV(new Binary(s));
record.addField(field);
listDataSet.putRecord(record);
}
return listDataSet;
}
protected Set<String> getNodeNextChildren(PartialPath path) throws MetadataException {
return IoTDB.metaManager.getChildNodeInNextLevel(path);
}
protected List<PartialPath> getStorageGroupNames(PartialPath path) throws MetadataException {
return IoTDB.metaManager.getStorageGroupPaths(path);
}
private QueryDataSet processShowStorageGroup(ShowStorageGroupPlan showStorageGroupPlan)
throws MetadataException {
ListDataSet listDataSet =
new ListDataSet(
Collections.singletonList(new PartialPath(COLUMN_STORAGE_GROUP, false)),
Collections.singletonList(TSDataType.TEXT));
List<PartialPath> storageGroupList = getStorageGroupNames(showStorageGroupPlan.getPath());
addToDataSet(storageGroupList, listDataSet);
return listDataSet;
}
private void addToDataSet(Collection<PartialPath> paths, ListDataSet dataSet) {
for (PartialPath s : paths) {
RowRecord record = new RowRecord(0);
Field field = new Field(TSDataType.TEXT);
field.setBinaryV(new Binary(s.getFullPath()));
record.addField(field);
dataSet.putRecord(record);
}
}
private QueryDataSet processShowLockInfo(ShowLockInfoPlan showLockInfoPlan)
throws MetadataException {
ListDataSet listDataSet =
new ListDataSet(
Arrays.asList(
new PartialPath(COLUMN_STORAGE_GROUP, false),
new PartialPath(COLUMN_LOCK_INFO, false)),
Arrays.asList(TSDataType.TEXT, TSDataType.TEXT));
try {
List<PartialPath> storageGroupList = getStorageGroupNames(showLockInfoPlan.getPath());
List<String> lockHolderList = StorageEngine.getInstance().getLockInfo(storageGroupList);
addLockInfoToDataSet(storageGroupList, lockHolderList, listDataSet);
} catch (StorageEngineException e) {
throw new MetadataException(e);
}
return listDataSet;
}
private void addLockInfoToDataSet(
List<PartialPath> paths, List<String> lockHolderList, ListDataSet dataSet) {
for (int i = 0; i < paths.size(); i++) {
RowRecord record = new RowRecord(0);
Field field = new Field(TSDataType.TEXT);
field.setBinaryV(new Binary(paths.get(i).getFullPath()));
record.addField(field);
field = new Field(TSDataType.TEXT);
field.setBinaryV(new Binary(lockHolderList.get(i)));
record.addField(field);
dataSet.putRecord(record);
}
}
private QueryDataSet processShowTimeseries(
ShowTimeSeriesPlan showTimeSeriesPlan, QueryContext context) throws MetadataException {
return new ShowTimeseriesDataSet(showTimeSeriesPlan, context);
}
protected List<IStorageGroupMNode> getAllStorageGroupNodes() {
return IoTDB.metaManager.getAllStorageGroupNodes();
}
private QueryDataSet processShowTTLQuery(ShowTTLPlan showTTLPlan) {
ListDataSet listDataSet =
new ListDataSet(
Arrays.asList(
new PartialPath(COLUMN_STORAGE_GROUP, false), new PartialPath(COLUMN_TTL, false)),
Arrays.asList(TSDataType.TEXT, TSDataType.INT64));
List<PartialPath> selectedSgs = showTTLPlan.getStorageGroups();
List<IStorageGroupMNode> storageGroups = getAllStorageGroupNodes();
int timestamp = 0;
for (IStorageGroupMNode mNode : storageGroups) {
PartialPath sgName = mNode.getPartialPath();
if (!selectedSgs.isEmpty() && !selectedSgs.contains(sgName)) {
continue;
}
RowRecord rowRecord = new RowRecord(timestamp++);
Field sg = new Field(TSDataType.TEXT);
Field ttl;
sg.setBinaryV(new Binary(sgName.getFullPath()));
if (mNode.getDataTTL() != Long.MAX_VALUE) {
ttl = new Field(TSDataType.INT64);
ttl.setLongV(mNode.getDataTTL());
} else {
ttl = null;
}
rowRecord.addField(sg);
rowRecord.addField(ttl);
listDataSet.putRecord(rowRecord);
}
return listDataSet;
}
private QueryDataSet processShowVersion() {
SingleDataSet singleDataSet =
new SingleDataSet(
Collections.singletonList(new PartialPath(IoTDBConstant.COLUMN_VERSION, false)),
Collections.singletonList(TSDataType.TEXT));
Field field = new Field(TSDataType.TEXT);
field.setBinaryV(new Binary(IoTDBConstant.VERSION));
RowRecord rowRecord = new RowRecord(0);
rowRecord.addField(field);
singleDataSet.setRecord(rowRecord);
return singleDataSet;
}
private QueryDataSet processShowFlushTaskInfo() {
ListDataSet listDataSet =
new ListDataSet(
Arrays.asList(
new PartialPath(COLUMN_ITEM, false), new PartialPath(COLUMN_VALUE, false)),
Arrays.asList(TSDataType.TEXT, TSDataType.TEXT));
int timestamp = 0;
addRowRecordForShowQuery(
listDataSet,
timestamp++,
"total number of flush tasks",
Integer.toString(FlushTaskPoolManager.getInstance().getTotalTasks()));
addRowRecordForShowQuery(
listDataSet,
timestamp++,
"number of working flush tasks",
Integer.toString(FlushTaskPoolManager.getInstance().getWorkingTasksNumber()));
addRowRecordForShowQuery(
listDataSet,
timestamp,
"number of waiting flush tasks",
Integer.toString(FlushTaskPoolManager.getInstance().getWaitingTasksNumber()));
return listDataSet;
}
private QueryDataSet processShowFunctions(ShowFunctionsPlan showPlan)
throws QueryProcessException {
ListDataSet listDataSet =
new ListDataSet(
Arrays.asList(
new PartialPath(COLUMN_FUNCTION_NAME, false),
new PartialPath(COLUMN_FUNCTION_TYPE, false),
new PartialPath(COLUMN_FUNCTION_CLASS, false)),
Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
appendUDFs(listDataSet, showPlan);
appendNativeFunctions(listDataSet, showPlan);
listDataSet.sort(
(r1, r2) ->
String.CASE_INSENSITIVE_ORDER.compare(
r1.getFields().get(0).getStringValue(), r2.getFields().get(0).getStringValue()));
return listDataSet;
}
@SuppressWarnings("squid:S3776")
private void appendUDFs(ListDataSet listDataSet, ShowFunctionsPlan showPlan)
throws QueryProcessException {
for (UDFRegistrationInformation info :
UDFRegistrationService.getInstance().getRegistrationInformation()) {
if (showPlan.showTemporary() && !info.isTemporary()) {
continue;
}
RowRecord rowRecord = new RowRecord(0); // ignore timestamp
rowRecord.addField(Binary.valueOf(info.getFunctionName()), TSDataType.TEXT);
String functionType = "";
try {
if (info.isBuiltin()) {
if (info.isUDTF()) {
functionType = FUNCTION_TYPE_BUILTIN_UDTF;
} else if (info.isUDAF()) {
functionType = FUNCTION_TYPE_BUILTIN_UDAF;
}
} else {
if (info.isUDTF()) {
functionType = FUNCTION_TYPE_EXTERNAL_UDTF;
} else if (info.isUDAF()) {
functionType = FUNCTION_TYPE_EXTERNAL_UDAF;
}
}
} catch (InstantiationException
| InvocationTargetException
| NoSuchMethodException
| IllegalAccessException e) {
throw new QueryProcessException(e.toString());
}
rowRecord.addField(Binary.valueOf(functionType), TSDataType.TEXT);
rowRecord.addField(Binary.valueOf(info.getClassName()), TSDataType.TEXT);
listDataSet.putRecord(rowRecord);
}
}
private QueryDataSet processShowContinuousQueries() {
ListDataSet listDataSet =
new ListDataSet(
Arrays.asList(
new PartialPath(COLUMN_CONTINUOUS_QUERY_NAME, false),
new PartialPath(COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL, false),
new PartialPath(COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL, false),
new PartialPath(COLUMN_CONTINUOUS_QUERY_QUERY_SQL, false),
new PartialPath(COLUMN_CONTINUOUS_QUERY_TARGET_PATH, false)),
Arrays.asList(
TSDataType.TEXT,
TSDataType.INT64,
TSDataType.INT64,
TSDataType.TEXT,
TSDataType.TEXT));
List<ShowContinuousQueriesResult> continuousQueriesList =
ContinuousQueryService.getInstance().getShowContinuousQueriesResultList();
for (ShowContinuousQueriesResult result : continuousQueriesList) {
RowRecord record = new RowRecord(0);
record.addField(Binary.valueOf(result.getContinuousQueryName()), TSDataType.TEXT);
record.addField(result.getEveryInterval(), TSDataType.INT64);
record.addField(result.getForInterval(), TSDataType.INT64);
record.addField(Binary.valueOf(result.getQuerySql()), TSDataType.TEXT);
record.addField(Binary.valueOf(result.getTargetPath().getFullPath()), TSDataType.TEXT);
listDataSet.putRecord(record);
}
return listDataSet;
}
private void appendNativeFunctions(ListDataSet listDataSet, ShowFunctionsPlan showPlan) {
if (showPlan.showTemporary()) {
return;
}
final Binary functionType = Binary.valueOf(FUNCTION_TYPE_NATIVE);
final Binary className = Binary.valueOf("");
for (String functionName : SQLConstant.getNativeFunctionNames()) {
RowRecord rowRecord = new RowRecord(0); // ignore timestamp
rowRecord.addField(Binary.valueOf(functionName.toUpperCase()), TSDataType.TEXT);
rowRecord.addField(functionType, TSDataType.TEXT);
rowRecord.addField(className, TSDataType.TEXT);
listDataSet.putRecord(rowRecord);
}
}
private QueryDataSet processShowTriggers() {
return TriggerRegistrationService.getInstance().show();
}
private void addRowRecordForShowQuery(
ListDataSet listDataSet, int timestamp, String item, String value) {
RowRecord rowRecord = new RowRecord(timestamp);
Field itemField = new Field(TSDataType.TEXT);
itemField.setBinaryV(new Binary(item));
Field valueField = new Field(TSDataType.TEXT);
valueField.setBinaryV(new Binary(value));
rowRecord.addField(itemField);
rowRecord.addField(valueField);
listDataSet.putRecord(rowRecord);
}
@Override
public void delete(DeletePlan deletePlan) throws QueryProcessException {
AUDIT_LOGGER.info(
"delete data from {} in [{},{}]",
deletePlan.getPaths(),
deletePlan.getDeleteStartTime(),
deletePlan.getDeleteEndTime());
for (PartialPath path : deletePlan.getPaths()) {
delete(
path,
deletePlan.getDeleteStartTime(),
deletePlan.getDeleteEndTime(),
deletePlan.getIndex());
}
}
private void operateLoadFiles(OperateFilePlan plan) throws QueryProcessException {
File file = plan.getFile();
if (!file.exists()) {
throw new QueryProcessException(
String.format("File path %s doesn't exists.", file.getPath()));
}
if (file.isDirectory()) {
recursionFileDir(file, plan);
} else {
loadFile(file, plan);
}
}
private void recursionFileDir(File curFile, OperateFilePlan plan) throws QueryProcessException {
File[] files = curFile.listFiles();
for (File file : files) {
if (file.isDirectory()) {
recursionFileDir(file, plan);
} else {
loadFile(file, plan);
}
}
}
private void loadFile(File file, OperateFilePlan plan) throws QueryProcessException {
if (!file.getName().endsWith(TSFILE_SUFFIX)) {
return;
}
TsFileResource tsFileResource = new TsFileResource(file);
tsFileResource.setClosed(true);
try {
// check file
RestorableTsFileIOWriter restorableTsFileIOWriter = new RestorableTsFileIOWriter(file);
if (restorableTsFileIOWriter.hasCrashed()) {
restorableTsFileIOWriter.close();
throw new QueryProcessException(
String.format(
"Cannot load file %s because the file has crashed.", file.getAbsolutePath()));
}
Map<Path, IMeasurementSchema> schemaMap = new HashMap<>();
List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>();
try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
reader.selfCheck(schemaMap, chunkGroupMetadataList, false);
if (plan.getVerifyMetadata()) {
loadNewTsFileVerifyMetadata(reader);
}
} catch (IOException e) {
logger.warn("can not get timeseries metadata from {}.", file.getAbsoluteFile());
throw new QueryProcessException(e.getMessage());
}
FileLoaderUtils.checkTsFileResource(tsFileResource);
if (UpgradeUtils.isNeedUpgrade(tsFileResource)) {
throw new QueryProcessException(
String.format(
"Cannot load file %s because the file's version is old which needs to be upgraded.",
file.getAbsolutePath()));
}
// create schemas if they doesn't exist
if (plan.isAutoCreateSchema()) {
createSchemaAutomatically(chunkGroupMetadataList, schemaMap, plan.getSgLevel());
}
List<TsFileResource> splitResources = new ArrayList();
if (tsFileResource.isSpanMultiTimePartitions()) {
logger.info(
"try to split the tsFile={} du to it spans multi partitions",
tsFileResource.getTsFile().getPath());
TsFileRewriteTool.rewriteTsFile(tsFileResource, splitResources);
logger.info(
"after split, the old tsFile was split to {} new tsFiles", splitResources.size());
}
if (splitResources.isEmpty()) {
splitResources.add(tsFileResource);
}
for (TsFileResource resource : splitResources) {
StorageEngine.getInstance().loadNewTsFile(resource);
}
} catch (Exception e) {
throw new QueryProcessException(
String.format("Cannot load file %s because %s", file.getAbsolutePath(), e.getMessage()));
}
}
private void loadNewTsFileVerifyMetadata(TsFileSequenceReader tsFileSequenceReader)
throws MetadataException, QueryProcessException, IOException {
Map<String, List<TimeseriesMetadata>> metadataSet =
tsFileSequenceReader.getAllTimeseriesMetadata();
for (Map.Entry<String, List<TimeseriesMetadata>> entry : metadataSet.entrySet()) {
String deviceId = entry.getKey();
PartialPath devicePath = new PartialPath(deviceId);
if (!IoTDB.metaManager.isPathExist(devicePath)) {
continue;
}
for (TimeseriesMetadata metadata : entry.getValue()) {
PartialPath fullPath =
new PartialPath(deviceId + TsFileConstant.PATH_SEPARATOR + metadata.getMeasurementId());
if (IoTDB.metaManager.isPathExist(fullPath)) {
TSDataType dataType = IoTDB.metaManager.getSeriesSchema(fullPath).getType();
if (dataType != metadata.getTSDataType()) {
throw new QueryProcessException(
fullPath.getFullPath()
+ " is "
+ metadata.getTSDataType().name()
+ " in the loading TsFile but is "
+ dataType.name()
+ " in IoTDB.");
}
}
}
}
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private void createSchemaAutomatically(
List<ChunkGroupMetadata> chunkGroupMetadataList,
Map<Path, IMeasurementSchema> knownSchemas,
int sgLevel)
throws QueryProcessException, MetadataException, IOException {
if (chunkGroupMetadataList.isEmpty()) {
return;
}
Set<PartialPath> registeredSeries = new HashSet<>();
for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
String device = chunkGroupMetadata.getDevice();
IMNode node =
IoTDB.metaManager.getDeviceNodeWithAutoCreate(
new PartialPath(device), true, true, sgLevel)
.left;
for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
PartialPath series =
new PartialPath(
chunkGroupMetadata.getDevice()
+ TsFileConstant.PATH_SEPARATOR
+ chunkMetadata.getMeasurementUid());
if (!registeredSeries.contains(series)) {
registeredSeries.add(series);
IMeasurementSchema schema =
knownSchemas.get(new Path(series.getDevice(), series.getMeasurement()));
if (schema == null) {
throw new MetadataException(
String.format(
"Can not get the schema of measurement [%s]",
chunkMetadata.getMeasurementUid()));
}
if (!node.hasChild(chunkMetadata.getMeasurementUid())) {
IoTDB.metaManager.createTimeseries(
series,
schema.getType(),
schema.getEncodingType(),
schema.getCompressor(),
Collections.emptyMap());
} else if (!(node.getChild(chunkMetadata.getMeasurementUid())
instanceof MeasurementMNode)) {
throw new QueryProcessException(
String.format("Current Path is not leaf node. %s", series));
}
}
}
}
}
private void operateRemoveFile(OperateFilePlan plan) throws QueryProcessException {
try {
if (!StorageEngine.getInstance().deleteTsfile(plan.getFile())) {
throw new QueryProcessException(
String.format("File %s doesn't exist.", plan.getFile().getName()));
}
} catch (StorageEngineException | IllegalPathException e) {
throw new QueryProcessException(
String.format("Cannot remove file because %s", e.getMessage()));
}
}
private void operateMoveFile(OperateFilePlan plan) throws QueryProcessException {
if (!plan.getTargetDir().exists() || !plan.getTargetDir().isDirectory()) {
throw new QueryProcessException(
String.format("Target dir %s is invalid.", plan.getTargetDir().getPath()));
}
try {
if (!StorageEngine.getInstance().moveTsfile(plan.getFile(), plan.getTargetDir())) {
throw new QueryProcessException(
String.format("File %s doesn't exist.", plan.getFile().getName()));
}
} catch (StorageEngineException | IllegalPathException e) {
throw new QueryProcessException(
String.format(
"Cannot move file %s to target directory %s because %s",
plan.getFile().getPath(), plan.getTargetDir().getPath(), e.getMessage()));
}
}
private void operateTTL(SetTTLPlan plan) throws QueryProcessException {
try {
List<PartialPath> storageGroupPaths =
IoTDB.metaManager.getStorageGroupPaths(plan.getStorageGroup());
for (PartialPath storagePath : storageGroupPaths) {
IoTDB.metaManager.setTTL(storagePath, plan.getDataTTL());
StorageEngine.getInstance().setTTL(storagePath, plan.getDataTTL());
}
} catch (MetadataException e) {
throw new QueryProcessException(e);
} catch (IOException e) {
throw new QueryProcessException(e.getMessage());
}
}
@Override
public void update(PartialPath path, long startTime, long endTime, String value) {
throw new UnsupportedOperationException("update is not supported now");
}
@Override
public void delete(PartialPath path, long startTime, long endTime, long planIndex)
throws QueryProcessException {
try {
StorageEngine.getInstance().delete(path, startTime, endTime, planIndex);
} catch (StorageEngineException e) {
throw new QueryProcessException(e);
}
}
protected IMNode getSeriesSchemas(InsertPlan insertPlan) throws MetadataException {
try {
return IoTDB.metaManager.getSeriesSchemasAndReadLockDevice(insertPlan);
} catch (IOException e) {
throw new MetadataException(e);
}
}
private void checkFailedMeasurments(InsertPlan plan)
throws PathNotExistException, StorageEngineException {
// check if all path not exist exceptions
List<String> failedPaths = plan.getFailedMeasurements();
List<Exception> exceptions = plan.getFailedExceptions();
boolean isPathNotExistException = true;
for (Exception e : exceptions) {
Throwable curException = e;
while (curException.getCause() != null) {
curException = curException.getCause();
}
if (!(curException instanceof PathNotExistException)) {
isPathNotExistException = false;
break;
}
}
if (isPathNotExistException) {
throw new PathNotExistException(failedPaths);
} else {
throw new StorageEngineException(
INSERT_MEASUREMENTS_FAILED_MESSAGE
+ plan.getFailedMeasurements()
+ (!exceptions.isEmpty() ? (" caused by " + exceptions.get(0).getMessage()) : ""));
}
}
@Override
public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
throws QueryProcessException {
if (insertRowsOfOneDevicePlan.getRowPlans().length == 0) {
return;
}
try {
for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
plan.setMeasurementMNodes(new IMeasurementMNode[plan.getMeasurements().length]);
// check whether types are match
getSeriesSchemas(plan);
// we do not need to infer data type for insertRowsOfOneDevicePlan
if (plan.isAligned()) {
plan.setPrefixPath(plan.getPrefixPath().getDevicePath());
}
}
// ok, we can begin to write data into the engine..
StorageEngine.getInstance().insert(insertRowsOfOneDevicePlan);
List<String> notExistedPaths = null;
List<String> failedMeasurements = null;
// If there are some exceptions, we assume they caused by the same reason.
Exception exception = null;
for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
if (plan.getFailedMeasurements() != null) {
if (notExistedPaths == null) {
notExistedPaths = new ArrayList<>();
failedMeasurements = new ArrayList<>();
}
// check if all path not exist exceptions
List<String> failedPaths = plan.getFailedMeasurements();
List<Exception> exceptions = plan.getFailedExceptions();
boolean isPathNotExistException = true;
for (Exception e : exceptions) {
exception = e;
Throwable curException = e;
while (curException.getCause() != null) {
curException = curException.getCause();
}
if (!(curException instanceof PathNotExistException)) {
isPathNotExistException = false;
break;
}
}
if (isPathNotExistException) {
notExistedPaths.addAll(failedPaths);
} else {
failedMeasurements.addAll(plan.getFailedMeasurements());
}
}
}
if (notExistedPaths != null && !notExistedPaths.isEmpty()) {
throw new PathNotExistException(notExistedPaths);
} else if (notExistedPaths != null && !failedMeasurements.isEmpty()) {
throw new StorageEngineException(
"failed to insert points "
+ failedMeasurements
+ (exception != null ? (" caused by " + exception.getMessage()) : ""));
}
} catch (StorageEngineException | MetadataException e) {
throw new QueryProcessException(e);
}
}
@Override
public void insert(InsertRowsPlan plan) throws QueryProcessException {
for (int i = 0; i < plan.getInsertRowPlanList().size(); i++) {
if (plan.getResults().containsKey(i) || plan.isExecuted(i)) {
continue;
}
try {
insert(plan.getInsertRowPlanList().get(i));
} catch (QueryProcessException e) {
plan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
}
}
if (!plan.getResults().isEmpty()) {
throw new BatchProcessException(plan.getFailingStatus());
}
}
@Override
public void insert(InsertRowPlan insertRowPlan) throws QueryProcessException {
try {
insertRowPlan.setMeasurementMNodes(
new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
// When insert data with sql statement, the data types will be null here.
// We need to predicted the data types first
if (insertRowPlan.getDataTypes()[0] == null) {
for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
insertRowPlan.getDataTypes()[i] =
TypeInferenceUtils.getPredictedDataType(
insertRowPlan.getValues()[i], insertRowPlan.isNeedInferType());
}
}
// check whether types are match
getSeriesSchemas(insertRowPlan);
if (insertRowPlan.isAligned()) {
insertRowPlan.setPrefixPathForAlignTimeSeries(
insertRowPlan.getPrefixPath().getDevicePath());
}
insertRowPlan.transferType();
StorageEngine.getInstance().insert(insertRowPlan);
if (insertRowPlan.getFailedMeasurements() != null) {
checkFailedMeasurments(insertRowPlan);
}
} catch (StorageEngineException | MetadataException e) {
if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
StatMonitor.getInstance().updateFailedStatValue();
}
throw new QueryProcessException(e);
} catch (Exception e) {
// update failed statistics
if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
StatMonitor.getInstance().updateFailedStatValue();
}
throw e;
}
}
@Override
public void insertTablet(InsertMultiTabletPlan insertMultiTabletPlan)
throws QueryProcessException {
for (int i = 0; i < insertMultiTabletPlan.getInsertTabletPlanList().size(); i++) {
if (insertMultiTabletPlan.getResults().containsKey(i)
|| insertMultiTabletPlan.isExecuted(i)) {
continue;
}
insertTablet(insertMultiTabletPlan.getInsertTabletPlanList().get(i));
}
}
@Override
public void insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException {
if (insertTabletPlan.getRowCount() == 0) {
return;
}
try {
insertTabletPlan.setMeasurementMNodes(
new IMeasurementMNode[insertTabletPlan.getMeasurements().length]);
getSeriesSchemas(insertTabletPlan);
if (insertTabletPlan.isAligned()) {
insertTabletPlan.setPrefixPath(insertTabletPlan.getPrefixPath().getDevicePath());
}
StorageEngine.getInstance().insertTablet(insertTabletPlan);
if (insertTabletPlan.getFailedMeasurements() != null) {
checkFailedMeasurments(insertTabletPlan);
}
} catch (StorageEngineException | MetadataException e) {
if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
StatMonitor.getInstance().updateFailedStatValue();
}
throw new QueryProcessException(e);
} catch (Exception e) {
// update failed statistics
if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
StatMonitor.getInstance().updateFailedStatValue();
}
throw e;
}
}
private boolean operateAuthor(AuthorPlan author) throws QueryProcessException {
AuthorOperator.AuthorType authorType = author.getAuthorType();
String userName = author.getUserName();
String roleName = author.getRoleName();
String password = author.getPassword();
String newPassword = author.getNewPassword();
Set<Integer> permissions = author.getPermissions();
PartialPath nodeName = author.getNodeName();
try {
switch (authorType) {
case UPDATE_USER:
authorizer.updateUserPassword(userName, newPassword);
break;
case CREATE_USER:
authorizer.createUser(userName, password);
break;
case CREATE_ROLE:
authorizer.createRole(roleName);
break;
case DROP_USER:
authorizer.deleteUser(userName);
break;
case DROP_ROLE:
authorizer.deleteRole(roleName);
break;
case GRANT_ROLE:
for (int i : permissions) {
authorizer.grantPrivilegeToRole(roleName, nodeName.getFullPath(), i);
}
break;
case GRANT_USER:
for (int i : permissions) {
authorizer.grantPrivilegeToUser(userName, nodeName.getFullPath(), i);
}
break;
case GRANT_ROLE_TO_USER:
authorizer.grantRoleToUser(roleName, userName);
break;
case REVOKE_USER:
for (int i : permissions) {
authorizer.revokePrivilegeFromUser(userName, nodeName.getFullPath(), i);
}
break;
case REVOKE_ROLE:
for (int i : permissions) {
authorizer.revokePrivilegeFromRole(roleName, nodeName.getFullPath(), i);
}
break;
case REVOKE_ROLE_FROM_USER:
authorizer.revokeRoleFromUser(roleName, userName);
break;
default:
throw new QueryProcessException("Unsupported operation " + authorType);
}
} catch (AuthException e) {
throw new QueryProcessException(e.getMessage(), true);
}
return true;
}
private boolean operateWatermarkEmbedding(List<String> users, boolean useWatermark)
throws QueryProcessException {
try {
for (String user : users) {
authorizer.setUserUseWaterMark(user, useWatermark);
}
} catch (AuthException e) {
throw new QueryProcessException(e.getMessage());
}
return true;
}
private boolean createTimeSeries(CreateTimeSeriesPlan createTimeSeriesPlan)
throws QueryProcessException {
try {
IoTDB.metaManager.createTimeseries(createTimeSeriesPlan);
} catch (MetadataException e) {
throw new QueryProcessException(e);
}
return true;
}
private boolean createAlignedTimeSeries(CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan)
throws QueryProcessException {
try {
IoTDB.metaManager.createAlignedTimeSeries(createAlignedTimeSeriesPlan);
} catch (MetadataException e) {
throw new QueryProcessException(e);
}
return true;
}
@SuppressWarnings("squid:S3776") // high Cognitive Complexity
private boolean createMultiTimeSeries(CreateMultiTimeSeriesPlan multiPlan)
throws BatchProcessException {
int dataTypeIdx = 0;
for (int i = 0; i < multiPlan.getPaths().size(); i++) {
if (multiPlan.getResults().containsKey(i) || multiPlan.isExecuted(i)) {
continue;
}
PartialPath path = multiPlan.getPaths().get(i);
String measurement = path.getMeasurement();
CreateTimeSeriesPlan plan =
new CreateTimeSeriesPlan(
multiPlan.getPaths().get(i),
multiPlan.getDataTypes().get(i),
multiPlan.getEncodings().get(i),
multiPlan.getCompressors().get(i),
multiPlan.getProps() == null ? null : multiPlan.getProps().get(i),
multiPlan.getTags() == null ? null : multiPlan.getTags().get(i),
multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i),
multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i));
dataTypeIdx++;
try {
createTimeSeries(plan);
} catch (QueryProcessException e) {
multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
}
}
if (!multiPlan.getResults().isEmpty()) {
throw new BatchProcessException(multiPlan.getFailingStatus());
}
return true;
}
protected boolean deleteTimeSeries(DeleteTimeSeriesPlan deleteTimeSeriesPlan)
throws QueryProcessException {
AUDIT_LOGGER.info("delete timeseries {}", deleteTimeSeriesPlan.getPaths());
List<PartialPath> deletePathList = deleteTimeSeriesPlan.getPaths();
for (int i = 0; i < deletePathList.size(); i++) {
PartialPath path = deletePathList.get(i);
try {
StorageEngine.getInstance().deleteTimeseries(path, deleteTimeSeriesPlan.getIndex());
String failed = IoTDB.metaManager.deleteTimeseries(path);
if (failed != null) {
deleteTimeSeriesPlan
.getResults()
.put(i, RpcUtils.getStatus(TSStatusCode.NODE_DELETE_FAILED_ERROR, failed));
}
} catch (StorageEngineException | MetadataException e) {
deleteTimeSeriesPlan
.getResults()
.put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
}
}
if (!deleteTimeSeriesPlan.getResults().isEmpty()) {
throw new BatchProcessException(deleteTimeSeriesPlan.getFailingStatus());
}
return true;
}
private boolean alterTimeSeries(AlterTimeSeriesPlan alterTimeSeriesPlan)
throws QueryProcessException {
PartialPath path = alterTimeSeriesPlan.getPath();
Map<String, String> alterMap = alterTimeSeriesPlan.getAlterMap();
try {
switch (alterTimeSeriesPlan.getAlterType()) {
case RENAME:
String beforeName = alterMap.keySet().iterator().next();
String currentName = alterMap.get(beforeName);
IoTDB.metaManager.renameTagOrAttributeKey(beforeName, currentName, path);
break;
case SET:
IoTDB.metaManager.setTagsOrAttributesValue(alterMap, path);
break;
case DROP:
IoTDB.metaManager.dropTagsOrAttributes(alterMap.keySet(), path);
break;
case ADD_TAGS:
IoTDB.metaManager.addTags(alterMap, path);
break;
case ADD_ATTRIBUTES:
IoTDB.metaManager.addAttributes(alterMap, path);
break;
case UPSERT:
IoTDB.metaManager.upsertTagsAndAttributes(
alterTimeSeriesPlan.getAlias(),
alterTimeSeriesPlan.getTagsMap(),
alterTimeSeriesPlan.getAttributesMap(),
path);
break;
}
} catch (MetadataException e) {
throw new QueryProcessException(e);
} catch (IOException e) {
throw new QueryProcessException(
String.format(
"Something went wrong while read/write the [%s]'s tag/attribute info.",
path.getFullPath()));
}
return true;
}
public boolean setStorageGroup(SetStorageGroupPlan setStorageGroupPlan)
throws QueryProcessException {
AUDIT_LOGGER.info("set storage group to {}", setStorageGroupPlan.getPaths());
PartialPath path = setStorageGroupPlan.getPath();
try {
IoTDB.metaManager.setStorageGroup(path);
} catch (MetadataException e) {
throw new QueryProcessException(e);
}
return true;
}
protected boolean deleteStorageGroups(DeleteStorageGroupPlan deleteStorageGroupPlan)
throws QueryProcessException {
AUDIT_LOGGER.info("delete storage group {}", deleteStorageGroupPlan.getPaths());
List<PartialPath> deletePathList = new ArrayList<>();
try {
for (PartialPath storageGroupPath : deleteStorageGroupPlan.getPaths()) {
List<PartialPath> allRelatedStorageGroupPath =
IoTDB.metaManager.getStorageGroupPaths(storageGroupPath);
if (allRelatedStorageGroupPath.isEmpty()) {
throw new PathNotExistException(storageGroupPath.getFullPath(), true);
}
for (PartialPath path : allRelatedStorageGroupPath) {
StorageEngine.getInstance().deleteStorageGroup(path);
deletePathList.add(path);
}
}
IoTDB.metaManager.deleteStorageGroups(deletePathList);
} catch (MetadataException e) {
throw new QueryProcessException(e);
}
return true;
}
protected QueryDataSet processAuthorQuery(AuthorPlan plan) throws QueryProcessException {
AuthorType authorType = plan.getAuthorType();
String userName = plan.getUserName();
String roleName = plan.getRoleName();
PartialPath path = plan.getNodeName();
ListDataSet dataSet;
try {
switch (authorType) {
case LIST_ROLE:
dataSet = executeListRole(plan);
break;
case LIST_USER:
dataSet = executeListUser(plan);
break;
case LIST_ROLE_USERS:
dataSet = executeListRoleUsers(roleName);
break;
case LIST_USER_ROLES:
dataSet = executeListUserRoles(userName);
break;
case LIST_ROLE_PRIVILEGE:
dataSet = executeListRolePrivileges(roleName, path);
break;
case LIST_USER_PRIVILEGE:
dataSet = executeListUserPrivileges(userName, path);
break;
default:
throw new QueryProcessException("Unsupported operation " + authorType);
}
} catch (AuthException e) {
throw new QueryProcessException(e.getMessage());
}
return dataSet;
}
private ListDataSet executeListRole(AuthorPlan plan) throws AuthException {
ListDataSet dataSet =
new ListDataSet(
Collections.singletonList(new PartialPath(COLUMN_ROLE, false)),
Collections.singletonList(TSDataType.TEXT));
// check if current user is granted list_role privilege
boolean hasListRolePrivilege =
AuthorityChecker.check(
plan.getLoginUserName(),
Collections.emptyList(),
plan.getOperatorType(),
plan.getLoginUserName());
if (!hasListRolePrivilege) {
return dataSet;
}
List<String> roleList = authorizer.listAllRoles();
addToDataSet(roleList, dataSet);
return dataSet;
}
private void addToDataSet(List<String> strResults, ListDataSet dataSet) {
int index = 0;
for (String role : strResults) {
RowRecord record = new RowRecord(index++);
Field field = new Field(TSDataType.TEXT);
field.setBinaryV(new Binary(role));
record.addField(field);
dataSet.putRecord(record);
}
}
private ListDataSet executeListUser(AuthorPlan plan) throws AuthException {
ListDataSet dataSet =
new ListDataSet(
Collections.singletonList(new PartialPath(COLUMN_USER, false)),
Collections.singletonList(TSDataType.TEXT));
// check if current user is granted list_user privilege
boolean hasListUserPrivilege =
AuthorityChecker.check(
plan.getLoginUserName(),
Collections.singletonList((plan.getNodeName())),
plan.getOperatorType(),
plan.getLoginUserName());
if (!hasListUserPrivilege) {
return dataSet;
}
List<String> userList = authorizer.listAllUsers();
addToDataSet(userList, dataSet);
return dataSet;
}
private ListDataSet executeListRoleUsers(String roleName) throws AuthException {
Role role = authorizer.getRole(roleName);
if (role == null) {
throw new AuthException("No such role : " + roleName);
}
ListDataSet dataSet =
new ListDataSet(
Collections.singletonList(new PartialPath(COLUMN_USER, false)),
Collections.singletonList(TSDataType.TEXT));
List<String> userList = authorizer.listAllUsers();
int index = 0;
for (String userN : userList) {
User userObj = authorizer.getUser(userN);
if (userObj != null && userObj.hasRole(roleName)) {
RowRecord record = new RowRecord(index++);
Field field = new Field(TSDataType.TEXT);
field.setBinaryV(new Binary(userN));
record.addField(field);
dataSet.putRecord(record);
}
}
return dataSet;
}
private ListDataSet executeListUserRoles(String userName) throws AuthException {
User user = authorizer.getUser(userName);
if (user != null) {
ListDataSet dataSet =
new ListDataSet(
Collections.singletonList(new PartialPath(COLUMN_ROLE, false)),
Collections.singletonList(TSDataType.TEXT));
int index = 0;
for (String roleN : user.getRoleList()) {
RowRecord record = new RowRecord(index++);
Field field = new Field(TSDataType.TEXT);
field.setBinaryV(new Binary(roleN));
record.addField(field);
dataSet.putRecord(record);
}
return dataSet;
} else {
throw new AuthException("No such user : " + userName);
}
}
private ListDataSet executeListRolePrivileges(String roleName, PartialPath path)
throws AuthException {
Role role = authorizer.getRole(roleName);
if (role != null) {
List<PartialPath> headerList = new ArrayList<>();
List<TSDataType> typeList = new ArrayList<>();
headerList.add(new PartialPath(COLUMN_PRIVILEGE, false));
typeList.add(TSDataType.TEXT);
ListDataSet dataSet = new ListDataSet(headerList, typeList);
int index = 0;
for (PathPrivilege pathPrivilege : role.getPrivilegeList()) {
if (path == null || AuthUtils.pathBelongsTo(path.getFullPath(), pathPrivilege.getPath())) {
RowRecord record = new RowRecord(index++);
Field field = new Field(TSDataType.TEXT);
field.setBinaryV(new Binary(pathPrivilege.toString()));
record.addField(field);
dataSet.putRecord(record);
}
}
return dataSet;
} else {
throw new AuthException("No such role : " + roleName);
}
}
private ListDataSet executeListUserPrivileges(String userName, PartialPath path)
throws AuthException {
User user = authorizer.getUser(userName);
if (user == null) {
throw new AuthException("No such user : " + userName);
}
List<PartialPath> headerList = new ArrayList<>();
List<TSDataType> typeList = new ArrayList<>();
headerList.add(new PartialPath(COLUMN_ROLE, false));
headerList.add(new PartialPath(COLUMN_PRIVILEGE, false));
typeList.add(TSDataType.TEXT);
typeList.add(TSDataType.TEXT);
ListDataSet dataSet = new ListDataSet(headerList, typeList);
int index = 0;
for (PathPrivilege pathPrivilege : user.getPrivilegeList()) {
if (path == null || AuthUtils.pathBelongsTo(path.getFullPath(), pathPrivilege.getPath())) {
RowRecord record = new RowRecord(index++);
Field roleF = new Field(TSDataType.TEXT);
roleF.setBinaryV(new Binary(""));
record.addField(roleF);
Field privilegeF = new Field(TSDataType.TEXT);
privilegeF.setBinaryV(new Binary(pathPrivilege.toString()));
record.addField(privilegeF);
dataSet.putRecord(record);
}
}
for (String roleN : user.getRoleList()) {
Role role = authorizer.getRole(roleN);
if (role == null) {
continue;
}
for (PathPrivilege pathPrivilege : role.getPrivilegeList()) {
if (path == null || AuthUtils.pathBelongsTo(path.getFullPath(), pathPrivilege.getPath())) {
RowRecord record = new RowRecord(index++);
Field roleF = new Field(TSDataType.TEXT);
roleF.setBinaryV(new Binary(roleN));
record.addField(roleF);
Field privilegeF = new Field(TSDataType.TEXT);
privilegeF.setBinaryV(new Binary(pathPrivilege.toString()));
record.addField(privilegeF);
dataSet.putRecord(record);
}
}
}
return dataSet;
}
protected String deleteTimeSeries(PartialPath path) throws MetadataException {
return IoTDB.metaManager.deleteTimeseries(path);
}
@SuppressWarnings("unused") // for the distributed version
protected void loadConfiguration(LoadConfigurationPlan plan) throws QueryProcessException {
IoTDBDescriptor.getInstance().loadHotModifiedProps();
}
private QueryDataSet processShowMergeStatus() {
List<PartialPath> headerList = new ArrayList<>();
List<TSDataType> typeList = new ArrayList<>();
headerList.add(new PartialPath(COLUMN_STORAGE_GROUP, false));
headerList.add(new PartialPath(COLUMN_TASK_NAME, false));
headerList.add(new PartialPath(COLUMN_CREATED_TIME, false));
headerList.add(new PartialPath(COLUMN_PROGRESS, false));
headerList.add(new PartialPath(COLUMN_CANCELLED, false));
headerList.add(new PartialPath(COLUMN_DONE, false));
typeList.add(TSDataType.TEXT);
typeList.add(TSDataType.TEXT);
typeList.add(TSDataType.TEXT);
typeList.add(TSDataType.TEXT);
typeList.add(TSDataType.BOOLEAN);
typeList.add(TSDataType.BOOLEAN);
ListDataSet dataSet = new ListDataSet(headerList, typeList);
Map<String, List<TaskStatus>>[] taskStatus = MergeManager.getINSTANCE().collectTaskStatus();
for (Map<String, List<TaskStatus>> statusMap : taskStatus) {
for (Entry<String, List<TaskStatus>> stringListEntry : statusMap.entrySet()) {
for (TaskStatus status : stringListEntry.getValue()) {
dataSet.putRecord(toRowRecord(status, stringListEntry.getKey()));
}
}
}
return dataSet;
}
public RowRecord toRowRecord(TaskStatus status, String storageGroup) {
RowRecord record = new RowRecord(0);
record.addField(new Binary(storageGroup), TSDataType.TEXT);
record.addField(new Binary(status.getTaskName()), TSDataType.TEXT);
record.addField(new Binary(status.getCreatedTime()), TSDataType.TEXT);
record.addField(new Binary(status.getProgress()), TSDataType.TEXT);
record.addField(status.isCancelled(), TSDataType.BOOLEAN);
record.addField(status.isDone(), TSDataType.BOOLEAN);
return record;
}
private QueryDataSet processShowQueryProcesslist() {
ListDataSet listDataSet =
new ListDataSet(
Arrays.asList(new PartialPath(QUERY_ID, false), new PartialPath(STATEMENT, false)),
Arrays.asList(TSDataType.INT64, TSDataType.TEXT));
QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
for (Entry<Long, QueryInfo> queryInfo : queryTimeManager.getQueryInfoMap().entrySet()) {
RowRecord record = new RowRecord(queryInfo.getValue().getStartTime());
record.addField(queryInfo.getKey(), TSDataType.INT64);
record.addField(new Binary(queryInfo.getValue().getStatement()), TSDataType.TEXT);
listDataSet.putRecord(record);
}
return listDataSet;
}
/**
* @param storageGroups the storage groups to check
* @return List of PartialPath the storage groups that not exist
*/
List<PartialPath> checkStorageGroupExist(List<PartialPath> storageGroups) {
List<PartialPath> noExistSg = new ArrayList<>();
if (storageGroups == null) {
return noExistSg;
}
for (PartialPath storageGroup : storageGroups) {
if (!IoTDB.metaManager.isStorageGroup(storageGroup)) {
noExistSg.add(storageGroup);
}
}
return noExistSg;
}
}