| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.eagle.jpm.spark.history.crawl; |
| |
| import org.apache.eagle.jpm.spark.entity.*; |
| import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig; |
| import org.apache.eagle.jpm.util.*; |
| import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; |
| import org.apache.eagle.service.client.EagleServiceClientException; |
| import org.apache.eagle.service.client.impl.EagleServiceBaseClient; |
| import org.apache.eagle.service.client.impl.EagleServiceClientImpl; |
| import com.typesafe.config.Config; |
| import com.typesafe.config.ConfigFactory; |
| import org.apache.commons.lang.ArrayUtils; |
| import org.json.simple.JSONArray; |
| import org.json.simple.JSONObject; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.*; |
| |
| public class JHFSparkEventReader { |
| private static final Logger LOG = LoggerFactory.getLogger(JHFSparkEventReader.class); |
| |
| private static final int FLUSH_LIMIT = 500; |
| private long firstTaskLaunchTime; |
| private long lastEventTime; |
| |
| private Map<String, SparkExecutor> executors; |
| private SparkApp app; |
| private Map<Integer, SparkJob> jobs; |
| private Map<String, SparkStage> stages; |
| private Map<Integer, Set<String>> jobStageMap; |
| private Map<Long, SparkTask> tasks; |
| private EagleServiceClientImpl client; |
| private Map<String, Map<Integer, Boolean>> stageTaskStatusMap; |
| |
| private List<TaggedLogAPIEntity> createEntities; |
| |
| private SparkHistoryJobAppConfig config; |
| private Config conf; |
| |
| public JHFSparkEventReader(SparkHistoryJobAppConfig config, Map<String, String> baseTags, SparkApplicationInfo info) { |
| app = new SparkApp(); |
| app.setTags(new HashMap<String, String>(baseTags)); |
| app.setYarnState(info.getState()); |
| app.setYarnStatus(info.getFinalStatus()); |
| createEntities = new ArrayList<>(); |
| jobs = new HashMap<Integer, SparkJob>(); |
| stages = new HashMap<String, SparkStage>(); |
| jobStageMap = new HashMap<Integer, Set<String>>(); |
| tasks = new HashMap<Long, SparkTask>(); |
| executors = new HashMap<String, SparkExecutor>(); |
| stageTaskStatusMap = new HashMap<>(); |
| conf = config.getConfig(); |
| this.config = config; |
| this.initiateClient(); |
| } |
| |
| public SparkApp getApp() { |
| return this.app; |
| } |
| |
| public void read(JSONObject eventObj) { |
| String eventType = (String) eventObj.get("Event"); |
| if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerApplicationStart.toString())) { |
| handleAppStarted(eventObj); |
| } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerEnvironmentUpdate.toString())) { |
| handleEnvironmentSet(eventObj); |
| } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerExecutorAdded.toString())) { |
| handleExecutorAdd(eventObj); |
| } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerBlockManagerAdded.toString())) { |
| handleBlockManagerAdd(eventObj); |
| } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerJobStart.toString())) { |
| handleJobStart(eventObj); |
| } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerStageSubmitted.toString())) { |
| handleStageSubmit(eventObj); |
| } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerTaskStart.toString())) { |
| handleTaskStart(eventObj); |
| } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerTaskEnd.toString())) { |
| handleTaskEnd(eventObj); |
| } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerStageCompleted.toString())) { |
| handleStageComplete(eventObj); |
| } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerJobEnd.toString())) { |
| handleJobEnd(eventObj); |
| } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerExecutorRemoved.toString())) { |
| handleExecutorRemoved(eventObj); |
| } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerApplicationEnd.toString())) { |
| handleAppEnd(eventObj); |
| } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerBlockManagerRemoved.toString())) { |
| //nothing to do now |
| } else { |
| LOG.info("Not registered event type:" + eventType); |
| } |
| |
| } |
| |
| private void handleEnvironmentSet(JSONObject event) { |
| app.setConfig(new JobConfig()); |
| JSONObject sparkProps = (JSONObject) event.get("Spark Properties"); |
| |
| String[] props = {"spark.yarn.app.id", "spark.executor.memory", "spark.driver.host", "spark.driver.port", |
| "spark.driver.memory", "spark.scheduler.pool", "spark.executor.cores", "spark.yarn.am.memory", |
| "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", "spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", "spark.master"}; |
| |
| String[] additionalJobConf = null; |
| if (conf.hasPath("spark.jobConf.additional.info")) { |
| additionalJobConf = conf.getString("spark.jobConf.additional.info").split(",\\s*"); |
| } |
| |
| String[] jobConf = (String[]) ArrayUtils.addAll(additionalJobConf, props); |
| for (String prop : jobConf) { |
| if (sparkProps.containsKey(prop)) { |
| app.getConfig().getConfig().put(prop, (String) sparkProps.get(prop)); |
| } |
| } |
| } |
| |
| private Object getConfigVal(JobConfig config, String configName, String type) { |
| if (config.getConfig().containsKey(configName)) { |
| Object val = config.getConfig().get(configName); |
| if (type.equalsIgnoreCase(Integer.class.getName())) { |
| return Integer.parseInt((String) val); |
| } else { |
| return val; |
| } |
| } else { |
| if (type.equalsIgnoreCase(Integer.class.getName())) { |
| return conf.getInt("spark.defaultVal." + configName); |
| } else { |
| return conf.getString("spark.defaultVal." + configName); |
| } |
| } |
| } |
| |
| private boolean isClientMode(JobConfig config) { |
| return config.getConfig().get("spark.master").equalsIgnoreCase("yarn-client"); |
| } |
| |
| private void handleAppStarted(JSONObject event) { |
| //need update all entities tag before app start |
| List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>(); |
| entities.addAll(this.executors.values()); |
| entities.add(this.app); |
| |
| long appStartTime = JSONUtils.getLong(event, "Timestamp", lastEventTime); |
| for (TaggedLogAPIEntity entity : entities) { |
| entity.getTags().put(SparkJobTagName.SPARK_APP_ID.toString(), JSONUtils.getString(event, "App ID")); |
| entity.getTags().put(SparkJobTagName.SPARK_APP_NAME.toString(), JSONUtils.getString(event, "App Name")); |
| // In yarn-client mode, attemptId is not available in the log, so we set attemptId = 1. |
| String attemptId = isClientMode(this.app.getConfig()) ? "1" : JSONUtils.getString(event, "App Attempt ID"); |
| entity.getTags().put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), attemptId); |
| // the second argument of getNormalizeName() is changed to null because the original code contains sensitive text |
| // original second argument looks like: this.app.getConfig().getConfig().get("xxx"), "xxx" is the sensitive text |
| entity.getTags().put(SparkJobTagName.SPARK_APP_NORM_NAME.toString(), this.getNormalizedName(JSONUtils.getString(event, "App Name"), null)); |
| entity.getTags().put(SparkJobTagName.SPARK_USER.toString(), JSONUtils.getString(event, "User")); |
| |
| entity.setTimestamp(appStartTime); |
| } |
| |
| this.app.setStartTime(appStartTime); |
| this.lastEventTime = appStartTime; |
| } |
| |
| private void handleExecutorAdd(JSONObject event) { |
| String executorID = (String) event.get("Executor ID"); |
| long executorAddTime = JSONUtils.getLong(event, "Timestamp", lastEventTime); |
| this.lastEventTime = executorAddTime; |
| SparkExecutor executor = this.initiateExecutor(executorID, executorAddTime); |
| |
| JSONObject executorInfo = JSONUtils.getJSONObject(event, "Executor Info"); |
| |
| } |
| |
| private void handleBlockManagerAdd(JSONObject event) { |
| long maxMemory = JSONUtils.getLong(event, "Maximum Memory"); |
| long timestamp = JSONUtils.getLong(event, "Timestamp", lastEventTime); |
| this.lastEventTime = timestamp; |
| JSONObject blockInfo = JSONUtils.getJSONObject(event, "Block Manager ID"); |
| String executorID = JSONUtils.getString(blockInfo, "Executor ID"); |
| String hostAndPort = JSONUtils.getString(blockInfo, "Host") + ":" + JSONUtils.getLong(blockInfo, "Port"); |
| |
| SparkExecutor executor = this.initiateExecutor(executorID, timestamp); |
| executor.setMaxMemory(maxMemory); |
| executor.setHostPort(hostAndPort); |
| } |
| |
| private void handleTaskStart(JSONObject event) { |
| this.initializeTask(event); |
| } |
| |
| private void handleTaskEnd(JSONObject event) { |
| JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info"); |
| long taskId = JSONUtils.getLong(taskInfo, "Task ID"); |
| SparkTask task = tasks.get(taskId); |
| if (task == null) { |
| return; |
| } |
| |
| task.setFailed(JSONUtils.getBoolean(taskInfo, "Failed")); |
| JSONObject taskMetrics = JSONUtils.getJSONObject(event, "Task Metrics"); |
| if (null != taskMetrics) { |
| task.setExecutorDeserializeTime(JSONUtils.getLong(taskMetrics, "Executor Deserialize Time", lastEventTime)); |
| task.setExecutorRunTime(JSONUtils.getLong(taskMetrics, "Executor Run Time", lastEventTime)); |
| task.setJvmGcTime(JSONUtils.getLong(taskMetrics, "JVM GC Time", lastEventTime)); |
| task.setResultSize(JSONUtils.getLong(taskMetrics, "Result Size")); |
| task.setResultSerializationTime(JSONUtils.getLong(taskMetrics, "Result Serialization Time", lastEventTime)); |
| task.setMemoryBytesSpilled(JSONUtils.getLong(taskMetrics, "Memory Bytes Spilled")); |
| task.setDiskBytesSpilled(JSONUtils.getLong(taskMetrics, "Disk Bytes Spilled")); |
| |
| JSONObject inputMetrics = JSONUtils.getJSONObject(taskMetrics, "Input Metrics"); |
| if (null != inputMetrics) { |
| task.setInputBytes(JSONUtils.getLong(inputMetrics, "Bytes Read")); |
| task.setInputRecords(JSONUtils.getLong(inputMetrics, "Records Read")); |
| } |
| |
| JSONObject outputMetrics = JSONUtils.getJSONObject(taskMetrics, "Output Metrics"); |
| if (null != outputMetrics) { |
| task.setOutputBytes(JSONUtils.getLong(outputMetrics, "Bytes Written")); |
| task.setOutputRecords(JSONUtils.getLong(outputMetrics, "Records Written")); |
| } |
| |
| JSONObject shuffleWriteMetrics = JSONUtils.getJSONObject(taskMetrics, "Shuffle Write Metrics"); |
| if (null != shuffleWriteMetrics) { |
| task.setShuffleWriteBytes(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle Bytes Written")); |
| task.setShuffleWriteRecords(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle Records Written")); |
| } |
| |
| JSONObject shuffleReadMetrics = JSONUtils.getJSONObject(taskMetrics, "Shuffle Read Metrics"); |
| if (null != shuffleReadMetrics) { |
| task.setShuffleReadLocalBytes(JSONUtils.getLong(shuffleReadMetrics, "Local Bytes Read")); |
| task.setShuffleReadRemoteBytes(JSONUtils.getLong(shuffleReadMetrics, "Remote Bytes Read")); |
| task.setShuffleReadRecords(JSONUtils.getLong(shuffleReadMetrics, "Total Records Read")); |
| } |
| } else { |
| //for tasks success without task metrics, save in the end if no other information |
| if (!task.isFailed()) { |
| return; |
| } |
| } |
| |
| aggregateToStage(task); |
| aggregateToExecutor(task); |
| tasks.remove(taskId); |
| this.flushEntities(task, false); |
| } |
| |
| |
| private SparkTask initializeTask(JSONObject event) { |
| SparkTask task = new SparkTask(); |
| task.setTags(new HashMap<>(this.app.getTags())); |
| task.setTimestamp(app.getTimestamp()); |
| |
| task.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage ID"))); |
| task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage Attempt ID"))); |
| |
| JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info"); |
| long taskId = JSONUtils.getLong(taskInfo, "Task ID"); |
| task.setTaskId(taskId); |
| |
| task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), Long.toString(JSONUtils.getLong(taskInfo, "Index"))); |
| task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), Integer.toString(JSONUtils.getInt(taskInfo, "Attempt"))); |
| long launchTime = JSONUtils.getLong(taskInfo, "Launch Time", lastEventTime); |
| this.lastEventTime = launchTime; |
| if (taskId == 0) { |
| this.setFirstTaskLaunchTime(launchTime); |
| } |
| task.setLaunchTime(launchTime); |
| task.setExecutorId(JSONUtils.getString(taskInfo, "Executor ID")); |
| task.setHost(JSONUtils.getString(taskInfo, "Host")); |
| task.setTaskLocality(JSONUtils.getString(taskInfo, "Locality")); |
| task.setSpeculative(JSONUtils.getBoolean(taskInfo, "Speculative")); |
| |
| tasks.put(task.getTaskId(), task); |
| return task; |
| } |
| |
| private void setFirstTaskLaunchTime(long launchTime) { |
| this.firstTaskLaunchTime = launchTime; |
| } |
| |
| private void handleJobStart(JSONObject event) { |
| SparkJob job = new SparkJob(); |
| job.setTags(new HashMap<>(this.app.getTags())); |
| job.setTimestamp(app.getTimestamp()); |
| |
| int jobId = JSONUtils.getInt(event, "Job ID"); |
| job.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), Integer.toString(jobId)); |
| long submissionTime = JSONUtils.getLong(event, "Submission Time", lastEventTime); |
| job.setSubmissionTime(submissionTime); |
| this.lastEventTime = submissionTime; |
| |
| //for complete application, no active stages/tasks |
| job.setNumActiveStages(0); |
| job.setNumActiveTasks(0); |
| |
| this.jobs.put(jobId, job); |
| this.jobStageMap.put(jobId, new HashSet<String>()); |
| |
| JSONArray stages = JSONUtils.getJSONArray(event, "Stage Infos"); |
| int stagesSize = (stages == null ? 0 : stages.size()); |
| job.setNumStages(stagesSize); |
| for (int i = 0; i < stagesSize; i++) { |
| JSONObject stageInfo = (JSONObject) stages.get(i); |
| int stageId = JSONUtils.getInt(stageInfo, "Stage ID"); |
| int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID"); |
| String stageName = JSONUtils.getString(stageInfo, "Stage Name"); |
| int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks"); |
| this.initiateStage(jobId, stageId, stageAttemptId, stageName, numTasks); |
| } |
| } |
| |
| private void handleStageSubmit(JSONObject event) { |
| JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info"); |
| int stageId = JSONUtils.getInt(stageInfo, "Stage ID"); |
| int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID"); |
| String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId)); |
| stageTaskStatusMap.put(key, new HashMap<Integer, Boolean>()); |
| |
| if (!stages.containsKey(key)) { |
| //may be further attempt for one stage |
| String baseAttempt = this.generateStageKey(Integer.toString(stageId), "0"); |
| if (stages.containsKey(baseAttempt)) { |
| SparkStage stage = stages.get(baseAttempt); |
| String jobId = stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()); |
| |
| String stageName = JSONUtils.getString(event, "Stage Name"); |
| int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks"); |
| this.initiateStage(Integer.parseInt(jobId), stageId, stageAttemptId, stageName, numTasks); |
| } |
| } |
| } |
| |
| private void handleStageComplete(JSONObject event) { |
| JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info"); |
| int stageId = JSONUtils.getInt(stageInfo, "Stage ID"); |
| int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID"); |
| String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId)); |
| SparkStage stage = stages.get(key); |
| |
| // If "Submission Time" is not available, use the "Launch Time" of "Task ID" = 0. |
| Long submissionTime = JSONUtils.getLong(stageInfo, "Submission Time", firstTaskLaunchTime); |
| |
| stage.setSubmitTime(submissionTime); |
| |
| long completeTime = JSONUtils.getLong(stageInfo, "Completion Time", lastEventTime); |
| stage.setCompleteTime(completeTime); |
| this.lastEventTime = completeTime; |
| |
| if (stageInfo != null && stageInfo.containsKey("Failure Reason")) { |
| stage.setStatus(SparkEntityConstant.SparkStageStatus.FAILED.toString()); |
| } else { |
| stage.setStatus(SparkEntityConstant.SparkStageStatus.COMPLETE.toString()); |
| } |
| } |
| |
| private void handleExecutorRemoved(JSONObject event) { |
| String executorID = JSONUtils.getString(event, "Executor ID"); |
| SparkExecutor executor = executors.get(executorID); |
| long removedTime = JSONUtils.getLong(event, "Timestamp", lastEventTime); |
| executor.setEndTime(removedTime); |
| this.lastEventTime = removedTime; |
| } |
| |
| private void handleJobEnd(JSONObject event) { |
| int jobId = JSONUtils.getInt(event, "Job ID"); |
| SparkJob job = jobs.get(jobId); |
| |
| long completionTime = JSONUtils.getLong(event, "Completion Time", lastEventTime); |
| job.setCompletionTime(completionTime); |
| this.lastEventTime = completionTime; |
| |
| JSONObject jobResult = JSONUtils.getJSONObject(event, "Job Result"); |
| String result = JSONUtils.getString(jobResult, "Result"); |
| if (result.equalsIgnoreCase("JobSucceeded")) { |
| job.setStatus(SparkEntityConstant.SparkJobStatus.SUCCEEDED.toString()); |
| } else { |
| job.setStatus(SparkEntityConstant.SparkJobStatus.FAILED.toString()); |
| } |
| } |
| |
| private void handleAppEnd(JSONObject event) { |
| long endTime = JSONUtils.getLong(event, "Timestamp", lastEventTime); |
| app.setEndTime(endTime); |
| this.lastEventTime = endTime; |
| } |
| |
| public void clearReader() throws Exception { |
| //clear tasks |
| for (SparkTask task : tasks.values()) { |
| LOG.info("Task {} does not have result or no task metrics.", task.getTaskId()); |
| task.setFailed(true); |
| aggregateToStage(task); |
| aggregateToExecutor(task); |
| this.flushEntities(task, false); |
| } |
| |
| List<SparkStage> needStoreStages = new ArrayList<>(); |
| for (SparkStage stage : this.stages.values()) { |
| int jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString())); |
| if (stage.getSubmitTime() == 0 || stage.getCompleteTime() == 0) { |
| SparkJob job = this.jobs.get(jobId); |
| job.setNumSkippedStages(job.getNumSkippedStages() + 1); |
| job.setNumSkippedTasks(job.getNumSkippedTasks() + stage.getNumTasks()); |
| } else { |
| this.aggregateToJob(stage); |
| this.aggregateStageToApp(stage); |
| needStoreStages.add(stage); |
| } |
| String stageId = stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()); |
| String stageAttemptId = stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()); |
| this.jobStageMap.get(jobId).remove(this.generateStageKey(stageId, stageAttemptId)); |
| } |
| |
| this.flushEntities(needStoreStages, false); |
| for (SparkJob job : jobs.values()) { |
| this.aggregateJobToApp(job); |
| } |
| this.flushEntities(jobs.values(), false); |
| |
| app.setExecutors(executors.values().size()); |
| |
| long executorMemory = Utils.parseMemory((String) this.getConfigVal(this.app.getConfig(), "spark.executor.memory", String.class.getName())); |
| long driverMemory = Utils.parseMemory(this.isClientMode(app.getConfig()) |
| ? (String) this.getConfigVal(this.app.getConfig(), "spark.yarn.am.memory", String.class.getName()) |
| : (String) this.getConfigVal(app.getConfig(), "spark.driver.memory", String.class.getName())); |
| |
| int executorCore = (Integer) this.getConfigVal(app.getConfig(), "spark.executor.cores", Integer.class.getName()); |
| int driverCore = this.isClientMode(app.getConfig()) |
| ? (Integer) this.getConfigVal(app.getConfig(), "spark.yarn.am.cores", Integer.class.getName()) |
| : (Integer) this.getConfigVal(app.getConfig(), "spark.driver.cores", Integer.class.getName()); |
| |
| long executorMemoryOverhead = this.getMemoryOverhead(app.getConfig(), executorMemory, "spark.yarn.executor.memoryOverhead"); |
| long driverMemoryOverhead = this.isClientMode(app.getConfig()) |
| ? this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.am.memoryOverhead") |
| : this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.driver.memoryOverhead"); |
| |
| app.setExecMemoryBytes(executorMemory); |
| app.setDriveMemoryBytes(driverMemory); |
| app.setExecutorCores(executorCore); |
| app.setDriverCores(driverCore); |
| app.setExecutorMemoryOverhead(executorMemoryOverhead); |
| app.setDriverMemoryOverhead(driverMemoryOverhead); |
| |
| for (SparkExecutor executor : executors.values()) { |
| String executorID = executor.getTags().get(SparkJobTagName.SPARK_EXECUTOR_ID.toString()); |
| if (executorID.equalsIgnoreCase("driver")) { |
| executor.setExecMemoryBytes(driverMemory); |
| executor.setCores(driverCore); |
| executor.setMemoryOverhead(driverMemoryOverhead); |
| } else { |
| executor.setExecMemoryBytes(executorMemory); |
| executor.setCores(executorCore); |
| executor.setMemoryOverhead(executorMemoryOverhead); |
| } |
| if (app.getEndTime() <= 0L) { |
| app.setEndTime(this.lastEventTime); |
| } |
| if (executor.getEndTime() <= 0L) { |
| executor.setEndTime(app.getEndTime()); |
| } |
| this.aggregateExecutorToApp(executor); |
| } |
| this.flushEntities(executors.values(), false); |
| //spark code...tricky |
| app.setSkippedTasks(app.getCompleteTasks()); |
| this.flushEntities(app, true); |
| } |
| |
| private long getMemoryOverhead(JobConfig config, long executorMemory, String fieldName) { |
| long result = 0L; |
| String fieldValue = config.getConfig().get(fieldName); |
| if (fieldValue != null) { |
| try { |
| result = Utils.parseMemory(fieldValue + "m"); |
| } catch (Exception e) { |
| result = Utils.parseMemory(fieldValue); |
| } |
| } |
| |
| if (result == 0L) { |
| result = Math.max( |
| Utils.parseMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")), |
| executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 100); |
| } |
| return result; |
| } |
| |
| private void aggregateExecutorToApp(SparkExecutor executor) { |
| long totalExecutorTime = app.getTotalExecutorTime() + executor.getEndTime() - executor.getStartTime(); |
| if (totalExecutorTime < 0L) { |
| totalExecutorTime = 0L; |
| } |
| app.setTotalExecutorTime(totalExecutorTime); |
| } |
| |
| private void aggregateJobToApp(SparkJob job) { |
| //aggregate job level metrics |
| app.setNumJobs(app.getNumJobs() + 1); |
| app.setTotalTasks(app.getTotalTasks() + job.getNumTask()); |
| app.setCompleteTasks(app.getCompleteTasks() + job.getNumCompletedTasks()); |
| app.setSkippedTasks(app.getSkippedTasks() + job.getNumSkippedTasks()); |
| app.setFailedTasks(app.getFailedTasks() + job.getNumFailedTasks()); |
| app.setTotalStages(app.getTotalStages() + job.getNumStages()); |
| app.setFailedStages(app.getFailedStages() + job.getNumFailedStages()); |
| app.setSkippedStages(app.getSkippedStages() + job.getNumSkippedStages()); |
| } |
| |
| private void aggregateStageToApp(SparkStage stage) { |
| //aggregate task level metrics |
| app.setDiskBytesSpilled(app.getDiskBytesSpilled() + stage.getDiskBytesSpilled()); |
| app.setMemoryBytesSpilled(app.getMemoryBytesSpilled() + stage.getMemoryBytesSpilled()); |
| app.setExecutorRunTime(app.getExecutorRunTime() + stage.getExecutorRunTime()); |
| app.setJvmGcTime(app.getJvmGcTime() + stage.getJvmGcTime()); |
| app.setExecutorDeserializeTime(app.getExecutorDeserializeTime() + stage.getExecutorDeserializeTime()); |
| app.setResultSerializationTime(app.getResultSerializationTime() + stage.getResultSerializationTime()); |
| app.setResultSize(app.getResultSize() + stage.getResultSize()); |
| app.setInputRecords(app.getInputRecords() + stage.getInputRecords()); |
| app.setInputBytes(app.getInputBytes() + stage.getInputBytes()); |
| app.setOutputRecords(app.getOutputRecords() + stage.getOutputRecords()); |
| app.setOutputBytes(app.getOutputBytes() + stage.getOutputBytes()); |
| app.setShuffleWriteRecords(app.getShuffleWriteRecords() + stage.getShuffleWriteRecords()); |
| app.setShuffleWriteBytes(app.getShuffleWriteBytes() + stage.getShuffleWriteBytes()); |
| app.setShuffleReadRecords(app.getShuffleReadRecords() + stage.getShuffleReadRecords()); |
| app.setShuffleReadBytes(app.getShuffleReadBytes() + stage.getShuffleReadBytes()); |
| } |
| |
| private void aggregateToStage(SparkTask task) { |
| String stageId = task.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()); |
| String stageAttemptId = task.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()); |
| String key = this.generateStageKey(stageId, stageAttemptId); |
| SparkStage stage = stages.get(key); |
| |
| stage.setDiskBytesSpilled(stage.getDiskBytesSpilled() + task.getDiskBytesSpilled()); |
| stage.setMemoryBytesSpilled(stage.getMemoryBytesSpilled() + task.getMemoryBytesSpilled()); |
| stage.setExecutorRunTime(stage.getExecutorRunTime() + task.getExecutorRunTime()); |
| stage.setJvmGcTime(stage.getJvmGcTime() + task.getJvmGcTime()); |
| stage.setExecutorDeserializeTime(stage.getExecutorDeserializeTime() + task.getExecutorDeserializeTime()); |
| stage.setResultSerializationTime(stage.getResultSerializationTime() + task.getResultSerializationTime()); |
| stage.setResultSize(stage.getResultSize() + task.getResultSize()); |
| stage.setInputRecords(stage.getInputRecords() + task.getInputRecords()); |
| stage.setInputBytes(stage.getInputBytes() + task.getInputBytes()); |
| stage.setOutputRecords(stage.getOutputRecords() + task.getOutputRecords()); |
| stage.setOutputBytes(stage.getOutputBytes() + task.getOutputBytes()); |
| stage.setShuffleWriteRecords(stage.getShuffleWriteRecords() + task.getShuffleWriteRecords()); |
| stage.setShuffleWriteBytes(stage.getShuffleWriteBytes() + task.getShuffleWriteBytes()); |
| stage.setShuffleReadRecords(stage.getShuffleReadRecords() + task.getShuffleReadRecords()); |
| long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes(); |
| stage.setShuffleReadBytes(stage.getShuffleReadBytes() + taskShuffleReadBytes); |
| |
| boolean success = !task.isFailed(); |
| |
| Integer taskIndex = Integer.parseInt(task.getTags().get(SparkJobTagName.SPARK_TASK_INDEX.toString())); |
| if (stageTaskStatusMap.get(key).containsKey(taskIndex)) { |
| //has previous task attempt, retrieved from task index in one stage |
| boolean previousResult = stageTaskStatusMap.get(key).get(taskIndex); |
| success = previousResult || success; |
| if (previousResult != success) { |
| stage.setNumFailedTasks(stage.getNumFailedTasks() - 1); |
| stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1); |
| stageTaskStatusMap.get(key).put(taskIndex, success); |
| } |
| } else { |
| if (success) { |
| stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1); |
| } else { |
| stage.setNumFailedTasks(stage.getNumFailedTasks() + 1); |
| } |
| stageTaskStatusMap.get(key).put(taskIndex, success); |
| } |
| |
| } |
| |
| private void aggregateToExecutor(SparkTask task) { |
| String executorId = task.getExecutorId(); |
| SparkExecutor executor = executors.get(executorId); |
| |
| if (null != executor) { |
| executor.setTotalTasks(executor.getTotalTasks() + 1); |
| if (task.isFailed()) { |
| executor.setFailedTasks(executor.getFailedTasks() + 1); |
| } else { |
| executor.setCompletedTasks(executor.getCompletedTasks() + 1); |
| } |
| long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes(); |
| executor.setTotalShuffleRead(executor.getTotalShuffleRead() + taskShuffleReadBytes); |
| executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime()); |
| executor.setTotalInputBytes(executor.getTotalInputBytes() + task.getInputBytes()); |
| executor.setTotalShuffleWrite(executor.getTotalShuffleWrite() + task.getShuffleWriteBytes()); |
| executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime()); |
| } |
| |
| } |
| |
| private void aggregateToJob(SparkStage stage) { |
| int jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString())); |
| SparkJob job = jobs.get(jobId); |
| job.setNumCompletedTasks(job.getNumCompletedTasks() + stage.getNumCompletedTasks()); |
| job.setNumFailedTasks(job.getNumFailedTasks() + stage.getNumFailedTasks()); |
| job.setNumTask(job.getNumTask() + stage.getNumTasks()); |
| |
| |
| if (stage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) { |
| //if multiple attempts succeed, just count one |
| if (!hasStagePriorAttemptSuccess(stage)) { |
| job.setNumCompletedStages(job.getNumCompletedStages() + 1); |
| } |
| } else { |
| job.setNumFailedStages(job.getNumFailedStages() + 1); |
| } |
| } |
| |
| private boolean hasStagePriorAttemptSuccess(SparkStage stage) { |
| int stageAttemptId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString())); |
| for (int i = 0; i < stageAttemptId; i++) { |
| SparkStage previousStage = stages.get(this.generateStageKey( |
| stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()), Integer.toString(i))); |
| if (previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| |
| private String generateStageKey(String stageId, String stageAttemptId) { |
| return stageId + "-" + stageAttemptId; |
| } |
| |
| private void initiateStage(int jobId, int stageId, int stageAttemptId, String name, int numTasks) { |
| SparkStage stage = new SparkStage(); |
| stage.setTags(new HashMap<>(this.app.getTags())); |
| stage.setTimestamp(app.getTimestamp()); |
| stage.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), Integer.toString(jobId)); |
| stage.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Integer.toString(stageId)); |
| stage.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Integer.toString(stageAttemptId)); |
| stage.setName(name); |
| stage.setNumActiveTasks(0); |
| stage.setNumTasks(numTasks); |
| stage.setSchedulingPool(this.app.getConfig().getConfig().get("spark.scheduler.pool") == null |
| ? "default" : this.app.getConfig().getConfig().get("spark.scheduler.pool")); |
| |
| String stageKey = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId)); |
| stages.put(stageKey, stage); |
| this.jobStageMap.get(jobId).add(stageKey); |
| } |
| |
| |
| private SparkExecutor initiateExecutor(String executorID, long startTime) { |
| if (!executors.containsKey(executorID)) { |
| SparkExecutor executor = new SparkExecutor(); |
| executor.setTags(new HashMap<>(this.app.getTags())); |
| executor.getTags().put(SparkJobTagName.SPARK_EXECUTOR_ID.toString(), executorID); |
| executor.setStartTime(startTime); |
| executor.setTimestamp(app.getTimestamp()); |
| |
| this.executors.put(executorID, executor); |
| } |
| |
| return this.executors.get(executorID); |
| } |
| |
| private String getNormalizedName(String jobName, String assignedName) { |
| if (null != assignedName) { |
| return assignedName; |
| } else { |
| return JobNameNormalization.getInstance(this.config.getConfig()).normalize(jobName); |
| } |
| } |
| |
| private void flushEntities(Object entity, boolean forceFlush) { |
| this.flushEntities(Collections.singletonList(entity), forceFlush); |
| } |
| |
| private void flushEntities(Collection entities, boolean forceFlush) { |
| this.createEntities.addAll(entities); |
| |
| if (forceFlush || this.createEntities.size() >= config.eagleInfo.flushLimit) { |
| try { |
| this.doFlush(this.createEntities); |
| this.createEntities.clear(); |
| } catch (Exception e) { |
| LOG.error("Fail to flush entities", e); |
| } |
| |
| } |
| } |
| |
| private EagleServiceBaseClient initiateClient() { |
| client = new EagleServiceClientImpl(config.eagleInfo.host, |
| config.eagleInfo.port, |
| config.eagleInfo.basePath, |
| config.eagleInfo.username, |
| config.eagleInfo.password); |
| int timeout = config.eagleInfo.timeout; |
| client.setReadTimeout(timeout * 1000); |
| |
| return client; |
| } |
| |
| private void doFlush(List entities) throws IOException, EagleServiceClientException { |
| client.create(entities); |
| int size = (entities == null ? 0 : entities.size()); |
| LOG.info("finish flushing entities of total number " + size); |
| } |
| } |