blob: b8ace00dd2f3eb8a4622cf26845c833e1bd3aad2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.agent.task;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.PipeTask;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningFilter;
import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.task.PipeDataNodeTask;
import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeBuilder;
import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskBuilder;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSchemaQueueNode;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
public class PipeDataNodeTaskAgent extends PipeTaskAgent {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataNodeTaskAgent.class);
protected static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
////////////////////////// Pipe Task Management Entry //////////////////////////
@Override
protected boolean isShutdown() {
return PipeAgent.runtime().isShutdown();
}
@Override
protected Map<Integer, PipeTask> buildPipeTasks(PipeMeta pipeMetaFromConfigNode)
throws IllegalPathException {
return new PipeDataNodeBuilder(pipeMetaFromConfigNode).build();
}
///////////////////////// Manage by regionGroupId /////////////////////////
@Override
protected void createPipeTask(
int consensusGroupId, PipeStaticMeta pipeStaticMeta, PipeTaskMeta pipeTaskMeta)
throws IllegalPathException {
if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) {
final PipeParameters extractorParameters = pipeStaticMeta.getExtractorParameters();
final boolean needConstructDataRegionTask =
StorageEngine.getInstance()
.getAllDataRegionIds()
.contains(new DataRegionId(consensusGroupId))
&& DataRegionListeningFilter.shouldDataRegionBeListened(extractorParameters);
final boolean needConstructSchemaRegionTask =
SchemaEngine.getInstance()
.getAllSchemaRegionIds()
.contains(new SchemaRegionId(consensusGroupId))
&& !SchemaRegionListeningFilter.parseListeningPlanTypeSet(extractorParameters)
.isEmpty();
// Advance the extractor parameters parsing logic to avoid creating un-relevant pipeTasks
if (needConstructDataRegionTask || needConstructSchemaRegionTask) {
final PipeDataNodeTask pipeTask =
new PipeDataNodeTaskBuilder(pipeStaticMeta, consensusGroupId, pipeTaskMeta).build();
pipeTask.create();
pipeTaskManager.addPipeTask(pipeStaticMeta, consensusGroupId, pipeTask);
}
}
pipeMetaKeeper
.getPipeMeta(pipeStaticMeta.getPipeName())
.getRuntimeMeta()
.getConsensusGroupId2TaskMetaMap()
.put(consensusGroupId, pipeTaskMeta);
}
@Override
public List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChangesInternal(
List<PipeMeta> pipeMetaListFromCoordinator) {
// Do nothing if the node is removing or removed
if (isShutdown()) {
return Collections.emptyList();
}
final List<TPushPipeMetaRespExceptionMessage> exceptionMessages =
super.handlePipeMetaChangesInternal(pipeMetaListFromCoordinator);
try {
final Set<Integer> validSchemaRegionIds =
clearSchemaRegionListeningQueueIfNecessary(pipeMetaListFromCoordinator);
closeSchemaRegionListeningQueueIfNecessary(validSchemaRegionIds, exceptionMessages);
} catch (Exception e) {
throw new PipeException("Failed to clear/close schema region listening queue.", e);
}
return exceptionMessages;
}
private Set<Integer> clearSchemaRegionListeningQueueIfNecessary(
List<PipeMeta> pipeMetaListFromCoordinator) throws IllegalPathException {
final Map<Integer, Long> schemaRegionId2ListeningQueueNewFirstIndex = new HashMap<>();
// Check each pipe
for (final PipeMeta pipeMetaFromCoordinator : pipeMetaListFromCoordinator) {
if (SchemaRegionListeningFilter.parseListeningPlanTypeSet(
pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters())
.isEmpty()) {
continue;
}
// Check each schema region in a pipe
final Map<Integer, PipeTaskMeta> groupId2TaskMetaMap =
pipeMetaFromCoordinator.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();
for (final SchemaRegionId regionId : SchemaEngine.getInstance().getAllSchemaRegionIds()) {
final int id = regionId.getId();
final PipeTaskMeta pipeTaskMeta = groupId2TaskMetaMap.get(id);
if (pipeTaskMeta == null) {
continue;
}
final ProgressIndex progressIndex = pipeTaskMeta.getProgressIndex();
if (progressIndex instanceof MetaProgressIndex) {
if (((MetaProgressIndex) progressIndex).getIndex() + 1
< schemaRegionId2ListeningQueueNewFirstIndex.getOrDefault(id, Long.MAX_VALUE)) {
schemaRegionId2ListeningQueueNewFirstIndex.put(
id, ((MetaProgressIndex) progressIndex).getIndex() + 1);
}
} else {
// Do not clear "minimumProgressIndex"s related queues to avoid clearing
// the queue when there are schema tasks just started and transferring
schemaRegionId2ListeningQueueNewFirstIndex.put(id, 0L);
}
}
}
schemaRegionId2ListeningQueueNewFirstIndex.forEach(
(schemaRegionId, listeningQueueNewFirstIndex) ->
PipeAgent.runtime()
.schemaListener(new SchemaRegionId(schemaRegionId))
.removeBefore(listeningQueueNewFirstIndex));
return schemaRegionId2ListeningQueueNewFirstIndex.keySet();
}
private void closeSchemaRegionListeningQueueIfNecessary(
Set<Integer> validSchemaRegionIds,
List<TPushPipeMetaRespExceptionMessage> exceptionMessages) {
if (!exceptionMessages.isEmpty()) {
return;
}
PipeAgent.runtime()
.listeningSchemaRegionIds()
.forEach(
schemaRegionId -> {
if (!validSchemaRegionIds.contains(schemaRegionId.getId())
&& PipeAgent.runtime().isSchemaLeaderReady(schemaRegionId)) {
try {
SchemaRegionConsensusImpl.getInstance()
.write(
schemaRegionId,
new PipeOperateSchemaQueueNode(new PlanNodeId(""), false));
} catch (ConsensusException e) {
throw new PipeException(
"Failed to close listening queue for SchemaRegion " + schemaRegionId, e);
}
}
});
}
public void stopAllPipesWithCriticalException() {
super.stopAllPipesWithCriticalException(
IoTDBDescriptor.getInstance().getConfig().getDataNodeId());
}
///////////////////////// Heartbeat /////////////////////////
public void collectPipeMetaList(TDataNodeHeartbeatResp resp) throws TException {
// Try the lock instead of directly acquire it to prevent the block of the cluster heartbeat
// 10s is the half of the HEARTBEAT_TIMEOUT_TIME defined in class BaseNodeCache in ConfigNode
if (!tryReadLockWithTimeOut(10)) {
return;
}
try {
collectPipeMetaListInternal(resp);
} finally {
releaseReadLock();
}
}
private void collectPipeMetaListInternal(TDataNodeHeartbeatResp resp) throws TException {
// Do nothing if data node is removing or removed, or request does not need pipe meta list
if (PipeAgent.runtime().isShutdown()) {
return;
}
final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
try {
final Optional<Logger> logger =
PipeResourceManager.log()
.schedule(
PipeDataNodeTaskAgent.class,
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
pipeMetaKeeper.getPipeMetaCount());
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
logger.ifPresent(l -> l.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage()));
}
LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
} catch (IOException e) {
throw new TException(e);
}
resp.setPipeMetaList(pipeMetaBinaryList);
}
@Override
protected void collectPipeMetaListInternal(TPipeHeartbeatReq req, TPipeHeartbeatResp resp)
throws TException {
// Do nothing if data node is removing or removed, or request does not need pipe meta list
if (PipeAgent.runtime().isShutdown()) {
return;
}
LOGGER.info("Received pipe heartbeat request {} from config node.", req.heartbeatId);
final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
try {
final Optional<Logger> logger =
PipeResourceManager.log()
.schedule(
PipeDataNodeTaskAgent.class,
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
pipeMetaKeeper.getPipeMetaCount());
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
logger.ifPresent(l -> l.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage()));
}
LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
} catch (IOException e) {
throw new TException(e);
}
resp.setPipeMetaList(pipeMetaBinaryList);
PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
}
///////////////////////// Restart Logic /////////////////////////
public void restartAllStuckPipes() {
if (!tryWriteLockWithTimeOut(5)) {
return;
}
try {
restartAllStuckPipesInternal();
} finally {
releaseWriteLock();
}
}
private void restartAllStuckPipesInternal() {
final Map<String, IoTDBDataRegionExtractor> taskId2ExtractorMap =
PipeExtractorMetrics.getInstance().getExtractorMap();
final Set<PipeMeta> stuckPipes = new HashSet<>();
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
final String pipeName = pipeMeta.getStaticMeta().getPipeName();
final List<IoTDBDataRegionExtractor> extractors =
taskId2ExtractorMap.values().stream()
.filter(e -> e.getPipeName().equals(pipeName))
.collect(Collectors.toList());
if (extractors.isEmpty()
|| !extractors.get(0).isStreamMode()
|| extractors.stream()
.noneMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles)) {
continue;
}
if (mayMemTablePinnedCountReachDangerousThreshold() || mayWalSizeReachThrottleThreshold()) {
LOGGER.warn("Pipe {} may be stuck.", pipeMeta.getStaticMeta());
stuckPipes.add(pipeMeta);
}
}
// Restart all stuck pipes
stuckPipes.parallelStream().forEach(this::restartStuckPipe);
}
private boolean mayMemTablePinnedCountReachDangerousThreshold() {
return PipeResourceManager.wal().getPinnedWalCount()
>= 10 * PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
}
private boolean mayWalSizeReachThrottleThreshold() {
return 3 * WALManager.getInstance().getTotalDiskUsage()
> 2 * IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
}
private void restartStuckPipe(PipeMeta pipeMeta) {
LOGGER.warn("Pipe {} will be restarted because of stuck.", pipeMeta.getStaticMeta());
final long startTime = System.currentTimeMillis();
changePipeStatusBeforeRestart(pipeMeta.getStaticMeta().getPipeName());
handleSinglePipeMetaChangesInternal(pipeMeta);
LOGGER.warn(
"Pipe {} was restarted because of stuck, time cost: {} ms.",
pipeMeta.getStaticMeta(),
System.currentTimeMillis() - startTime);
}
private void changePipeStatusBeforeRestart(String pipeName) {
final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
final Map<Integer, PipeTask> pipeTasks = pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta());
final Set<Integer> taskRegionIds = new HashSet<>(pipeTasks.keySet());
final Set<Integer> dataRegionIds =
StorageEngine.getInstance().getAllDataRegionIds().stream()
.map(DataRegionId::getId)
.collect(Collectors.toSet());
final Set<PipeTask> dataRegionPipeTasks =
taskRegionIds.stream()
.filter(dataRegionIds::contains)
.map(regionId -> pipeTaskManager.removePipeTask(pipeMeta.getStaticMeta(), regionId))
.filter(Objects::nonNull)
.collect(Collectors.toSet());
// Drop data region tasks
dataRegionPipeTasks.parallelStream().forEach(PipeTask::drop);
// Stop schema region tasks
pipeTaskManager
.getPipeTasks(pipeMeta.getStaticMeta())
.values()
.parallelStream()
.forEach(PipeTask::stop);
// Re-create data region tasks
dataRegionPipeTasks
.parallelStream()
.forEach(
pipeTask -> {
final PipeTask newPipeTask =
new PipeDataNodeTaskBuilder(
pipeMeta.getStaticMeta(),
((PipeDataNodeTask) pipeTask).getRegionId(),
pipeMeta
.getRuntimeMeta()
.getConsensusGroupId2TaskMetaMap()
.get(((PipeDataNodeTask) pipeTask).getRegionId()))
.build();
newPipeTask.create();
pipeTaskManager.addPipeTask(
pipeMeta.getStaticMeta(),
((PipeDataNodeTask) pipeTask).getRegionId(),
newPipeTask);
});
// Set pipe meta status to STOPPED
pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
}
///////////////////////// Utils /////////////////////////
public Set<Integer> getPipeTaskRegionIdSet(String pipeName, long creationTime) {
final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
return pipeMeta == null || pipeMeta.getStaticMeta().getCreationTime() != creationTime
? Collections.emptySet()
: pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap().keySet();
}
}