blob: 0284b4346dd274cfee38e8ca7d7f64f72691a413 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.console.core.watcher;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.component.FlinkCheckpointProcessor;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
import org.apache.streampark.console.core.enums.OptionStateEnum;
import org.apache.streampark.console.core.metrics.flink.CheckPoints;
import org.apache.streampark.console.core.service.alert.AlertService;
import org.apache.streampark.console.core.service.application.ApplicationInfoService;
import org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.flink.kubernetes.enums.FlinkJobStateEnum;
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteModeEnum;
import org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent;
import org.apache.streampark.flink.kubernetes.event.FlinkJobCheckpointChangeEvent;
import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent;
import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
import org.apache.streampark.flink.kubernetes.model.JobStatusCV;
import org.apache.streampark.flink.kubernetes.model.TrackId;
import org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.concurrent.Executor;
import scala.Enumeration;
import static org.apache.streampark.console.core.enums.FlinkAppStateEnum.Bridge.fromK8sFlinkJobState;
import static org.apache.streampark.console.core.enums.FlinkAppStateEnum.Bridge.toK8sFlinkJobState;
/**
* Event Listener for K8sFlinkTrackMonitor。
*
* <p>Use FlinkK8sChangeListenerV2 listeners instead:
*
* @link org.apache.streampark.console.core.watcher.FlinkK8sChangeListenerV2
*/
@Deprecated
@Slf4j
@Component
public class FlinkK8sChangeEventListener {
@Lazy @Autowired private ApplicationManageService applicationManageService;
@Autowired private ApplicationInfoService applicationInfoService;
@Lazy @Autowired private AlertService alertService;
@Lazy @Autowired private FlinkCheckpointProcessor checkpointProcessor;
@Qualifier("streamparkNotifyExecutor")
@Autowired
private Executor executorService;
/**
* Catch FlinkJobStatusChangeEvent then storage it persistently to db. Actually update
* org.apache.streampark.console.core.entity.Application records.
*/
@SuppressWarnings("UnstableApiUsage")
@AllowConcurrentEvents
@Subscribe
public void subscribeJobStatusChange(FlinkJobStatusChangeEvent event) {
JobStatusCV jobStatus = event.jobStatus();
TrackId trackId = event.trackId();
// get pre application record
Application app = applicationManageService.getById(trackId.appId());
if (app == null) {
return;
}
// update application record
setByJobStatusCV(app, jobStatus);
applicationManageService.persistMetrics(app);
// email alerts when necessary
FlinkAppStateEnum state = app.getStateEnum();
if (FlinkAppStateEnum.FAILED == state
|| FlinkAppStateEnum.LOST == state
|| FlinkAppStateEnum.RESTARTING == state
|| FlinkAppStateEnum.FINISHED == state) {
executorService.execute(
() -> {
if (app.getProbing()) {
log.info("application with id {} is probing, don't send alert", app.getId());
return;
}
alertService.alert(app.getAlertId(), AlertTemplate.of(app, state));
});
}
}
/**
* Catch FlinkClusterMetricChangeEvent then storage it persistently to db. Actually update
* org.apache.streampark.console.core.entity.Application records.
*/
@SuppressWarnings("UnstableApiUsage")
@AllowConcurrentEvents
@Subscribe
public void subscribeMetricsChange(FlinkClusterMetricChangeEvent event) {
TrackId trackId = event.trackId();
FlinkExecutionMode mode = FlinkK8sExecuteModeEnum.toExecutionMode(trackId.executeMode());
// discard session mode change
if (FlinkExecutionMode.KUBERNETES_NATIVE_SESSION == mode) {
return;
}
Application app = applicationManageService.getById(trackId.appId());
if (app == null) {
return;
}
FlinkMetricCV metrics = event.metrics();
app.setJmMemory(metrics.totalJmMemory());
app.setTmMemory(metrics.totalTmMemory());
app.setTotalTM(metrics.totalTm());
app.setTotalSlot(metrics.totalSlot());
app.setAvailableSlot(metrics.availableSlot());
applicationManageService.persistMetrics(app);
}
@SuppressWarnings("UnstableApiUsage")
@AllowConcurrentEvents
@Subscribe
public void subscribeCheckpointChange(FlinkJobCheckpointChangeEvent event) {
CheckPoints.CheckPoint completed = new CheckPoints.CheckPoint();
completed.setId(event.checkpoint().id());
completed.setCheckpointType(event.checkpoint().checkpointType());
completed.setExternalPath(event.checkpoint().externalPath());
completed.setIsSavepoint(event.checkpoint().isSavepoint());
completed.setStatus(event.checkpoint().status());
completed.setTriggerTimestamp(event.checkpoint().triggerTimestamp());
CheckPoints.Latest latest = new CheckPoints.Latest();
latest.setCompleted(completed);
CheckPoints checkPoint = new CheckPoints();
checkPoint.setLatest(latest);
checkpointProcessor.process(
applicationManageService.getById(event.trackId().appId()), checkPoint);
}
private void setByJobStatusCV(Application app, JobStatusCV jobStatus) {
// infer the final flink job state
Enumeration.Value state =
FlinkJobStatusWatcher.inferFlinkJobStateFromPersist(
jobStatus.jobState(), toK8sFlinkJobState(app.getStateEnum()));
// corrective start-time / end-time / duration
long preStartTime = app.getStartTime() != null ? app.getStartTime().getTime() : 0;
long startTime = Math.max(jobStatus.jobStartTime(), preStartTime);
long preEndTime = app.getEndTime() != null ? app.getEndTime().getTime() : 0;
long endTime = Math.max(jobStatus.jobEndTime(), preEndTime);
long duration = jobStatus.duration();
if (FlinkJobStateEnum.isEndState(state)) {
if (endTime < startTime) {
endTime = System.currentTimeMillis();
}
if (duration <= 0) {
duration = endTime - startTime;
}
}
app.setState(fromK8sFlinkJobState(state).getValue());
app.setJobId(jobStatus.jobId());
app.setTotalTask(jobStatus.taskTotal());
app.setStartTime(new Date(startTime > 0 ? startTime : 0));
app.setEndTime(endTime > 0 && endTime >= startTime ? new Date(endTime) : null);
app.setDuration(duration > 0 ? duration : 0);
// when a flink job status change event can be received, it means
// that the operation command sent by streampark has been completed.
app.setOptionState(OptionStateEnum.NONE.getValue());
}
}