| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.mpp.plan.execution.config.executor; |
| |
| import org.apache.iotdb.common.rpc.thrift.TFlushReq; |
| import org.apache.iotdb.common.rpc.thrift.TSStatus; |
| import org.apache.iotdb.commons.cluster.NodeStatus; |
| import org.apache.iotdb.commons.exception.IoTDBException; |
| import org.apache.iotdb.commons.exception.MetadataException; |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.commons.udf.service.UDFExecutableManager; |
| import org.apache.iotdb.commons.udf.service.UDFRegistrationService; |
| import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; |
| import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupInfo; |
| import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema; |
| import org.apache.iotdb.db.localconfignode.LocalConfigNode; |
| import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode; |
| import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult; |
| import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CountStorageGroupTask; |
| import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowStorageGroupTask; |
| import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowTTLTask; |
| import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeSinkTask; |
| import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeTask; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.SetTTLStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDataNodesStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowRegionStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStorageGroupStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTTLStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.template.SetSchemaTemplateStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowNodesInSchemaTemplateStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathSetTemplateStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StartPipeStatement; |
| import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StopPipeStatement; |
| import org.apache.iotdb.db.sync.sender.pipe.PipeSink; |
| import org.apache.iotdb.rpc.StatementExecutionException; |
| import org.apache.iotdb.rpc.TSStatusCode; |
| |
| import com.google.common.util.concurrent.SettableFuture; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| public class StandaloneConfigTaskExecutor implements IConfigTaskExecutor { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(StandaloneConfigTaskExecutor.class); |
| |
| private static final class StandaloneConfigTaskExecutorHolder { |
| private static final StandaloneConfigTaskExecutor INSTANCE = new StandaloneConfigTaskExecutor(); |
| |
| private StandaloneConfigTaskExecutorHolder() {} |
| } |
| |
| public static StandaloneConfigTaskExecutor getInstance() { |
| return StandaloneConfigTaskExecutorHolder.INSTANCE; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> setStorageGroup( |
| SetStorageGroupStatement setStorageGroupStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| try { |
| LocalConfigNode localConfigNode = LocalConfigNode.getInstance(); |
| localConfigNode.setStorageGroup(setStorageGroupStatement.getStorageGroupPath()); |
| if (setStorageGroupStatement.getTTL() != null) { |
| localConfigNode.setTTL( |
| setStorageGroupStatement.getStorageGroupPath(), setStorageGroupStatement.getTTL()); |
| } |
| // schemaReplicationFactor, dataReplicationFactor, timePartitionInterval are ignored |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } catch (Exception e) { |
| LOGGER.error("Failed to set storage group, caused by ", e); |
| future.setException(e); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showStorageGroup( |
| ShowStorageGroupStatement showStorageGroupStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| Map<String, TStorageGroupInfo> storageGroupInfoMap = new HashMap<>(); |
| try { |
| LocalConfigNode localConfigNode = LocalConfigNode.getInstance(); |
| List<PartialPath> partialPaths = |
| localConfigNode.getMatchedStorageGroups( |
| showStorageGroupStatement.getPathPattern(), showStorageGroupStatement.isPrefixPath()); |
| for (PartialPath storageGroupPath : partialPaths) { |
| IStorageGroupMNode storageGroupMNode = |
| localConfigNode.getStorageGroupNodeByPath(storageGroupPath); |
| String storageGroup = storageGroupMNode.getFullPath(); |
| TStorageGroupSchema storageGroupSchema = storageGroupMNode.getStorageGroupSchema(); |
| |
| TStorageGroupInfo storageGroupInfo = new TStorageGroupInfo(); |
| storageGroupInfo.setName(storageGroup); |
| storageGroupInfo.setTTL(storageGroupSchema.getTTL()); |
| storageGroupInfo.setSchemaReplicationFactor( |
| storageGroupSchema.getSchemaReplicationFactor()); |
| storageGroupInfo.setDataReplicationFactor(storageGroupSchema.getDataReplicationFactor()); |
| storageGroupInfo.setTimePartitionInterval(storageGroupSchema.getTimePartitionInterval()); |
| storageGroupInfo.setSchemaRegionNum(1); |
| storageGroupInfo.setDataRegionNum(1); |
| storageGroupInfoMap.put(storageGroup, storageGroupInfo); |
| } |
| // build TSBlock |
| ShowStorageGroupTask.buildTSBlock(storageGroupInfoMap, future); |
| } catch (MetadataException e) { |
| future.setException(e); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> countStorageGroup( |
| CountStorageGroupStatement countStorageGroupStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| int storageGroupNum; |
| try { |
| storageGroupNum = |
| LocalConfigNode.getInstance() |
| .getStorageGroupNum( |
| countStorageGroupStatement.getPathPattern(), |
| countStorageGroupStatement.isPrefixPath()); |
| // build TSBlock |
| CountStorageGroupTask.buildTSBlock(storageGroupNum, future); |
| } catch (MetadataException e) { |
| future.setException(e); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> deleteStorageGroup( |
| DeleteStorageGroupStatement deleteStorageGroupStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| try { |
| List<PartialPath> deletePathList = new ArrayList<>(); |
| for (String path : deleteStorageGroupStatement.getPrefixPath()) { |
| PartialPath prefixPath = new PartialPath(path); |
| deletePathList.addAll( |
| LocalConfigNode.getInstance().getMatchedStorageGroups(prefixPath, false)); |
| } |
| if (deletePathList.isEmpty()) { |
| future.setException( |
| new IoTDBException( |
| String.format( |
| "Path %s does not exist", |
| Arrays.toString(deleteStorageGroupStatement.getPrefixPath().toArray())), |
| TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode())); |
| return future; |
| } else { |
| LocalConfigNode.getInstance().deleteStorageGroups(deletePathList); |
| } |
| } catch (MetadataException e) { |
| future.setException(e); |
| return future; |
| } |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> createFunction( |
| String udfName, String className, List<String> uris) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| try { |
| UDFRegistrationService.getInstance() |
| .register(udfName, className, uris, UDFExecutableManager.getInstance(), true); |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } catch (Exception e) { |
| final String message = |
| String.format( |
| "Failed to create function %s(%s), URI: %s, because %s.", |
| udfName, className, uris, e.getMessage()); |
| LOGGER.error(message, e); |
| future.setException( |
| new IoTDBException(message, TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> dropFunction(String udfName) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| try { |
| UDFRegistrationService.getInstance().deregister(udfName); |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } catch (Exception e) { |
| final String message = |
| String.format("Failed to drop function %s, because %s.", udfName, e.getMessage()); |
| LOGGER.error(message, e); |
| future.setException( |
| new IoTDBException(message, TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> createTrigger( |
| CreateTriggerStatement createTriggerStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| try { |
| // todo: implementation |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } catch (Exception e) { |
| final String message = |
| String.format( |
| "Failed to create trigger %s, because %s.", |
| createTriggerStatement.getTriggerName(), e.getMessage()); |
| LOGGER.error(message, e); |
| future.setException( |
| new IoTDBException(message, TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> dropTrigger(String triggerName) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| try { |
| // todo: implementation |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } catch (Exception e) { |
| final String message = |
| String.format("Failed to drop trigger %s, because %s.", triggerName, e.getMessage()); |
| LOGGER.error(message, e); |
| future.setException( |
| new IoTDBException(message, TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showTriggers() { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| future.setException( |
| new IoTDBException( |
| "Executing show triggers in standalone mode is not supported", |
| TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> setTTL(SetTTLStatement setTTLStatement, String taskName) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| try { |
| LocalConfigNode.getInstance() |
| .setTTL(setTTLStatement.getStorageGroupPath(), setTTLStatement.getTTL()); |
| } catch (MetadataException | IOException e) { |
| future.setException(e); |
| } |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> merge(boolean onCluster) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TSStatus tsStatus = LocalConfigNode.getInstance().executeMergeOperation(); |
| if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } else { |
| future.setException(new IoTDBException(tsStatus.message, tsStatus.code)); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq, boolean onCluster) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TSStatus tsStatus = LocalConfigNode.getInstance().executeFlushOperation(tFlushReq); |
| if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } else { |
| future.setException(new IoTDBException(tsStatus.message, tsStatus.code)); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> clearCache(boolean onCluster) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TSStatus tsStatus = LocalConfigNode.getInstance().executeClearCacheOperation(); |
| if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } else { |
| future.setException(new IoTDBException(tsStatus.message, tsStatus.code)); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> loadConfiguration(boolean onCluster) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TSStatus tsStatus = LocalConfigNode.getInstance().executeLoadConfigurationOperation(); |
| if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } else { |
| future.setException(new StatementExecutionException(tsStatus)); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> setSystemStatus(boolean onCluster, NodeStatus status) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TSStatus tsStatus = LocalConfigNode.getInstance().executeSetSystemStatus(status); |
| if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } else { |
| future.setException(new StatementExecutionException(tsStatus)); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showCluster() { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| future.setException( |
| new IoTDBException( |
| "Executing show cluster in standalone mode is not supported", |
| TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showTTL(ShowTTLStatement showTTLStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| List<PartialPath> storageGroupPaths = showTTLStatement.getPaths(); |
| Map<String, Long> storageGroupToTTL = new HashMap<>(); |
| try { |
| Map<PartialPath, Long> allStorageGroupToTTL = |
| LocalConfigNode.getInstance().getStorageGroupsTTL(); |
| if (showTTLStatement.isAll()) { |
| allStorageGroupToTTL |
| .entrySet() |
| .forEach( |
| (entry) -> { |
| storageGroupToTTL.put(entry.getKey().getFullPath(), entry.getValue()); |
| }); |
| } else { |
| for (PartialPath storageGroupPath : storageGroupPaths) { |
| List<PartialPath> matchedStorageGroupPaths = |
| LocalConfigNode.getInstance() |
| .getMatchedStorageGroups(storageGroupPath, showTTLStatement.isPrefixPath()); |
| for (PartialPath matchedStorageGroupPath : matchedStorageGroupPaths) { |
| storageGroupToTTL.put( |
| matchedStorageGroupPath.getFullPath(), |
| allStorageGroupToTTL.get(matchedStorageGroupPath)); |
| } |
| } |
| } |
| } catch (MetadataException e) { |
| future.setException(e); |
| } |
| // build TSBlock |
| ShowTTLTask.buildTSBlock(storageGroupToTTL, future); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showRegion(ShowRegionStatement showRegionStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| future.setException( |
| new IoTDBException( |
| "Executing show regions in standalone mode is not supported", |
| TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showDataNodes( |
| ShowDataNodesStatement showDataNodesStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| future.setException( |
| new IoTDBException( |
| "Executing show datanodes in standalone mode is not supported", |
| TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showConfigNodes() { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| future.setException( |
| new IoTDBException( |
| "Executing show confignodes in standalone mode is not supported", |
| TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> createSchemaTemplate( |
| CreateSchemaTemplateStatement createSchemaTemplateStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| future.setException( |
| new IoTDBException( |
| "Executing create schema template is not supported", |
| TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showSchemaTemplate( |
| ShowSchemaTemplateStatement showSchemaTemplateStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| future.setException( |
| new IoTDBException( |
| "Executing show schema template is not supported", |
| TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showNodesInSchemaTemplate( |
| ShowNodesInSchemaTemplateStatement showNodesInSchemaTemplateStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| future.setException( |
| new IoTDBException( |
| "Executing show nodes in schema template is not supported", |
| TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> setSchemaTemplate( |
| SetSchemaTemplateStatement setSchemaTemplateStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| future.setException( |
| new IoTDBException( |
| "Executing set schema template is not supported", |
| TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showPathSetTemplate( |
| ShowPathSetTemplateStatement showPathSetTemplateStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| future.setException( |
| new IoTDBException( |
| "Executing show path set template is not supported", |
| TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> createPipeSink( |
| CreatePipeSinkStatement createPipeSinkStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TSStatus tsStatus = LocalConfigNode.getInstance().createPipeSink(createPipeSinkStatement); |
| if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } else { |
| future.setException(new StatementExecutionException(tsStatus)); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> dropPipeSink( |
| DropPipeSinkStatement dropPipeSinkStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TSStatus tsStatus = |
| LocalConfigNode.getInstance().dropPipeSink(dropPipeSinkStatement.getPipeSinkName()); |
| if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } else { |
| future.setException(new StatementExecutionException(tsStatus)); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showPipeSink( |
| ShowPipeSinkStatement showPipeSinkStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| List<PipeSink> pipeSinkList = |
| LocalConfigNode.getInstance().showPipeSink(showPipeSinkStatement.getPipeSinkName()); |
| ShowPipeSinkTask.buildTSBlock(pipeSinkList, future); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> createPipe(CreatePipeStatement createPipeStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TSStatus tsStatus = LocalConfigNode.getInstance().createPipe(createPipeStatement); |
| if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } else { |
| future.setException(new StatementExecutionException(tsStatus)); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> startPipe(StartPipeStatement startPipeStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TSStatus tsStatus = LocalConfigNode.getInstance().startPipe(startPipeStatement.getPipeName()); |
| if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } else { |
| future.setException(new StatementExecutionException(tsStatus)); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> stopPipe(StopPipeStatement stopPipeStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TSStatus tsStatus = LocalConfigNode.getInstance().stopPipe(stopPipeStatement.getPipeName()); |
| if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } else { |
| future.setException(new StatementExecutionException(tsStatus)); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> dropPipe(DropPipeStatement dropPipeStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TSStatus tsStatus = LocalConfigNode.getInstance().dropPipe(dropPipeStatement.getPipeName()); |
| if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); |
| } else { |
| future.setException(new StatementExecutionException(tsStatus)); |
| } |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> showPipe(ShowPipeStatement showPipeStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| TShowPipeResp showPipeResp = |
| LocalConfigNode.getInstance().showPipe(showPipeStatement.getPipeName()); |
| ShowPipeTask.buildTSBlock(showPipeResp.getPipeInfoList(), future); |
| return future; |
| } |
| |
| @Override |
| public SettableFuture<ConfigTaskResult> deleteTimeSeries( |
| String queryId, DeleteTimeSeriesStatement deleteTimeSeriesStatement) { |
| SettableFuture<ConfigTaskResult> future = SettableFuture.create(); |
| future.setException( |
| new IoTDBException( |
| "Executing delete timeseries is not supported", |
| TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); |
| return future; |
| } |
| } |