blob: 6684fe6e71c49481fdc57e9b1a3c39d80516105c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.master;
import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.ColocateGroupSchema;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.Resource.ResourceType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.clone.TabletSchedCtx;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.cooldown.CooldownConf;
import org.apache.doris.metric.GaugeMetric;
import org.apache.doris.metric.Metric.MetricUnit;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.persist.BackendReplicasInfo;
import org.apache.doris.persist.BackendTabletsInfo;
import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.policy.Policy;
import org.apache.doris.policy.PolicyTypeEnum;
import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Backend.BackendStatus;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.ClearTransactionTask;
import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.task.DropReplicaTask;
import org.apache.doris.task.MasterTask;
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.task.PushCooldownConfTask;
import org.apache.doris.task.PushStoragePolicyTask;
import org.apache.doris.task.StorageMediaMigrationTask;
import org.apache.doris.task.UpdateTabletMetaInfoTask;
import org.apache.doris.thrift.TBackend;
import org.apache.doris.thrift.TDisk;
import org.apache.doris.thrift.TMasterResult;
import org.apache.doris.thrift.TPartitionVersionInfo;
import org.apache.doris.thrift.TReportRequest;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStoragePolicy;
import org.apache.doris.thrift.TStorageResource;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTablet;
import org.apache.doris.thrift.TTabletInfo;
import org.apache.doris.thrift.TTabletMetaInfo;
import org.apache.doris.thrift.TTaskType;
import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
public class ReportHandler extends Daemon {
private static final Logger LOG = LogManager.getLogger(ReportHandler.class);
private BlockingQueue<ReportTask> reportQueue = Queues.newLinkedBlockingQueue();
private enum ReportType {
UNKNOWN,
TASK,
DISK,
TABLET
}
public ReportHandler() {
super("report-thread");
GaugeMetric<Long> gauge = new GaugeMetric<Long>(
"report_queue_size", MetricUnit.NOUNIT, "report queue size") {
@Override
public Long getValue() {
return (long) reportQueue.size();
}
};
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge);
}
public TMasterResult handleReport(TReportRequest request) throws TException {
TMasterResult result = new TMasterResult();
TStatus tStatus = new TStatus(TStatusCode.OK);
result.setStatus(tStatus);
// get backend
TBackend tBackend = request.getBackend();
String host = tBackend.getHost();
int bePort = tBackend.getBePort();
Backend backend = Env.getCurrentSystemInfo().getBackendWithBePort(host, bePort);
if (backend == null) {
tStatus.setStatusCode(TStatusCode.INTERNAL_ERROR);
List<String> errorMsgs = Lists.newArrayList();
errorMsgs.add("backend[" + NetUtils
.getHostPortInAccessibleFormat(host, bePort) + "] does not exist.");
tStatus.setErrorMsgs(errorMsgs);
return result;
}
long beId = backend.getId();
Map<TTaskType, Set<Long>> tasks = null;
Map<String, TDisk> disks = null;
Map<Long, TTablet> tablets = null;
long reportVersion = -1;
ReportType reportType = ReportType.UNKNOWN;
if (request.isSetTasks()) {
tasks = request.getTasks();
reportType = ReportType.TASK;
}
if (request.isSetDisks()) {
disks = request.getDisks();
reportType = ReportType.DISK;
}
if (request.isSetTablets()) {
tablets = request.getTablets();
reportVersion = request.getReportVersion();
reportType = ReportType.TABLET;
} else if (request.isSetTabletList()) {
// the 'tablets' member will be deprecated in future.
tablets = buildTabletMap(request.getTabletList());
reportVersion = request.getReportVersion();
reportType = ReportType.TABLET;
}
if (request.isSetTabletMaxCompactionScore()) {
backend.setTabletMaxCompactionScore(request.getTabletMaxCompactionScore());
}
ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, reportVersion,
request.getStoragePolicy(), request.getResource(), request.getNumCores(),
request.getPipelineExecutorSize());
try {
putToQueue(reportTask);
} catch (Exception e) {
tStatus.setStatusCode(TStatusCode.INTERNAL_ERROR);
List<String> errorMsgs = Lists.newArrayList();
errorMsgs.add("failed to put report task to queue. queue size: " + reportQueue.size());
errorMsgs.add("err: " + e.getMessage());
tStatus.setErrorMsgs(errorMsgs);
return result;
}
LOG.info("receive report from be {}. type: {}, current queue size: {}",
backend.getId(), reportType, reportQueue.size());
return result;
}
private void putToQueue(ReportTask reportTask) throws Exception {
int currentSize = reportQueue.size();
if (currentSize > Config.report_queue_size) {
LOG.warn("the report queue size exceeds the limit: {}. current: {}", Config.report_queue_size, currentSize);
throw new Exception(
"the report queue size exceeds the limit: "
+ Config.report_queue_size + ". current: " + currentSize);
}
reportQueue.put(reportTask);
}
private Map<Long, TTablet> buildTabletMap(List<TTablet> tabletList) {
Map<Long, TTablet> tabletMap = Maps.newHashMap();
for (TTablet tTablet : tabletList) {
if (tTablet.getTabletInfos().isEmpty()) {
continue;
}
tabletMap.put(tTablet.getTabletInfos().get(0).getTabletId(), tTablet);
}
return tabletMap;
}
private class ReportTask extends MasterTask {
private long beId;
private Map<TTaskType, Set<Long>> tasks;
private Map<String, TDisk> disks;
private Map<Long, TTablet> tablets;
private long reportVersion;
private List<TStoragePolicy> storagePolicies;
private List<TStorageResource> storageResources;
private int cpuCores;
private int pipelineExecutorSize;
public ReportTask(long beId, Map<TTaskType, Set<Long>> tasks,
Map<String, TDisk> disks,
Map<Long, TTablet> tablets, long reportVersion,
List<TStoragePolicy> storagePolicies, List<TStorageResource> storageResources, int cpuCores,
int pipelineExecutorSize) {
this.beId = beId;
this.tasks = tasks;
this.disks = disks;
this.tablets = tablets;
this.reportVersion = reportVersion;
this.storagePolicies = storagePolicies;
this.storageResources = storageResources;
this.cpuCores = cpuCores;
this.pipelineExecutorSize = pipelineExecutorSize;
}
@Override
protected void exec() {
if (tasks != null) {
ReportHandler.taskReport(beId, tasks);
}
if (disks != null) {
ReportHandler.diskReport(beId, disks);
ReportHandler.cpuReport(beId, cpuCores, pipelineExecutorSize);
}
if (Config.enable_storage_policy && storagePolicies != null && storageResources != null) {
storagePolicyReport(beId, storagePolicies, storageResources);
}
if (tablets != null) {
long backendReportVersion = Env.getCurrentSystemInfo().getBackendReportVersion(beId);
if (reportVersion < backendReportVersion) {
LOG.warn("out of date report version {} from backend[{}]. current report version[{}]",
reportVersion, beId, backendReportVersion);
} else {
ReportHandler.tabletReport(beId, tablets, reportVersion);
}
}
}
}
private static void handlePushCooldownConf(long backendId, List<CooldownConf> cooldownConfToPush) {
final int PUSH_BATCH_SIZE = 1024;
AgentBatchTask batchTask = new AgentBatchTask();
for (int start = 0; start < cooldownConfToPush.size(); start += PUSH_BATCH_SIZE) {
PushCooldownConfTask task = new PushCooldownConfTask(backendId,
cooldownConfToPush.subList(start, Math.min(start + PUSH_BATCH_SIZE, cooldownConfToPush.size())));
batchTask.addTask(task);
}
AgentTaskExecutor.submit(batchTask);
}
private static void handlePushStoragePolicy(long backendId, List<Policy> policyToPush,
List<Resource> resourceToPush, List<Long> policyToDrop) {
AgentBatchTask batchTask = new AgentBatchTask();
PushStoragePolicyTask pushStoragePolicyTask = new PushStoragePolicyTask(backendId, policyToPush,
resourceToPush, policyToDrop);
batchTask.addTask(pushStoragePolicyTask);
AgentTaskExecutor.submit(batchTask);
}
private static void storagePolicyReport(long backendId,
List<TStoragePolicy> storagePoliciesInBe,
List<TStorageResource> storageResourcesInBe) {
LOG.info("backend[{}] reports policies {}, report resources: {}",
backendId, storagePoliciesInBe, storageResourcesInBe);
// do the diff. find out (intersection) / (be - meta) / (meta - be)
List<Policy> policiesInFe = Env.getCurrentEnv().getPolicyMgr().getCopiedPoliciesByType(PolicyTypeEnum.STORAGE);
List<Resource> resourcesInFe = Env.getCurrentEnv().getResourceMgr().getResource(ResourceType.S3);
resourcesInFe.addAll(Env.getCurrentEnv().getResourceMgr().getResource(ResourceType.HDFS));
List<Resource> resourceToPush = new ArrayList<>();
List<Policy> policyToPush = new ArrayList<>();
List<Long> policyToDrop = new ArrayList<>();
diffPolicy(storagePoliciesInBe, policiesInFe, policyToPush, policyToDrop);
diffResource(storageResourcesInBe, resourcesInFe, resourceToPush);
if (policyToPush.isEmpty() && resourceToPush.isEmpty() && policyToDrop.isEmpty()) {
return;
}
LOG.info("after diff policy, policyToPush {}, policyToDrop {}, and resourceToPush {}",
policyToPush.stream()
.map(p -> "StoragePolicy(name=" + p.getPolicyName() + " id=" + p.getId() + " version="
+ p.getVersion()).collect(Collectors.toList()),
policyToDrop, resourceToPush.stream()
.map(r -> "Resource(name=" + r.getName() + " id=" + r.getId() + " version="
+ r.getVersion()).collect(Collectors.toList()));
// send push rpc
handlePushStoragePolicy(backendId, policyToPush, resourceToPush, policyToDrop);
}
private static void diffPolicy(List<TStoragePolicy> storagePoliciesInBe, List<Policy> policiesInFe,
List<Policy> policyToPush, List<Long> policyToDrop) {
// fe - be
for (Policy policy : policiesInFe) {
if (policy.getId() <= 0 || ((StoragePolicy) policy).getStorageResource() == null) {
continue; // ignore policy with invalid id or storage resource
}
boolean beHasIt = false;
for (TStoragePolicy tStoragePolicy : storagePoliciesInBe) {
if (policy.getId() == tStoragePolicy.getId()) {
beHasIt = true;
// find id eq
if (policy.getVersion() == tStoragePolicy.getVersion()) {
// find version eq
} else if (policy.getVersion() > tStoragePolicy.getVersion()) {
// need to add
policyToPush.add(policy);
} else {
// impossible
LOG.warn("fe policy version {} litter than be {}, impossible",
policy.getVersion(), tStoragePolicy.getVersion());
}
break;
}
}
if (!beHasIt) {
policyToPush.add(policy);
}
}
// be - fe
for (TStoragePolicy tStoragePolicy : storagePoliciesInBe) {
boolean feHasIt = false;
for (Policy policy : policiesInFe) {
if (policy.getId() == tStoragePolicy.getId()) {
feHasIt = true;
// find id eq
break;
}
}
if (!feHasIt) {
policyToDrop.add(tStoragePolicy.getId());
}
}
}
private static void diffResource(List<TStorageResource> storageResourcesInBe, List<Resource> resourcesInFe,
List<Resource> resourceToPush) {
// fe - be
for (Resource resource : resourcesInFe) {
if (resource.getId() <= 0) {
continue; // ignore resource with invalid id
}
boolean beHasIt = false;
for (TStorageResource tStorageResource : storageResourcesInBe) {
if (resource.getId() == tStorageResource.getId()) {
beHasIt = true;
// find id eq
if (resource.getVersion() == tStorageResource.getVersion()) {
// find version eq
} else if (resource.getVersion() > tStorageResource.getVersion()) {
// need to add
resourceToPush.add(resource);
} else {
// impossible
LOG.warn("fe resource version {} litter than be {}, impossible",
resource.getVersion(), tStorageResource.getVersion());
}
break;
}
}
if (!beHasIt) {
resourceToPush.add(resource);
}
}
}
// public for fe ut
public static void tabletReport(long backendId, Map<Long, TTablet> backendTablets, long backendReportVersion) {
long start = System.currentTimeMillis();
LOG.info("backend[{}] reports {} tablet(s). report version: {}",
backendId, backendTablets.size(), backendReportVersion);
// storage medium map
HashMap<Long, TStorageMedium> storageMediumMap = Config.disable_storage_medium_check
? Maps.newHashMap() : Env.getCurrentEnv().getPartitionIdToStorageMediumMap();
// db id -> tablet id
ListMultimap<Long, Long> tabletSyncMap = LinkedListMultimap.create();
// db id -> tablet id
ListMultimap<Long, Long> tabletDeleteFromMeta = LinkedListMultimap.create();
// tablet ids which both in fe and be
Set<Long> tabletFoundInMeta = Sets.newConcurrentHashSet();
// storage medium -> tablet id
ListMultimap<TStorageMedium, Long> tabletMigrationMap = LinkedListMultimap.create();
// dbid -> txn id -> [partition info]
Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish = Maps.newHashMap();
ListMultimap<Long, Long> transactionsToClear = LinkedListMultimap.create();
// db id -> tablet id
ListMultimap<Long, Long> tabletRecoveryMap = LinkedListMultimap.create();
List<TTabletMetaInfo> tabletToUpdate = Lists.newArrayList();
List<CooldownConf> cooldownConfToPush = new LinkedList<>();
List<CooldownConf> cooldownConfToUpdate = new LinkedList<>();
// 1. do the diff. find out (intersection) / (be - meta) / (meta - be)
Env.getCurrentInvertedIndex().tabletReport(backendId, backendTablets, storageMediumMap,
tabletSyncMap,
tabletDeleteFromMeta,
tabletFoundInMeta,
tabletMigrationMap,
transactionsToPublish,
transactionsToClear,
tabletRecoveryMap,
tabletToUpdate,
cooldownConfToPush,
cooldownConfToUpdate);
// 2. sync
if (!tabletSyncMap.isEmpty()) {
sync(backendTablets, tabletSyncMap, backendId, backendReportVersion);
}
// 3. delete (meta - be)
// BE will automatically drop defective tablets. these tablets should also be dropped in catalog
if (!tabletDeleteFromMeta.isEmpty()) {
deleteFromMeta(tabletDeleteFromMeta, backendId, backendReportVersion);
}
// 4. handle (be - meta)
if (tabletFoundInMeta.size() != backendTablets.size()) {
deleteFromBackend(backendTablets, tabletFoundInMeta, backendId);
}
// 5. migration (ssd <-> hdd)
if (!Config.disable_storage_medium_check && !tabletMigrationMap.isEmpty()) {
handleMigration(tabletMigrationMap, backendId);
}
// 6. send clear transactions to be
if (!transactionsToClear.isEmpty()) {
handleClearTransactions(transactionsToClear, backendId);
}
// 7. send publish version request to be
if (!transactionsToPublish.isEmpty()) {
handleRepublishVersionInfo(transactionsToPublish, backendId);
}
// 8. send recover request to be
if (!tabletRecoveryMap.isEmpty()) {
handleRecoverTablet(tabletRecoveryMap, backendTablets, backendId);
}
// 9. send tablet meta to be for updating
if (!tabletToUpdate.isEmpty()) {
handleUpdateTabletMeta(backendId, tabletToUpdate);
}
// handle cooldown conf
if (!cooldownConfToPush.isEmpty()) {
handlePushCooldownConf(backendId, cooldownConfToPush);
}
if (!cooldownConfToUpdate.isEmpty()) {
Env.getCurrentEnv().getCooldownConfHandler().addCooldownConfToUpdate(cooldownConfToUpdate);
}
final SystemInfoService currentSystemInfo = Env.getCurrentSystemInfo();
Backend reportBackend = currentSystemInfo.getBackend(backendId);
if (reportBackend != null) {
BackendStatus backendStatus = reportBackend.getBackendStatus();
backendStatus.lastSuccessReportTabletsTime = TimeUtils.longToTimeString(start);
}
long end = System.currentTimeMillis();
LOG.info("finished to handle tablet report from backend[{}] cost: {} ms", backendId, (end - start));
}
private static void taskReport(long backendId, Map<TTaskType, Set<Long>> runningTasks) {
if (LOG.isDebugEnabled()) {
LOG.debug("begin to handle task report from backend {}", backendId);
}
long start = System.currentTimeMillis();
if (LOG.isDebugEnabled()) {
for (TTaskType type : runningTasks.keySet()) {
Set<Long> taskSet = runningTasks.get(type);
if (!taskSet.isEmpty()) {
String signatures = StringUtils.join(taskSet, ", ");
if (LOG.isDebugEnabled()) {
LOG.debug("backend task[{}]: {}", type.name(), signatures);
}
}
}
}
List<AgentTask> diffTasks = AgentTaskQueue.getDiffTasks(backendId, runningTasks);
AgentBatchTask batchTask = new AgentBatchTask();
long taskReportTime = System.currentTimeMillis();
for (AgentTask task : diffTasks) {
// these tasks no need to do diff
// 1. CREATE
// 2. SYNC DELETE
// 3. CHECK_CONSISTENCY
// 4. STORAGE_MDEIUM_MIGRATE
if (task.getTaskType() == TTaskType.CREATE
|| task.getTaskType() == TTaskType.CHECK_CONSISTENCY
|| task.getTaskType() == TTaskType.STORAGE_MEDIUM_MIGRATE) {
continue;
}
// to escape sending duplicate agent task to be
if (task.shouldResend(taskReportTime)) {
batchTask.addTask(task);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("get {} diff task(s) to resend", batchTask.getTaskNum());
}
if (batchTask.getTaskNum() > 0) {
AgentTaskExecutor.submit(batchTask);
}
LOG.info("finished to handle task report from backend {}, diff task num: {}. cost: {} ms",
backendId, batchTask.getTaskNum(), (System.currentTimeMillis() - start));
}
private static void diskReport(long backendId, Map<String, TDisk> backendDisks) {
LOG.info("begin to handle disk report from backend {}", backendId);
long start = System.currentTimeMillis();
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
if (backend == null) {
LOG.warn("backend doesn't exist. id: " + backendId);
return;
}
List<String> badDisks = backendDisks.values().stream().filter(disk -> !disk.isUsed())
.map(disk -> "path=" + disk.getRootPath() + ", path hash=" + disk.getPathHash())
.collect(Collectors.toList());
backend.updateDisks(backendDisks);
LOG.info("finished to handle disk report from backend: {}, disk size: {}, bad disk: {}, cost: {} ms",
backendId, backendDisks.size(), badDisks, (System.currentTimeMillis() - start));
}
private static void cpuReport(long backendId, int cpuCores, int pipelineExecutorSize) {
LOG.info("begin to handle cpu report from backend {}", backendId);
long start = System.currentTimeMillis();
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
if (backend == null) {
LOG.warn("backend doesn't exist. id: " + backendId);
return;
}
if (backend.updateCpuInfo(cpuCores, pipelineExecutorSize)) {
// cpu info is changed
LOG.info("new cpu info. backendId: {}, cpucores: {}, pipelineExecutorSize: {}", backendId, cpuCores,
pipelineExecutorSize);
// log change
Env.getCurrentEnv().getEditLog().logBackendStateChange(backend);
}
LOG.info("finished to handle cpu report from backend {}, cost: {} ms",
backendId, (System.currentTimeMillis() - start));
}
private static void sync(Map<Long, TTablet> backendTablets, ListMultimap<Long, Long> tabletSyncMap,
long backendId, long backendReportVersion) {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
OUTER:
for (Long dbId : tabletSyncMap.keySet()) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
continue;
}
int syncCounter = 0;
List<Long> tabletIds = tabletSyncMap.get(dbId);
LOG.info("before sync tablets in db[{}]. report num: {}. backend[{}]",
dbId, tabletIds.size(), backendId);
List<TabletMeta> tabletMetaList = invertedIndex.getTabletMetaList(tabletIds);
for (int i = 0; i < tabletMetaList.size(); i++) {
TabletMeta tabletMeta = tabletMetaList.get(i);
if (tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
continue;
}
long tabletId = tabletIds.get(i);
long tableId = tabletMeta.getTableId();
OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
if (olapTable == null || !olapTable.writeLockIfExist()) {
continue;
}
try {
if (backendReportVersion < Env.getCurrentSystemInfo().getBackendReportVersion(backendId)) {
break OUTER;
}
long partitionId = tabletMeta.getPartitionId();
Partition partition = olapTable.getPartition(partitionId);
if (partition == null) {
continue;
}
long indexId = tabletMeta.getIndexId();
MaterializedIndex index = partition.getIndex(indexId);
if (index == null) {
continue;
}
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
Tablet tablet = index.getTablet(tabletId);
if (tablet == null) {
continue;
}
Replica replica = tablet.getReplicaByBackendId(backendId);
if (replica == null) {
continue;
}
// yiguolei: it is very important here, if the replica is under schema change or rollup
// should ignore the report.
// eg.
// original replica import successfully, but the dest schema change replica failed
// the fe will sync the replica with the original replica, but ignore the schema change replica.
// if the last failed version is changed, then fe will think schema change successfully.
// this is an fatal error.
if (replica.getState() == ReplicaState.NORMAL) {
long metaVersion = replica.getVersion();
long backendVersion = -1L;
long rowCount = -1L;
long dataSize = -1L;
long remoteDataSize = -1L;
// schema change maybe successfully in fe, but not inform be,
// then be will report two schema hash
// just select the dest schema hash
for (TTabletInfo tabletInfo : backendTablets.get(tabletId).getTabletInfos()) {
if (tabletInfo.getSchemaHash() == schemaHash) {
backendVersion = tabletInfo.getVersion();
rowCount = tabletInfo.getRowCount();
dataSize = tabletInfo.getDataSize();
remoteDataSize = tabletInfo.getRemoteDataSize();
break;
}
}
if (backendVersion == -1L) {
continue;
}
boolean needSync = false;
if (metaVersion < backendVersion) {
needSync = true;
} else if (metaVersion == backendVersion) {
if (replica.isBad()) {
needSync = true;
}
if (replica.getVersion() >= partition.getCommittedVersion()
&& replica.getLastFailedVersion() > partition.getCommittedVersion()) {
LOG.info("sync replica {} of tablet {} in backend {} in db {}. replica last failed"
+ " version change to -1 because last failed version > replica's committed"
+ " version {}",
replica, tabletId, backendId, dbId, partition.getCommittedVersion());
replica.updateLastFailedVersion(-1L);
needSync = true;
}
}
if (needSync) {
// happens when
// 1. PUSH finished in BE but failed or not yet report to FE
// 2. repair for VERSION_INCOMPLETE finished in BE, but failed or not yet report to FE
replica.updateVersionInfo(backendVersion, dataSize, remoteDataSize, rowCount);
if (replica.getLastFailedVersion() < 0) {
if (replica.setBad(false)) {
LOG.info("sync replica {} of tablet {} in backend {} in db {}. "
+ "replica change from bad to good.",
replica, tabletId, backendId, dbId);
}
// last failed version < 0 means this replica becomes health after sync,
// so we write an edit log to sync this operation
ReplicaPersistInfo info = ReplicaPersistInfo.createForClone(dbId, tableId,
partitionId, indexId, tabletId, backendId, replica.getId(),
replica.getVersion(), schemaHash,
dataSize, remoteDataSize, rowCount,
replica.getLastFailedVersion(),
replica.getLastSuccessVersion());
Env.getCurrentEnv().getEditLog().logUpdateReplica(info);
}
++syncCounter;
if (LOG.isDebugEnabled()) {
LOG.debug("sync replica {} of tablet {} in backend {} in db {}. report version: {}",
replica.getId(), tabletId, backendId, dbId, backendReportVersion);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("replica {} of tablet {} in backend {} version is changed"
+ " between check and real sync. meta[{}]. backend[{}]",
replica.getId(), tabletId, backendId, metaVersion,
backendVersion);
}
}
}
} finally {
olapTable.writeUnlock();
}
}
LOG.info("sync {} tablets in db[{}]. backend[{}]", syncCounter, dbId, backendId);
} // end for dbs
}
private static void deleteFromMeta(ListMultimap<Long, Long> tabletDeleteFromMeta, long backendId,
long backendReportVersion) {
AgentBatchTask createReplicaBatchTask = new AgentBatchTask();
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Long dbId : tabletDeleteFromMeta.keySet()) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
continue;
}
int deleteCounter = 0;
List<Long> tabletIds = tabletDeleteFromMeta.get(dbId);
List<TabletMeta> tabletMetaList = invertedIndex.getTabletMetaList(tabletIds);
for (int i = 0; i < tabletMetaList.size(); i++) {
TabletMeta tabletMeta = tabletMetaList.get(i);
if (tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
continue;
}
long tabletId = tabletIds.get(i);
long tableId = tabletMeta.getTableId();
OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
if (olapTable == null || !olapTable.writeLockIfExist()) {
continue;
}
try {
long partitionId = tabletMeta.getPartitionId();
Partition partition = olapTable.getPartition(partitionId);
if (partition == null) {
continue;
}
short replicationNum = olapTable.getPartitionInfo()
.getReplicaAllocation(partition.getId()).getTotalReplicaNum();
long indexId = tabletMeta.getIndexId();
MaterializedIndex index = partition.getIndex(indexId);
if (index == null) {
continue;
}
if (index.getState() == IndexState.SHADOW) {
// This index is under schema change or rollup, tablet may not be created on BE.
// ignore it.
continue;
}
Tablet tablet = index.getTablet(tabletId);
if (tablet == null) {
continue;
}
Replica replica = tablet.getReplicaByBackendId(backendId);
if (replica == null) {
continue;
}
// check report version again
long currentBackendReportVersion = Env.getCurrentSystemInfo()
.getBackendReportVersion(backendId);
if (backendReportVersion < currentBackendReportVersion) {
continue;
}
BinlogConfig binlogConfig = new BinlogConfig(olapTable.getBinlogConfig());
ReplicaState state = replica.getState();
if (state == ReplicaState.NORMAL || state == ReplicaState.SCHEMA_CHANGE) {
// if state is PENDING / ROLLUP / CLONE
// it's normal that the replica is not created in BE but exists in meta.
// so we do not delete it.
List<Replica> replicas = tablet.getReplicas();
if (replicas.size() <= 1) {
LOG.error("backend [{}] invalid situation. tablet[{}] has few replica[{}], "
+ "replica num setting is [{}]",
backendId, tabletId, replicas.size(), replicationNum);
// there is a replica in FE, but not in BE and there is only one replica in this tablet
// in this case, it means data is lost.
// should generate a create replica request to BE to create a replica forcibly.
if (replicas.size() == 1) {
if (Config.recover_with_empty_tablet) {
// only create this task if force recovery is true
LOG.warn("tablet {} has only one replica {} on backend {}"
+ " and it is lost. create an empty replica to recover it",
tabletId, replica.getId(), backendId);
MaterializedIndexMeta indexMeta = olapTable.getIndexMetaByIndexId(indexId);
Set<String> bfColumns = olapTable.getCopiedBfColumns();
double bfFpp = olapTable.getBfFpp();
List<Index> indexes = indexId == olapTable.getBaseIndexId()
? olapTable.getCopiedIndexes() : null;
CreateReplicaTask createReplicaTask = new CreateReplicaTask(backendId, dbId,
tableId, partitionId, indexId, tabletId, replica.getId(),
indexMeta.getShortKeyColumnCount(),
indexMeta.getSchemaHash(), partition.getVisibleVersion(),
indexMeta.getKeysType(),
TStorageType.COLUMN,
TStorageMedium.HDD, indexMeta.getSchema(), bfColumns, bfFpp, null,
indexes,
olapTable.isInMemory(),
olapTable.getPartitionInfo().getTabletType(partitionId),
null,
olapTable.getCompressionType(),
olapTable.getEnableUniqueKeyMergeOnWrite(), olapTable.getStoragePolicy(),
olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(),
olapTable.skipWriteIndexOnLoad(), olapTable.getCompactionPolicy(),
olapTable.getTimeSeriesCompactionGoalSizeMbytes(),
olapTable.getTimeSeriesCompactionFileCountThreshold(),
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
olapTable.getTimeSeriesCompactionLevelThreshold(),
olapTable.storeRowColumn(),
binlogConfig);
createReplicaTask.setIsRecoverTask(true);
createReplicaBatchTask.addTask(createReplicaTask);
} else {
// just set this replica as bad
if (replica.setBad(true)) {
LOG.warn("tablet {} has only one replica {} on backend {}"
+ " and it is lost, set it as bad",
tabletId, replica.getId(), backendId);
BackendTabletsInfo tabletsInfo = new BackendTabletsInfo(backendId);
tabletsInfo.setBad(true);
ReplicaPersistInfo replicaPersistInfo = ReplicaPersistInfo.createForReport(dbId,
tableId, partitionId, indexId, tabletId, backendId, replica.getId());
tabletsInfo.addReplicaInfo(replicaPersistInfo);
Env.getCurrentEnv().getEditLog().logBackendTabletsInfo(tabletsInfo);
}
}
}
continue;
}
tablet.deleteReplicaByBackendId(backendId);
++deleteCounter;
// remove replica related tasks
AgentTaskQueue.removeReplicaRelatedTasks(backendId, tabletId);
// write edit log
ReplicaPersistInfo info = ReplicaPersistInfo.createForDelete(dbId, tableId, partitionId,
indexId, tabletId, backendId);
Env.getCurrentEnv().getEditLog().logDeleteReplica(info);
LOG.warn("delete replica[{}] in tablet[{}] from meta. backend[{}], report version: {}"
+ ", current report version: {}",
replica.getId(), tabletId, backendId, backendReportVersion,
currentBackendReportVersion);
// check for clone
replicas = tablet.getReplicas();
if (replicas.size() == 0) {
LOG.error("invalid situation. tablet[{}] is empty", tabletId);
}
}
} finally {
olapTable.writeUnlock();
}
}
LOG.info("delete {} replica(s) from catalog in db[{}]", deleteCounter, dbId);
} // end for dbs
if (Config.recover_with_empty_tablet && createReplicaBatchTask.getTaskNum() > 0) {
// must add to queue, so that when task finish report, the task can be found in queue.
// the task will be eventually removed from queue by task report, so no need to worry
// about the residuals.
AgentTaskQueue.addBatchTask(createReplicaBatchTask);
AgentTaskExecutor.submit(createReplicaBatchTask);
}
}
private static void deleteFromBackend(Map<Long, TTablet> backendTablets,
Set<Long> tabletFoundInMeta,
long backendId) {
int deleteFromBackendCounter = 0;
int addToMetaCounter = 0;
AgentBatchTask batchTask = new AgentBatchTask();
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Long tabletId : backendTablets.keySet()) {
TTablet backendTablet = backendTablets.get(tabletId);
TTabletInfo backendTabletInfo = backendTablet.getTabletInfos().get(0);
boolean needDelete = false;
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
if (LOG.isDebugEnabled()) {
LOG.debug("process tablet [{}], backend[{}]", tabletId, backendId);
}
if (!tabletFoundInMeta.contains(tabletId)) {
if (isBackendReplicaHealthy(backendTabletInfo)) {
// if this tablet meta is still in invertedIndex. try to add it.
// if add failed. delete this tablet from backend.
if (tabletMeta != null && addReplica(tabletId, tabletMeta, backendTabletInfo, backendId)) {
// update counter
++addToMetaCounter;
if (LOG.isDebugEnabled()) {
LOG.debug("add to meta. tablet[{}], backend[{}]", tabletId, backendId);
}
} else {
LOG.info("failed add to meta. tablet[{}], backend[{}]", tabletId, backendId);
needDelete = true;
}
} else {
needDelete = true;
}
}
if (needDelete) {
// drop replica
long replicaId = backendTabletInfo.getReplicaId();
// If no such tablet meta, this indicates that the tablet belongs to a dropped table or partition
boolean isDropTableOrPartition = tabletMeta == null;
DropReplicaTask task = new DropReplicaTask(backendId, tabletId, replicaId,
backendTabletInfo.getSchemaHash(), isDropTableOrPartition);
batchTask.addTask(task);
LOG.info("delete tablet[{}] from backend[{}] because not found in meta", tabletId, backendId);
++deleteFromBackendCounter;
}
} // end for backendTabletIds
if (batchTask.getTaskNum() != 0) {
AgentTaskExecutor.submit(batchTask);
}
LOG.info("delete {} tablet(s) and add {} replica(s) to meta from backend[{}]",
deleteFromBackendCounter, addToMetaCounter, backendId);
}
// replica is used and no version missing
private static boolean isBackendReplicaHealthy(TTabletInfo backendTabletInfo) {
if (backendTabletInfo.isSetUsed() && !backendTabletInfo.isUsed()) {
return false;
}
if (backendTabletInfo.isSetVersionMiss() && backendTabletInfo.isVersionMiss()) {
return false;
}
return true;
}
private static void handleMigration(ListMultimap<TStorageMedium, Long> tabletMetaMigrationMap,
long backendId) {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
SystemInfoService infoService = Env.getCurrentSystemInfo();
Backend be = infoService.getBackend(backendId);
if (be == null) {
return;
}
AgentBatchTask batchTask = new AgentBatchTask();
for (TStorageMedium storageMedium : tabletMetaMigrationMap.keySet()) {
List<Long> tabletIds = tabletMetaMigrationMap.get(storageMedium);
if (!be.hasSpecifiedStorageMedium(storageMedium)) {
LOG.warn("no specified storage medium {} on backend {}, skip storage migration."
+ " sample tablet id: {}", storageMedium, backendId, tabletIds.isEmpty()
? "-1" : tabletIds.get(0));
continue;
}
List<TabletMeta> tabletMetaList = invertedIndex.getTabletMetaList(tabletIds);
for (int i = 0; i < tabletMetaList.size(); i++) {
long tabletId = tabletIds.get(i);
TabletMeta tabletMeta = tabletMetaList.get(i);
if (tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
continue;
}
// always get old schema hash(as effective one)
int effectiveSchemaHash = tabletMeta.getOldSchemaHash();
StorageMediaMigrationTask task = new StorageMediaMigrationTask(backendId, tabletId,
effectiveSchemaHash, storageMedium);
batchTask.addTask(task);
}
}
AgentTaskExecutor.submit(batchTask);
}
private static void handleRepublishVersionInfo(
Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish, long backendId) {
AgentBatchTask batchTask = new AgentBatchTask();
long createPublishVersionTaskTime = System.currentTimeMillis();
for (Long dbId : transactionsToPublish.keySet()) {
ListMultimap<Long, TPartitionVersionInfo> map = transactionsToPublish.get(dbId);
for (long txnId : map.keySet()) {
PublishVersionTask task = new PublishVersionTask(backendId, txnId, dbId,
map.get(txnId), createPublishVersionTaskTime);
batchTask.addTask(task);
// add to AgentTaskQueue for handling finish report.
AgentTaskQueue.addTask(task);
}
}
AgentTaskExecutor.submit(batchTask);
}
private static void handleRecoverTablet(ListMultimap<Long, Long> tabletRecoveryMap,
Map<Long, TTablet> backendTablets, long backendId) {
// print a warn log here to indicate the exceptions on the backend
LOG.warn("find {} tablets on backend {} which is bad or misses versions that need clone or force recovery",
tabletRecoveryMap.size(), backendId);
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
BackendReplicasInfo backendReplicasInfo = new BackendReplicasInfo(backendId);
for (Long dbId : tabletRecoveryMap.keySet()) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
continue;
}
List<Long> tabletIds = tabletRecoveryMap.get(dbId);
List<TabletMeta> tabletMetaList = invertedIndex.getTabletMetaList(tabletIds);
for (int i = 0; i < tabletMetaList.size(); i++) {
TabletMeta tabletMeta = tabletMetaList.get(i);
if (tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
continue;
}
long tabletId = tabletIds.get(i);
long tableId = tabletMeta.getTableId();
OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
if (olapTable == null || !olapTable.writeLockIfExist()) {
continue;
}
try {
long partitionId = tabletMeta.getPartitionId();
Partition partition = olapTable.getPartition(partitionId);
if (partition == null) {
continue;
}
long indexId = tabletMeta.getIndexId();
MaterializedIndex index = partition.getIndex(indexId);
if (index == null) {
continue;
}
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
Tablet tablet = index.getTablet(tabletId);
if (tablet == null) {
continue;
}
Replica replica = tablet.getReplicaByBackendId(backendId);
if (replica == null) {
continue;
}
for (TTabletInfo tTabletInfo : backendTablets.get(tabletId).getTabletInfos()) {
if (tTabletInfo.getSchemaHash() == schemaHash) {
if (tTabletInfo.isSetUsed() && !tTabletInfo.isUsed()) {
if (replica.setBad(true)) {
LOG.warn("set bad for replica {} of tablet {} on backend {}",
replica.getId(), tabletId, backendId);
backendReplicasInfo.addBadReplica(tabletId);
}
break;
}
if ((tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss())
|| replica.checkVersionRegressive(tTabletInfo.getVersion())) {
// If the origin last failed version is larger than 0, not change it.
// Otherwise, we set last failed version to replica'version + 1.
// Because last failed version should always larger than replica's version.
long newLastFailedVersion = replica.getLastFailedVersion();
if (newLastFailedVersion < 0) {
newLastFailedVersion = replica.getVersion() + 1;
replica.updateLastFailedVersion(newLastFailedVersion);
LOG.warn("set missing version for replica {} of tablet {} on backend {}, "
+ "version in fe {}, version in be {}, be missing {}",
replica.getId(), tabletId, backendId, replica.getVersion(),
tTabletInfo.getVersion(), tTabletInfo.isVersionMiss());
}
backendReplicasInfo.addMissingVersionReplica(tabletId, newLastFailedVersion);
break;
}
break;
}
}
} finally {
olapTable.writeUnlock();
}
}
} // end for recovery map
if (!backendReplicasInfo.isEmpty()) {
// need to write edit log the sync the bad info to other FEs
Env.getCurrentEnv().getEditLog().logBackendReplicasInfo(backendReplicasInfo);
}
}
private static void handleUpdateTabletMeta(long backendId, List<TTabletMetaInfo> tabletToUpdate) {
final int updateBatchSize = 4096;
AgentBatchTask batchTask = new AgentBatchTask();
for (int start = 0; start < tabletToUpdate.size(); start += updateBatchSize) {
UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(backendId,
tabletToUpdate.subList(start, Math.min(start + updateBatchSize, tabletToUpdate.size())));
batchTask.addTask(task);
}
AgentTaskExecutor.submit(batchTask);
}
private static void handleClearTransactions(ListMultimap<Long, Long> transactionsToClear, long backendId) {
AgentBatchTask batchTask = new AgentBatchTask();
for (Long transactionId : transactionsToClear.keySet()) {
ClearTransactionTask clearTransactionTask = new ClearTransactionTask(backendId,
transactionId, transactionsToClear.get(transactionId));
batchTask.addTask(clearTransactionTask);
}
AgentTaskExecutor.submit(batchTask);
}
// return false if add replica failed
private static boolean addReplica(long tabletId, TabletMeta tabletMeta, TTabletInfo backendTabletInfo,
long backendId) {
long dbId = tabletMeta.getDbId();
long tableId = tabletMeta.getTableId();
long partitionId = tabletMeta.getPartitionId();
long indexId = tabletMeta.getIndexId();
int schemaHash = backendTabletInfo.getSchemaHash();
long version = backendTabletInfo.getVersion();
long dataSize = backendTabletInfo.getDataSize();
long remoteDataSize = backendTabletInfo.getRemoteDataSize();
long rowCount = backendTabletInfo.getRowCount();
Database db;
OlapTable olapTable;
try {
db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
olapTable = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP);
olapTable.writeLockOrMetaException();
} catch (MetaNotFoundException e) {
LOG.warn(e);
return false;
}
try {
Partition partition = olapTable.getPartition(partitionId);
if (partition == null) {
LOG.warn("partition[{}] does not exist", partitionId);
return false;
}
ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId());
MaterializedIndex materializedIndex = partition.getIndex(indexId);
if (materializedIndex == null) {
LOG.warn("index[{}] does not exist", indexId);
return false;
}
Tablet tablet = materializedIndex.getTablet(tabletId);
if (tablet == null) {
LOG.warn("tablet[{}] does not exist", tabletId);
return false;
}
// check replica id
long replicaId = backendTabletInfo.getReplicaId();
if (replicaId <= 0) {
LOG.warn("replica id is invalid");
return false;
}
long visibleVersion = partition.getVisibleVersion();
// check replica version
if (version < visibleVersion) {
LOG.warn("version is invalid. tablet[{}], visible[{}]", version, visibleVersion);
return false;
}
// check schema hash
if (schemaHash != olapTable.getSchemaHashByIndexId(indexId)) {
LOG.warn("schema hash is diff[{}-{}]", schemaHash, olapTable.getSchemaHashByIndexId(indexId));
return false;
}
// colocate table will delete Replica in meta when balance
// but we need to rely on MetaNotFoundException to decide whether delete the tablet in backend
// if the tablet is healthy, delete it.
boolean isColocateBackend = false;
ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
if (colocateTableIndex.isColocateTable(olapTable.getId())) {
ColocateTableIndex.GroupId groupId = colocateTableIndex.getGroup(tableId);
Preconditions.checkState(groupId != null,
"can not get colocate group for %s", tableId);
int tabletOrderIdx = materializedIndex.getTabletOrderIdx(tabletId);
Preconditions.checkState(tabletOrderIdx != -1, "get tablet materializedIndex for %s fail", tabletId);
Set<Long> backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(groupId);
if (groupSchema != null) {
replicaAlloc = groupSchema.getReplicaAlloc();
}
TabletStatus status =
tablet.getColocateHealthStatus(visibleVersion, replicaAlloc, backendsSet);
if (status == TabletStatus.HEALTHY) {
return false;
}
if (backendsSet.contains(backendId)) {
isColocateBackend = true;
}
}
SystemInfoService infoService = Env.getCurrentSystemInfo();
List<Long> aliveBeIds = infoService.getAllBackendIds(true);
Pair<TabletStatus, TabletSchedCtx.Priority> status = tablet.getHealthStatusWithPriority(infoService,
visibleVersion, replicaAlloc, aliveBeIds);
// FORCE_REDUNDANT is a specific missing case.
// So it can add replica when it's in FORCE_REDUNDANT.
// But must be careful to avoid: delete a replica then add it back, then repeat forever.
// If this replica is sched available and existing another replica is sched unavailable,
// it's safe to add this replica.
// Because if the tablet scheduler want to delete a replica, it will choose the sched
// unavailable replica and avoid the repeating loop as above.
boolean canAddForceRedundant = status.first == TabletStatus.FORCE_REDUNDANT
&& infoService.checkBackendScheduleAvailable(backendId)
&& tablet.getReplicas().stream().anyMatch(
r -> !infoService.checkBackendScheduleAvailable(r.getBackendId()));
if (isColocateBackend
|| canAddForceRedundant
|| status.first == TabletStatus.VERSION_INCOMPLETE
|| status.first == TabletStatus.REPLICA_MISSING
|| status.first == TabletStatus.UNRECOVERABLE) {
long lastFailedVersion = -1L;
// For some partition created by old version's Doris
// The init partition's version in FE is (1-0), the tablet's version in BE is (2-0)
// If the BE report version is (2-0) and partition's version is (1-0),
// we should add the tablet to meta.
// But old version doris is too old, we should not consider them any more,
// just throw exception in this case
if (version > partition.getNextVersion() - 1) {
// this is a fatal error
LOG.warn("version is invalid. tablet[{}], partition's max version [{}]", version,
partition.getNextVersion() - 1);
return false;
} else if (version < partition.getCommittedVersion()) {
lastFailedVersion = partition.getCommittedVersion();
}
if (backendTabletInfo.isSetCooldownMetaId()) {
// replica has cooldowned data
do {
Pair<Long, Long> cooldownConf = tablet.getCooldownConf();
if (backendTabletInfo.getCooldownTerm() > cooldownConf.second) {
// should not be here
LOG.warn("report cooldownTerm({}) > cooldownTerm in TabletMeta({}), tabletId={}",
backendTabletInfo.getCooldownTerm(), cooldownConf.second, tabletId);
return false;
}
if (backendTabletInfo.getReplicaId() == cooldownConf.first) {
// this replica is true cooldown replica, so replica's cooldowned data must not be deleted
break;
}
List<Replica> replicas = Env.getCurrentInvertedIndex().getReplicas(tabletId);
if (backendTabletInfo.getCooldownTerm() <= 0) {
if (replicas.stream().anyMatch(
r -> backendTabletInfo.getCooldownMetaId().equals(r.getCooldownMetaId()))) {
// this backend is just restarted, and shares same cooldowned data with others replica,
// so replica's cooldowned data must not be deleted
break;
}
}
long minCooldownTerm = Long.MAX_VALUE;
for (Replica r : replicas) {
minCooldownTerm = Math.min(r.getCooldownTerm(), minCooldownTerm);
}
if (backendTabletInfo.getCooldownTerm() >= minCooldownTerm) {
if (replicas.stream().anyMatch(
r -> backendTabletInfo.getCooldownMetaId().equals(r.getCooldownMetaId()))) {
// this replica shares same cooldowned data with others replica, and won't follow data
// of lower cooldown term, so replica's cooldowned data must not be deleted
break;
}
}
LOG.warn("replica's cooldowned data may have been deleted. tabletId={}, replicaId={}", tabletId,
replicaId);
return false;
} while (false);
}
// use replicaId reported by BE to maintain replica meta consistent between FE and BE
Replica replica = new Replica(replicaId, backendId, version, schemaHash,
dataSize, remoteDataSize, rowCount, ReplicaState.NORMAL,
lastFailedVersion, version);
tablet.addReplica(replica);
// write edit log
ReplicaPersistInfo info = ReplicaPersistInfo.createForAdd(dbId, tableId, partitionId, indexId,
tabletId, backendId, replicaId,
version, schemaHash, dataSize, remoteDataSize, rowCount,
lastFailedVersion,
version);
Env.getCurrentEnv().getEditLog().logAddReplica(info);
LOG.info("add replica[{}-{}] to catalog. backend[{}], tablet status {}, tablet size {}, "
+ "is colocate backend {}",
tabletId, replicaId, backendId, status.first.name(), tablet.getReplicas().size(),
isColocateBackend);
return true;
} else {
// replica is enough. check if this tablet is already in meta
// (status changed between 'tabletReport()' and 'addReplica()')
for (Replica replica : tablet.getReplicas()) {
if (replica.getBackendId() == backendId) {
// tablet is already in meta. return true
return true;
}
}
LOG.warn("no add replica [{}-{}] cause it is enough[{}-{}], tablet status {}",
tabletId, replicaId, tablet.getReplicas().size(), replicaAlloc.toCreateStmt(),
status.first.name());
return false;
}
} finally {
olapTable.writeUnlock();
}
}
@Override
protected void runOneCycle() {
while (true) {
ReportTask task = null;
try {
task = reportQueue.take();
task.exec();
} catch (InterruptedException e) {
LOG.warn("got interupted exception when executing report", e);
}
}
}
}