blob: a470b02b9da35f673248a6a2c349c80f23fd6c60 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.plan.execution.config.executor;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.common.rpc.thrift.TSetThrottleQuotaReq;
import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
import org.apache.iotdb.common.rpc.thrift.TThrottleQuota;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.executable.ExecutableManager;
import org.apache.iotdb.commons.executable.ExecutableResource;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFClassLoader;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TCountDatabaseResp;
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchemaResp;
import org.apache.iotdb.confignode.rpc.thrift.TDeactivateSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabasesReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPipeConfigTransferReq;
import org.apache.iotdb.confignode.rpc.thrift.TPipeConfigTransferResp;
import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowThrottleReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowTopicResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.protocol.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.analyze.Analyzer;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CountDatabaseTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CountTimeSlotListTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DatabaseSchemaTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.GetRegionIdTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.GetSeriesSlotListTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.GetTimeSlotListTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterDetailsTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterIdTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowConfigNodesTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowContinuousQueriesTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowDataNodesTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowFunctionsTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowPipePluginsTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowRegionTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowTTLTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowTriggersTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowVariablesTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.template.ShowNodesInSchemaTemplateTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.template.ShowPathSetTemplateTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.template.ShowSchemaTemplateTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.ShowPipeTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowSpaceQuotaTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowThrottleQuotaTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowTopicsTask;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.visitor.TransformToViewExpressionVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountDatabaseStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountTimeSlotListStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateContinuousQueryStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateFunctionStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateTriggerStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.DeleteDatabaseStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.DeleteTimeSeriesStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetRegionIdStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetSeriesSlotListStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetTimeSlotListStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.MigrateRegionStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDataNodesStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowRegionStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTTLStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.AlterPipeStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipePluginStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StopPipeStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowSubscriptionsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowTopicsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.AlterSchemaTemplateStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.CreateSchemaTemplateStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.DeactivateTemplateStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.DropSchemaTemplateStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.SetSchemaTemplateStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ShowNodesInSchemaTemplateStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ShowPathSetTemplateStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ShowSchemaTemplateStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.UnsetSchemaTemplateStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.view.AlterLogicalViewStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.view.DeleteLogicalViewStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.view.RenameLogicalViewStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.KillQueryStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetSpaceQuotaStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetThrottleQuotaStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowSpaceQuotaStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowThrottleQuotaStatement;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
import org.apache.iotdb.db.schemaengine.rescon.DataNodeSchemaQuotaManager;
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.db.schemaengine.template.TemplateAlterOperationType;
import org.apache.iotdb.db.schemaengine.template.alter.TemplateAlterOperationUtil;
import org.apache.iotdb.db.schemaengine.template.alter.TemplateExtendInfo;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairTaskStatus;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
import org.apache.iotdb.pipe.api.PipePlugin;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.trigger.api.Trigger;
import org.apache.iotdb.trigger.api.enums.FailureStrategy;
import org.apache.iotdb.udf.api.UDF;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.iotdb.db.protocol.client.ConfigNodeClient.MSG_RECONNECTION_FAIL;
public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterConfigTaskExecutor.class);
private static final IClientManager<ConfigRegionId, ConfigNodeClient> CONFIG_NODE_CLIENT_MANAGER =
ConfigNodeClientManager.getInstance();
/** FIXME Consolidate this clientManager with the upper one. */
private static final IClientManager<ConfigRegionId, ConfigNodeClient>
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER =
new IClientManager.Factory<ConfigRegionId, ConfigNodeClient>()
.createClientManager(
new DataNodeClientPoolFactory.ClusterDeletionConfigNodeClientPoolFactory());
private static final class ClusterConfigTaskExecutorHolder {
private static final ClusterConfigTaskExecutor INSTANCE = new ClusterConfigTaskExecutor();
private ClusterConfigTaskExecutorHolder() {}
}
public static ClusterConfigTaskExecutor getInstance() {
return ClusterConfigTaskExecutor.ClusterConfigTaskExecutorHolder.INSTANCE;
}
@Override
public SettableFuture<ConfigTaskResult> setDatabase(
DatabaseSchemaStatement databaseSchemaStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
// Construct request using statement
TDatabaseSchema databaseSchema =
DatabaseSchemaTask.constructDatabaseSchema(databaseSchemaStatement);
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// Send request to some API server
TSStatus tsStatus = configNodeClient.setDatabase(databaseSchema);
// Get response or throw exception
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
// If database already exists when loading, we do not throw exceptions to avoid printing too
// many logs
if (TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() == tsStatus.getCode()
&& !databaseSchemaStatement.getEnablePrintExceptionLog()) {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
} else {
LOGGER.warn(
"Failed to execute create database {} in config node, status is {}.",
databaseSchemaStatement.getDatabasePath(),
tsStatus);
future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
}
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> alterDatabase(
DatabaseSchemaStatement databaseSchemaStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
// Construct request using statement
TDatabaseSchema databaseSchema =
DatabaseSchemaTask.constructDatabaseSchema(databaseSchemaStatement);
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// Send request to some API server
TSStatus tsStatus = configNodeClient.alterDatabase(databaseSchema);
// Get response or throw exception
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
if (databaseSchemaStatement.getEnablePrintExceptionLog()) {
LOGGER.warn(
"Failed to execute alter database {} in config node, status is {}.",
databaseSchemaStatement.getDatabasePath(),
tsStatus);
}
future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showDatabase(
ShowDatabaseStatement showDatabaseStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
// Construct request using statement
List<String> databasePathPattern =
Arrays.asList(showDatabaseStatement.getPathPattern().getNodes());
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// Send request to some API server
TGetDatabaseReq req =
new TGetDatabaseReq(
databasePathPattern, showDatabaseStatement.getAuthorityScope().serialize());
TShowDatabaseResp resp = client.showDatabase(req);
// build TSBlock
showDatabaseStatement.buildTSBlock(resp.getDatabaseInfoMap(), future);
} catch (IOException | ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> countDatabase(
CountDatabaseStatement countDatabaseStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
int databaseNum;
List<String> databasePathPattern =
Arrays.asList(countDatabaseStatement.getPathPattern().getNodes());
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TGetDatabaseReq req =
new TGetDatabaseReq(
databasePathPattern, countDatabaseStatement.getAuthorityScope().serialize());
TCountDatabaseResp resp = client.countMatchedDatabases(req);
databaseNum = resp.getCount();
// build TSBlock
CountDatabaseTask.buildTSBlock(databaseNum, future);
} catch (IOException | ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> deleteDatabase(
DeleteDatabaseStatement deleteDatabaseStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TDeleteDatabasesReq req = new TDeleteDatabasesReq(deleteDatabaseStatement.getPrefixPath());
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus = client.deleteDatabases(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
"Failed to execute delete database {} in config node, status is {}.",
deleteDatabaseStatement.getPrefixPath(),
tsStatus);
if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
future.setException(
new BatchProcessException(tsStatus.subStatus.toArray(new TSStatus[0])));
} else {
future.setException(new IoTDBException(tsStatus.message, tsStatus.getCode()));
}
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> createFunction(
CreateFunctionStatement createFunctionStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
String udfName = createFunctionStatement.getUdfName();
String className = createFunctionStatement.getClassName();
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TCreateFunctionReq tCreateFunctionReq = new TCreateFunctionReq(udfName, className, false);
String libRoot = UDFExecutableManager.getInstance().getLibRoot();
String jarFileName;
ByteBuffer jarFile;
String jarMd5;
if (createFunctionStatement.isUsingURI()) {
String uriString = createFunctionStatement.getUriString();
if (uriString == null || uriString.isEmpty()) {
future.setException(
new IoTDBException(
"URI is empty, please specify the URI.",
TSStatusCode.UDF_DOWNLOAD_ERROR.getStatusCode()));
return future;
}
jarFileName = new File(uriString).getName();
try {
URI uri = new URI(uriString);
if (uri.getScheme() == null) {
future.setException(
new IoTDBException(
"The scheme of URI is not set, please specify the scheme of URI.",
TSStatusCode.UDF_DOWNLOAD_ERROR.getStatusCode()));
return future;
}
if (!uri.getScheme().equals("file")) {
// Download executable
ExecutableResource resource =
UDFExecutableManager.getInstance().request(Collections.singletonList(uriString));
String jarFilePathUnderTempDir =
UDFExecutableManager.getInstance()
.getDirStringUnderTempRootByRequestId(resource.getRequestId())
+ File.separator
+ jarFileName;
// libRoot should be the path of the specified jar
libRoot = jarFilePathUnderTempDir;
jarFile = ExecutableManager.transferToBytebuffer(jarFilePathUnderTempDir);
jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(jarFilePathUnderTempDir)));
} else {
// libRoot should be the path of the specified jar
libRoot = new File(new URI(uriString)).getAbsolutePath();
// If jarPath is a file path on datanode, we transfer it to ByteBuffer and send it to
// ConfigNode.
jarFile = ExecutableManager.transferToBytebuffer(libRoot);
// Set md5 of the jar file
jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(libRoot)));
}
} catch (IOException | URISyntaxException e) {
LOGGER.warn(
"Failed to get executable for UDF({}) using URI: {}, the cause is: {}",
createFunctionStatement.getUdfName(),
createFunctionStatement.getUriString(),
e);
future.setException(
new IoTDBException(
"Failed to get executable for UDF '"
+ createFunctionStatement.getUdfName()
+ "', please check the URI.",
TSStatusCode.TRIGGER_DOWNLOAD_ERROR.getStatusCode()));
return future;
}
// modify req
tCreateFunctionReq.setJarFile(jarFile);
tCreateFunctionReq.setJarMD5(jarMd5);
tCreateFunctionReq.setIsUsingURI(true);
tCreateFunctionReq.setJarName(
String.format(
"%s-%s.%s",
jarFileName.substring(0, jarFileName.lastIndexOf(".")),
jarMd5,
jarFileName.substring(jarFileName.lastIndexOf(".") + 1)));
}
// try to create instance, this request will fail if creation is not successful
try (UDFClassLoader classLoader = new UDFClassLoader(libRoot)) {
// ensure that jar file contains the class and the class is a UDF
Class<?> clazz = Class.forName(createFunctionStatement.getClassName(), true, classLoader);
UDF udf = (UDF) clazz.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException
| NoSuchMethodException
| InstantiationException
| IllegalAccessException
| InvocationTargetException
| ClassCastException e) {
LOGGER.warn(
"Failed to create function when try to create UDF({}) instance first, the cause is: {}",
createFunctionStatement.getUdfName(),
e);
future.setException(
new IoTDBException(
"Failed to load class '"
+ createFunctionStatement.getClassName()
+ "', because it's not found in jar file: "
+ createFunctionStatement.getUriString(),
TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode()));
return future;
}
final TSStatus executionStatus = client.createFunction(tCreateFunctionReq);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
LOGGER.warn(
"Failed to create function {}({}) because {}",
udfName,
className,
executionStatus.getMessage());
future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | IOException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> dropFunction(String udfName) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TSStatus executionStatus = client.dropFunction(new TDropFunctionReq(udfName));
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
LOGGER.warn("[{}] Failed to drop function {}.", executionStatus, udfName);
future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showFunctions() {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TGetUDFTableResp getUDFTableResp = client.getUDFTable();
if (getUDFTableResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.setException(
new IoTDBException(
getUDFTableResp.getStatus().message, getUDFTableResp.getStatus().code));
return future;
}
// convert UDFTable and buildTsBlock
ShowFunctionsTask.buildTsBlock(getUDFTableResp.getAllUDFInformation(), future);
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> createTrigger(
CreateTriggerStatement createTriggerStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TCreateTriggerReq tCreateTriggerReq =
new TCreateTriggerReq(
createTriggerStatement.getTriggerName(),
createTriggerStatement.getClassName(),
createTriggerStatement.getTriggerEvent().getId(),
createTriggerStatement.getTriggerType().getId(),
createTriggerStatement.getPathPattern().serialize(),
createTriggerStatement.getAttributes(),
FailureStrategy.OPTIMISTIC.getId(),
createTriggerStatement.isUsingURI()); // set default strategy
String libRoot = TriggerExecutableManager.getInstance().getLibRoot();
String jarFileName;
ByteBuffer jarFile;
String jarMd5;
if (createTriggerStatement.isUsingURI()) {
String uriString = createTriggerStatement.getUriString();
if (uriString == null || uriString.isEmpty()) {
future.setException(
new IoTDBException(
"URI is empty, please specify the URI.",
TSStatusCode.TRIGGER_DOWNLOAD_ERROR.getStatusCode()));
return future;
}
jarFileName = new File(uriString).getName();
try {
URI uri = new URI(uriString);
if (uri.getScheme() == null) {
future.setException(
new IoTDBException(
"The scheme of URI is not set, please specify the scheme of URI.",
TSStatusCode.TRIGGER_DOWNLOAD_ERROR.getStatusCode()));
return future;
}
if (!uri.getScheme().equals("file")) {
// download executable
ExecutableResource resource =
TriggerExecutableManager.getInstance()
.request(Collections.singletonList(uriString));
String jarFilePathUnderTempDir =
TriggerExecutableManager.getInstance()
.getDirStringUnderTempRootByRequestId(resource.getRequestId())
+ File.separator
+ jarFileName;
// libRoot should be the path of the specified jar
libRoot = jarFilePathUnderTempDir;
jarFile = ExecutableManager.transferToBytebuffer(jarFilePathUnderTempDir);
jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(jarFilePathUnderTempDir)));
} else {
// libRoot should be the path of the specified jar
libRoot = new File(new URI(uriString)).getAbsolutePath();
// If jarPath is a file path on datanode, we transfer it to ByteBuffer and send it to
// ConfigNode.
jarFile = ExecutableManager.transferToBytebuffer(libRoot);
// set md5 of the jar file
jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(libRoot)));
}
} catch (IOException | URISyntaxException e) {
LOGGER.warn(
"Failed to get executable for Trigger({}) using URI: {}, the cause is: {}",
createTriggerStatement.getTriggerName(),
createTriggerStatement.getUriString(),
e);
future.setException(
new IoTDBException(
"Failed to get executable for Trigger '"
+ createTriggerStatement.getUriString()
+ "', please check the URI.",
TSStatusCode.TRIGGER_DOWNLOAD_ERROR.getStatusCode()));
return future;
}
// modify req
tCreateTriggerReq.setJarFile(jarFile);
tCreateTriggerReq.setJarMD5(jarMd5);
tCreateTriggerReq.setIsUsingURI(true);
tCreateTriggerReq.setJarName(
String.format(
"%s-%s.%s",
jarFileName.substring(0, jarFileName.lastIndexOf(".")),
jarMd5,
jarFileName.substring(jarFileName.lastIndexOf(".") + 1)));
}
// try to create instance, this request will fail if creation is not successful
try (TriggerClassLoader classLoader = new TriggerClassLoader(libRoot)) {
Class<?> triggerClass =
Class.forName(createTriggerStatement.getClassName(), true, classLoader);
Trigger trigger = (Trigger) triggerClass.getDeclaredConstructor().newInstance();
tCreateTriggerReq.setFailureStrategy(trigger.getFailureStrategy().getId());
} catch (ClassNotFoundException
| NoSuchMethodException
| InstantiationException
| IllegalAccessException
| InvocationTargetException
| ClassCastException e) {
LOGGER.warn(
"Failed to create trigger when try to create trigger({}) instance first, the cause is: {}",
createTriggerStatement.getTriggerName(),
e);
future.setException(
new IoTDBException(
"Failed to load class '"
+ createTriggerStatement.getClassName()
+ "', because it's not found in jar file: "
+ createTriggerStatement.getUriString(),
TSStatusCode.TRIGGER_LOAD_CLASS_ERROR.getStatusCode()));
return future;
}
final TSStatus executionStatus = client.createTrigger(tCreateTriggerReq);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
LOGGER.warn(
"[{}] Failed to create trigger {}. TSStatus is {}",
executionStatus,
createTriggerStatement.getTriggerName(),
executionStatus.message);
future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException | IOException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> dropTrigger(String triggerName) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TSStatus executionStatus = client.dropTrigger(new TDropTriggerReq(triggerName));
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
LOGGER.warn("[{}] Failed to drop trigger {}.", executionStatus, triggerName);
future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showTriggers() {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TGetTriggerTableResp getTriggerTableResp = client.getTriggerTable();
if (getTriggerTableResp.getStatus().getCode()
!= TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.setException(
new IoTDBException(
getTriggerTableResp.getStatus().message, getTriggerTableResp.getStatus().code));
return future;
}
// convert triggerTable and buildTsBlock
ShowTriggersTask.buildTsBlock(getTriggerTableResp.getAllTriggerInformation(), future);
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> createPipePlugin(
CreatePipePluginStatement createPipePluginStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
final String pluginName = createPipePluginStatement.getPluginName();
final String className = createPipePluginStatement.getClassName();
final String uriString = createPipePluginStatement.getUriString();
if (uriString == null || uriString.isEmpty()) {
future.setException(
new IoTDBException(
"Failed to create pipe plugin, because the URI is empty.",
TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
return future;
}
try (final ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
String libRoot;
ByteBuffer jarFile;
String jarMd5;
final String jarFileName = new File(uriString).getName();
try {
URI uri = new URI(uriString);
if (uri.getScheme() == null) {
future.setException(
new IoTDBException(
"The scheme of URI is not set, please specify the scheme of URI.",
TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
return future;
}
if (!uri.getScheme().equals("file")) {
// Download executable
ExecutableResource resource =
PipePluginExecutableManager.getInstance()
.request(Collections.singletonList(uriString));
String jarFilePathUnderTempDir =
PipePluginExecutableManager.getInstance()
.getDirStringUnderTempRootByRequestId(resource.getRequestId())
+ File.separator
+ jarFileName;
// libRoot should be the path of the specified jar
libRoot = jarFilePathUnderTempDir;
jarFile = ExecutableManager.transferToBytebuffer(jarFilePathUnderTempDir);
jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(jarFilePathUnderTempDir)));
} else {
// libRoot should be the path of the specified jar
libRoot = new File(new URI(uriString)).getAbsolutePath();
// If jarPath is a file path on datanode, we transfer it to ByteBuffer and send it to
// ConfigNode.
jarFile = ExecutableManager.transferToBytebuffer(libRoot);
// Set md5 of the jar file
jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(libRoot)));
}
} catch (IOException | URISyntaxException e) {
LOGGER.warn(
"Failed to get executable for PipePlugin({}) using URI: {}, the cause is: {}",
createPipePluginStatement.getPluginName(),
createPipePluginStatement.getUriString(),
e);
future.setException(
new IoTDBException(
"Failed to get executable for PipePlugin"
+ createPipePluginStatement.getPluginName()
+ "', please check the URI.",
TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
return future;
}
// try to create instance, this request will fail if creation is not successful
try (PipePluginClassLoader classLoader = new PipePluginClassLoader(libRoot)) {
// ensure that jar file contains the class and the class is a pipe plugin
Class<?> clazz = Class.forName(createPipePluginStatement.getClassName(), true, classLoader);
PipePlugin ignored = (PipePlugin) clazz.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException
| NoSuchMethodException
| InstantiationException
| IllegalAccessException
| InvocationTargetException
| ClassCastException e) {
LOGGER.warn(
"Failed to create function when try to create PipePlugin({}) instance first, the cause is: {}",
createPipePluginStatement.getPluginName(),
e);
future.setException(
new IoTDBException(
"Failed to load class '"
+ createPipePluginStatement.getClassName()
+ "', because it's not found in jar file: "
+ createPipePluginStatement.getUriString(),
TSStatusCode.PIPE_PLUGIN_LOAD_CLASS_ERROR.getStatusCode()));
return future;
}
final TSStatus executionStatus =
client.createPipePlugin(
new TCreatePipePluginReq()
.setPluginName(pluginName)
.setClassName(className)
.setJarFile(jarFile)
.setJarMD5(jarMd5)
.setJarName(
String.format(
"%s-%s.%s",
jarFileName.substring(0, jarFileName.lastIndexOf(".")),
jarMd5,
jarFileName.substring(jarFileName.lastIndexOf(".") + 1))));
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
LOGGER.warn(
"Failed to create PipePlugin {}({}) because {}",
pluginName,
className,
executionStatus.getMessage());
future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException | IOException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> dropPipePlugin(String pluginName) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (final ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TSStatus executionStatus = client.dropPipePlugin(new TDropPipePluginReq(pluginName));
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
LOGGER.warn("[{}] Failed to drop pipe plugin {}.", executionStatus, pluginName);
future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showPipePlugins() {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (final ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TGetPipePluginTableResp getPipePluginTableResp = client.getPipePluginTable();
if (getPipePluginTableResp.getStatus().getCode()
!= TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.setException(
new IoTDBException(
getPipePluginTableResp.getStatus().message,
getPipePluginTableResp.getStatus().code));
return future;
}
// convert PipePluginTable and buildTsBlock
ShowPipePluginsTask.buildTsBlock(getPipePluginTableResp.getAllPipePluginMeta(), future);
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> setTTL(SetTTLStatement setTTLStatement, String taskName) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
List<String> databasePathPattern = Arrays.asList(setTTLStatement.getDatabasePath().getNodes());
TSetTTLReq setTTLReq = new TSetTTLReq(databasePathPattern, setTTLStatement.getTTL());
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// Send request to some API server
TSStatus tsStatus = configNodeClient.setTTL(setTTLReq);
// Get response or throw exception
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
"Failed to execute {} {} in config node, status is {}.",
taskName,
setTTLStatement.getDatabasePath(),
tsStatus);
future.setException(new IoTDBException(tsStatus.getMessage(), tsStatus.getCode()));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> merge(boolean onCluster) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TSStatus tsStatus = new TSStatus();
if (onCluster) {
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// Send request to some API server
tsStatus = client.merge();
} catch (ClientManagerException | TException e) {
future.setException(e);
}
} else {
try {
StorageEngine.getInstance().mergeAll();
tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (StorageEngineException e) {
tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
}
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
} else {
future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq, boolean onCluster) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TSStatus tsStatus = new TSStatus();
if (onCluster) {
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// Send request to some API server
tsStatus = client.flush(tFlushReq);
} catch (ClientManagerException | TException e) {
future.setException(e);
}
} else {
try {
StorageEngine.getInstance().operateFlush(tFlushReq);
tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (Exception e) {
tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
}
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
} else {
future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> clearCache(boolean onCluster) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TSStatus tsStatus = new TSStatus();
if (onCluster) {
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// Send request to some API server
tsStatus = client.clearCache();
} catch (ClientManagerException | TException e) {
future.setException(e);
}
} else {
try {
StorageEngine.getInstance().clearCache();
tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (Exception e) {
tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
}
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
} else {
future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> startRepairData(boolean onCluster) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TSStatus tsStatus = new TSStatus();
if (onCluster) {
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// Send request to some API server
tsStatus = client.startRepairData();
} catch (ClientManagerException | TException e) {
future.setException(e);
}
} else {
if (!StorageEngine.getInstance().isAllSgReady()) {
future.setException(
new IoTDBException(
"not all sg is ready", TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
return future;
}
IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
if (!iotdbConfig.isEnableSeqSpaceCompaction()
|| !iotdbConfig.isEnableUnseqSpaceCompaction()) {
future.setException(
new IoTDBException(
"cannot start repair task because inner space compaction is not enabled",
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
return future;
}
try {
if (StorageEngine.getInstance().repairData()) {
tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} else {
if (CompactionScheduleTaskManager.getRepairTaskManagerInstance().getRepairTaskStatus()
== RepairTaskStatus.STOPPING) {
tsStatus =
RpcUtils.getStatus(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "previous repair task is still stopping");
} else {
tsStatus =
RpcUtils.getStatus(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "already have a running repair task");
}
}
} catch (Exception e) {
tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
}
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
} else {
future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> stopRepairData(boolean onCluster) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TSStatus tsStatus = new TSStatus();
if (onCluster) {
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// Send request to some API server
tsStatus = client.stopRepairData();
} catch (ClientManagerException | TException e) {
future.setException(e);
}
} else {
try {
StorageEngine.getInstance().stopRepairData();
tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (StorageEngineException e) {
tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
}
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
} else {
future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> loadConfiguration(boolean onCluster) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TSStatus tsStatus = new TSStatus();
if (onCluster) {
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// Send request to some API server
tsStatus = client.loadConfiguration();
} catch (ClientManagerException | TException e) {
future.setException(e);
}
} else {
try {
IoTDBDescriptor.getInstance().loadHotModifiedProps();
tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (Exception e) {
tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
}
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
} else {
future.setException(new StatementExecutionException(tsStatus));
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> setSystemStatus(boolean onCluster, NodeStatus status) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TSStatus tsStatus = new TSStatus();
if (onCluster) {
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// Send request to some API server
tsStatus = client.setSystemStatus(status.getStatus());
} catch (ClientManagerException | TException e) {
future.setException(e);
}
} else {
try {
CommonDescriptor.getInstance().getConfig().setNodeStatus(status);
tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (Exception e) {
tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
}
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
} else {
future.setException(new StatementExecutionException(tsStatus));
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> killQuery(KillQueryStatement killQueryStatement) {
int dataNodeId = -1;
String queryId = killQueryStatement.getQueryId();
if (!killQueryStatement.isKillAll()) {
String[] splits = queryId.split("_");
try {
// We just judge the input queryId has three '_' and the DataNodeId from it is non-negative
// here
if (splits.length != 4 || ((dataNodeId = Integer.parseInt(splits[3])) < 0)) {
throw new SemanticException("Please ensure your input <queryId> is correct");
}
} catch (NumberFormatException e) {
throw new SemanticException("Please ensure your input <queryId> is correct");
}
}
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TSStatus executionStatus = client.killQuery(queryId, dataNodeId);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
LOGGER.warn("Failed to kill query [{}], because {}", queryId, executionStatus.message);
future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showCluster(ShowClusterStatement showClusterStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TShowClusterResp showClusterResp = new TShowClusterResp();
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
showClusterResp = client.showCluster();
} catch (ClientManagerException | TException e) {
if (showClusterResp.getConfigNodeList() == null) {
future.setException(new TException(MSG_RECONNECTION_FAIL));
} else {
future.setException(e);
}
return future;
}
// build TSBlock
if (showClusterStatement.isDetails()) {
ShowClusterDetailsTask.buildTSBlock(showClusterResp, future);
} else {
ShowClusterTask.buildTsBlock(showClusterResp, future);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showClusterParameters() {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TShowVariablesResp showVariablesResp = new TShowVariablesResp();
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
showVariablesResp = client.showVariables();
} catch (ClientManagerException | TException e) {
future.setException(e);
}
// build TSBlock
ShowVariablesTask.buildTSBlock(showVariablesResp, future);
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showClusterId() {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
ShowClusterIdTask.buildTSBlock(
IoTDBDescriptor.getInstance().getConfig().getClusterId(), future);
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showTTL(ShowTTLStatement showTTLStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
List<PartialPath> databasePaths = showTTLStatement.getPaths();
Map<String, Long> databaseToTTL = new HashMap<>();
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
ByteBuffer scope = showTTLStatement.getAuthorityScope().serialize();
if (showTTLStatement.isAll()) {
List<String> allStorageGroupPathPattern = Arrays.asList("root", "**");
TGetDatabaseReq req = new TGetDatabaseReq(allStorageGroupPathPattern, scope);
TDatabaseSchemaResp resp = client.getMatchedDatabaseSchemas(req);
for (Map.Entry<String, TDatabaseSchema> entry : resp.getDatabaseSchemaMap().entrySet()) {
databaseToTTL.put(entry.getKey(), entry.getValue().getTTL());
}
} else {
for (PartialPath databasePath : databasePaths) {
List<String> databasePathPattern = Arrays.asList(databasePath.getNodes());
TGetDatabaseReq req = new TGetDatabaseReq(databasePathPattern, scope);
TDatabaseSchemaResp resp = client.getMatchedDatabaseSchemas(req);
for (Map.Entry<String, TDatabaseSchema> entry : resp.getDatabaseSchemaMap().entrySet()) {
if (!databaseToTTL.containsKey(entry.getKey())) {
databaseToTTL.put(entry.getKey(), entry.getValue().getTTL());
}
}
}
}
} catch (IOException | ClientManagerException | TException e) {
future.setException(e);
}
// build TSBlock
ShowTTLTask.buildTSBlock(databaseToTTL, future);
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showRegion(ShowRegionStatement showRegionStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TShowRegionResp showRegionResp = new TShowRegionResp();
TShowRegionReq showRegionReq = new TShowRegionReq();
showRegionReq.setConsensusGroupType(showRegionStatement.getRegionType());
if (showRegionStatement.getStorageGroups() == null) {
showRegionReq.setDatabases(null);
} else {
showRegionReq.setDatabases(
showRegionStatement.getStorageGroups().stream()
.map(PartialPath::getFullPath)
.collect(Collectors.toList()));
}
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
showRegionResp = client.showRegion(showRegionReq);
if (showRegionResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.setException(
new IoTDBException(
showRegionResp.getStatus().message, showRegionResp.getStatus().code));
return future;
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
// filter the regions by nodeId
if (showRegionStatement.getNodeIds() != null) {
List<TRegionInfo> regionInfos = showRegionResp.getRegionInfoList();
regionInfos =
regionInfos.stream()
.filter(
regionInfo ->
showRegionStatement.getNodeIds().contains(regionInfo.getDataNodeId()))
.collect(Collectors.toList());
showRegionResp.setRegionInfoList(regionInfos);
}
// build TSBlock
ShowRegionTask.buildTSBlock(showRegionResp, future);
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showDataNodes(
ShowDataNodesStatement showDataNodesStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TShowDataNodesResp showDataNodesResp = new TShowDataNodesResp();
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
showDataNodesResp = client.showDataNodes();
if (showDataNodesResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.setException(
new IoTDBException(
showDataNodesResp.getStatus().message, showDataNodesResp.getStatus().code));
return future;
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
// build TSBlock
ShowDataNodesTask.buildTSBlock(showDataNodesResp, future);
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showConfigNodes() {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TShowConfigNodesResp showConfigNodesResp = new TShowConfigNodesResp();
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
showConfigNodesResp = client.showConfigNodes();
if (showConfigNodesResp.getStatus().getCode()
!= TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.setException(
new IoTDBException(
showConfigNodesResp.getStatus().message, showConfigNodesResp.getStatus().code));
return future;
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
// build TSBlock
ShowConfigNodesTask.buildTSBlock(showConfigNodesResp, future);
return future;
}
@Override
public SettableFuture<ConfigTaskResult> createSchemaTemplate(
CreateSchemaTemplateStatement createSchemaTemplateStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
// Construct request using statement
try {
// Send request to some API server
TSStatus tsStatus =
ClusterTemplateManager.getInstance().createSchemaTemplate(createSchemaTemplateStatement);
// Get response or throw exception
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
"Failed to execute create device template {} in config node, status is {}.",
createSchemaTemplateStatement.getName(),
tsStatus);
future.setException(new IoTDBException(tsStatus.getMessage(), tsStatus.getCode()));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (Exception e) {
future.setException(e.getCause());
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showSchemaTemplate(
ShowSchemaTemplateStatement showSchemaTemplateStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try {
// Send request to some API server
List<Template> templateList = ClusterTemplateManager.getInstance().getAllTemplates();
// build TSBlock
ShowSchemaTemplateTask.buildTSBlock(templateList, future);
} catch (Exception e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showNodesInSchemaTemplate(
ShowNodesInSchemaTemplateStatement showNodesInSchemaTemplateStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
String req = showNodesInSchemaTemplateStatement.getTemplateName();
try {
// Send request to some API server
Template template = ClusterTemplateManager.getInstance().getTemplate(req);
// Build TSBlock
ShowNodesInSchemaTemplateTask.buildTSBlock(template, future);
} catch (Exception e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> setSchemaTemplate(
String queryId, SetSchemaTemplateStatement setSchemaTemplateStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
String templateName = setSchemaTemplateStatement.getTemplateName();
PartialPath path = setSchemaTemplateStatement.getPath();
try {
// Send request to some API server
ClusterTemplateManager.getInstance().setSchemaTemplate(queryId, templateName, path);
// build TSBlock
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
} catch (Throwable e) {
if (e.getCause() instanceof IoTDBException) {
future.setException(e.getCause());
} else {
future.setException(e);
}
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showPathSetTemplate(
ShowPathSetTemplateStatement showPathSetTemplateStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try {
// Send request to some API server
List<PartialPath> listPath =
ClusterTemplateManager.getInstance()
.getPathsSetTemplate(
showPathSetTemplateStatement.getTemplateName(),
showPathSetTemplateStatement.getAuthorityScope());
// Build TSBlock
ShowPathSetTemplateTask.buildTSBlock(listPath, future);
} catch (Exception e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> deactivateSchemaTemplate(
String queryId, DeactivateTemplateStatement deactivateTemplateStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TDeactivateSchemaTemplateReq req = new TDeactivateSchemaTemplateReq();
req.setQueryId(queryId);
req.setTemplateName(deactivateTemplateStatement.getTemplateName());
req.setPathPatternTree(
serializePatternListToByteBuffer(deactivateTemplateStatement.getPathPatternList()));
try (ConfigNodeClient client =
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
tsStatus = client.deactivateSchemaTemplate(req);
} catch (TTransportException e) {
if (e.getType() == TTransportException.TIMED_OUT
|| e.getCause() instanceof SocketTimeoutException) {
// Time out mainly caused by slow execution, just wait
tsStatus = RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK);
} else {
throw e;
}
}
// Keep waiting until task ends
} while (TSStatusCode.OVERLAP_WITH_EXISTING_TASK.getStatusCode() == tsStatus.getCode());
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
"Failed to execute deactivate device template {} from {} in config node, status is {}.",
deactivateTemplateStatement.getTemplateName(),
deactivateTemplateStatement.getPathPatternList(),
tsStatus);
future.setException(new IoTDBException(tsStatus.getMessage(), tsStatus.getCode()));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> dropSchemaTemplate(
DropSchemaTemplateStatement dropSchemaTemplateStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// Send request to some API server
TSStatus tsStatus =
configNodeClient.dropSchemaTemplate(dropSchemaTemplateStatement.getTemplateName());
// Get response or throw exception
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
"Failed to execute drop device template {} in config node, status is {}.",
dropSchemaTemplateStatement.getTemplateName(),
tsStatus);
future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> alterSchemaTemplate(
String queryId, AlterSchemaTemplateStatement alterSchemaTemplateStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
if (alterSchemaTemplateStatement
.getOperationType()
.equals(TemplateAlterOperationType.EXTEND_TEMPLATE)) {
// check duplicate measurement
TemplateExtendInfo templateExtendInfo =
(TemplateExtendInfo) alterSchemaTemplateStatement.getTemplateAlterInfo();
String duplicateMeasurement = templateExtendInfo.getFirstDuplicateMeasurement();
if (duplicateMeasurement != null) {
future.setException(
new MetadataException(
String.format(
"Duplicated measurement [%s] in device template alter request",
duplicateMeasurement)));
return future;
}
// check schema quota
long localNeedQuota =
(long) templateExtendInfo.getMeasurements().size()
* SchemaEngine.getInstance()
.getSchemaEngineStatistics()
.getTemplateUsingNumber(templateExtendInfo.getTemplateName());
if (localNeedQuota != 0) {
try {
DataNodeSchemaQuotaManager.getInstance().check(localNeedQuota, 0);
} catch (SchemaQuotaExceededException e) {
future.setException(e);
return future;
}
}
}
TAlterSchemaTemplateReq req = new TAlterSchemaTemplateReq();
req.setQueryId(queryId);
req.setTemplateAlterInfo(
TemplateAlterOperationUtil.generateExtendTemplateReqInfo(
alterSchemaTemplateStatement.getOperationType(),
alterSchemaTemplateStatement.getTemplateAlterInfo()));
try (ConfigNodeClient client =
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
tsStatus = client.alterSchemaTemplate(req);
} catch (TTransportException e) {
if (e.getType() == TTransportException.TIMED_OUT
|| e.getCause() instanceof SocketTimeoutException) {
// Time out mainly caused by slow execution, just wait
tsStatus = RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK);
} else {
throw e;
}
}
// Keep waiting until task ends
} while (TSStatusCode.OVERLAP_WITH_EXISTING_TASK.getStatusCode() == tsStatus.getCode());
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
"Failed to alter device template {} in config node, status is {}.",
alterSchemaTemplateStatement.getTemplateAlterInfo().getTemplateName(),
tsStatus);
future.setException(new IoTDBException(tsStatus.getMessage(), tsStatus.getCode()));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
private ByteBuffer serializePatternListToByteBuffer(List<PartialPath> patternList) {
PathPatternTree patternTree = new PathPatternTree();
for (PartialPath pathPattern : patternList) {
patternTree.appendPathPattern(pathPattern);
}
patternTree.constructTree();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
try {
patternTree.serialize(dataOutputStream);
} catch (IOException ignored) {
// memory operation, won't happen
}
return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
}
@Override
public SettableFuture<ConfigTaskResult> unsetSchemaTemplate(
String queryId, UnsetSchemaTemplateStatement unsetSchemaTemplateStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TUnsetSchemaTemplateReq req = new TUnsetSchemaTemplateReq();
req.setQueryId(queryId);
req.setTemplateName(unsetSchemaTemplateStatement.getTemplateName());
req.setPath(unsetSchemaTemplateStatement.getPath().getFullPath());
try (ConfigNodeClient client =
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
tsStatus = client.unsetSchemaTemplate(req);
} catch (TTransportException e) {
if (e.getType() == TTransportException.TIMED_OUT
|| e.getCause() instanceof SocketTimeoutException) {
// time out mainly caused by slow execution, wait until
tsStatus = RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK);
} else {
throw e;
}
}
// keep waiting until task ends
} while (TSStatusCode.OVERLAP_WITH_EXISTING_TASK.getStatusCode() == tsStatus.getCode());
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
"Failed to execute unset device template {} from {} in config node, status is {}.",
unsetSchemaTemplateStatement.getTemplateName(),
unsetSchemaTemplateStatement.getPath(),
tsStatus);
future.setException(new IoTDBException(tsStatus.getMessage(), tsStatus.getCode()));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> createPipe(CreatePipeStatement createPipeStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
// Validate pipe name
if (createPipeStatement.getPipeName().startsWith(PipeStaticMeta.SYSTEM_PIPE_PREFIX)) {
String exceptionMessage =
String.format(
"Failed to create pipe %s in config node, pipe name starting with \"%s\" are not allowed to be created",
createPipeStatement.getPipeName(), PipeStaticMeta.SYSTEM_PIPE_PREFIX);
LOGGER.warn(exceptionMessage);
future.setException(
new IoTDBException(exceptionMessage, TSStatusCode.PIPE_ERROR.getStatusCode()));
return future;
}
// Validate pipe plugin before creation
try {
PipeAgent.plugin()
.validate(
createPipeStatement.getPipeName(),
createPipeStatement.getExtractorAttributes(),
createPipeStatement.getProcessorAttributes(),
createPipeStatement.getConnectorAttributes());
} catch (Exception e) {
LOGGER.info("Failed to validate pipe statement, because {}", e.getMessage(), e);
future.setException(
new IoTDBException(e.getMessage(), TSStatusCode.PIPE_ERROR.getStatusCode()));
return future;
}
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TCreatePipeReq req =
new TCreatePipeReq()
.setPipeName(createPipeStatement.getPipeName())
.setExtractorAttributes(createPipeStatement.getExtractorAttributes())
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
TSStatus tsStatus = configNodeClient.createPipe(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
"Failed to create pipe {} in config node, status is {}.",
createPipeStatement.getPipeName(),
tsStatus);
future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (Exception e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> alterPipe(AlterPipeStatement alterPipeStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
// Validate pipe name
if (alterPipeStatement.getPipeName().startsWith(PipeStaticMeta.SYSTEM_PIPE_PREFIX)) {
String exceptionMessage =
String.format(
"Failed to alter pipe %s in config node, pipe name starting with \"%s\" are not allowed to be altered",
alterPipeStatement.getPipeName(), PipeStaticMeta.SYSTEM_PIPE_PREFIX);
LOGGER.warn(exceptionMessage);
future.setException(
new IoTDBException(exceptionMessage, TSStatusCode.PIPE_ERROR.getStatusCode()));
return future;
}
// Validate pipe plugin before alteration - only validate replace mode
final String pipeName = alterPipeStatement.getPipeName();
try {
if (!alterPipeStatement.getProcessorAttributes().isEmpty()
&& alterPipeStatement.isReplaceAllProcessorAttributes()) {
PipeAgent.plugin().validateProcessor(alterPipeStatement.getProcessorAttributes());
}
if (!alterPipeStatement.getConnectorAttributes().isEmpty()
&& alterPipeStatement.isReplaceAllConnectorAttributes()) {
PipeAgent.plugin().validateConnector(pipeName, alterPipeStatement.getConnectorAttributes());
}
} catch (Exception e) {
LOGGER.info("Failed to validate pipe statement, because {}", e.getMessage(), e);
future.setException(
new IoTDBException(e.getMessage(), TSStatusCode.PIPE_ERROR.getStatusCode()));
return future;
}
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TAlterPipeReq req =
new TAlterPipeReq(
pipeName,
alterPipeStatement.getProcessorAttributes(),
alterPipeStatement.getConnectorAttributes(),
alterPipeStatement.isReplaceAllProcessorAttributes(),
alterPipeStatement.isReplaceAllConnectorAttributes());
final TSStatus tsStatus = configNodeClient.alterPipe(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn("Failed to alter pipe {} in config node, status is {}.", pipeName, tsStatus);
future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (Exception e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> startPipe(StartPipeStatement startPipeStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
// Validate pipe name
if (startPipeStatement.getPipeName().startsWith(PipeStaticMeta.SYSTEM_PIPE_PREFIX)) {
String exceptionMessage =
String.format(
"Failed to start pipe %s in config node, pipe name starting with \"%s\" are not allowed to be started",
startPipeStatement.getPipeName(), PipeStaticMeta.SYSTEM_PIPE_PREFIX);
LOGGER.warn(exceptionMessage);
future.setException(
new IoTDBException(exceptionMessage, TSStatusCode.PIPE_ERROR.getStatusCode()));
return future;
}
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus = configNodeClient.startPipe(startPipeStatement.getPipeName());
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
"Failed to start pipe {}, status is {}.", startPipeStatement.getPipeName(), tsStatus);
future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (Exception e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> dropPipe(DropPipeStatement dropPipeStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
// Validate pipe name
if (dropPipeStatement.getPipeName().startsWith(PipeStaticMeta.SYSTEM_PIPE_PREFIX)) {
String exceptionMessage =
String.format(
"Failed to drop pipe %s in config node, pipe name starting with \"%s\" are not allowed to be dropped",
dropPipeStatement.getPipeName(), PipeStaticMeta.SYSTEM_PIPE_PREFIX);
LOGGER.warn(exceptionMessage);
future.setException(
new IoTDBException(exceptionMessage, TSStatusCode.PIPE_ERROR.getStatusCode()));
return future;
}
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TSStatus tsStatus = configNodeClient.dropPipe(dropPipeStatement.getPipeName());
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
"Failed to drop pipe {}, status is {}.", dropPipeStatement.getPipeName(), tsStatus);
future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (Exception e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> stopPipe(StopPipeStatement stopPipeStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
// Validate pipe name
if (stopPipeStatement.getPipeName().startsWith(PipeStaticMeta.SYSTEM_PIPE_PREFIX)) {
String exceptionMessage =
String.format(
"Failed to stop pipe %s in config node, pipe name starting with \"%s\" are not allowed to be stopped",
stopPipeStatement.getPipeName(), PipeStaticMeta.SYSTEM_PIPE_PREFIX);
LOGGER.warn(exceptionMessage);
future.setException(
new IoTDBException(exceptionMessage, TSStatusCode.PIPE_ERROR.getStatusCode()));
return future;
}
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TSStatus tsStatus = configNodeClient.stopPipe(stopPipeStatement.getPipeName());
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
"Failed to stop pipe {}, status is {}.", stopPipeStatement.getPipeName(), tsStatus);
future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (Exception e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showPipes(ShowPipesStatement showPipesStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TShowPipeReq tShowPipeReq = new TShowPipeReq();
if (showPipesStatement.getPipeName() != null) {
tShowPipeReq.setPipeName(showPipesStatement.getPipeName());
}
if (showPipesStatement.getWhereClause()) {
tShowPipeReq.setWhereClause(true);
}
final List<TShowPipeInfo> tShowPipeInfoList =
configNodeClient.showPipe(tShowPipeReq).getPipeInfoList();
ShowPipeTask.buildTSBlock(tShowPipeInfoList, future);
} catch (Exception e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showSubscriptions(
ShowSubscriptionsStatement showSubscriptionsStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TShowSubscriptionReq showSubscriptionReq = new TShowSubscriptionReq();
if (showSubscriptionsStatement.getTopicName() != null) {
showSubscriptionReq.setTopicName(showSubscriptionsStatement.getTopicName());
}
final TShowSubscriptionResp showSubscriptionResp =
configNodeClient.showSubscription(showSubscriptionReq);
if (showSubscriptionResp.getStatus().getCode()
!= TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.setException(
new IoTDBException(
showSubscriptionResp.getStatus().getMessage(),
showSubscriptionResp.getStatus().getCode()));
return future;
}
ShowSubscriptionTask.buildTSBlock(
showSubscriptionResp.isSetSubscriptionInfoList()
? showSubscriptionResp.getSubscriptionInfoList()
: Collections.emptyList(),
future);
} catch (Exception e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> createTopic(CreateTopicStatement createTopicStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TCreateTopicReq req =
new TCreateTopicReq()
.setTopicName(createTopicStatement.getTopicName())
.setTopicAttributes(createTopicStatement.getTopicAttributes());
final TSStatus tsStatus = configNodeClient.createTopic(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
"Failed to create topic {} in config node, status is {}.",
createTopicStatement.getTopicName(),
tsStatus);
future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (Exception e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> dropTopic(DropTopicStatement dropTopicStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TSStatus tsStatus = configNodeClient.dropTopic(dropTopicStatement.getTopicName());
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
"Failed to drop topic {}, status is {}.", dropTopicStatement.getTopicName(), tsStatus);
future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (Exception e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showTopics(ShowTopicsStatement showTopicsStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TShowTopicReq showTopicReq = new TShowTopicReq();
if (showTopicsStatement.getTopicName() != null) {
showTopicReq.setTopicName(showTopicsStatement.getTopicName());
}
final TShowTopicResp showTopicResp = configNodeClient.showTopic(showTopicReq);
if (showTopicResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.setException(
new IoTDBException(
showTopicResp.getStatus().getMessage(), showTopicResp.getStatus().getCode()));
return future;
}
ShowTopicsTask.buildTSBlock(
showTopicResp.isSetTopicInfoList()
? showTopicResp.getTopicInfoList()
: Collections.emptyList(),
future);
} catch (Exception e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> deleteTimeSeries(
String queryId, DeleteTimeSeriesStatement deleteTimeSeriesStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
final TDeleteTimeSeriesReq req =
new TDeleteTimeSeriesReq(
queryId,
serializePatternListToByteBuffer(deleteTimeSeriesStatement.getPathPatternList()));
try (ConfigNodeClient client =
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
tsStatus = client.deleteTimeSeries(req);
} catch (TTransportException e) {
if (e.getType() == TTransportException.TIMED_OUT
|| e.getCause() instanceof SocketTimeoutException) {
// time out mainly caused by slow execution, wait until
tsStatus = RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK);
} else {
throw e;
}
}
// keep waiting until task ends
} while (TSStatusCode.OVERLAP_WITH_EXISTING_TASK.getStatusCode() == tsStatus.getCode());
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
"Failed to execute delete timeseries {} in config node, status is {}.",
deleteTimeSeriesStatement.getPathPatternList(),
tsStatus);
if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
future.setException(
new BatchProcessException(tsStatus.subStatus.toArray(new TSStatus[0])));
} else {
future.setException(new IoTDBException(tsStatus.getMessage(), tsStatus.getCode()));
}
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> deleteLogicalView(
String queryId, DeleteLogicalViewStatement deleteLogicalViewStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
final TDeleteLogicalViewReq req =
new TDeleteLogicalViewReq(
queryId,
serializePatternListToByteBuffer(deleteLogicalViewStatement.getPathPatternList()));
try (ConfigNodeClient client =
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
tsStatus = client.deleteLogicalView(req);
} catch (TTransportException e) {
if (e.getType() == TTransportException.TIMED_OUT
|| e.getCause() instanceof SocketTimeoutException) {
// time out mainly caused by slow execution, wait until
tsStatus = RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK);
} else {
throw e;
}
}
// keep waiting until task ends
} while (TSStatusCode.OVERLAP_WITH_EXISTING_TASK.getStatusCode() == tsStatus.getCode());
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
"Failed to execute delete view {}, status is {}.",
deleteLogicalViewStatement.getPathPatternList(),
tsStatus);
future.setException(new IoTDBException(tsStatus.getMessage(), tsStatus.getCode()));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> renameLogicalView(
String queryId, RenameLogicalViewStatement renameLogicalViewStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
// check path
final PartialPath oldName = renameLogicalViewStatement.getOldName();
if (oldName.hasWildcard()) {
future.setException(
new MetadataException("Rename view doesn't support path pattern with wildcard."));
return future;
}
// fetch viewExpression
final PathPatternTree patternTree = new PathPatternTree();
patternTree.appendFullPath(oldName);
patternTree.constructTree();
final ISchemaTree schemaTree =
ClusterSchemaFetcher.getInstance().fetchSchema(patternTree, true, null);
final List<MeasurementPath> measurementPathList =
schemaTree.searchMeasurementPaths(oldName).left;
if (measurementPathList.isEmpty()) {
future.setException(new PathNotExistException(oldName.getFullPath()));
return future;
}
final LogicalViewSchema logicalViewSchema =
(LogicalViewSchema) measurementPathList.get(0).getMeasurementSchema();
final ViewExpression viewExpression = logicalViewSchema.getExpression();
// create new view
final CreateLogicalViewStatement createLogicalViewStatement = new CreateLogicalViewStatement();
createLogicalViewStatement.setTargetFullPaths(
Collections.singletonList(renameLogicalViewStatement.getNewName()));
createLogicalViewStatement.setViewExpressions(Collections.singletonList(viewExpression));
final ExecutionResult executionResult =
Coordinator.getInstance()
.executeForTreeModel(
createLogicalViewStatement,
0,
null,
"",
ClusterPartitionFetcher.getInstance(),
ClusterSchemaFetcher.getInstance(),
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.setException(
new IoTDBException(
executionResult.status.getMessage(), executionResult.status.getCode()));
return future;
}
// delete old view
final TDeleteLogicalViewReq req =
new TDeleteLogicalViewReq(
queryId, serializePatternListToByteBuffer(Collections.singletonList(oldName)));
try (ConfigNodeClient client =
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
tsStatus = client.deleteLogicalView(req);
} catch (TTransportException e) {
if (e.getType() == TTransportException.TIMED_OUT
|| e.getCause() instanceof SocketTimeoutException) {
// time out mainly caused by slow execution, wait until
tsStatus = RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK);
} else {
throw e;
}
}
// keep waiting until task ends
} while (TSStatusCode.OVERLAP_WITH_EXISTING_TASK.getStatusCode() == tsStatus.getCode());
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn("Failed to execute delete view {}, status is {}.", oldName, tsStatus);
future.setException(new IoTDBException(tsStatus.getMessage(), tsStatus.getCode()));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> alterLogicalView(
AlterLogicalViewStatement alterLogicalViewStatement, MPPQueryContext context) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
final CreateLogicalViewStatement createLogicalViewStatement = new CreateLogicalViewStatement();
createLogicalViewStatement.setTargetPaths(alterLogicalViewStatement.getTargetPaths());
createLogicalViewStatement.setSourcePaths(alterLogicalViewStatement.getSourcePaths());
createLogicalViewStatement.setQueryStatement(alterLogicalViewStatement.getQueryStatement());
final Analysis analysis = Analyzer.analyze(createLogicalViewStatement, context);
if (analysis.isFailed()) {
future.setException(
new IoTDBException(
analysis.getFailStatus().getMessage(), analysis.getFailStatus().getCode()));
return future;
}
// Transform all Expressions into ViewExpressions.
final TransformToViewExpressionVisitor transformToViewExpressionVisitor =
new TransformToViewExpressionVisitor();
final List<Expression> expressionList = alterLogicalViewStatement.getSourceExpressionList();
final List<ViewExpression> viewExpressionList = new ArrayList<>();
for (Expression expression : expressionList) {
viewExpressionList.add(transformToViewExpressionVisitor.process(expression, null));
}
final List<PartialPath> viewPathList = alterLogicalViewStatement.getTargetPathList();
final ByteArrayOutputStream stream = new ByteArrayOutputStream();
try {
ReadWriteIOUtils.write(viewPathList.size(), stream);
for (int i = 0; i < viewPathList.size(); i++) {
viewPathList.get(i).serialize(stream);
ViewExpression.serialize(viewExpressionList.get(i), stream);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
final TAlterLogicalViewReq req =
new TAlterLogicalViewReq(
context.getQueryId().getId(), ByteBuffer.wrap(stream.toByteArray()));
try (final ConfigNodeClient client =
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
tsStatus = client.alterLogicalView(req);
} catch (TTransportException e) {
if (e.getType() == TTransportException.TIMED_OUT
|| e.getCause() instanceof SocketTimeoutException) {
// time out mainly caused by slow execution, wait until
tsStatus = RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK);
} else {
throw e;
}
}
// keep waiting until task ends
} while (TSStatusCode.OVERLAP_WITH_EXISTING_TASK.getStatusCode() == tsStatus.getCode());
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
"Failed to execute alter view {}, status is {}.",
alterLogicalViewStatement.getTargetPathList(),
tsStatus);
if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
future.setException(
new BatchProcessException(tsStatus.subStatus.toArray(new TSStatus[0])));
} else {
future.setException(new IoTDBException(tsStatus.getMessage(), tsStatus.getCode()));
}
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
return future;
} catch (ClientManagerException | TException e) {
future.setException(e);
return future;
}
}
@Override
public TSStatus alterLogicalViewByPipe(AlterLogicalViewNode alterLogicalViewNode) {
final Map<PartialPath, ViewExpression> viewPathToSourceMap =
alterLogicalViewNode.getViewPathToSourceMap();
final ByteArrayOutputStream stream = new ByteArrayOutputStream();
try {
ReadWriteIOUtils.write(viewPathToSourceMap.size(), stream);
for (Map.Entry<PartialPath, ViewExpression> entry : viewPathToSourceMap.entrySet()) {
entry.getKey().serialize(stream);
ViewExpression.serialize(entry.getValue(), stream);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
final TAlterLogicalViewReq req =
new TAlterLogicalViewReq(
Coordinator.getInstance().createQueryId().getId(),
ByteBuffer.wrap(stream.toByteArray()))
.setIsGeneratedByPipe(true);
TSStatus tsStatus;
try (ConfigNodeClient client =
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
do {
try {
tsStatus = client.alterLogicalView(req);
} catch (TTransportException e) {
if (e.getType() == TTransportException.TIMED_OUT
|| e.getCause() instanceof SocketTimeoutException) {
// time out mainly caused by slow execution, wait until
tsStatus = RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK);
} else {
throw e;
}
}
// keep waiting until task ends
} while (TSStatusCode.OVERLAP_WITH_EXISTING_TASK.getStatusCode() == tsStatus.getCode());
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
"Failed to execute alter view {} by pipe, status is {}.",
viewPathToSourceMap,
tsStatus);
}
} catch (ClientManagerException | TException e) {
tsStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
tsStatus.setMessage(e.toString());
}
return tsStatus;
}
@Override
public SettableFuture<ConfigTaskResult> getRegionId(GetRegionIdStatement getRegionIdStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TGetRegionIdResp resp = new TGetRegionIdResp();
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TGetRegionIdReq tGetRegionIdReq =
new TGetRegionIdReq(getRegionIdStatement.getPartitionType());
if (getRegionIdStatement.getDevice() != null) {
tGetRegionIdReq.setDevice(getRegionIdStatement.getDevice());
} else {
tGetRegionIdReq.setDatabase(getRegionIdStatement.getDatabase());
}
tGetRegionIdReq.setStartTimeSlot(
TimePartitionUtils.getTimePartitionSlot(getRegionIdStatement.getStartTimeStamp()));
tGetRegionIdReq.setEndTimeSlot(
TimePartitionUtils.getTimePartitionSlot(getRegionIdStatement.getEndTimeStamp()));
resp = configNodeClient.getRegionId(tGetRegionIdReq);
if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.setException(new IoTDBException(resp.getStatus().message, resp.getStatus().code));
return future;
}
} catch (Exception e) {
future.setException(e);
}
GetRegionIdTask.buildTsBlock(resp, future);
return future;
}
@Override
public SettableFuture<ConfigTaskResult> getSeriesSlotList(
GetSeriesSlotListStatement getSeriesSlotListStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TGetSeriesSlotListResp resp = new TGetSeriesSlotListResp();
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TGetSeriesSlotListReq tGetSeriesSlotListReq =
new TGetSeriesSlotListReq(
getSeriesSlotListStatement.getDatabase(),
getSeriesSlotListStatement.getPartitionType());
resp = configNodeClient.getSeriesSlotList(tGetSeriesSlotListReq);
if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.setException(new IoTDBException(resp.getStatus().message, resp.getStatus().code));
return future;
}
} catch (Exception e) {
future.setException(e);
}
GetSeriesSlotListTask.buildTsBlock(resp, future);
return future;
}
@Override
public SettableFuture<ConfigTaskResult> getTimeSlotList(
GetTimeSlotListStatement getTimeSlotListStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TGetTimeSlotListResp resp = new TGetTimeSlotListResp();
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TGetTimeSlotListReq tGetTimeSlotListReq = new TGetTimeSlotListReq();
if (getTimeSlotListStatement.getDatabase() != null) {
tGetTimeSlotListReq.setDatabase(getTimeSlotListStatement.getDatabase());
} else if (getTimeSlotListStatement.getDevice() != null) {
tGetTimeSlotListReq.setDevice(getTimeSlotListStatement.getDevice());
} else if (getTimeSlotListStatement.getRegionId() != -1) {
tGetTimeSlotListReq.setRegionId(getTimeSlotListStatement.getRegionId());
}
if (getTimeSlotListStatement.getStartTime() != -1) {
tGetTimeSlotListReq.setStartTime(getTimeSlotListStatement.getStartTime());
}
if (getTimeSlotListStatement.getEndTime() != -1) {
tGetTimeSlotListReq.setEndTime(getTimeSlotListStatement.getEndTime());
}
resp = configNodeClient.getTimeSlotList(tGetTimeSlotListReq);
if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.setException(new IoTDBException(resp.getStatus().message, resp.getStatus().code));
return future;
}
} catch (Exception e) {
future.setException(e);
}
GetTimeSlotListTask.buildTSBlock(resp, future);
return future;
}
@Override
public SettableFuture<ConfigTaskResult> countTimeSlotList(
CountTimeSlotListStatement countTimeSlotListStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TCountTimeSlotListResp resp = new TCountTimeSlotListResp();
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TCountTimeSlotListReq tCountTimeSlotListReq = new TCountTimeSlotListReq();
if (countTimeSlotListStatement.getDatabase() != null) {
tCountTimeSlotListReq.setDatabase(countTimeSlotListStatement.getDatabase());
} else if (countTimeSlotListStatement.getDevice() != null) {
tCountTimeSlotListReq.setDevice(countTimeSlotListStatement.getDevice());
} else if (countTimeSlotListStatement.getRegionId() != -1) {
tCountTimeSlotListReq.setRegionId(countTimeSlotListStatement.getRegionId());
}
if (countTimeSlotListStatement.getStartTime() != -1) {
tCountTimeSlotListReq.setStartTime(countTimeSlotListStatement.getStartTime());
}
if (countTimeSlotListStatement.getEndTime() != -1) {
tCountTimeSlotListReq.setEndTime(countTimeSlotListStatement.getEndTime());
}
resp = configNodeClient.countTimeSlotList(tCountTimeSlotListReq);
if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.setException(new IoTDBException(resp.getStatus().message, resp.getStatus().code));
return future;
}
} catch (Exception e) {
future.setException(e);
}
CountTimeSlotListTask.buildTSBlock(resp, future);
return future;
}
@Override
public SettableFuture<ConfigTaskResult> migrateRegion(
MigrateRegionStatement migrateRegionStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TMigrateRegionReq tMigrateRegionReq =
new TMigrateRegionReq(
migrateRegionStatement.getRegionId(),
migrateRegionStatement.getFromId(),
migrateRegionStatement.getToId());
final TSStatus status = configNodeClient.migrateRegion(tMigrateRegionReq);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.setException(new IoTDBException(status.message, status.code));
return future;
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (Exception e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> createContinuousQuery(
CreateContinuousQueryStatement createContinuousQueryStatement, MPPQueryContext context) {
createContinuousQueryStatement.semanticCheck();
final String queryBody = createContinuousQueryStatement.getQueryBody();
// TODO Do not modify Statement in Analyzer
Analyzer.analyze(createContinuousQueryStatement.getQueryBodyStatement(), context);
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TCreateCQReq tCreateCQReq =
new TCreateCQReq(
createContinuousQueryStatement.getCqId(),
createContinuousQueryStatement.getEveryInterval(),
createContinuousQueryStatement.getBoundaryTime(),
createContinuousQueryStatement.getStartTimeOffset(),
createContinuousQueryStatement.getEndTimeOffset(),
createContinuousQueryStatement.getTimeoutPolicy().getType(),
queryBody,
context.getSql(),
context.getZoneId().getId(),
context.getSession() == null ? null : context.getSession().getUserName());
final TSStatus executionStatus = client.createCQ(tCreateCQReq);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
LOGGER.warn(
"[{}] Failed to create continuous query {}. TSStatus is {}",
executionStatus,
createContinuousQueryStatement.getCqId(),
executionStatus.message);
future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> dropContinuousQuery(String cqId) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TSStatus executionStatus = client.dropCQ(new TDropCQReq(cqId));
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
LOGGER.warn("[{}] Failed to drop continuous query {}.", executionStatus, cqId);
future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showContinuousQueries() {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TShowCQResp showCQResp = client.showCQ();
if (showCQResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.setException(
new IoTDBException(showCQResp.getStatus().message, showCQResp.getStatus().code));
return future;
}
// convert cqList and buildTsBlock
ShowContinuousQueriesTask.buildTsBlock(showCQResp.getCqList(), future);
} catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> setSpaceQuota(
SetSpaceQuotaStatement setSpaceQuotaStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TSStatus tsStatus = new TSStatus();
final TSetSpaceQuotaReq req = new TSetSpaceQuotaReq();
req.setDatabase(setSpaceQuotaStatement.getPrefixPathList());
final TSpaceQuota spaceQuota = new TSpaceQuota();
spaceQuota.setDeviceNum(setSpaceQuotaStatement.getDeviceNum());
spaceQuota.setTimeserieNum(setSpaceQuotaStatement.getTimeSeriesNum());
spaceQuota.setDiskSize(setSpaceQuotaStatement.getDiskSize());
req.setSpaceLimit(spaceQuota);
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// Send request to some API server
tsStatus = client.setSpaceQuota(req);
} catch (Exception e) {
future.setException(e);
}
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
} else {
future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showSpaceQuota(
ShowSpaceQuotaStatement showSpaceQuotaStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final List<String> databases = new ArrayList<>();
if (showSpaceQuotaStatement.getDatabases() != null) {
showSpaceQuotaStatement
.getDatabases()
.forEach(database -> databases.add(database.toString()));
}
// Send request to some API server
final TSpaceQuotaResp showSpaceQuotaResp = configNodeClient.showSpaceQuota(databases);
// build TSBlock
ShowSpaceQuotaTask.buildTsBlock(showSpaceQuotaResp, future);
} catch (Exception e) {
future.setException(e);
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> setThrottleQuota(
SetThrottleQuotaStatement setThrottleQuotaStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TSStatus tsStatus = new TSStatus();
TSetThrottleQuotaReq req = new TSetThrottleQuotaReq();
req.setUserName(setThrottleQuotaStatement.getUserName());
TThrottleQuota throttleQuota = new TThrottleQuota();
throttleQuota.setThrottleLimit(setThrottleQuotaStatement.getThrottleLimit());
throttleQuota.setMemLimit(setThrottleQuotaStatement.getMemLimit());
throttleQuota.setCpuLimit(setThrottleQuotaStatement.getCpuLimit());
req.setThrottleQuota(throttleQuota);
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// Send request to some API server
tsStatus = client.setThrottleQuota(req);
} catch (Exception e) {
future.setException(e);
}
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
} else {
future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
}
return future;
}
@Override
public SettableFuture<ConfigTaskResult> showThrottleQuota(
ShowThrottleQuotaStatement showThrottleQuotaStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// Send request to some API server
TShowThrottleReq req = new TShowThrottleReq();
req.setUserName(showThrottleQuotaStatement.getUserName());
TThrottleQuotaResp throttleQuotaResp = configNodeClient.showThrottleQuota(req);
// build TSBlock
ShowThrottleQuotaTask.buildTSBlock(throttleQuotaResp, future);
} catch (Exception e) {
future.setException(e);
}
return future;
}
@Override
public TThrottleQuotaResp getThrottleQuota() {
TThrottleQuotaResp throttleQuotaResp = new TThrottleQuotaResp();
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// Send request to some API server
throttleQuotaResp = configNodeClient.getThrottleQuota();
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
return throttleQuotaResp;
}
@Override
public TSpaceQuotaResp getSpaceQuota() {
TSpaceQuotaResp spaceQuotaResp = new TSpaceQuotaResp();
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// Send request to some API server
spaceQuotaResp = configNodeClient.getSpaceQuota();
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
return spaceQuotaResp;
}
@Override
public TPipeTransferResp handleTransferConfigPlan(String clientId, TPipeTransferReq req) {
final TPipeConfigTransferReq configTransferReq =
new TPipeConfigTransferReq(
req.version,
req.type,
req.body,
req instanceof AirGapPseudoTPipeTransferRequest,
clientId);
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TPipeConfigTransferResp pipeConfigTransferResp =
configNodeClient.handleTransferConfigPlan(configTransferReq);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode()
!= pipeConfigTransferResp.getStatus().getCode()) {
LOGGER.warn("Failed to handleTransferConfigPlan, status is {}.", pipeConfigTransferResp);
}
return new TPipeTransferResp(pipeConfigTransferResp.status)
.setBody(pipeConfigTransferResp.body);
} catch (Exception e) {
return new TPipeTransferResp(
new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
.setMessage(e.toString()));
}
}
@Override
public void handlePipeConfigClientExit(String clientId) {
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TSStatus status = configNodeClient.handlePipeConfigClientExit(clientId);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
LOGGER.warn("Failed to handlePipeConfigClientExit, status is {}.", status);
}
} catch (Exception e) {
LOGGER.warn("Failed to handlePipeConfigClientExit.", e);
}
}
}