blob: 0432dae0a4d9a000233f791bffe0eec47e511f19 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.persistence.partition;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.UpdateRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
import org.apache.iotdb.confignode.consensus.response.RegionInfoListResp;
import org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
import org.apache.iotdb.confignode.persistence.metric.PartitionInfoMetrics;
import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
import org.apache.iotdb.db.service.metrics.enums.Tag;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TIOStreamTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* The PartitionInfo stores cluster PartitionTable. The PartitionTable including: 1. regionMap:
* location of Region member 2. schemaPartition: location of schema 3. dataPartition: location of
* data
*/
public class PartitionInfo implements SnapshotProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionInfo.class);
// For allocating Regions
private final AtomicInteger nextRegionGroupId;
// Map<StorageGroupName, StorageGroupPartitionInfo>
private final ConcurrentHashMap<String, StorageGroupPartitionTable> storageGroupPartitionTables;
// For RegionReplicas' asynchronous management
private final List<RegionMaintainTask> regionMaintainTaskList;
private final String snapshotFileName = "partition_info.bin";
public PartitionInfo() {
this.nextRegionGroupId = new AtomicInteger(-1);
this.storageGroupPartitionTables = new ConcurrentHashMap<>();
this.regionMaintainTaskList = Collections.synchronizedList(new ArrayList<>());
}
public int generateNextRegionGroupId() {
return nextRegionGroupId.incrementAndGet();
}
// ======================================================
// Consensus read/write interfaces
// ======================================================
/**
* Thread-safely create new StorageGroupPartitionInfo
*
* @param plan SetStorageGroupPlan
* @return SUCCESS_STATUS if the new StorageGroupPartitionInfo is created successfully.
*/
public TSStatus setStorageGroup(SetStorageGroupPlan plan) {
String storageGroupName = plan.getSchema().getName();
StorageGroupPartitionTable storageGroupPartitionTable =
new StorageGroupPartitionTable(storageGroupName);
storageGroupPartitionTables.put(storageGroupName, storageGroupPartitionTable);
MetricService.getInstance()
.addMetricSet(
new PartitionInfoMetrics.StorageGroupPartitionTableMetrics(storageGroupPartitionTable));
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
/**
* Thread-safely cache allocation result of new RegionGroups
*
* @param plan CreateRegionGroupsPlan
* @return SUCCESS_STATUS
*/
public TSStatus createRegionGroups(CreateRegionGroupsPlan plan) {
TSStatus result;
AtomicInteger maxRegionId = new AtomicInteger(Integer.MIN_VALUE);
plan.getRegionGroupMap()
.forEach(
(storageGroup, regionReplicaSets) -> {
storageGroupPartitionTables.get(storageGroup).createRegionGroups(regionReplicaSets);
regionReplicaSets.forEach(
regionReplicaSet ->
maxRegionId.set(
Math.max(maxRegionId.get(), regionReplicaSet.getRegionId().getId())));
});
// To ensure that the nextRegionGroupId is updated correctly when
// the ConfigNode-followers concurrently processes CreateRegionsPlan,
// we need to add a synchronization lock here
synchronized (nextRegionGroupId) {
if (nextRegionGroupId.get() < maxRegionId.get()) {
nextRegionGroupId.set(maxRegionId.get());
}
}
result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
return result;
}
/**
* Offer a batch of RegionMaintainTasks for the RegionMaintainer
*
* @return SUCCESS_STATUS
*/
public TSStatus offerRegionMaintainTasks(
OfferRegionMaintainTasksPlan offerRegionMaintainTasksPlan) {
synchronized (regionMaintainTaskList) {
regionMaintainTaskList.addAll(offerRegionMaintainTasksPlan.getRegionMaintainTaskList());
return RpcUtils.SUCCESS_STATUS;
}
}
/**
* Poll the head of RegionMaintainTasks from the regionMaintainTaskList after it's executed
* successfully
*
* @return SUCCESS_STATUS
*/
public TSStatus pollRegionMaintainTask() {
synchronized (regionMaintainTaskList) {
regionMaintainTaskList.remove(0);
return RpcUtils.SUCCESS_STATUS;
}
}
/**
* Get a deep copy of RegionCleanList for RegionCleaner to maintain cluster RegionReplicas
*
* @return A deep copy of RegionCleanList
*/
public List<RegionMaintainTask> getRegionMaintainEntryList() {
synchronized (regionMaintainTaskList) {
return new ArrayList<>(regionMaintainTaskList);
}
}
/**
* Thread-safely pre-delete the specific StorageGroup
*
* @param preDeleteStorageGroupPlan PreDeleteStorageGroupPlan
* @return SUCCESS_STATUS
*/
public TSStatus preDeleteStorageGroup(PreDeleteStorageGroupPlan preDeleteStorageGroupPlan) {
final PreDeleteStorageGroupPlan.PreDeleteType preDeleteType =
preDeleteStorageGroupPlan.getPreDeleteType();
final String storageGroup = preDeleteStorageGroupPlan.getStorageGroup();
StorageGroupPartitionTable storageGroupPartitionTable =
storageGroupPartitionTables.get(storageGroup);
if (storageGroupPartitionTable == null) {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
switch (preDeleteType) {
case EXECUTE:
storageGroupPartitionTable.setPredeleted(true);
break;
case ROLLBACK:
storageGroupPartitionTable.setPredeleted(false);
break;
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
/**
* Thread-safely delete StorageGroup
*
* @param plan DeleteStorageGroupPlan
*/
public void deleteStorageGroup(DeleteStorageGroupPlan plan) {
// Clean the StorageGroupTable cache
storageGroupPartitionTables.remove(plan.getName());
}
/**
* Thread-safely get SchemaPartition
*
* @param plan SchemaPartitionPlan with partitionSlotsMap
* @return SchemaPartitionDataSet that contains only existing SchemaPartition
*/
public DataSet getSchemaPartition(GetSchemaPartitionPlan plan) {
AtomicBoolean isAllPartitionsExist = new AtomicBoolean(true);
// TODO: Replace this map whit new SchemaPartition
Map<String, SchemaPartitionTable> schemaPartition = new ConcurrentHashMap<>();
if (plan.getPartitionSlotsMap().size() == 0) {
// Return all SchemaPartitions when the queried PartitionSlots are empty
storageGroupPartitionTables.forEach(
(storageGroup, storageGroupPartitionTable) -> {
if (!storageGroupPartitionTable.isPredeleted()) {
schemaPartition.put(storageGroup, new SchemaPartitionTable());
storageGroupPartitionTable.getSchemaPartition(
new ArrayList<>(), schemaPartition.get(storageGroup));
if (schemaPartition.get(storageGroup).getSchemaPartitionMap().isEmpty()) {
// Remove empty Map
schemaPartition.remove(storageGroup);
}
}
});
} else {
// Return the SchemaPartition for each StorageGroup
plan.getPartitionSlotsMap()
.forEach(
(storageGroup, partitionSlots) -> {
if (isStorageGroupExisted(storageGroup)) {
schemaPartition.put(storageGroup, new SchemaPartitionTable());
if (!storageGroupPartitionTables
.get(storageGroup)
.getSchemaPartition(partitionSlots, schemaPartition.get(storageGroup))) {
isAllPartitionsExist.set(false);
}
if (schemaPartition.get(storageGroup).getSchemaPartitionMap().isEmpty()) {
// Remove empty Map
schemaPartition.remove(storageGroup);
}
}
});
}
return new SchemaPartitionResp(
new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
isAllPartitionsExist.get(),
schemaPartition);
}
/**
* Thread-safely get DataPartition
*
* @param plan DataPartitionPlan with partitionSlotsMap
* @return DataPartitionDataSet that contains only existing DataPartition
*/
public DataSet getDataPartition(GetDataPartitionPlan plan) {
AtomicBoolean isAllPartitionsExist = new AtomicBoolean(true);
// TODO: Replace this map whit new DataPartition
Map<String, DataPartitionTable> dataPartition = new ConcurrentHashMap<>();
plan.getPartitionSlotsMap()
.forEach(
(storageGroup, partitionSlots) -> {
if (isStorageGroupExisted(storageGroup)) {
dataPartition.put(storageGroup, new DataPartitionTable());
if (!storageGroupPartitionTables
.get(storageGroup)
.getDataPartition(partitionSlots, dataPartition.get(storageGroup))) {
isAllPartitionsExist.set(false);
}
if (dataPartition.get(storageGroup).getDataPartitionMap().isEmpty()) {
// Remove empty Map
dataPartition.remove(storageGroup);
}
}
});
return new DataPartitionResp(
new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
isAllPartitionsExist.get(),
dataPartition);
}
/**
* Checks whether the specified DataPartition has a predecessor and returns if it does
*
* @param storageGroup StorageGroupName
* @param seriesPartitionSlot Corresponding SeriesPartitionSlot
* @param timePartitionSlot Corresponding TimePartitionSlot
* @param timePartitionInterval Time partition interval
* @return The specific DataPartition's predecessor if exists, null otherwise
*/
public TConsensusGroupId getPrecededDataPartition(
String storageGroup,
TSeriesPartitionSlot seriesPartitionSlot,
TTimePartitionSlot timePartitionSlot,
long timePartitionInterval) {
if (storageGroupPartitionTables.containsKey(storageGroup)) {
return storageGroupPartitionTables
.get(storageGroup)
.getPrecededDataPartition(seriesPartitionSlot, timePartitionSlot, timePartitionInterval);
} else {
return null;
}
}
private boolean isStorageGroupExisted(String storageGroup) {
final StorageGroupPartitionTable storageGroupPartitionTable =
storageGroupPartitionTables.get(storageGroup);
return storageGroupPartitionTable != null && !storageGroupPartitionTable.isPredeleted();
}
/**
* Create SchemaPartition
*
* @param plan CreateSchemaPartitionPlan with SchemaPartition assigned result
* @return TSStatusCode.SUCCESS_STATUS
*/
public TSStatus createSchemaPartition(CreateSchemaPartitionPlan plan) {
plan.getAssignedSchemaPartition()
.forEach(
(storageGroup, schemaPartitionTable) -> {
if (isStorageGroupExisted(storageGroup)) {
storageGroupPartitionTables
.get(storageGroup)
.createSchemaPartition(schemaPartitionTable);
}
});
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
/**
* Create DataPartition
*
* @param plan CreateDataPartitionPlan with DataPartition assigned result
* @return TSStatusCode.SUCCESS_STATUS
*/
public TSStatus createDataPartition(CreateDataPartitionPlan plan) {
plan.getAssignedDataPartition()
.forEach(
(storageGroup, dataPartitionTable) -> {
if (isStorageGroupExisted(storageGroup)) {
storageGroupPartitionTables
.get(storageGroup)
.createDataPartition(dataPartitionTable);
}
});
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
/** Get SchemaNodeManagementPartition through matched storageGroup */
public DataSet getSchemaNodeManagementPartition(List<String> matchedStorageGroups) {
SchemaNodeManagementResp schemaNodeManagementResp = new SchemaNodeManagementResp();
Map<String, SchemaPartitionTable> schemaPartitionMap = new ConcurrentHashMap<>();
matchedStorageGroups.stream()
.filter(this::isStorageGroupExisted)
.forEach(
storageGroup -> {
schemaPartitionMap.put(storageGroup, new SchemaPartitionTable());
storageGroupPartitionTables
.get(storageGroup)
.getSchemaPartition(new ArrayList<>(), schemaPartitionMap.get(storageGroup));
if (schemaPartitionMap.get(storageGroup).getSchemaPartitionMap().isEmpty()) {
// Remove empty Map
schemaPartitionMap.remove(storageGroup);
}
});
schemaNodeManagementResp.setSchemaPartition(schemaPartitionMap);
schemaNodeManagementResp.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
return schemaNodeManagementResp;
}
/** Get region information */
public DataSet getRegionInfoList(GetRegionInfoListPlan regionsInfoPlan) {
RegionInfoListResp regionResp = new RegionInfoListResp();
List<TRegionInfo> regionInfoList = new Vector<>();
if (storageGroupPartitionTables.isEmpty()) {
regionResp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
regionResp.setRegionInfoList(new ArrayList<>());
return regionResp;
}
TShowRegionReq showRegionReq = regionsInfoPlan.getShowRegionReq();
final List<String> storageGroups =
showRegionReq != null ? showRegionReq.getStorageGroups() : null;
storageGroupPartitionTables.forEach(
(storageGroup, storageGroupPartitionTable) -> {
if (storageGroups != null && !storageGroups.contains(storageGroup)) {
return;
}
regionInfoList.addAll(storageGroupPartitionTable.getRegionInfoList(regionsInfoPlan));
});
regionInfoList.sort(
(o1, o2) ->
o1.getConsensusGroupId().getId() != o2.getConsensusGroupId().getId()
? o1.getConsensusGroupId().getId() - o2.getConsensusGroupId().getId()
: o1.getDataNodeId() - o2.getDataNodeId());
regionResp.setRegionInfoList(regionInfoList);
regionResp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
return regionResp;
}
/**
* update a region location
*
* @param req UpdateRegionLocationReq
* @return TSStatus
*/
public TSStatus updateRegionLocation(UpdateRegionLocationPlan req) {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
TConsensusGroupId regionId = req.getRegionId();
TDataNodeLocation oldNode = req.getOldNode();
TDataNodeLocation newNode = req.getNewNode();
storageGroupPartitionTables
.values()
.forEach(s -> s.updateRegionLocation(regionId, oldNode, newNode));
return status;
}
/**
* get storage group for region
*
* @param regionId regionId
* @return storage group name
*/
public String getRegionStorageGroup(TConsensusGroupId regionId) {
Optional<StorageGroupPartitionTable> sgPartitionTableOptional =
storageGroupPartitionTables.values().stream()
.filter(s -> s.containRegion(regionId))
.findFirst();
return sgPartitionTableOptional
.map(StorageGroupPartitionTable::getStorageGroupName)
.orElse(null);
}
// ======================================================
// Leader scheduling interfaces
// ======================================================
/**
* Only Leader use this interface. Filter unassigned SchemaPartitionSlots
*
* @param partitionSlotsMap Map<StorageGroupName, List<TSeriesPartitionSlot>>
* @return Map<StorageGroupName, List<TSeriesPartitionSlot>>, SchemaPartitionSlots that is not
* assigned in partitionSlotsMap
*/
public Map<String, List<TSeriesPartitionSlot>> filterUnassignedSchemaPartitionSlots(
Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap) {
Map<String, List<TSeriesPartitionSlot>> result = new ConcurrentHashMap<>();
partitionSlotsMap.forEach(
(storageGroup, partitionSlots) -> {
if (isStorageGroupExisted(storageGroup)) {
result.put(
storageGroup,
storageGroupPartitionTables
.get(storageGroup)
.filterUnassignedSchemaPartitionSlots(partitionSlots));
}
});
return result;
}
/**
* Only Leader use this interface. Filter unassigned SchemaPartitionSlots
*
* @param partitionSlotsMap Map<StorageGroupName, Map<TSeriesPartitionSlot,
* List<TTimePartitionSlot>>>
* @return Map<StorageGroupName, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>,
* DataPartitionSlots that is not assigned in partitionSlotsMap
*/
public Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
filterUnassignedDataPartitionSlots(
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap) {
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> result =
new ConcurrentHashMap<>();
partitionSlotsMap.forEach(
(storageGroup, partitionSlots) -> {
if (isStorageGroupExisted(storageGroup)) {
result.put(
storageGroup,
storageGroupPartitionTables
.get(storageGroup)
.filterUnassignedDataPartitionSlots(partitionSlots));
}
});
return result;
}
/**
* Only leader use this interface.
*
* @return All Regions' RegionReplicaSet
*/
public List<TRegionReplicaSet> getAllReplicaSets() {
List<TRegionReplicaSet> result = new ArrayList<>();
storageGroupPartitionTables
.values()
.forEach(
storageGroupPartitionTable ->
result.addAll(storageGroupPartitionTable.getAllReplicaSets()));
return result;
}
/**
* Only leader use this interface.
*
* @param storageGroup The specified StorageGroup
* @return All Regions' RegionReplicaSet of the specified StorageGroup
*/
public List<TRegionReplicaSet> getAllReplicaSets(String storageGroup) {
if (storageGroupPartitionTables.containsKey(storageGroup)) {
return storageGroupPartitionTables.get(storageGroup).getAllReplicaSets();
} else {
return new ArrayList<>();
}
}
/**
* Only leader use this interface. Get the number of Regions currently owned by the specific
* StorageGroup
*
* @param storageGroup StorageGroupName
* @param type SchemaRegion or DataRegion
* @return Number of Regions currently owned by the specific StorageGroup
* @throws StorageGroupNotExistsException When the specific StorageGroup doesn't exist
*/
public int getRegionCount(String storageGroup, TConsensusGroupType type)
throws StorageGroupNotExistsException {
if (!isStorageGroupExisted(storageGroup)) {
throw new StorageGroupNotExistsException(storageGroup);
}
return storageGroupPartitionTables.get(storageGroup).getRegionGroupCount(type);
}
public int getAssignedSeriesPartitionSlotsCount(String storageGroup) {
return storageGroupPartitionTables.get(storageGroup).getAssignedSeriesPartitionSlotsCount();
}
/**
* Get the DataNodes who contain the specific StorageGroup's Schema or Data
*
* @param storageGroup The specific StorageGroup's name
* @param type SchemaRegion or DataRegion
* @return Set<TDataNodeLocation>, the related DataNodes
*/
public Set<TDataNodeLocation> getStorageGroupRelatedDataNodes(
String storageGroup, TConsensusGroupType type) {
return storageGroupPartitionTables.get(storageGroup).getStorageGroupRelatedDataNodes(type);
}
/**
* Only leader use this interface.
*
* @param storageGroup StorageGroupName
* @param type SchemaRegion or DataRegion
* @return The specific StorageGroup's Regions that sorted by the number of allocated slots
*/
public List<Pair<Long, TConsensusGroupId>> getSortedRegionSlotsCounter(
String storageGroup, TConsensusGroupType type) {
return storageGroupPartitionTables.get(storageGroup).getSortedRegionGroupSlotsCounter(type);
}
/**
* Update RegionGroup-related metric
*
* @param type SchemaRegion or DataRegion
* @return the number of SchemaRegion or DataRegion
*/
public int updateRegionGroupMetric(TConsensusGroupType type) {
Set<RegionGroup> regionGroups = new HashSet<>();
for (Map.Entry<String, StorageGroupPartitionTable> entry :
storageGroupPartitionTables.entrySet()) {
regionGroups.addAll(entry.getValue().getRegionGroups(type));
}
int result = regionGroups.size();
// datanode location -> region number
Map<TDataNodeLocation, Integer> dataNodeLocationIntegerMap = new HashMap<>();
for (RegionGroup regionGroup : regionGroups) {
TRegionReplicaSet regionReplicaSet = regionGroup.getReplicaSet();
List<TDataNodeLocation> dataNodeLocations = regionReplicaSet.getDataNodeLocations();
for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
if (!dataNodeLocationIntegerMap.containsKey(dataNodeLocation)) {
dataNodeLocationIntegerMap.put(dataNodeLocation, 0);
}
dataNodeLocationIntegerMap.put(
dataNodeLocation, dataNodeLocationIntegerMap.get(dataNodeLocation) + 1);
}
}
for (Map.Entry<TDataNodeLocation, Integer> entry : dataNodeLocationIntegerMap.entrySet()) {
TDataNodeLocation dataNodeLocation = entry.getKey();
String name =
"EndPoint("
+ dataNodeLocation.getClientRpcEndPoint().ip
+ ":"
+ dataNodeLocation.getClientRpcEndPoint().port
+ ")";
// TODO: this metric can be optimized
MetricService.getInstance()
.getOrCreateGauge(
Metric.REGION.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
name,
Tag.TYPE.toString(),
type.toString())
.set(dataNodeLocationIntegerMap.get(dataNodeLocation));
}
return result;
}
@Override
public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
File snapshotFile = new File(snapshotDir, snapshotFileName);
if (snapshotFile.exists() && snapshotFile.isFile()) {
LOGGER.error(
"Failed to take snapshot, because snapshot file [{}] is already exist.",
snapshotFile.getAbsolutePath());
return false;
}
// prevents temporary files from being damaged and cannot be deleted, which affects the next
// snapshot operation.
File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" + UUID.randomUUID());
try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile);
TIOStreamTransport tioStreamTransport = new TIOStreamTransport(fileOutputStream)) {
TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
// serialize nextRegionGroupId
ReadWriteIOUtils.write(nextRegionGroupId.get(), fileOutputStream);
// serialize StorageGroupPartitionTable
ReadWriteIOUtils.write(storageGroupPartitionTables.size(), fileOutputStream);
for (Map.Entry<String, StorageGroupPartitionTable> storageGroupPartitionTableEntry :
storageGroupPartitionTables.entrySet()) {
ReadWriteIOUtils.write(storageGroupPartitionTableEntry.getKey(), fileOutputStream);
storageGroupPartitionTableEntry.getValue().serialize(fileOutputStream, protocol);
}
// serialize regionCleanList
ReadWriteIOUtils.write(regionMaintainTaskList.size(), fileOutputStream);
for (RegionMaintainTask task : regionMaintainTaskList) {
task.serialize(fileOutputStream, protocol);
}
// write to file
fileOutputStream.flush();
fileOutputStream.close();
// rename file
return tmpFile.renameTo(snapshotFile);
} finally {
// with or without success, delete temporary files anyway
for (int retry = 0; retry < 5; retry++) {
if (!tmpFile.exists() || tmpFile.delete()) {
break;
} else {
LOGGER.warn(
"Can't delete temporary snapshot file: {}, retrying...", tmpFile.getAbsolutePath());
}
}
}
}
public void processLoadSnapshot(File snapshotDir) throws TException, IOException {
File snapshotFile = new File(snapshotDir, snapshotFileName);
if (!snapshotFile.exists() || !snapshotFile.isFile()) {
LOGGER.error(
"Failed to load snapshot,snapshot file [{}] is not exist.",
snapshotFile.getAbsolutePath());
return;
}
try (FileInputStream fileInputStream = new FileInputStream(snapshotFile);
TIOStreamTransport tioStreamTransport = new TIOStreamTransport(fileInputStream)) {
TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
// before restoring a snapshot, clear all old data
clear();
// start to restore
nextRegionGroupId.set(ReadWriteIOUtils.readInt(fileInputStream));
// restore StorageGroupPartitionTable
int length = ReadWriteIOUtils.readInt(fileInputStream);
for (int i = 0; i < length; i++) {
String storageGroup = ReadWriteIOUtils.readString(fileInputStream);
if (storageGroup == null) {
throw new IOException("Failed to load snapshot because get null StorageGroup name");
}
StorageGroupPartitionTable storageGroupPartitionTable =
new StorageGroupPartitionTable(storageGroup);
storageGroupPartitionTable.deserialize(fileInputStream, protocol);
storageGroupPartitionTables.put(storageGroup, storageGroupPartitionTable);
}
// restore deletedRegionSet
length = ReadWriteIOUtils.readInt(fileInputStream);
for (int i = 0; i < length; i++) {
RegionMaintainTask task = RegionMaintainTask.Factory.create(fileInputStream, protocol);
regionMaintainTaskList.add(task);
}
}
}
public int getStorageGroupPartitionTableSize() {
return storageGroupPartitionTables.size();
}
public void clear() {
nextRegionGroupId.set(-1);
storageGroupPartitionTables.clear();
regionMaintainTaskList.clear();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PartitionInfo that = (PartitionInfo) o;
return storageGroupPartitionTables.equals(that.storageGroupPartitionTables)
&& regionMaintainTaskList.equals(that.regionMaintainTaskList);
}
@Override
public int hashCode() {
return Objects.hash(storageGroupPartitionTables, regionMaintainTaskList);
}
}