blob: e82b25f1f17b8b73aa21010b782e6fc5175ebbd1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.service.thrift.impl;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* This class takes the responsibility of managing regions, executing PlanNode of write type on
* according regions and controlling the execution concurrency.
*/
public class DataNodeRegionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeRegionManager.class);
private final SchemaEngine schemaEngine;
private final StorageEngineV2 storageEngine;
private final Map<SchemaRegionId, ReentrantReadWriteLock> schemaRegionLockMap =
new ConcurrentHashMap<>();
private final Map<DataRegionId, ReentrantReadWriteLock> dataRegionLockMap =
new ConcurrentHashMap<>();
public DataNodeRegionManager(SchemaEngine schemaEngine, StorageEngineV2 storageEngine) {
this.schemaEngine = schemaEngine;
this.storageEngine = storageEngine;
schemaEngine
.getAllSchemaRegions()
.forEach(
schemaRegion -> {
schemaRegionLockMap.put(
schemaRegion.getSchemaRegionId(), new ReentrantReadWriteLock(false));
});
storageEngine
.getAllDataRegionIds()
.forEach(
dataRegionId -> dataRegionLockMap.put(dataRegionId, new ReentrantReadWriteLock(false)));
}
public TSendPlanNodeResp executePlanNode(ConsensusGroupId groupId, PlanNode planNode) {
if (planNode instanceof InsertNode) {
return executeDataInsert((DataRegionId) groupId, (InsertNode) planNode);
} else {
TSendPlanNodeResp response = new TSendPlanNodeResp();
ConsensusWriteResponse writeResponse = executePlanNodeInConsensusLayer(groupId, planNode);
// TODO need consider more status
if (writeResponse.getStatus() != null) {
response.setAccepted(
TSStatusCode.SUCCESS_STATUS.getStatusCode() == writeResponse.getStatus().getCode());
response.setMessage(writeResponse.getStatus().message);
response.setStatus(writeResponse.getStatus());
} else {
LOGGER.error(
"Something wrong happened while calling consensus layer's write API.",
writeResponse.getException());
response.setAccepted(false);
response.setMessage(writeResponse.getException().getMessage());
}
return response;
}
}
private ConsensusWriteResponse executePlanNodeInConsensusLayer(
ConsensusGroupId groupId, PlanNode planNode) {
if (groupId instanceof DataRegionId) {
return DataRegionConsensusImpl.getInstance().write(groupId, planNode);
} else {
return SchemaRegionConsensusImpl.getInstance().write(groupId, planNode);
}
}
private TSendPlanNodeResp executeDataInsert(DataRegionId dataRegionId, InsertNode insertNode) {
TSendPlanNodeResp response = new TSendPlanNodeResp();
dataRegionLockMap.get(dataRegionId).readLock().lock();
try {
try {
SchemaValidator.validate(insertNode);
} catch (SemanticException e) {
response.setAccepted(false);
response.setStatus(
RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(), e.getMessage()));
response.setMessage(e.getMessage());
return response;
}
boolean hasFailedMeasurement = insertNode.hasFailedMeasurements();
String partialInsertMessage = null;
if (hasFailedMeasurement) {
partialInsertMessage =
String.format(
"Fail to insert measurements %s caused by %s",
insertNode.getFailedMeasurements(), insertNode.getFailedMessages());
LOGGER.warn(partialInsertMessage);
}
ConsensusWriteResponse writeResponse =
executePlanNodeInConsensusLayer(dataRegionId, insertNode);
// TODO need consider more status
if (writeResponse.getStatus() != null) {
response.setAccepted(
!hasFailedMeasurement
&& TSStatusCode.SUCCESS_STATUS.getStatusCode()
== writeResponse.getStatus().getCode());
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != writeResponse.getStatus().getCode()) {
response.setMessage(writeResponse.getStatus().message);
response.setStatus(writeResponse.getStatus());
} else if (hasFailedMeasurement) {
response.setMessage(partialInsertMessage);
response.setStatus(
RpcUtils.getStatus(
TSStatusCode.METADATA_ERROR.getStatusCode(), partialInsertMessage));
} else {
response.setMessage(writeResponse.getStatus().message);
}
} else {
LOGGER.error(
"Something wrong happened while calling consensus layer's write API.",
writeResponse.getException());
response.setAccepted(false);
response.setMessage(writeResponse.getException().getMessage());
}
return response;
} finally {
dataRegionLockMap.get(dataRegionId).readLock().unlock();
}
}
public TSStatus executeSchemaPlanNode(SchemaRegionId schemaRegionId, PlanNode planNode) {
ConsensusWriteResponse writeResponse =
executePlanNodeInConsensusLayer(schemaRegionId, planNode);
TSStatus status = writeResponse.getStatus();
if (status == null) {
LOGGER.error(
"Something wrong happened while calling consensus layer's write API.",
writeResponse.getException());
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
return status;
}
public TSStatus executeDeleteDataForDeleteTimeSeries(
DataRegionId dataRegionId, PlanNode planNode) {
dataRegionLockMap.get(dataRegionId).writeLock().lock();
try {
ConsensusWriteResponse writeResponse =
executePlanNodeInConsensusLayer(dataRegionId, planNode);
TSStatus status = writeResponse.getStatus();
if (status == null) {
LOGGER.error(
"Something wrong happened while calling consensus layer's write API.",
writeResponse.getException());
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
return status;
} finally {
dataRegionLockMap.get(dataRegionId).writeLock().unlock();
}
}
public TSStatus createSchemaRegion(TRegionReplicaSet regionReplicaSet, String storageGroup) {
TSStatus tsStatus;
try {
PartialPath storageGroupPartitionPath = new PartialPath(storageGroup);
SchemaRegionId schemaRegionId = new SchemaRegionId(regionReplicaSet.getRegionId().getId());
schemaEngine.createSchemaRegion(storageGroupPartitionPath, schemaRegionId);
schemaRegionLockMap.put(schemaRegionId, new ReentrantReadWriteLock(false));
List<Peer> peers = new ArrayList<>();
for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
TEndPoint endpoint =
new TEndPoint(
dataNodeLocation.getSchemaRegionConsensusEndPoint().getIp(),
dataNodeLocation.getSchemaRegionConsensusEndPoint().getPort());
peers.add(new Peer(schemaRegionId, endpoint));
}
ConsensusGenericResponse consensusGenericResponse =
SchemaRegionConsensusImpl.getInstance().createPeer(schemaRegionId, peers);
if (consensusGenericResponse.isSuccess()) {
tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
tsStatus = new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
tsStatus.setMessage(consensusGenericResponse.getException().getMessage());
}
} catch (IllegalPathException e1) {
LOGGER.error("Create Schema Region {} failed because path is illegal.", storageGroup);
tsStatus = new TSStatus(TSStatusCode.PATH_ILLEGAL.getStatusCode());
tsStatus.setMessage("Create Schema Region failed because storageGroup path is illegal.");
} catch (MetadataException e2) {
LOGGER.error("Create Schema Region {} failed because {}", storageGroup, e2.getMessage());
tsStatus = new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
tsStatus.setMessage(
String.format("Create Schema Region failed because of %s", e2.getMessage()));
}
return tsStatus;
}
public TSStatus createDataRegion(
TRegionReplicaSet regionReplicaSet, String storageGroup, long ttl) {
TSStatus tsStatus;
try {
DataRegionId dataRegionId = new DataRegionId(regionReplicaSet.getRegionId().getId());
storageEngine.createDataRegion(dataRegionId, storageGroup, ttl);
dataRegionLockMap.put(dataRegionId, new ReentrantReadWriteLock(false));
List<Peer> peers = new ArrayList<>();
for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
TEndPoint endpoint =
new TEndPoint(
dataNodeLocation.getDataRegionConsensusEndPoint().getIp(),
dataNodeLocation.getDataRegionConsensusEndPoint().getPort());
peers.add(new Peer(dataRegionId, endpoint));
}
ConsensusGenericResponse consensusGenericResponse =
DataRegionConsensusImpl.getInstance().createPeer(dataRegionId, peers);
if (consensusGenericResponse.isSuccess()) {
tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
tsStatus = new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
tsStatus.setMessage(consensusGenericResponse.getException().getMessage());
}
} catch (DataRegionException e) {
LOGGER.error("Create Data Region {} failed because {}", storageGroup, e.getMessage());
tsStatus = new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
tsStatus.setMessage(String.format("Create Data Region failed because of %s", e.getMessage()));
}
return tsStatus;
}
public TSStatus createNewRegion(ConsensusGroupId regionId, String storageGroup, long ttl) {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
LOGGER.info("start to create new region {}", regionId);
try {
if (regionId instanceof DataRegionId) {
DataRegionId dataRegionId = (DataRegionId) regionId;
storageEngine.createDataRegion(dataRegionId, storageGroup, ttl);
dataRegionLockMap.put(dataRegionId, new ReentrantReadWriteLock(false));
} else {
SchemaRegionId schemaRegionId = (SchemaRegionId) regionId;
schemaEngine.createSchemaRegion(new PartialPath(storageGroup), schemaRegionId);
schemaRegionLockMap.put(schemaRegionId, new ReentrantReadWriteLock(false));
}
} catch (Exception e) {
LOGGER.error("create new region {} error", regionId, e);
status.setCode(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
status.setMessage("create new region " + regionId + "error, exception:" + e.getMessage());
return status;
}
status.setMessage("create new region " + regionId + " succeed");
LOGGER.info("succeed to create new region {}", regionId);
return status;
}
public TSStatus deleteDataRegion(DataRegionId dataRegionId) {
storageEngine.deleteDataRegion(dataRegionId);
dataRegionLockMap.remove(dataRegionId);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
}
public TSStatus deleteSchemaRegion(SchemaRegionId schemaRegionId) {
try {
schemaEngine.deleteSchemaRegion(schemaRegionId);
schemaRegionLockMap.remove(schemaRegionId);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
} catch (MetadataException e) {
LOGGER.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage());
}
}
}