| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.mpp.plan.execution.config.executor; |
| |
| import org.apache.iotdb.common.rpc.thrift.TFlushReq; |
| import org.apache.iotdb.common.rpc.thrift.TSStatus; |
| import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; |
| import org.apache.iotdb.commons.client.IClientManager; |
| import org.apache.iotdb.commons.cluster.NodeStatus; |
| import org.apache.iotdb.commons.consensus.PartitionRegionId; |
| import org.apache.iotdb.commons.exception.IoTDBException; |
| import org.apache.iotdb.commons.executable.ExecutableManager; |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.commons.path.PathPatternTree; |
| import org.apache.iotdb.commons.trigger.TriggerTable; |
| import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp; |
| import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq; |
| import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq; |
| import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq; |
| import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq; |
| import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq; |
| import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; |
| import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp; |
| import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; |
| import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq; |
| import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp; |
| import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp; |
| import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp; |
| import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; |
| import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; |
| import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp; |
| import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema; |
| import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp; |
| import org.apache.iotdb.db.client.ConfigNodeClient; |
| import org.apache.iotdb.db.client.ConfigNodeInfo; |
| import org.apache.iotdb.db.client.DataNodeClientPoolFactory; |
| import org.apache.iotdb.db.localconfignode.LocalConfigNode; |
| import org.apache.iotdb.db.metadata.template.ClusterTemplateManager; |
| import org.apache.iotdb.db.metadata.template.Template; |
| import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult; |
| import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CountStorageGroupTask; |
| import org.apache.iotdb.db.mpp.plan.execution.config.metadata.SetStorageGroupTask; |
| import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowClusterTask; |
| import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowConfigNodesTask; |
| import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowDataNodesTask; |
| import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowRegionTask; |
| import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowStorageGroupTask; |
| import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowTTLTask; |
| import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowTriggersTask; |
| import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowNodesInSchemaTemplateTask; |
| import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowPathSetTemplateTask; |
| import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowSchemaTemplateTask; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.SetTTLStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDataNodesStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowRegionStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStorageGroupStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTTLStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.template.SetSchemaTemplateStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowNodesInSchemaTemplateStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathSetTemplateStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StartPipeStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StopPipeStatement; |
| import org.apache.iotdb.rpc.RpcUtils; |
| import org.apache.iotdb.rpc.StatementExecutionException; |
| import org.apache.iotdb.rpc.TSStatusCode; |
| |
| import com.google.common.util.concurrent.SettableFuture; |
| import org.apache.thrift.TException; |
| import org.apache.thrift.transport.TTransportException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.net.SocketTimeoutException; |
| import java.nio.ByteBuffer; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.stream.Collectors; |
| |
| public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(ClusterConfigTaskExecutor.class); |
| |
| private static final IClientManager<PartitionRegionId, ConfigNodeClient> |
| CONFIG_NODE_CLIENT_MANAGER = |
| new IClientManager.Factory<PartitionRegionId, ConfigNodeClient>() |
| .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory()); |
| |
| private static final IClientManager<PartitionRegionId, ConfigNodeClient> |
| CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER = |
| new IClientManager.Factory<PartitionRegionId, ConfigNodeClient>() |
| .createClientManager( |
| new DataNodeClientPoolFactory.ClusterDeletionConfigNodeClientPoolFactory()); |
| |
| private static final class ClusterConfigTaskExecutorHolder { |
| private static final ClusterConfigTaskExecutor INSTANCE = new ClusterConfigTaskExecutor(); |
| |
| private ClusterConfigTaskExecutorHolder() {} |
| } |
| |
| public static ClusterConfigTaskExecutor getInstance() { |
| return ClusterConfigTaskExecutor.ClusterConfigTaskExecutorHolder.INSTANCE; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> setStorageGroup( |
| SetStorageGroupStatement setStorageGroupStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| // Construct request using statement |
| TStorageGroupSchema storageGroupSchema = |
| SetStorageGroupTask.constructStorageGroupSchema(setStorageGroupStatement); |
| TSetStorageGroupReq req = new TSetStorageGroupReq(storageGroupSchema); |
| try (ConfigNodeClient configNodeClient = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| // Send request to some API server |
| TSStatus tsStatus = configNodeClient.setStorageGroup(req); |
| // Get response or throw exception |
| if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { |
| LOGGER.error( |
| "Failed to execute set storage group {} in config node, status is {}.", |
| setStorageGroupStatement.getStorageGroupPath(), |
| tsStatus); |
| future.setException(new IoTDBException(tsStatus.message, tsStatus.code)); |
| } else { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } |
| } catch (TException | IOException e) { |
| future.setException(e); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showStorageGroup( |
| ShowStorageGroupStatement showStorageGroupStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| // Construct request using statement |
| List<String> storageGroupPathPattern = |
| Arrays.asList(showStorageGroupStatement.getPathPattern().getNodes()); |
| try (ConfigNodeClient client = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| // Send request to some API server |
| TShowStorageGroupResp resp = client.showStorageGroup(storageGroupPathPattern); |
| // build TSBlock |
| ShowStorageGroupTask.buildTSBlock(resp.getStorageGroupInfoMap(), future); |
| } catch (TException | IOException e) { |
| future.setException(e); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> countStorageGroup( |
| CountStorageGroupStatement countStorageGroupStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| int storageGroupNum; |
| List<String> storageGroupPathPattern = |
| Arrays.asList(countStorageGroupStatement.getPathPattern().getNodes()); |
| try (ConfigNodeClient client = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| TCountStorageGroupResp resp = client.countMatchedStorageGroups(storageGroupPathPattern); |
| storageGroupNum = resp.getCount(); |
| // build TSBlock |
| CountStorageGroupTask.buildTSBlock(storageGroupNum, future); |
| } catch (TException | IOException e) { |
| future.setException(e); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> deleteStorageGroup( |
| DeleteStorageGroupStatement deleteStorageGroupStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TDeleteStorageGroupsReq req = |
| new TDeleteStorageGroupsReq(deleteStorageGroupStatement.getPrefixPath()); |
| try (ConfigNodeClient client = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| TSStatus tsStatus = client.deleteStorageGroups(req); |
| if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { |
| LOGGER.error( |
| "Failed to execute delete storage group {} in config node, status is {}.", |
| deleteStorageGroupStatement.getPrefixPath(), |
| tsStatus); |
| future.setException(new IoTDBException(tsStatus.getMessage(), tsStatus.getCode())); |
| } else { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } |
| } catch (TException | IOException e) { |
| future.setException(e); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> createFunction( |
| String udfName, String className, List<String> uris) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| try (ConfigNodeClient client = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| final TSStatus executionStatus = |
| client.createFunction(new TCreateFunctionReq(udfName, className, uris)); |
| |
| if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) { |
| LOGGER.error( |
| "[{}] Failed to create function {}({}) in config node, URI: {}.", |
| executionStatus, |
| udfName, |
| className, |
| uris); |
| future.setException(new IoTDBException(executionStatus.message, executionStatus.code)); |
| } else { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } |
| } catch (TException | IOException e) { |
| future.setException(e); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> dropFunction(String udfName) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| try (ConfigNodeClient client = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| final TSStatus executionStatus = client.dropFunction(new TDropFunctionReq(udfName)); |
| |
| if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) { |
| LOGGER.error("[{}] Failed to drop function {} in config node.", executionStatus, udfName); |
| future.setException(new IoTDBException(executionStatus.message, executionStatus.code)); |
| } else { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } |
| } catch (TException | IOException e) { |
| future.setException(e); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> createTrigger( |
| CreateTriggerStatement createTriggerStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| try (ConfigNodeClient client = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| TCreateTriggerReq tCreateTriggerReq = |
| new TCreateTriggerReq( |
| createTriggerStatement.getTriggerName(), |
| createTriggerStatement.getClassName(), |
| createTriggerStatement.getJarPath(), |
| createTriggerStatement.isUsingURI(), |
| createTriggerStatement.getTriggerEvent().getId(), |
| createTriggerStatement.getTriggerType().getId(), |
| createTriggerStatement.getPathPattern().serialize(), |
| createTriggerStatement.getAttributes()); |
| |
| if (!createTriggerStatement.isUsingURI()) { |
| // If jarPath is a file path, we transfer it to ByteBuffer and send it to ConfigNode. |
| tCreateTriggerReq.setJarFile( |
| ExecutableManager.transferToBytebuffer(createTriggerStatement.getJarPath())); |
| } |
| |
| final TSStatus executionStatus = client.createTrigger(tCreateTriggerReq); |
| |
| if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) { |
| LOGGER.error( |
| "[{}] Failed to create trigger {}. TSStatus is {}", |
| executionStatus, |
| createTriggerStatement.getTriggerName(), |
| executionStatus.message); |
| future.setException(new IoTDBException(executionStatus.message, executionStatus.code)); |
| } else { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } |
| } catch (TException | IOException e) { |
| future.setException(e); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> dropTrigger(String triggerName) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| try (ConfigNodeClient client = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| // todo: implementation |
| final TSStatus executionStatus = client.dropTrigger(new TDropTriggerReq(triggerName)); |
| |
| if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) { |
| LOGGER.error("[{}] Failed to drop trigger {}.", executionStatus, triggerName); |
| future.setException(new IoTDBException(executionStatus.message, executionStatus.code)); |
| } else { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } |
| } catch (TException | IOException e) { |
| future.setException(e); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showTriggers() { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| try (ConfigNodeClient client = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| TGetTriggerTableResp getTriggerTableResp = client.getTriggerTable(); |
| if (getTriggerTableResp.getStatus().getCode() |
| != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.setException( |
| new IoTDBException( |
| getTriggerTableResp.getStatus().message, getTriggerTableResp.getStatus().code)); |
| return future; |
| } |
| } catch (TException | IOException e) { |
| future.setException(e); |
| } |
| // convert triggerTable and buildTsBlock |
| ShowTriggersTask.buildTsBlock(new TriggerTable(), future); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> setTTL(SetTTLStatement setTTLStatement, String taskName) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| List<String> storageGroupPathPattern = |
| Arrays.asList(setTTLStatement.getStorageGroupPath().getNodes()); |
| TSetTTLReq setTTLReq = new TSetTTLReq(storageGroupPathPattern, setTTLStatement.getTTL()); |
| try (ConfigNodeClient configNodeClient = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| // Send request to some API server |
| TSStatus tsStatus = configNodeClient.setTTL(setTTLReq); |
| // Get response or throw exception |
| if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { |
| LOGGER.error( |
| "Failed to execute {} {} in config node, status is {}.", |
| taskName, |
| setTTLStatement.getStorageGroupPath(), |
| tsStatus); |
| future.setException(new IoTDBException(tsStatus.getMessage(), tsStatus.getCode())); |
| } else { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } |
| } catch (TException | IOException e) { |
| future.setException(e); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> merge(boolean onCluster) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TSStatus tsStatus = new TSStatus(); |
| if (onCluster) { |
| try (ConfigNodeClient client = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| // Send request to some API server |
| tsStatus = client.merge(); |
| } catch (IOException | TException e) { |
| future.setException(e); |
| } |
| } else { |
| tsStatus = LocalConfigNode.getInstance().executeMergeOperation(); |
| } |
| if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } else { |
| future.setException(new IoTDBException(tsStatus.message, tsStatus.code)); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq, boolean onCluster) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TSStatus tsStatus = new TSStatus(); |
| if (onCluster) { |
| try (ConfigNodeClient client = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| // Send request to some API server |
| tsStatus = client.flush(tFlushReq); |
| } catch (IOException | TException e) { |
| future.setException(e); |
| } |
| } else { |
| tsStatus = LocalConfigNode.getInstance().executeFlushOperation(tFlushReq); |
| } |
| if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } else { |
| future.setException(new IoTDBException(tsStatus.message, tsStatus.code)); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> clearCache(boolean onCluster) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TSStatus tsStatus = new TSStatus(); |
| if (onCluster) { |
| try (ConfigNodeClient client = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| // Send request to some API server |
| tsStatus = client.clearCache(); |
| } catch (IOException | TException e) { |
| future.setException(e); |
| } |
| } else { |
| tsStatus = LocalConfigNode.getInstance().executeClearCacheOperation(); |
| } |
| if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } else { |
| future.setException(new IoTDBException(tsStatus.message, tsStatus.code)); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> loadConfiguration(boolean onCluster) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TSStatus tsStatus = new TSStatus(); |
| if (onCluster) { |
| try (ConfigNodeClient client = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| // Send request to some API server |
| tsStatus = client.loadConfiguration(); |
| } catch (IOException | TException e) { |
| future.setException(e); |
| } |
| } else { |
| tsStatus = LocalConfigNode.getInstance().executeLoadConfigurationOperation(); |
| } |
| if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } else { |
| future.setException(new StatementExecutionException(tsStatus)); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> setSystemStatus(boolean onCluster, NodeStatus status) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TSStatus tsStatus = new TSStatus(); |
| if (onCluster) { |
| try (ConfigNodeClient client = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| // Send request to some API server |
| tsStatus = client.setSystemStatus(status.getStatus()); |
| } catch (IOException | TException e) { |
| future.setException(e); |
| } |
| } else { |
| tsStatus = LocalConfigNode.getInstance().executeSetSystemStatus(status); |
| } |
| if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } else { |
| future.setException(new StatementExecutionException(tsStatus)); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showCluster() { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TShowClusterResp showClusterResp = new TShowClusterResp(); |
| try (ConfigNodeClient client = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| showClusterResp = client.showCluster(); |
| } catch (TException | IOException e) { |
| future.setException(e); |
| } |
| // build TSBlock |
| ShowClusterTask.buildTSBlock(showClusterResp, future); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showTTL(ShowTTLStatement showTTLStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| List<PartialPath> storageGroupPaths = showTTLStatement.getPaths(); |
| Map<String, Long> storageGroupToTTL = new HashMap<>(); |
| try (ConfigNodeClient client = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| if (showTTLStatement.isAll()) { |
| List<String> allStorageGroupPathPattern = Arrays.asList("root", "**"); |
| TStorageGroupSchemaResp resp = |
| client.getMatchedStorageGroupSchemas(allStorageGroupPathPattern); |
| for (Map.Entry<String, TStorageGroupSchema> entry : |
| resp.getStorageGroupSchemaMap().entrySet()) { |
| storageGroupToTTL.put(entry.getKey(), entry.getValue().getTTL()); |
| } |
| } else { |
| for (PartialPath storageGroupPath : storageGroupPaths) { |
| List<String> storageGroupPathPattern = Arrays.asList(storageGroupPath.getNodes()); |
| TStorageGroupSchemaResp resp = |
| client.getMatchedStorageGroupSchemas(storageGroupPathPattern); |
| for (Map.Entry<String, TStorageGroupSchema> entry : |
| resp.getStorageGroupSchemaMap().entrySet()) { |
| if (!storageGroupToTTL.containsKey(entry.getKey())) { |
| storageGroupToTTL.put(entry.getKey(), entry.getValue().getTTL()); |
| } |
| } |
| } |
| } |
| } catch (TException | IOException e) { |
| future.setException(e); |
| } |
| // build TSBlock |
| ShowTTLTask.buildTSBlock(storageGroupToTTL, future); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showRegion(ShowRegionStatement showRegionStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TShowRegionResp showRegionResp = new TShowRegionResp(); |
| TShowRegionReq showRegionReq = new TShowRegionReq(); |
| showRegionReq.setConsensusGroupType(showRegionStatement.getRegionType()); |
| if (showRegionStatement.getStorageGroups() == null) { |
| showRegionReq.setStorageGroups(null); |
| } else { |
| showRegionReq.setStorageGroups( |
| showRegionStatement.getStorageGroups().stream() |
| .map(PartialPath::getFullPath) |
| .collect(Collectors.toList())); |
| } |
| try (ConfigNodeClient client = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| showRegionResp = client.showRegion(showRegionReq); |
| if (showRegionResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.setException( |
| new IoTDBException( |
| showRegionResp.getStatus().message, showRegionResp.getStatus().code)); |
| return future; |
| } |
| } catch (TException | IOException e) { |
| future.setException(e); |
| } |
| // build TSBlock |
| ShowRegionTask.buildTSBlock(showRegionResp, future); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showDataNodes( |
| ShowDataNodesStatement showDataNodesStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TShowDataNodesResp showDataNodesResp = new TShowDataNodesResp(); |
| try (ConfigNodeClient client = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| showDataNodesResp = client.showDataNodes(); |
| if (showDataNodesResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.setException( |
| new IoTDBException( |
| showDataNodesResp.getStatus().message, showDataNodesResp.getStatus().code)); |
| return future; |
| } |
| } catch (TException | IOException e) { |
| future.setException(e); |
| } |
| // build TSBlock |
| ShowDataNodesTask.buildTSBlock(showDataNodesResp, future); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showConfigNodes() { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TShowConfigNodesResp showConfigNodesResp = new TShowConfigNodesResp(); |
| try (ConfigNodeClient client = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| showConfigNodesResp = client.showConfigNodes(); |
| if (showConfigNodesResp.getStatus().getCode() |
| != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.setException( |
| new IoTDBException( |
| showConfigNodesResp.getStatus().message, showConfigNodesResp.getStatus().code)); |
| return future; |
| } |
| } catch (TException | IOException e) { |
| future.setException(e); |
| } |
| // build TSBlock |
| ShowConfigNodesTask.buildTSBlock(showConfigNodesResp, future); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> createSchemaTemplate( |
| CreateSchemaTemplateStatement createSchemaTemplateStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| // Construct request using statement |
| try { |
| // Send request to some API server |
| TSStatus tsStatus = |
| ClusterTemplateManager.getInstance().createSchemaTemplate(createSchemaTemplateStatement); |
| // Get response or throw exception |
| if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { |
| LOGGER.error( |
| "Failed to execute create schema template {} in config node, status is {}.", |
| createSchemaTemplateStatement.getName(), |
| tsStatus); |
| future.setException(new IoTDBException(tsStatus.getMessage(), tsStatus.getCode())); |
| } else { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } |
| } catch (Exception e) { |
| future.setException(e.getCause()); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showSchemaTemplate( |
| ShowSchemaTemplateStatement showSchemaTemplateStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| try (ConfigNodeClient configNodeClient = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| // Send request to some API server |
| List<Template> templateList = ClusterTemplateManager.getInstance().getAllTemplates(); |
| // build TSBlock |
| ShowSchemaTemplateTask.buildTSBlock(templateList, future); |
| } catch (Exception e) { |
| future.setException(e); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showNodesInSchemaTemplate( |
| ShowNodesInSchemaTemplateStatement showNodesInSchemaTemplateStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| String req = showNodesInSchemaTemplateStatement.getTemplateName(); |
| TGetTemplateResp tGetTemplateResp = new TGetTemplateResp(); |
| try (ConfigNodeClient configNodeClient = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| // Send request to some API server |
| Template template = ClusterTemplateManager.getInstance().getTemplate(req); |
| // build TSBlock |
| ShowNodesInSchemaTemplateTask.buildTSBlock(template, future); |
| } catch (Exception e) { |
| future.setException(e); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> setSchemaTemplate( |
| SetSchemaTemplateStatement setSchemaTemplateStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| String templateName = setSchemaTemplateStatement.getTemplateName(); |
| PartialPath path = setSchemaTemplateStatement.getPath(); |
| try { |
| // Send request to some API server |
| ClusterTemplateManager.getInstance().setSchemaTemplate(templateName, path); |
| // build TSBlock |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } catch (Exception e) { |
| future.setException(e.getCause()); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showPathSetTemplate( |
| ShowPathSetTemplateStatement showPathSetTemplateStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| String templateName = showPathSetTemplateStatement.getTemplateName(); |
| try (ConfigNodeClient configNodeClient = |
| CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { |
| // Send request to some API server |
| List<PartialPath> listPath = |
| ClusterTemplateManager.getInstance().getPathsSetTemplate(templateName); |
| // build TSBlock |
| ShowPathSetTemplateTask.buildTSBlock(listPath, future); |
| } catch (Exception e) { |
| future.setException(e); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> createPipe(CreatePipeStatement createPipeStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| future.setException( |
| new IoTDBException( |
| "Executing create pipe is not supported", |
| TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> createPipeSink( |
| CreatePipeSinkStatement createPipeSinkStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| future.setException( |
| new IoTDBException( |
| "Executing create pipesink is not supported", |
| TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> dropPipe(DropPipeStatement dropPipeStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| future.setException( |
| new IoTDBException( |
| "Executing drop pipe is not supported", |
| TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> dropPipeSink( |
| DropPipeSinkStatement dropPipeSinkStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| future.setException( |
| new IoTDBException( |
| "Executing drop pipesink is not supported", |
| TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showPipe(ShowPipeStatement showPipeStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| future.setException( |
| new IoTDBException( |
| "Executing show pipe is not supported", |
| TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showPipeSink( |
| ShowPipeSinkStatement showPipeSinkStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| future.setException( |
| new IoTDBException( |
| "Executing show pipesink is not supported", |
| TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> startPipe(StartPipeStatement startPipeStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| future.setException( |
| new IoTDBException( |
| "Executing Start pipe is not supported", |
| TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> stopPipe(StopPipeStatement stopPipeStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| future.setException( |
| new IoTDBException( |
| "Executing stop pipe is not supported", |
| TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> deleteTimeSeries( |
| String queryId, DeleteTimeSeriesStatement deleteTimeSeriesStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| PathPatternTree patternTree = new PathPatternTree(); |
| for (PartialPath pathPattern : deleteTimeSeriesStatement.getPathPatternList()) { |
| patternTree.appendPathPattern(pathPattern); |
| } |
| patternTree.constructTree(); |
| ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); |
| DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); |
| try { |
| patternTree.serialize(dataOutputStream); |
| } catch (IOException ignored) { |
| // memory operation, won't happen |
| } |
| TDeleteTimeSeriesReq req = |
| new TDeleteTimeSeriesReq(queryId, ByteBuffer.wrap(byteArrayOutputStream.toByteArray())); |
| try (ConfigNodeClient client = |
| CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient( |
| ConfigNodeInfo.partitionRegionId)) { |
| TSStatus tsStatus = null; |
| do { |
| try { |
| tsStatus = client.deleteTimeSeries(req); |
| } catch (TTransportException e) { |
| if (e.getType() == TTransportException.TIMED_OUT |
| || e.getCause() instanceof SocketTimeoutException) { |
| // time out mainly caused by slow execution, wait until |
| tsStatus = RpcUtils.getStatus(TSStatusCode.STILL_EXECUTING_STATUS); |
| } else { |
| throw e; |
| } |
| } |
| // keep waiting until task ends |
| } while (TSStatusCode.STILL_EXECUTING_STATUS.getStatusCode() == tsStatus.getCode()); |
| |
| if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { |
| LOGGER.error( |
| "Failed to execute delete timeseries {} in config node, status is {}.", |
| deleteTimeSeriesStatement.getPathPatternList(), |
| tsStatus); |
| future.setException(new IoTDBException(tsStatus.getMessage(), tsStatus.getCode())); |
| } else { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } |
| } catch (TException | IOException e) { |
| future.setException(e); |
| } |
| return future; |
| } |
| } |