| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.confignode.manager; |
| |
| import org.apache.iotdb.common.rpc.thrift.TSStatus; |
| import org.apache.iotdb.commons.exception.IoTDBException; |
| import org.apache.iotdb.commons.path.PathPatternTree; |
| import org.apache.iotdb.commons.utils.StatusUtils; |
| import org.apache.iotdb.confignode.conf.ConfigNodeConfig; |
| import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; |
| import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; |
| import org.apache.iotdb.confignode.manager.partition.PartitionManager; |
| import org.apache.iotdb.confignode.persistence.ProcedureInfo; |
| import org.apache.iotdb.confignode.procedure.Procedure; |
| import org.apache.iotdb.confignode.procedure.ProcedureExecutor; |
| import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; |
| import org.apache.iotdb.confignode.procedure.impl.AddConfigNodeProcedure; |
| import org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure; |
| import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure; |
| import org.apache.iotdb.confignode.procedure.impl.DeleteTimeSeriesProcedure; |
| import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure; |
| import org.apache.iotdb.confignode.procedure.impl.RemoveConfigNodeProcedure; |
| import org.apache.iotdb.confignode.procedure.impl.RemoveDataNodeProcedure; |
| import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler; |
| import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler; |
| import org.apache.iotdb.confignode.procedure.store.ConfigProcedureStore; |
| import org.apache.iotdb.confignode.procedure.store.IProcedureStore; |
| import org.apache.iotdb.confignode.procedure.store.ProcedureFactory; |
| import org.apache.iotdb.confignode.procedure.store.ProcedureStore; |
| import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq; |
| import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq; |
| import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq; |
| import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema; |
| import org.apache.iotdb.rpc.RpcUtils; |
| import org.apache.iotdb.rpc.TSStatusCode; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.concurrent.TimeUnit; |
| |
| public class ProcedureManager { |
| private static final Logger LOGGER = LoggerFactory.getLogger(ProcedureManager.class); |
| |
| private static final ConfigNodeConfig CONFIG_NODE_CONFIG = |
| ConfigNodeDescriptor.getInstance().getConf(); |
| |
| private static final int PROCEDURE_WAIT_TIME_OUT = 30; |
| private static final int PROCEDURE_WAIT_RETRY_TIMEOUT = 250; |
| |
| private final ConfigManager configManager; |
| private ProcedureExecutor<ConfigNodeProcedureEnv> executor; |
| private ProcedureScheduler scheduler; |
| private IProcedureStore store; |
| private ConfigNodeProcedureEnv env; |
| |
| public ProcedureManager(ConfigManager configManager, ProcedureInfo procedureInfo) { |
| this.configManager = configManager; |
| this.scheduler = new SimpleProcedureScheduler(); |
| this.store = new ConfigProcedureStore(configManager, procedureInfo); |
| this.env = new ConfigNodeProcedureEnv(configManager, scheduler); |
| this.executor = new ProcedureExecutor<>(env, store, scheduler); |
| } |
| |
| public void shiftExecutor(boolean running) { |
| if (running) { |
| if (!executor.isRunning()) { |
| executor.init(CONFIG_NODE_CONFIG.getProcedureCoreWorkerThreadsSize()); |
| executor.startWorkers(); |
| executor.startCompletedCleaner( |
| CONFIG_NODE_CONFIG.getProcedureCompletedCleanInterval(), |
| CONFIG_NODE_CONFIG.getProcedureCompletedEvictTTL()); |
| store.start(); |
| LOGGER.info("ProcedureManager is started successfully."); |
| } |
| } else { |
| if (executor.isRunning()) { |
| executor.stop(); |
| if (!executor.isRunning()) { |
| executor.join(); |
| store.stop(); |
| LOGGER.info("ProcedureManager is stopped successfully."); |
| } |
| } |
| } |
| } |
| |
| public TSStatus deleteStorageGroups(ArrayList<TStorageGroupSchema> deleteSgSchemaList) { |
| List<Long> procedureIds = new ArrayList<>(); |
| for (TStorageGroupSchema storageGroupSchema : deleteSgSchemaList) { |
| DeleteStorageGroupProcedure deleteStorageGroupProcedure = |
| new DeleteStorageGroupProcedure(storageGroupSchema); |
| long procedureId = this.executor.submitProcedure(deleteStorageGroupProcedure); |
| procedureIds.add(procedureId); |
| } |
| List<TSStatus> procedureStatus = new ArrayList<>(); |
| boolean isSucceed = waitingProcedureFinished(procedureIds, procedureStatus); |
| // clear the previously deleted regions |
| final PartitionManager partitionManager = getConfigManager().getPartitionManager(); |
| partitionManager.getRegionMaintainer().submit(partitionManager::maintainRegionReplicas); |
| if (isSucceed) { |
| return StatusUtils.OK; |
| } else { |
| return RpcUtils.getStatus(procedureStatus); |
| } |
| } |
| |
| public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) { |
| String queryId = req.getQueryId(); |
| PathPatternTree patternTree = |
| PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree())); |
| long procedureId = -1; |
| synchronized (this) { |
| boolean hasOverlappedTask = false; |
| ProcedureFactory.ProcedureType type; |
| DeleteTimeSeriesProcedure deleteTimeSeriesProcedure; |
| for (Procedure<?> procedure : executor.getProcedures().values()) { |
| type = ProcedureFactory.getProcedureType(procedure); |
| if (type == null |
| || !type.equals(ProcedureFactory.ProcedureType.DELETE_TIMESERIES_PROCEDURE)) { |
| continue; |
| } |
| deleteTimeSeriesProcedure = ((DeleteTimeSeriesProcedure) procedure); |
| if (queryId.equals(deleteTimeSeriesProcedure.getQueryId())) { |
| procedureId = deleteTimeSeriesProcedure.getProcId(); |
| break; |
| } |
| if (patternTree.isOverlapWith(deleteTimeSeriesProcedure.getPatternTree())) { |
| hasOverlappedTask = true; |
| break; |
| } |
| } |
| |
| if (procedureId == -1) { |
| if (hasOverlappedTask) { |
| return RpcUtils.getStatus( |
| TSStatusCode.OVERLAP_WITH_EXISTING_DELETE_TIMESERIES_TASK, |
| "Some other task is deleting some target timeseries."); |
| } |
| procedureId = |
| this.executor.submitProcedure(new DeleteTimeSeriesProcedure(queryId, patternTree)); |
| } |
| } |
| List<TSStatus> procedureStatus = new ArrayList<>(); |
| boolean isSucceed = |
| waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus); |
| if (isSucceed) { |
| return StatusUtils.OK; |
| } else { |
| return procedureStatus.get(0); |
| } |
| } |
| |
| /** Generate a AddConfigNodeProcedure, and serially execute all the AddConfigNodeProcedure */ |
| public void addConfigNode(TConfigNodeRegisterReq req) { |
| AddConfigNodeProcedure addConfigNodeProcedure = |
| new AddConfigNodeProcedure(req.getConfigNodeLocation()); |
| this.executor.submitProcedure(addConfigNodeProcedure); |
| } |
| |
| /** |
| * Generate a RemoveConfigNodeProcedure, and serially execute all the RemoveConfigNodeProcedure |
| */ |
| public void removeConfigNode(RemoveConfigNodePlan removeConfigNodePlan) { |
| RemoveConfigNodeProcedure removeConfigNodeProcedure = |
| new RemoveConfigNodeProcedure(removeConfigNodePlan.getConfigNodeLocation()); |
| this.executor.submitProcedure(removeConfigNodeProcedure); |
| LOGGER.info("Submit to remove ConfigNode, {}", removeConfigNodePlan); |
| } |
| |
| /** Generate RemoveDataNodeProcedures, and serially execute all the RemoveDataNodeProcedure */ |
| public boolean removeDataNode(RemoveDataNodePlan removeDataNodePlan) { |
| removeDataNodePlan |
| .getDataNodeLocations() |
| .forEach( |
| tDataNodeLocation -> { |
| this.executor.submitProcedure(new RemoveDataNodeProcedure(tDataNodeLocation)); |
| LOGGER.info("Submit to remove data node procedure, {}", tDataNodeLocation); |
| }); |
| return true; |
| } |
| |
| /** |
| * Generate CreateRegionGroupsProcedure and wait for it finished |
| * |
| * @return SUCCESS_STATUS if all RegionGroups created successfully, CREATE_REGION_ERROR otherwise |
| */ |
| public TSStatus createRegionGroups(CreateRegionGroupsPlan createRegionGroupsPlan) { |
| long procedureId = |
| executor.submitProcedure(new CreateRegionGroupsProcedure(createRegionGroupsPlan)); |
| List<TSStatus> statusList = new ArrayList<>(); |
| boolean isSucceed = |
| waitingProcedureFinished(Collections.singletonList(procedureId), statusList); |
| if (isSucceed) { |
| return RpcUtils.SUCCESS_STATUS; |
| } else { |
| return new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode()) |
| .setMessage(statusList.get(0).getMessage()); |
| } |
| } |
| |
| /** |
| * Waiting until the specific procedures finished |
| * |
| * @param procedureIds The specific procedures' index |
| * @param statusList The corresponding running results of these procedures |
| * @return True if all Procedures finished successfully, false otherwise |
| */ |
| private boolean waitingProcedureFinished(List<Long> procedureIds, List<TSStatus> statusList) { |
| boolean isSucceed = true; |
| for (long procedureId : procedureIds) { |
| long startTimeForCurrentProcedure = System.currentTimeMillis(); |
| while (executor.isRunning() |
| && !executor.isFinished(procedureId) |
| && TimeUnit.MILLISECONDS.toSeconds( |
| System.currentTimeMillis() - startTimeForCurrentProcedure) |
| < PROCEDURE_WAIT_TIME_OUT) { |
| sleepWithoutInterrupt(PROCEDURE_WAIT_RETRY_TIMEOUT); |
| } |
| Procedure<ConfigNodeProcedureEnv> finishedProcedure = |
| executor.getResultOrProcedure(procedureId); |
| if (!finishedProcedure.isFinished()) { |
| // the procedure is still executing |
| statusList.add(RpcUtils.getStatus(TSStatusCode.STILL_EXECUTING_STATUS)); |
| isSucceed = false; |
| continue; |
| } |
| if (finishedProcedure.isSuccess()) { |
| statusList.add(StatusUtils.OK); |
| } else { |
| if (finishedProcedure.getException().getCause() instanceof IoTDBException) { |
| IoTDBException e = (IoTDBException) finishedProcedure.getException().getCause(); |
| statusList.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); |
| } else { |
| statusList.add( |
| StatusUtils.EXECUTE_STATEMENT_ERROR.setMessage( |
| finishedProcedure.getException().getMessage())); |
| } |
| isSucceed = false; |
| } |
| } |
| return isSucceed; |
| } |
| |
| public static void sleepWithoutInterrupt(final long timeToSleep) { |
| long currentTime = System.currentTimeMillis(); |
| long endTime = timeToSleep + currentTime; |
| boolean interrupted = false; |
| while (currentTime < endTime) { |
| try { |
| Thread.sleep(endTime - currentTime); |
| } catch (InterruptedException e) { |
| interrupted = true; |
| } |
| currentTime = System.currentTimeMillis(); |
| } |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| // ====================================================== |
| /* |
| GET-SET Region |
| */ |
| // ====================================================== |
| public IManager getConfigManager() { |
| return configManager; |
| } |
| |
| public ProcedureExecutor<ConfigNodeProcedureEnv> getExecutor() { |
| return executor; |
| } |
| |
| public void setExecutor(ProcedureExecutor<ConfigNodeProcedureEnv> executor) { |
| this.executor = executor; |
| } |
| |
| public ProcedureScheduler getScheduler() { |
| return scheduler; |
| } |
| |
| public void setScheduler(ProcedureScheduler scheduler) { |
| this.scheduler = scheduler; |
| } |
| |
| public IProcedureStore getStore() { |
| return store; |
| } |
| |
| public void setStore(ProcedureStore store) { |
| this.store = store; |
| } |
| |
| public ConfigNodeProcedureEnv getEnv() { |
| return env; |
| } |
| |
| public void setEnv(ConfigNodeProcedureEnv env) { |
| this.env = env; |
| } |
| |
| public void reportRegionMigrateResult(TRegionMigrateResultReportReq req) { |
| |
| this.executor |
| .getProcedures() |
| .values() |
| .forEach( |
| procedure -> { |
| if (procedure instanceof RegionMigrateProcedure) { |
| RegionMigrateProcedure regionMigrateProcedure = (RegionMigrateProcedure) procedure; |
| if (regionMigrateProcedure.getConsensusGroupId().equals(req.getRegionId())) { |
| regionMigrateProcedure.notifyTheRegionMigrateFinished(req); |
| } |
| } |
| }); |
| } |
| } |