blob: c957fa77e292924fc8fef5072c95086b97db1d9d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.flink;
import com.mashape.unirest.http.JsonNode;
import com.mashape.unirest.http.Unirest;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.execution.JobClient;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class JobManager {
private static Logger LOGGER = LoggerFactory.getLogger(JobManager.class);
public static final String LATEST_CHECKPOINT_PATH = "latest_checkpoint_path";
public static final String SAVEPOINT_PATH = "savepoint_path";
public static final String RESUME_FROM_SAVEPOINT = "resumeFromSavepoint";
public static final String RESUME_FROM_CHECKPOINT = "resumeFromLatestCheckpoint";
public static final String SAVEPOINT_DIR = "savepointDir";
private Map<String, JobClient> jobs = new HashMap<>();
private ConcurrentHashMap<JobID, FlinkJobProgressPoller> jobProgressPollerMap =
new ConcurrentHashMap<>();
private String flinkWebUrl;
private String displayedFlinkWebUrl;
private Properties properties;
public JobManager(String flinkWebUrl,
String displayedFlinkWebUrl,
Properties properties) {
this.flinkWebUrl = flinkWebUrl;
this.displayedFlinkWebUrl = displayedFlinkWebUrl;
this.properties = properties;
LOGGER.info("Creating JobManager at flinkWebUrl: {}, displayedFlinkWebUrl: {}",
flinkWebUrl, displayedFlinkWebUrl);
}
public void addJob(InterpreterContext context, JobClient jobClient) {
String paragraphId = context.getParagraphId();
JobClient previousJobClient = this.jobs.put(paragraphId, jobClient);
if (previousJobClient != null) {
LOGGER.warn("There's another Job {} that is associated with paragraph {}",
jobClient.getJobID(), paragraphId);
return;
}
long checkInterval = Long.parseLong(properties.getProperty("zeppelin.flink.job.check_interval", "1000"));
FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl, jobClient.getJobID(), context, checkInterval);
thread.setName("JobProgressPoller-Thread-" + paragraphId);
thread.start();
this.jobProgressPollerMap.put(jobClient.getJobID(), thread);
}
public void removeJob(String paragraphId) {
LOGGER.info("Remove job in paragraph: " + paragraphId);
JobClient jobClient = this.jobs.remove(paragraphId);
if (jobClient == null) {
LOGGER.warn("Unable to remove job, because no job is associated with paragraph: "
+ paragraphId);
return;
}
FlinkJobProgressPoller jobProgressPoller =
this.jobProgressPollerMap.remove(jobClient.getJobID());
if (jobProgressPoller == null) {
LOGGER.warn("Unable to remove poller, because no poller is associated with paragraph: "
+ paragraphId);
return;
}
jobProgressPoller.cancel();
jobProgressPoller.interrupt();
}
public void sendFlinkJobUrl(InterpreterContext context) {
JobClient jobClient = jobs.get(context.getParagraphId());
if (jobClient != null) {
String jobUrl = displayedFlinkWebUrl + "#/job/" + jobClient.getJobID();
Map<String, String> infos = new HashMap<>();
infos.put("jobUrl", jobUrl);
infos.put("label", "FLINK JOB");
infos.put("tooltip", "View in Flink web UI");
infos.put("noteId", context.getNoteId());
infos.put("paraId", context.getParagraphId());
context.getIntpEventClient().onParaInfosReceived(infos);
} else {
LOGGER.warn("No job is associated with paragraph: " + context.getParagraphId());
}
}
public int getJobProgress(String paragraphId) {
JobClient jobClient = this.jobs.get(paragraphId);
if (jobClient == null) {
LOGGER.warn("Unable to get job progress for paragraph: " + paragraphId +
", because no job is associated with this paragraph");
return 0;
}
FlinkJobProgressPoller jobProgressPoller = this.jobProgressPollerMap.get(jobClient.getJobID());
if (jobProgressPoller == null) {
LOGGER.warn("Unable to get job progress for paragraph: " + paragraphId +
", because no job progress is associated with this jobId: " + jobClient.getJobID());
return 0;
}
return jobProgressPoller.getProgress();
}
public void cancelJob(InterpreterContext context) throws InterpreterException {
LOGGER.info("Canceling job associated of paragraph: {}", context.getParagraphId());
JobClient jobClient = this.jobs.get(context.getParagraphId());
if (jobClient == null) {
LOGGER.warn("Unable to remove Job from paragraph {} as no job associated to this paragraph",
context.getParagraphId());
return;
}
boolean cancelled = false;
try {
String savePointDir = context.getLocalProperties().get(SAVEPOINT_DIR);
if (StringUtils.isBlank(savePointDir)) {
LOGGER.info("Trying to cancel job of paragraph {}", context.getParagraphId());
jobClient.cancel();
} else {
LOGGER.info("Trying to stop job of paragraph {} with save point dir: {}",
context.getParagraphId(), savePointDir);
try {
String savePointPath = jobClient.stopWithSavepoint(true, savePointDir).get();
Map<String, String> config = new HashMap<>();
config.put(SAVEPOINT_PATH, savePointPath);
context.getIntpEventClient().updateParagraphConfig(
context.getNoteId(), context.getParagraphId(), config);
LOGGER.info("Job {} of paragraph {} is stopped with save point path: {}",
jobClient.getJobID(), context.getParagraphId(), savePointPath);
} catch (Exception e) {
LOGGER.warn("Fail to cancel job of paragraph {} with savepoint, try to cancel it without savepoint",
context.getParagraphId(), e);
jobClient.cancel();
}
}
cancelled = true;
} catch (Exception e) {
String errorMessage = String.format("Fail to cancel job %s that is associated " +
"with paragraph %s", jobClient.getJobID(), context.getParagraphId());
LOGGER.warn(errorMessage, e);
throw new InterpreterException(errorMessage, e);
} finally {
if (cancelled) {
LOGGER.info("Cancelling is successful, remove the associated FlinkJobProgressPoller of paragraph: "
+ context.getParagraphId());
FlinkJobProgressPoller jobProgressPoller = jobProgressPollerMap.remove(jobClient.getJobID());
if (jobProgressPoller != null) {
jobProgressPoller.cancel();
jobProgressPoller.interrupt();
}
this.jobs.remove(context.getParagraphId());
}
}
}
public void shutdown() {
for (FlinkJobProgressPoller jobProgressPoller : jobProgressPollerMap.values()) {
jobProgressPoller.cancel();
}
}
class FlinkJobProgressPoller extends Thread {
private String flinkWebUrl;
private JobID jobId;
private InterpreterContext context;
private boolean isStreamingInsertInto;
private int progress;
private AtomicBoolean running = new AtomicBoolean(true);
private boolean isFirstPoll = true;
private long checkInterval;
private String latestCheckpointPath;
FlinkJobProgressPoller(String flinkWebUrl, JobID jobId, InterpreterContext context, long checkInterval) {
this.flinkWebUrl = flinkWebUrl;
this.jobId = jobId;
this.context = context;
this.isStreamingInsertInto = context.getLocalProperties().containsKey("flink.streaming.insert_into");
this.checkInterval = checkInterval;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted() && running.get()) {
JsonNode rootNode = null;
try {
synchronized (running) {
running.wait(checkInterval);
}
rootNode = Unirest.get(flinkWebUrl + "/jobs/" + jobId.toString())
.asJson().getBody();
JSONArray vertices = rootNode.getObject().getJSONArray("vertices");
int totalTasks = 0;
int finishedTasks = 0;
for (int i = 0; i < vertices.length(); ++i) {
JSONObject vertex = vertices.getJSONObject(i);
totalTasks += vertex.getInt("parallelism");
finishedTasks += vertex.getJSONObject("tasks").getInt("FINISHED");
}
LOGGER.debug("Total tasks:" + totalTasks);
LOGGER.debug("Finished tasks:" + finishedTasks);
if (finishedTasks != 0) {
this.progress = finishedTasks * 100 / totalTasks;
LOGGER.debug("Progress: " + this.progress);
}
String jobState = rootNode.getObject().getString("state");
if (jobState.equalsIgnoreCase("finished")) {
break;
}
long duration = rootNode.getObject().getLong("duration") / 1000;
if (isStreamingInsertInto) {
if (isFirstPoll) {
StringBuilder builder = new StringBuilder("%angular ");
builder.append("<h1>Duration: {{duration}} </h1>");
builder.append("\n%text ");
context.out.clear(false);
context.out.write(builder.toString());
context.out.flush();
isFirstPoll = false;
}
context.getAngularObjectRegistry().add("duration",
toRichTimeDuration(duration),
context.getNoteId(),
context.getParagraphId());
}
// fetch checkpoints info and save the latest checkpoint into paragraph's config.
rootNode = Unirest.get(flinkWebUrl + "/jobs/" + jobId.toString() + "/checkpoints")
.asJson().getBody();
if (rootNode.getObject().has("latest")) {
JSONObject latestObject = rootNode.getObject().getJSONObject("latest");
if (latestObject.has("completed") && latestObject.get("completed") instanceof JSONObject) {
JSONObject completedObject = latestObject.getJSONObject("completed");
if (completedObject.has("external_path")) {
String checkpointPath = completedObject.getString("external_path");
LOGGER.debug("Latest checkpoint path: {}", checkpointPath);
if (!StringUtils.isBlank(checkpointPath) && !checkpointPath.equals(latestCheckpointPath)) {
Map<String, String> config = new HashMap<>();
config.put(LATEST_CHECKPOINT_PATH, checkpointPath);
LOGGER.info("Update latest checkpoint path: {}", checkpointPath);
context.getIntpEventClient().updateParagraphConfig(
context.getNoteId(), context.getParagraphId(), config);
latestCheckpointPath = checkpointPath;
}
}
}
}
} catch (Exception e) {
LOGGER.error("Fail to poll flink job progress via rest api", e);
}
}
}
public void cancel() {
this.running.set(false);
synchronized (running) {
running.notify();
}
}
public int getProgress() {
return progress;
}
}
/**
* Convert duration in seconds to rich time duration format. e.g. 2 days 3 hours 4 minutes 5 seconds
*
* @param duration in second
* @return
*/
static String toRichTimeDuration(long duration) {
long days = TimeUnit.SECONDS.toDays(duration);
duration -= TimeUnit.DAYS.toSeconds(days);
long hours = TimeUnit.SECONDS.toHours(duration);
duration -= TimeUnit.HOURS.toSeconds(hours);
long minutes = TimeUnit.SECONDS.toMinutes(duration);
duration -= TimeUnit.MINUTES.toSeconds(minutes);
long seconds = TimeUnit.SECONDS.toSeconds(duration);
StringBuilder builder = new StringBuilder();
if (days != 0) {
builder.append(days + " days ");
}
if (days != 0 || hours != 0) {
builder.append(hours + " hours ");
}
if (days != 0 || hours != 0 || minutes != 0) {
builder.append(minutes + " minutes ");
}
builder.append(seconds + " seconds");
return builder.toString();
}
}