blob: bef6e34a760da24beaf9140fd0d4abced76f416f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.engine.server;
import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
import org.apache.seatunnel.api.event.EventHandler;
import org.apache.seatunnel.api.event.EventProcessor;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.common.utils.StringFormatUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
import org.apache.seatunnel.engine.common.exception.JobException;
import org.apache.seatunnel.engine.common.exception.JobNotFoundException;
import org.apache.seatunnel.engine.common.exception.SavePointFailedException;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobInfo;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.event.JobEventHttpReportHandler;
import org.apache.seatunnel.engine.server.event.JobEventProcessor;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.master.JobHistoryService;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
import org.apache.seatunnel.engine.server.task.operation.GetMetricsOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.hazelcast.cluster.Address;
import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.services.MembershipServiceEvent;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.spi.impl.NodeEngineImpl;
import lombok.NonNull;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJobMetricsMap;
public class CoordinatorService {
private final NodeEngineImpl nodeEngine;
private final ILogger logger;
private volatile ResourceManager resourceManager;
private JobHistoryService jobHistoryService;
/**
* IMap key is jobId and value is {@link JobInfo}. Tuple2 key is JobMaster init timestamp and
* value is the jobImmutableInformation which is sent by client when submit job
*
* <p>This IMap is used to recovery runningJobInfoIMap in JobMaster when a new master node
* active
*/
private IMap<Long, JobInfo> runningJobInfoIMap;
/**
* IMap key is one of jobId {@link
* org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and {@link
* org.apache.seatunnel.engine.server.execution.TaskGroupLocation}
*
* <p>The value of IMap is one of {@link JobStatus} {@link PipelineStatus} {@link
* org.apache.seatunnel.engine.server.execution.ExecutionState}
*
* <p>This IMap is used to recovery runningJobStateIMap in JobMaster when a new master node
* active
*/
IMap<Object, Object> runningJobStateIMap;
/**
* IMap key is one of jobId {@link
* org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and {@link
* org.apache.seatunnel.engine.server.execution.TaskGroupLocation}
*
* <p>The value of IMap is one of {@link
* org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan} stateTimestamps {@link
* org.apache.seatunnel.engine.server.dag.physical.SubPlan} stateTimestamps {@link
* org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex} stateTimestamps
*
* <p>This IMap is used to recovery runningJobStateTimestampsIMap in JobMaster when a new master
* node active
*/
IMap<Object, Long[]> runningJobStateTimestampsIMap;
/**
* key: job id; <br>
* value: job master;
*/
private Map<Long, JobMaster> runningJobMasterMap = new ConcurrentHashMap<>();
/**
* IMap key is {@link PipelineLocation}
*
* <p>The value of IMap is map of {@link TaskGroupLocation} and the {@link SlotProfile} it used.
*
* <p>This IMap is used to recovery ownedSlotProfilesIMap in JobMaster when a new master node
* active
*/
private IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>> ownedSlotProfilesIMap;
private IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap;
/** If this node is a master node */
private volatile boolean isActive = false;
private ExecutorService executorService;
private final SeaTunnelServer seaTunnelServer;
private final ScheduledExecutorService masterActiveListener;
private final EngineConfig engineConfig;
private ConnectorPackageService connectorPackageService;
private EventProcessor eventProcessor;
public CoordinatorService(
@NonNull NodeEngineImpl nodeEngine,
@NonNull SeaTunnelServer seaTunnelServer,
EngineConfig engineConfig) {
this.nodeEngine = nodeEngine;
this.logger = nodeEngine.getLogger(getClass());
this.executorService =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("seatunnel-coordinator-service-%d")
.build());
this.seaTunnelServer = seaTunnelServer;
this.engineConfig = engineConfig;
masterActiveListener = Executors.newSingleThreadScheduledExecutor();
masterActiveListener.scheduleAtFixedRate(
this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
}
private JobEventProcessor createJobEventProcessor(
String reportHttpEndpoint,
Map<String, String> reportHttpHeaders,
NodeEngineImpl nodeEngine) {
List<EventHandler> handlers =
EventProcessor.loadEventHandlers(Thread.currentThread().getContextClassLoader());
if (reportHttpEndpoint != null) {
String ringBufferName = "zeta-job-event";
int maxBufferCapacity = 2000;
nodeEngine
.getHazelcastInstance()
.getConfig()
.addRingBufferConfig(
new Config()
.getRingbufferConfig(ringBufferName)
.setCapacity(maxBufferCapacity)
.setBackupCount(0)
.setAsyncBackupCount(1)
.setTimeToLiveSeconds(0));
Ringbuffer ringbuffer = nodeEngine.getHazelcastInstance().getRingbuffer(ringBufferName);
JobEventHttpReportHandler httpReportHandler =
new JobEventHttpReportHandler(
reportHttpEndpoint, reportHttpHeaders, ringbuffer);
handlers.add(httpReportHandler);
}
logger.info("Loaded event handlers: " + handlers);
JobEventProcessor eventProcessor = new JobEventProcessor(handlers);
return eventProcessor;
}
public JobHistoryService getJobHistoryService() {
return jobHistoryService;
}
public JobMaster getJobMaster(Long jobId) {
return runningJobMasterMap.get(jobId);
}
public EventProcessor getEventProcessor() {
return eventProcessor;
}
// On the new master node
// 1. If runningJobStateIMap.get(jobId) == null and runningJobInfoIMap.get(jobId) != null. We
// will do
// runningJobInfoIMap.remove(jobId)
//
// 2. If runningJobStateIMap.get(jobId) != null and the value equals JobStatus End State. We
// need new a
// JobMaster and generate PhysicalPlan again and then try to remove all of PipelineLocation
// and
// TaskGroupLocation key in the runningJobStateIMap.
//
// 3. If runningJobStateIMap.get(jobId) != null and the value equals JobStatus.SCHEDULED. We
// need cancel the job
// and then call submitJob(long jobId, Data jobImmutableInformation) to resubmit it.
//
// 4. If runningJobStateIMap.get(jobId) != null and the value is CANCELING or RUNNING. We need
// recover the JobMaster
// from runningJobStateIMap and then waiting for it complete.
private void initCoordinatorService() {
runningJobInfoIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO);
runningJobStateIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
runningJobStateTimestampsIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_STATE_TIMESTAMPS);
ownedSlotProfilesIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
metricsImap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
jobHistoryService =
new JobHistoryService(
runningJobStateIMap,
logger,
runningJobMasterMap,
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE),
nodeEngine
.getHazelcastInstance()
.getMap(Constant.IMAP_FINISHED_JOB_METRICS),
nodeEngine
.getHazelcastInstance()
.getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO),
engineConfig.getHistoryJobExpireMinutes());
eventProcessor =
createJobEventProcessor(
engineConfig.getEventReportHttpApi(),
engineConfig.getEventReportHttpHeaders(),
nodeEngine);
// If the user has configured the connector package service, create it on the master node.
ConnectorJarStorageConfig connectorJarStorageConfig =
engineConfig.getConnectorJarStorageConfig();
if (connectorJarStorageConfig.getEnable()) {
connectorPackageService = new ConnectorPackageService(seaTunnelServer);
}
List<CompletableFuture<Void>> collect =
runningJobInfoIMap.entrySet().stream()
.map(
entry ->
CompletableFuture.runAsync(
() -> {
logger.info(
String.format(
"begin restore job (%s) from master active switch",
entry.getKey()));
try {
restoreJobFromMasterActiveSwitch(
entry.getKey(), entry.getValue());
} catch (Exception e) {
logger.severe(e);
}
logger.info(
String.format(
"restore job (%s) from master active switch finished",
entry.getKey()));
},
executorService))
.collect(Collectors.toList());
try {
CompletableFuture<Void> voidCompletableFuture =
CompletableFuture.allOf(collect.toArray(new CompletableFuture[0]));
voidCompletableFuture.get();
} catch (Exception e) {
throw new SeaTunnelEngineException(e);
}
}
private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobInfo jobInfo) {
if (runningJobStateIMap.get(jobId) == null) {
runningJobInfoIMap.remove(jobId);
return;
}
JobStatus jobStatus = (JobStatus) runningJobStateIMap.get(jobId);
JobMaster jobMaster =
new JobMaster(
jobInfo.getJobImmutableInformation(),
nodeEngine,
executorService,
getResourceManager(),
getJobHistoryService(),
runningJobStateIMap,
runningJobStateTimestampsIMap,
ownedSlotProfilesIMap,
runningJobInfoIMap,
metricsImap,
engineConfig,
seaTunnelServer);
try {
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp(), true);
} catch (Exception e) {
throw new SeaTunnelEngineException(String.format("Job id %s init failed", jobId), e);
}
String jobFullName = jobMaster.getPhysicalPlan().getJobFullName();
runningJobMasterMap.put(jobId, jobMaster);
logger.info(
String.format(
"The restore %s is in %s state, restore pipeline and take over this job running",
jobFullName, jobStatus));
CompletableFuture.runAsync(
() -> {
try {
jobMaster
.getPhysicalPlan()
.getPipelineList()
.forEach(SubPlan::restorePipelineState);
jobMaster.run();
} finally {
// voidCompletableFuture will be cancelled when zeta master node
// shutdown to simulate master failure,
// don't update runningJobMasterMap is this case.
if (!jobMaster.getJobMasterCompleteFuture().isCompletedExceptionally()) {
runningJobMasterMap.remove(jobId);
}
}
},
executorService);
}
private void checkNewActiveMaster() {
try {
if (!isActive && this.seaTunnelServer.isMasterNode()) {
logger.info(
"This node become a new active master node, begin init coordinator service");
if (this.executorService.isShutdown()) {
this.executorService =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("seatunnel-coordinator-service-%d")
.build());
}
initCoordinatorService();
isActive = true;
} else if (isActive && !this.seaTunnelServer.isMasterNode()) {
isActive = false;
logger.info(
"This node become leave active master node, begin clear coordinator service");
clearCoordinatorService();
}
} catch (Exception e) {
isActive = false;
logger.severe(ExceptionUtils.getMessage(e));
throw new SeaTunnelEngineException("check new active master error, stop loop", e);
}
}
public synchronized void clearCoordinatorService() {
// interrupt all JobMaster
runningJobMasterMap.values().forEach(JobMaster::interrupt);
executorService.shutdownNow();
runningJobMasterMap.clear();
try {
executorService.awaitTermination(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new SeaTunnelEngineException("wait clean executor service error", e);
}
if (resourceManager != null) {
resourceManager.close();
}
try {
if (eventProcessor != null) {
eventProcessor.close();
}
} catch (Exception e) {
throw new SeaTunnelEngineException("close event processor error", e);
}
}
/** Lazy load for resource manager */
public ResourceManager getResourceManager() {
if (resourceManager == null) {
synchronized (this) {
if (resourceManager == null) {
ResourceManager manager =
new ResourceManagerFactory(nodeEngine).getResourceManager();
manager.init();
resourceManager = manager;
}
}
}
return resourceManager;
}
/** call by client to submit job */
public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInformation) {
CompletableFuture<Void> jobSubmitFuture = new CompletableFuture<>();
// Check if the current jobID is already running. If so, complete the submission
// successfully.
// This avoids potential issues like redundant job restores or other anomalies.
if (getJobMaster(jobId) != null) {
logger.warning(
String.format(
"The job %s is currently running; no need to submit again.", jobId));
jobSubmitFuture.complete(null);
return new PassiveCompletableFuture<>(jobSubmitFuture);
}
JobMaster jobMaster =
new JobMaster(
jobImmutableInformation,
this.nodeEngine,
executorService,
getResourceManager(),
getJobHistoryService(),
runningJobStateIMap,
runningJobStateTimestampsIMap,
ownedSlotProfilesIMap,
runningJobInfoIMap,
metricsImap,
engineConfig,
seaTunnelServer);
executorService.submit(
() -> {
try {
runningJobInfoIMap.put(
jobId,
new JobInfo(System.currentTimeMillis(), jobImmutableInformation));
runningJobMasterMap.put(jobId, jobMaster);
jobMaster.init(
runningJobInfoIMap.get(jobId).getInitializationTimestamp(), false);
// We specify that when init is complete, the submitJob is complete
jobSubmitFuture.complete(null);
} catch (Throwable e) {
logger.severe(
String.format(
"submit job %s error %s ",
jobId, ExceptionUtils.getMessage(e)));
jobSubmitFuture.completeExceptionally(e);
}
if (!jobSubmitFuture.isCompletedExceptionally()) {
try {
jobMaster.run();
} finally {
// voidCompletableFuture will be cancelled when zeta master node
// shutdown to simulate master failure,
// don't update runningJobMasterMap is this case.
if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
runningJobMasterMap.remove(jobId);
}
}
} else {
runningJobInfoIMap.remove(jobId);
runningJobMasterMap.remove(jobId);
}
});
return new PassiveCompletableFuture<>(jobSubmitFuture);
}
public PassiveCompletableFuture<Void> savePoint(long jobId) {
CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>();
if (!runningJobMasterMap.containsKey(jobId)) {
SavePointFailedException exception =
new SavePointFailedException(
"The job with id '" + jobId + "' not running, save point failed");
logger.warning(exception);
voidCompletableFuture.completeExceptionally(exception);
} else {
voidCompletableFuture =
new PassiveCompletableFuture<>(
CompletableFuture.supplyAsync(
() -> {
JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
if (!runningJobMaster.savePoint().join()) {
throw new SavePointFailedException(
"The job with id '"
+ jobId
+ "' save point failed");
}
return null;
},
executorService));
}
return new PassiveCompletableFuture<>(voidCompletableFuture);
}
public PassiveCompletableFuture<JobResult> waitForJobComplete(long jobId) {
JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
if (runningJobMaster == null) {
// Because operations on Imap cannot be performed within Operation.
CompletableFuture<JobHistoryService.JobState> jobStateFuture =
CompletableFuture.supplyAsync(
() -> {
return jobHistoryService.getJobDetailState(jobId);
},
executorService);
JobHistoryService.JobState jobState = null;
try {
jobState = jobStateFuture.get();
} catch (Exception e) {
throw new SeaTunnelEngineException("get job state error", e);
}
CompletableFuture<JobResult> future = new CompletableFuture<>();
if (jobState == null) future.complete(new JobResult(JobStatus.UNKNOWABLE, null));
else
future.complete(new JobResult(jobState.getJobStatus(), jobState.getErrorMessage()));
return new PassiveCompletableFuture<>(future);
} else {
return new PassiveCompletableFuture<>(runningJobMaster.getJobMasterCompleteFuture());
}
}
public PassiveCompletableFuture<Void> cancelJob(long jodId) {
JobMaster runningJobMaster = runningJobMasterMap.get(jodId);
if (runningJobMaster == null) {
CompletableFuture<Void> future = new CompletableFuture<>();
future.complete(null);
return new PassiveCompletableFuture<>(future);
} else {
return new PassiveCompletableFuture<>(
CompletableFuture.supplyAsync(
() -> {
runningJobMaster.cancelJob();
return null;
},
executorService));
}
}
public JobStatus getJobStatus(long jobId) {
JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
if (runningJobMaster == null) {
JobHistoryService.JobState jobDetailState = jobHistoryService.getJobDetailState(jobId);
return null == jobDetailState ? JobStatus.UNKNOWABLE : jobDetailState.getJobStatus();
}
JobStatus jobStatus = runningJobMaster.getJobStatus();
if (jobStatus == null) {
return jobHistoryService.getFinishedJobStateImap().get(jobId).getJobStatus();
}
return jobStatus;
}
public JobMetrics getJobMetrics(long jobId) {
JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
if (runningJobMaster == null) {
return jobHistoryService.getJobMetrics(jobId);
}
JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(runningJobMaster.getCurrJobMetrics());
JobMetrics jobMetricsImap = jobHistoryService.getJobMetrics(jobId);
return jobMetricsImap != null ? jobMetricsImap.merge(jobMetrics) : jobMetrics;
}
public Map<Long, JobMetrics> getRunningJobMetrics() {
final Set<Long> runningJobIds = runningJobMasterMap.keySet();
Set<Address> addresses = new HashSet<>();
ownedSlotProfilesIMap.forEach(
(pipelineLocation, ownedSlotProfilesIMap) -> {
if (runningJobIds.contains(pipelineLocation.getJobId())) {
ownedSlotProfilesIMap
.values()
.forEach(
ownedSlotProfile -> {
addresses.add(ownedSlotProfile.getWorker());
});
}
});
List<RawJobMetrics> metrics = new ArrayList<>();
addresses.forEach(
address -> {
try {
if (nodeEngine.getClusterService().getMember(address) != null) {
RawJobMetrics rawJobMetrics =
(RawJobMetrics)
NodeEngineUtil.sendOperationToMemberNode(
nodeEngine,
new GetMetricsOperation(runningJobIds),
address)
.get();
metrics.add(rawJobMetrics);
}
}
// HazelcastInstanceNotActiveException. It means that the node is
// offline, so waiting for the taskGroup to restore can be successful
catch (HazelcastInstanceNotActiveException e) {
logger.warning(
String.format(
"get metrics with exception: %s.",
ExceptionUtils.getMessage(e)));
} catch (Exception e) {
throw new SeaTunnelException(e.getMessage());
}
});
Map<Long, JobMetrics> longJobMetricsMap = toJobMetricsMap(metrics);
longJobMetricsMap.forEach(
(jobId, jobMetrics) -> {
JobMetrics jobMetricsImap = jobHistoryService.getJobMetrics(jobId);
if (jobMetricsImap != null) {
longJobMetricsMap.put(jobId, jobMetricsImap.merge(jobMetrics));
}
});
return longJobMetricsMap;
}
public JobDAGInfo getJobInfo(long jobId) {
JobDAGInfo jobInfo = jobHistoryService.getJobDAGInfo(jobId);
if (jobInfo != null) {
return jobInfo;
}
return runningJobMasterMap.get(jobId).getJobDAGInfo();
}
/**
* When TaskGroup ends, it is called by {@link TaskExecutionService} to notify JobMaster the
* TaskGroup's state.
*/
public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
logger.info(
String.format(
"Received task end from execution %s, state %s",
taskExecutionState.getTaskGroupLocation(),
taskExecutionState.getExecutionState()));
TaskGroupLocation taskGroupLocation = taskExecutionState.getTaskGroupLocation();
JobMaster runningJobMaster = runningJobMasterMap.get(taskGroupLocation.getJobId());
if (runningJobMaster == null) {
throw new JobNotFoundException(
String.format("Job %s not running", taskGroupLocation.getJobId()));
}
runningJobMaster.updateTaskExecutionState(taskExecutionState);
}
public void shutdown() {
if (masterActiveListener != null) {
masterActiveListener.shutdownNow();
}
clearCoordinatorService();
}
/** return true if this node is a master node and the coordinator service init finished. */
public boolean isCoordinatorActive() {
return isActive;
}
public void failedTaskOnMemberRemoved(MembershipServiceEvent event) {
Address lostAddress = event.getMember().getAddress();
runningJobMasterMap.forEach(
(aLong, jobMaster) -> {
jobMaster
.getPhysicalPlan()
.getPipelineList()
.forEach(
subPlan -> {
makeTasksFailed(
subPlan.getCoordinatorVertexList(), lostAddress);
makeTasksFailed(
subPlan.getPhysicalVertexList(), lostAddress);
});
});
}
private void makeTasksFailed(
@NonNull List<PhysicalVertex> physicalVertexList, @NonNull Address lostAddress) {
physicalVertexList.forEach(
physicalVertex -> {
Address deployAddress = physicalVertex.getCurrentExecutionAddress();
ExecutionState executionState = physicalVertex.getExecutionState();
if (null != deployAddress
&& deployAddress.equals(lostAddress)
&& (executionState.equals(ExecutionState.DEPLOYING)
|| executionState.equals(ExecutionState.RUNNING)
|| executionState.equals(ExecutionState.CANCELING))) {
TaskGroupLocation taskGroupLocation = physicalVertex.getTaskGroupLocation();
physicalVertex.updateStateByExecutionService(
new TaskExecutionState(
taskGroupLocation,
ExecutionState.FAILED,
new JobException(
String.format(
"The taskGroup(%s) deployed node(%s) offline",
taskGroupLocation, lostAddress))));
}
});
}
public void memberRemoved(MembershipServiceEvent event) {
if (isCoordinatorActive()) {
this.getResourceManager().memberRemoved(event);
}
this.failedTaskOnMemberRemoved(event);
}
public void printExecutionInfo() {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
int activeCount = threadPoolExecutor.getActiveCount();
int corePoolSize = threadPoolExecutor.getCorePoolSize();
int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
int poolSize = threadPoolExecutor.getPoolSize();
long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
long taskCount = threadPoolExecutor.getTaskCount();
logger.info(
StringFormatUtils.formatTable(
"CoordinatorService Thread Pool Status",
"activeCount",
activeCount,
"corePoolSize",
corePoolSize,
"maximumPoolSize",
maximumPoolSize,
"poolSize",
poolSize,
"completedTaskCount",
completedTaskCount,
"taskCount",
taskCount));
}
public void printJobDetailInfo() {
AtomicLong createdJobCount = new AtomicLong();
AtomicLong scheduledJobCount = new AtomicLong();
AtomicLong runningJobCount = new AtomicLong();
AtomicLong failingJobCount = new AtomicLong();
AtomicLong failedJobCount = new AtomicLong();
AtomicLong cancellingJobCount = new AtomicLong();
AtomicLong canceledJobCount = new AtomicLong();
AtomicLong finishedJobCount = new AtomicLong();
if (runningJobInfoIMap != null) {
runningJobInfoIMap
.keySet()
.forEach(
jobId -> {
if (runningJobStateIMap.get(jobId) != null) {
JobStatus jobStatus =
(JobStatus) runningJobStateIMap.get(jobId);
switch (jobStatus) {
case CREATED:
createdJobCount.addAndGet(1);
break;
case SCHEDULED:
scheduledJobCount.addAndGet(1);
break;
case RUNNING:
runningJobCount.addAndGet(1);
break;
case FAILING:
failingJobCount.addAndGet(1);
break;
case FAILED:
failedJobCount.addAndGet(1);
break;
case CANCELING:
cancellingJobCount.addAndGet(1);
break;
case CANCELED:
canceledJobCount.addAndGet(1);
break;
case FINISHED:
finishedJobCount.addAndGet(1);
break;
default:
}
}
});
}
logger.info(
StringFormatUtils.formatTable(
"Job info detail",
"createdJobCount",
createdJobCount,
"scheduledJobCount",
scheduledJobCount,
"runningJobCount",
runningJobCount,
"failingJobCount",
failingJobCount,
"failedJobCount",
failedJobCount,
"cancellingJobCount",
cancellingJobCount,
"canceledJobCount",
canceledJobCount,
"finishedJobCount",
finishedJobCount));
}
public ConnectorPackageService getConnectorPackageService() {
if (connectorPackageService == null) {
throw new SeaTunnelEngineException(
"The user is not configured to enable connector package service, can not get connector package service service from master node.");
}
return connectorPackageService;
}
}