| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.cluster.coordinator; |
| |
| import org.apache.iotdb.cluster.client.async.AsyncDataClient; |
| import org.apache.iotdb.cluster.client.sync.SyncDataClient; |
| import org.apache.iotdb.cluster.config.ClusterConstant; |
| import org.apache.iotdb.cluster.config.ClusterDescriptor; |
| import org.apache.iotdb.cluster.exception.ChangeMembershipException; |
| import org.apache.iotdb.cluster.exception.CheckConsistencyException; |
| import org.apache.iotdb.cluster.exception.UnknownLogTypeException; |
| import org.apache.iotdb.cluster.exception.UnsupportedPlanException; |
| import org.apache.iotdb.cluster.log.Log; |
| import org.apache.iotdb.cluster.metadata.CMManager; |
| import org.apache.iotdb.cluster.partition.PartitionGroup; |
| import org.apache.iotdb.cluster.query.ClusterPlanRouter; |
| import org.apache.iotdb.cluster.rpc.thrift.Node; |
| import org.apache.iotdb.cluster.rpc.thrift.RaftNode; |
| import org.apache.iotdb.cluster.rpc.thrift.RaftService; |
| import org.apache.iotdb.cluster.server.RaftServer; |
| import org.apache.iotdb.cluster.server.member.MetaGroupMember; |
| import org.apache.iotdb.cluster.server.monitor.Timer; |
| import org.apache.iotdb.cluster.utils.PartitionUtils; |
| import org.apache.iotdb.cluster.utils.StatusUtils; |
| import org.apache.iotdb.db.conf.IoTDBConstant; |
| import org.apache.iotdb.db.exception.metadata.IllegalPathException; |
| import org.apache.iotdb.db.exception.metadata.MetadataException; |
| import org.apache.iotdb.db.exception.metadata.PathNotExistException; |
| import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; |
| import org.apache.iotdb.db.exception.query.QueryProcessException; |
| import org.apache.iotdb.db.metadata.PartialPath; |
| import org.apache.iotdb.db.qp.physical.BatchPlan; |
| import org.apache.iotdb.db.qp.physical.PhysicalPlan; |
| import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan; |
| import org.apache.iotdb.db.qp.physical.crud.InsertPlan; |
| import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan; |
| import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; |
| import org.apache.iotdb.db.qp.physical.crud.SetSchemaTemplatePlan; |
| import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan; |
| import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan; |
| import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; |
| import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan; |
| import org.apache.iotdb.db.service.IoTDB; |
| import org.apache.iotdb.rpc.RpcUtils; |
| import org.apache.iotdb.rpc.TSStatusCode; |
| import org.apache.iotdb.service.rpc.thrift.EndPoint; |
| import org.apache.iotdb.service.rpc.thrift.TSStatus; |
| |
| import org.apache.thrift.TException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.CountDownLatch; |
| |
| /** Coordinator of client non-query request */ |
| public class Coordinator { |
| |
| private static final Logger logger = LoggerFactory.getLogger(Coordinator.class); |
| |
| private MetaGroupMember metaGroupMember; |
| |
| private String name; |
| private Node thisNode; |
| /** router calculates the partition groups that a partitioned plan should be sent to */ |
| private ClusterPlanRouter router; |
| |
| private static final String MSG_MULTIPLE_ERROR = |
| "The following errors occurred when executing " |
| + "the query, please retry or contact the DBA: "; |
| |
| public Coordinator(MetaGroupMember metaGroupMember) { |
| this.metaGroupMember = metaGroupMember; |
| this.name = metaGroupMember.getName(); |
| this.thisNode = metaGroupMember.getThisNode(); |
| } |
| |
| public Coordinator() {} |
| |
| public void setMetaGroupMember(MetaGroupMember metaGroupMember) { |
| this.metaGroupMember = metaGroupMember; |
| this.name = metaGroupMember.getName(); |
| this.thisNode = metaGroupMember.getThisNode(); |
| } |
| |
| public void setRouter(ClusterPlanRouter router) { |
| this.router = router; |
| } |
| |
| /** |
| * Execute a non-query plan. According to the type of the plan, the plan will be executed on all |
| * nodes (like timeseries deletion) or the nodes that belong to certain groups (like data |
| * ingestion). |
| * |
| * @param plan a non-query plan. |
| */ |
| public TSStatus executeNonQueryPlan(PhysicalPlan plan) { |
| TSStatus result; |
| long startTime = Timer.Statistic.COORDINATOR_EXECUTE_NON_QUERY.getOperationStartTime(); |
| if (PartitionUtils.isLocalNonQueryPlan(plan)) { |
| // run locally |
| result = executeNonQueryLocally(plan); |
| } else if (PartitionUtils.isGlobalMetaPlan(plan)) { |
| // forward the plan to all meta group nodes |
| result = metaGroupMember.processNonPartitionedMetaPlan(plan); |
| } else if (PartitionUtils.isGlobalDataPlan(plan)) { |
| // forward the plan to all data group nodes |
| result = processNonPartitionedDataPlan(plan); |
| } else { |
| // split the plan and forward them to some PartitionGroups |
| try { |
| result = processPartitionedPlan(plan); |
| } catch (UnsupportedPlanException e) { |
| return StatusUtils.getStatus(StatusUtils.UNSUPPORTED_OPERATION, e.getMessage()); |
| } |
| } |
| Timer.Statistic.COORDINATOR_EXECUTE_NON_QUERY.calOperationCostTimeFromStart(startTime); |
| return result; |
| } |
| |
| /** execute a non-query plan that is not necessary to be executed on other nodes. */ |
| private TSStatus executeNonQueryLocally(PhysicalPlan plan) { |
| boolean execRet; |
| try { |
| execRet = metaGroupMember.getLocalExecutor().processNonQuery(plan); |
| } catch (QueryProcessException e) { |
| if (e.getErrorCode() != TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) { |
| logger.debug("meet error while processing non-query. ", e); |
| } else { |
| logger.warn("meet error while processing non-query. ", e); |
| } |
| return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); |
| } catch (Exception e) { |
| logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e); |
| return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } |
| |
| return execRet |
| ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully") |
| : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR); |
| } |
| |
| /** |
| * A non-partitioned plan (like DeleteData) should be executed on all data group nodes, so the |
| * DataGroupLeader should take the responsible to make sure that every node receives the plan. |
| * Thus the plan will be processed locally only by the DataGroupLeader and forwarded by non-leader |
| * nodes. |
| */ |
| private TSStatus processNonPartitionedDataPlan(PhysicalPlan plan) { |
| try { |
| if (plan instanceof DeleteTimeSeriesPlan) { |
| // as delete related plans may have abstract paths (paths with wildcards), we convert |
| // them to full paths so the executor nodes will not need to query the metadata holders, |
| // eliminating the risk that when they are querying the metadata holders, the timeseries |
| // has already been deleted |
| ((CMManager) IoTDB.metaManager).convertToFullPaths(plan); |
| } else { |
| // function convertToFullPaths has already sync leader |
| metaGroupMember.syncLeaderWithConsistencyCheck(true); |
| } |
| } catch (PathNotExistException e) { |
| if (plan.getPaths().isEmpty()) { |
| // only reports an error when there is no matching path |
| return StatusUtils.getStatus(StatusUtils.TIMESERIES_NOT_EXIST_ERROR, e.getMessage()); |
| } |
| } catch (CheckConsistencyException e) { |
| logger.debug( |
| "Forwarding global data plan {} to meta leader {}", plan, metaGroupMember.getLeader()); |
| metaGroupMember.waitLeader(); |
| return metaGroupMember.forwardPlan(plan, metaGroupMember.getLeader(), null); |
| } |
| try { |
| createSchemaIfNecessary(plan); |
| } catch (MetadataException | CheckConsistencyException e) { |
| logger.error("{}: Cannot find storage groups for {}", name, plan); |
| return StatusUtils.NO_STORAGE_GROUP; |
| } |
| List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups(); |
| logger.debug("Forwarding global data plan {} to {} groups", plan, globalGroups.size()); |
| return forwardPlan(globalGroups, plan); |
| } |
| |
| public void createSchemaIfNecessary(PhysicalPlan plan) |
| throws MetadataException, CheckConsistencyException { |
| if (plan instanceof SetSchemaTemplatePlan) { |
| try { |
| IoTDB.metaManager.getStorageGroupPath( |
| new PartialPath(((SetSchemaTemplatePlan) plan).getPrefixPath())); |
| } catch (IllegalPathException e) { |
| // the plan has been checked |
| } catch (StorageGroupNotSetException e) { |
| ((CMManager) IoTDB.metaManager).createSchema(plan); |
| } |
| } |
| } |
| |
| /** |
| * A partitioned plan (like batch insertion) will be split into several sub-plans, each belongs to |
| * a data group. And these sub-plans will be sent to and executed on the corresponding groups |
| * separately. |
| */ |
| public TSStatus processPartitionedPlan(PhysicalPlan plan) throws UnsupportedPlanException { |
| logger.debug("{}: Received a partitioned plan {}", name, plan); |
| if (metaGroupMember.getPartitionTable() == null) { |
| logger.debug("{}: Partition table is not ready", name); |
| return StatusUtils.PARTITION_TABLE_NOT_READY; |
| } |
| |
| if (!checkPrivilegeForBatchExecution(plan)) { |
| return concludeFinalStatus( |
| plan, plan.getPaths().size(), true, null, false, null, Collections.emptyList()); |
| } |
| |
| // split the plan into sub-plans that each only involve one data group |
| Map<PhysicalPlan, PartitionGroup> planGroupMap; |
| try { |
| planGroupMap = splitPlan(plan); |
| } catch (CheckConsistencyException checkConsistencyException) { |
| return StatusUtils.getStatus( |
| StatusUtils.CONSISTENCY_FAILURE, checkConsistencyException.getMessage()); |
| } |
| |
| // the storage group is not found locally |
| if (planGroupMap == null || planGroupMap.isEmpty()) { |
| if ((plan instanceof InsertPlan |
| || plan instanceof CreateTimeSeriesPlan |
| || plan instanceof CreateAlignedTimeSeriesPlan |
| || plan instanceof CreateMultiTimeSeriesPlan) |
| && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) { |
| |
| logger.debug("{}: No associated storage group found for {}, auto-creating", name, plan); |
| try { |
| ((CMManager) IoTDB.metaManager).createSchema(plan); |
| return processPartitionedPlan(plan); |
| } catch (MetadataException | CheckConsistencyException e) { |
| logger.error( |
| String.format("Failed to set storage group or create timeseries, because %s", e)); |
| } |
| } |
| logger.error("{}: Cannot find storage groups for {}", name, plan); |
| return StatusUtils.NO_STORAGE_GROUP; |
| } |
| logger.debug("{}: The data groups of {} are {}", name, plan, planGroupMap); |
| return forwardPlan(planGroupMap, plan); |
| } |
| |
| /** |
| * check if batch execution plan has privilege on any sg |
| * |
| * @param plan |
| * @return |
| */ |
| private boolean checkPrivilegeForBatchExecution(PhysicalPlan plan) { |
| if (plan instanceof BatchPlan) { |
| return ((BatchPlan) plan).getResults().size() != plan.getPaths().size(); |
| } else { |
| return true; |
| } |
| } |
| |
| /** |
| * Forward a plan to all DataGroupMember groups. Only when all nodes time out, will a TIME_OUT be |
| * returned. The error messages from each group (if any) will be compacted into one string. |
| * |
| * @param partitionGroups |
| * @param plan |
| */ |
| private TSStatus forwardPlan(List<PartitionGroup> partitionGroups, PhysicalPlan plan) { |
| // the error codes from the groups that cannot execute the plan |
| TSStatus status; |
| List<String> errorCodePartitionGroups = new ArrayList<>(); |
| for (PartitionGroup partitionGroup : partitionGroups) { |
| if (partitionGroup.contains(thisNode)) { |
| // the query should be handled by a group the local node is in, handle it with in the group |
| status = |
| metaGroupMember |
| .getLocalDataMember(partitionGroup.getHeader()) |
| .executeNonQueryPlan(plan); |
| logger.debug( |
| "Execute {} in a local group of {} with status {}", |
| plan, |
| partitionGroup.getHeader(), |
| status); |
| } else { |
| // forward the query to the group that should handle it |
| status = forwardPlan(plan, partitionGroup); |
| logger.debug( |
| "Forward {} to a remote group of {} with status {}", |
| plan, |
| partitionGroup.getHeader(), |
| status); |
| } |
| if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() |
| && !(plan instanceof SetSchemaTemplatePlan |
| && status.getCode() == TSStatusCode.DUPLICATED_TEMPLATE.getStatusCode()) |
| && !(plan instanceof DeleteTimeSeriesPlan |
| && status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode())) { |
| // execution failed, record the error message |
| errorCodePartitionGroups.add( |
| String.format( |
| "[%s@%s:%s]", status.getCode(), partitionGroup.getHeader(), status.getMessage())); |
| } |
| } |
| if (errorCodePartitionGroups.isEmpty()) { |
| status = StatusUtils.OK; |
| } else { |
| status = |
| StatusUtils.getStatus( |
| StatusUtils.EXECUTE_STATEMENT_ERROR, MSG_MULTIPLE_ERROR + errorCodePartitionGroups); |
| } |
| logger.debug("{}: executed {} with answer {}", name, plan, status); |
| return status; |
| } |
| |
| public void sendLogToAllDataGroups(Log log) throws ChangeMembershipException { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Send log {} to all data groups: start", log); |
| } |
| |
| Map<PhysicalPlan, PartitionGroup> planGroupMap = router.splitAndRouteChangeMembershipLog(log); |
| List<String> errorCodePartitionGroups = new CopyOnWriteArrayList<>(); |
| CountDownLatch counter = new CountDownLatch(planGroupMap.size()); |
| for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) { |
| metaGroupMember |
| .getAppendLogThreadPool() |
| .submit(() -> forwardChangeMembershipPlan(log, entry, errorCodePartitionGroups, counter)); |
| } |
| try { |
| counter.await(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new ChangeMembershipException( |
| String.format("Can not wait all data groups to apply %s", log)); |
| } |
| if (!errorCodePartitionGroups.isEmpty()) { |
| throw new ChangeMembershipException( |
| String.format("Apply %s failed with status {%s}", log, errorCodePartitionGroups)); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("Send log {} to all data groups: end", log); |
| } |
| } |
| |
| private void forwardChangeMembershipPlan( |
| Log log, |
| Map.Entry<PhysicalPlan, PartitionGroup> entry, |
| List<String> errorCodePartitionGroups, |
| CountDownLatch counter) { |
| int retryTime = 0; |
| long startTime = System.currentTimeMillis(); |
| try { |
| while (true) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Send change membership log {} to data group {}, retry time: {}", |
| log, |
| entry.getValue(), |
| retryTime); |
| } |
| try { |
| TSStatus status = forwardToSingleGroup(entry); |
| if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Success to send change membership log {} to data group {}", |
| log, |
| entry.getValue()); |
| } |
| return; |
| } |
| long cost = System.currentTimeMillis() - startTime; |
| if (cost > ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS()) { |
| errorCodePartitionGroups.add( |
| String.format( |
| "Forward change membership log %s to data group %s", log, entry.getValue())); |
| return; |
| } |
| Thread.sleep(ClusterConstant.RETRY_WAIT_TIME_MS); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| errorCodePartitionGroups.add(e.getMessage()); |
| return; |
| } |
| retryTime++; |
| } |
| } finally { |
| counter.countDown(); |
| } |
| } |
| |
| /** split a plan into several sub-plans, each belongs to only one data group. */ |
| private Map<PhysicalPlan, PartitionGroup> splitPlan(PhysicalPlan plan) |
| throws UnsupportedPlanException, CheckConsistencyException { |
| Map<PhysicalPlan, PartitionGroup> planGroupMap = null; |
| try { |
| planGroupMap = router.splitAndRoutePlan(plan); |
| } catch (StorageGroupNotSetException e) { |
| // synchronize with the leader to see if this node has unpulled storage groups |
| metaGroupMember.syncLeaderWithConsistencyCheck(true); |
| try { |
| planGroupMap = router.splitAndRoutePlan(plan); |
| } catch (MetadataException | UnknownLogTypeException ex) { |
| // ignore |
| } |
| } catch (MetadataException | UnknownLogTypeException e) { |
| logger.error("Cannot route plan {}", plan, e); |
| } |
| logger.debug("route plan {} with partitionGroup {}", plan, planGroupMap); |
| return planGroupMap; |
| } |
| |
| /** |
| * Forward plans to the DataGroupMember of one node in the corresponding group. Only when all |
| * nodes time out, will a TIME_OUT be returned. |
| * |
| * @param planGroupMap sub-plan -> belong data group pairs |
| */ |
| private TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) { |
| // the error codes from the groups that cannot execute the plan |
| TSStatus status; |
| // need to create substatus for multiPlan |
| |
| // InsertTabletPlan, InsertMultiTabletPlan, InsertRowsPlan and CreateMultiTimeSeriesPlan |
| // contains many rows, |
| // each will correspond to a TSStatus as its execution result, |
| // as the plan is split and the sub-plans may have interleaving ranges, |
| // we must assure that each TSStatus is placed to the right position |
| // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to NodeA and row2 |
| // belongs to NodeB, when NodeA returns a success while NodeB returns a failure, the |
| // failure and success should be placed into proper positions in TSStatus.subStatus |
| if (plan instanceof InsertTabletPlan |
| || plan instanceof InsertMultiTabletPlan |
| || plan instanceof CreateMultiTimeSeriesPlan |
| || plan instanceof InsertRowsPlan) { |
| status = forwardMultiSubPlan(planGroupMap, plan); |
| } else if (planGroupMap.size() == 1) { |
| status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next()); |
| } else { |
| status = forwardToMultipleGroup(planGroupMap); |
| } |
| boolean hasCreated = false; |
| try { |
| if (plan instanceof InsertPlan |
| && status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode() |
| && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) { |
| hasCreated = createTimeseriesForFailedInsertion(((InsertPlan) plan)); |
| } |
| } catch (MetadataException | CheckConsistencyException e) { |
| logger.error("{}: Cannot auto-create timeseries for {}", name, plan, e); |
| return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage()); |
| } |
| if (hasCreated) { |
| status = forwardPlan(planGroupMap, plan); |
| } |
| if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() |
| && status.isSetRedirectNode()) { |
| status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode()); |
| } |
| logger.debug("{}: executed {} with answer {}", name, plan, status); |
| return status; |
| } |
| |
| private boolean createTimeseriesForFailedInsertion(InsertPlan plan) |
| throws CheckConsistencyException, IllegalPathException { |
| // try to create timeseries |
| if (plan.getFailedMeasurements() != null) { |
| plan.getPlanFromFailed(); |
| } |
| return ((CMManager) IoTDB.metaManager).createTimeseries(plan); |
| } |
| |
| private TSStatus forwardToSingleGroup(Map.Entry<PhysicalPlan, PartitionGroup> entry) { |
| TSStatus result; |
| if (entry.getValue().contains(thisNode)) { |
| // the query should be handled by a group the local node is in, handle it with in the group |
| long startTime = |
| Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP |
| .getOperationStartTime(); |
| logger.debug( |
| "Execute {} in a local group of {}", entry.getKey(), entry.getValue().getHeader()); |
| result = |
| metaGroupMember |
| .getLocalDataMember(entry.getValue().getHeader()) |
| .executeNonQueryPlan(entry.getKey()); |
| Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP |
| .calOperationCostTimeFromStart(startTime); |
| } else { |
| // forward the query to the group that should handle it |
| long startTime = |
| Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP |
| .getOperationStartTime(); |
| logger.debug( |
| "Forward {} to a remote group of {}", entry.getKey(), entry.getValue().getHeader()); |
| result = forwardPlan(entry.getKey(), entry.getValue()); |
| Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP |
| .calOperationCostTimeFromStart(startTime); |
| } |
| return result; |
| } |
| |
| /** |
| * forward each sub-plan to its corresponding data group, if some groups goes wrong, the error |
| * messages from each group will be compacted into one string. |
| * |
| * @param planGroupMap sub-plan -> data group pairs |
| */ |
| private TSStatus forwardToMultipleGroup(Map<PhysicalPlan, PartitionGroup> planGroupMap) { |
| List<String> errorCodePartitionGroups = new ArrayList<>(); |
| TSStatus tmpStatus; |
| boolean allRedirect = true; |
| EndPoint endPoint = null; |
| for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) { |
| tmpStatus = forwardToSingleGroup(entry); |
| if (tmpStatus.isSetRedirectNode()) { |
| endPoint = tmpStatus.getRedirectNode(); |
| } else { |
| allRedirect = false; |
| } |
| if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| // execution failed, record the error message |
| errorCodePartitionGroups.add( |
| String.format( |
| "[%s@%s:%s]", |
| tmpStatus.getCode(), entry.getValue().getHeader(), tmpStatus.getMessage())); |
| } |
| } |
| TSStatus status; |
| if (errorCodePartitionGroups.isEmpty()) { |
| if (allRedirect) { |
| status = StatusUtils.getStatus(TSStatusCode.NEED_REDIRECTION, endPoint); |
| } else { |
| status = StatusUtils.OK; |
| } |
| } else { |
| status = |
| StatusUtils.getStatus( |
| StatusUtils.EXECUTE_STATEMENT_ERROR, MSG_MULTIPLE_ERROR + errorCodePartitionGroups); |
| } |
| return status; |
| } |
| |
| /** |
| * Forward each sub-plan to its belonging data group, and combine responses from the groups. |
| * |
| * @param planGroupMap sub-plan -> data group pairs |
| */ |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| private TSStatus forwardMultiSubPlan( |
| Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan parentPlan) { |
| List<String> errorCodePartitionGroups = new ArrayList<>(); |
| TSStatus tmpStatus; |
| TSStatus[] subStatus = null; |
| boolean noFailure = true; |
| boolean isBatchFailure = false; |
| EndPoint endPoint = null; |
| int totalRowNum = parentPlan.getPaths().size(); |
| // send sub-plans to each belonging data group and collect results |
| for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) { |
| tmpStatus = forwardToSingleGroup(entry); |
| logger.debug("{}: from {},{},{}", name, entry.getKey(), entry.getValue(), tmpStatus); |
| noFailure = (tmpStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure; |
| isBatchFailure = |
| (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) || isBatchFailure; |
| if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { |
| if (parentPlan instanceof InsertTabletPlan) { |
| totalRowNum = ((InsertTabletPlan) parentPlan).getRowCount(); |
| } else if (parentPlan instanceof InsertMultiTabletPlan) { |
| // the subStatus is the two-dimensional array, |
| // The first dimension is the number of InsertTabletPlans, |
| // and the second dimension is the number of rows per InsertTabletPlan |
| totalRowNum = ((InsertMultiTabletPlan) parentPlan).getTabletsSize(); |
| } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) { |
| totalRowNum = parentPlan.getPaths().size(); |
| } else if (parentPlan instanceof InsertRowsPlan) { |
| totalRowNum = ((InsertRowsPlan) parentPlan).getRowCount(); |
| } |
| |
| if (subStatus == null) { |
| subStatus = new TSStatus[totalRowNum]; |
| Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS); |
| } |
| // set the status from one group to the proper positions of the overall status |
| if (parentPlan instanceof InsertMultiTabletPlan) { |
| InsertMultiTabletPlan tmpMultiTabletPlan = ((InsertMultiTabletPlan) entry.getKey()); |
| for (int i = 0; i < tmpMultiTabletPlan.getInsertTabletPlanList().size(); i++) { |
| InsertTabletPlan tmpInsertTabletPlan = tmpMultiTabletPlan.getInsertTabletPlan(i); |
| int parentIndex = tmpMultiTabletPlan.getParentIndex(i); |
| int parentPlanRowCount = ((InsertMultiTabletPlan) parentPlan).getRowCount(parentIndex); |
| if (subStatus[parentIndex].subStatus == null) { |
| TSStatus[] tmpSubTsStatus = new TSStatus[parentPlanRowCount]; |
| Arrays.fill(tmpSubTsStatus, RpcUtils.SUCCESS_STATUS); |
| subStatus[parentIndex].subStatus = Arrays.asList(tmpSubTsStatus); |
| } |
| TSStatus[] reorderTsStatus = |
| subStatus[parentIndex].subStatus.toArray(new TSStatus[] {}); |
| |
| PartitionUtils.reordering( |
| tmpInsertTabletPlan, |
| reorderTsStatus, |
| tmpStatus.subStatus.toArray(new TSStatus[] {})); |
| subStatus[parentIndex].subStatus = Arrays.asList(reorderTsStatus); |
| } |
| } else if (parentPlan instanceof InsertTabletPlan) { |
| PartitionUtils.reordering( |
| (InsertTabletPlan) entry.getKey(), |
| subStatus, |
| tmpStatus.subStatus.toArray(new TSStatus[] {})); |
| } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) { |
| CreateMultiTimeSeriesPlan subPlan = (CreateMultiTimeSeriesPlan) entry.getKey(); |
| for (int i = 0; i < subPlan.getIndexes().size(); i++) { |
| subStatus[subPlan.getIndexes().get(i)] = tmpStatus.subStatus.get(i); |
| } |
| } else if (parentPlan instanceof InsertRowsPlan) { |
| InsertRowsPlan subPlan = (InsertRowsPlan) entry.getKey(); |
| for (int i = 0; i < subPlan.getInsertRowPlanIndexList().size(); i++) { |
| subStatus[subPlan.getInsertRowPlanIndexList().get(i)] = tmpStatus.subStatus.get(i); |
| } |
| } |
| } |
| |
| if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| // execution failed, record the error message |
| errorCodePartitionGroups.add( |
| String.format( |
| "[%s@%s:%s:%s]", |
| tmpStatus.getCode(), |
| entry.getValue().getHeader(), |
| tmpStatus.getMessage(), |
| tmpStatus.subStatus)); |
| } |
| |
| if (tmpStatus.isSetRedirectNode()) { |
| boolean isLastInsertTabletPlan = |
| parentPlan instanceof InsertTabletPlan |
| && ((InsertTabletPlan) entry.getKey()).getMaxTime() |
| == ((InsertTabletPlan) parentPlan).getMaxTime(); |
| |
| boolean isLastInsertMultiTabletPlan = |
| parentPlan instanceof InsertMultiTabletPlan |
| && ((InsertMultiTabletPlan) entry.getKey()).getMaxTime() |
| == ((InsertMultiTabletPlan) parentPlan).getMaxTime(); |
| |
| if (isLastInsertTabletPlan || isLastInsertMultiTabletPlan) { |
| endPoint = tmpStatus.getRedirectNode(); |
| } |
| } |
| } |
| return concludeFinalStatus( |
| parentPlan, |
| totalRowNum, |
| noFailure, |
| endPoint, |
| isBatchFailure, |
| subStatus, |
| errorCodePartitionGroups); |
| } |
| |
| private TSStatus concludeFinalStatus( |
| PhysicalPlan parentPlan, |
| int totalRowNum, |
| boolean noFailure, |
| EndPoint endPoint, |
| boolean isBatchFailure, |
| TSStatus[] subStatus, |
| List<String> errorCodePartitionGroups) { |
| if (parentPlan instanceof InsertMultiTabletPlan |
| && !((InsertMultiTabletPlan) parentPlan).getResults().isEmpty()) { |
| if (subStatus == null) { |
| subStatus = new TSStatus[totalRowNum]; |
| Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS); |
| } |
| noFailure = false; |
| isBatchFailure = true; |
| for (Map.Entry<Integer, TSStatus> integerTSStatusEntry : |
| ((InsertMultiTabletPlan) parentPlan).getResults().entrySet()) { |
| subStatus[integerTSStatusEntry.getKey()] = integerTSStatusEntry.getValue(); |
| } |
| } |
| |
| if (parentPlan instanceof CreateMultiTimeSeriesPlan |
| && !((CreateMultiTimeSeriesPlan) parentPlan).getResults().isEmpty()) { |
| if (subStatus == null) { |
| subStatus = new TSStatus[totalRowNum]; |
| Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS); |
| } |
| noFailure = false; |
| isBatchFailure = true; |
| for (Map.Entry<Integer, TSStatus> integerTSStatusEntry : |
| ((CreateMultiTimeSeriesPlan) parentPlan).getResults().entrySet()) { |
| subStatus[integerTSStatusEntry.getKey()] = integerTSStatusEntry.getValue(); |
| } |
| } |
| |
| if (parentPlan instanceof InsertRowsPlan |
| && !((InsertRowsPlan) parentPlan).getResults().isEmpty()) { |
| if (subStatus == null) { |
| subStatus = new TSStatus[totalRowNum]; |
| Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS); |
| } |
| noFailure = false; |
| isBatchFailure = true; |
| for (Map.Entry<Integer, TSStatus> integerTSStatusEntry : |
| ((InsertRowsPlan) parentPlan).getResults().entrySet()) { |
| subStatus[integerTSStatusEntry.getKey()] = integerTSStatusEntry.getValue(); |
| } |
| } |
| |
| TSStatus status; |
| if (noFailure) { |
| status = StatusUtils.OK; |
| if (endPoint != null) { |
| status = StatusUtils.getStatus(status, endPoint); |
| } |
| } else if (isBatchFailure) { |
| status = RpcUtils.getStatus(Arrays.asList(subStatus)); |
| } else { |
| status = |
| StatusUtils.getStatus( |
| StatusUtils.EXECUTE_STATEMENT_ERROR, MSG_MULTIPLE_ERROR + errorCodePartitionGroups); |
| } |
| return status; |
| } |
| |
| /** |
| * Forward a plan to the DataGroupMember of one node in the group. Only when all nodes time out, |
| * will a TIME_OUT be returned. |
| */ |
| private TSStatus forwardPlan(PhysicalPlan plan, PartitionGroup group) { |
| for (Node node : group) { |
| TSStatus status; |
| try { |
| // only data plans are partitioned, so it must be processed by its data server instead of |
| // meta server |
| if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { |
| status = forwardDataPlanAsync(plan, node, group.getHeader()); |
| } else { |
| status = forwardDataPlanSync(plan, node, group.getHeader()); |
| } |
| } catch (IOException e) { |
| status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage()); |
| } |
| if (!StatusUtils.TIME_OUT.equals(status)) { |
| if (!status.isSetRedirectNode()) { |
| status.setRedirectNode(new EndPoint(node.getClientIp(), node.getClientPort())); |
| } |
| return status; |
| } else { |
| logger.warn("Forward {} to {} timed out", plan, node); |
| } |
| } |
| logger.warn("Forward {} to {} timed out", plan, group); |
| return StatusUtils.TIME_OUT; |
| } |
| |
| /** |
| * Forward a non-query plan to the data port of "receiver" |
| * |
| * @param plan a non-query plan |
| * @param header to determine which DataGroupMember of "receiver" will process the request. |
| * @return a TSStatus indicating if the forwarding is successful. |
| */ |
| private TSStatus forwardDataPlanAsync(PhysicalPlan plan, Node receiver, RaftNode header) |
| throws IOException { |
| RaftService.AsyncClient client = |
| metaGroupMember |
| .getClientProvider() |
| .getAsyncDataClient(receiver, RaftServer.getWriteOperationTimeoutMS()); |
| return this.metaGroupMember.forwardPlanAsync(plan, receiver, header, client); |
| } |
| |
| private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, RaftNode header) |
| throws IOException { |
| RaftService.Client client; |
| try { |
| client = |
| metaGroupMember |
| .getClientProvider() |
| .getSyncDataClient(receiver, RaftServer.getWriteOperationTimeoutMS()); |
| } catch (TException e) { |
| throw new IOException(e); |
| } |
| return this.metaGroupMember.forwardPlanSync(plan, receiver, header, client); |
| } |
| |
| /** |
| * Get a thrift client that will connect to "node" using the data port. |
| * |
| * @param node the node to be connected |
| * @param timeout timeout threshold of connection |
| */ |
| public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException { |
| return metaGroupMember.getClientProvider().getAsyncDataClient(node, timeout); |
| } |
| |
| public Node getThisNode() { |
| return thisNode; |
| } |
| |
| /** |
| * Get a thrift client that will connect to "node" using the data port. |
| * |
| * @param node the node to be connected |
| * @param timeout timeout threshold of connection |
| */ |
| public SyncDataClient getSyncDataClient(Node node, int timeout) throws TException { |
| return metaGroupMember.getClientProvider().getSyncDataClient(node, timeout); |
| } |
| } |