| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.falcon.logging; |
| |
| import org.apache.commons.cli.CommandLine; |
| import org.apache.commons.cli.GnuParser; |
| import org.apache.commons.cli.Option; |
| import org.apache.commons.cli.Options; |
| import org.apache.commons.cli.ParseException; |
| import org.apache.falcon.entity.v0.EntityType; |
| import org.apache.hadoop.conf.Configured; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.mapred.JobClient; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.JobID; |
| import org.apache.hadoop.mapred.RunningJob; |
| import org.apache.hadoop.mapred.TaskCompletionEvent; |
| import org.apache.hadoop.util.Tool; |
| import org.apache.hadoop.util.ToolRunner; |
| import org.apache.log4j.Logger; |
| import org.apache.oozie.client.OozieClient; |
| import org.apache.oozie.client.OozieClientException; |
| import org.apache.oozie.client.WorkflowAction; |
| import org.apache.oozie.client.WorkflowJob; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.net.URL; |
| import java.net.URLConnection; |
| import java.util.List; |
| |
| /** |
| * Utitlity called in the post process of oozie workflow to move oozie action executor log. |
| */ |
| public class LogMover extends Configured implements Tool { |
| |
| private static final Logger LOG = Logger.getLogger(LogMover.class); |
| |
| /** |
| * Args to the command. |
| */ |
| private static class ARGS { |
| private String oozieUrl; |
| private String subflowId; |
| private String runId; |
| private String logDir; |
| private String entityType; |
| } |
| |
| public static void main(String[] args) throws Exception { |
| ToolRunner.run(new LogMover(), args); |
| } |
| |
| @Override |
| public int run(String[] arguments) throws Exception { |
| try { |
| ARGS args = new ARGS(); |
| setupArgs(arguments, args); |
| OozieClient client = new OozieClient(args.oozieUrl); |
| WorkflowJob jobInfo; |
| try { |
| jobInfo = client.getJobInfo(args.subflowId); |
| } catch (OozieClientException e) { |
| LOG.error("Error getting jobinfo for: " + args.subflowId, e); |
| return 0; |
| } |
| Path path = new Path(args.logDir + "/" |
| + String.format("%03d", Integer.parseInt(args.runId))); |
| |
| FileSystem fs = path.getFileSystem(getConf()); |
| |
| if (args.entityType.equalsIgnoreCase(EntityType.FEED.name())) { |
| // if replication wf |
| copyOozieLog(client, fs, path, jobInfo.getId()); |
| copyTTlogs(fs, path, jobInfo.getActions().get(2)); |
| } else { |
| // if process wf |
| String subflowId = jobInfo.getExternalId(); |
| copyOozieLog(client, fs, path, subflowId); |
| WorkflowJob subflowInfo = client.getJobInfo(subflowId); |
| List<WorkflowAction> actions = subflowInfo.getActions(); |
| for (WorkflowAction action : actions) { |
| if (action.getType().equals("pig") |
| || action.getType().equals("java")) { |
| copyTTlogs(fs, path, action); |
| } else { |
| LOG.info("Ignoring hadoop TT log for non-pig and non-java action:" |
| + action.getName()); |
| } |
| } |
| |
| } |
| |
| } catch (Exception e) { |
| LOG.error("Exception in log mover:", e); |
| } |
| return 0; |
| } |
| |
| private void copyOozieLog(OozieClient client, FileSystem fs, Path path, |
| String id) throws OozieClientException, IOException { |
| InputStream in = new ByteArrayInputStream(client.getJobLog(id).getBytes()); |
| OutputStream out = fs.create(new Path(path, "oozie.log")); |
| IOUtils.copyBytes(in, out, 4096, true); |
| LOG.info("Copied oozie log to " + path); |
| } |
| |
| private void copyTTlogs(FileSystem fs, Path path, |
| WorkflowAction action) throws Exception { |
| String ttLogURL = getTTlogURL(action.getExternalId()); |
| if (ttLogURL != null) { |
| LOG.info("Fetching log for action: " + action.getExternalId() |
| + " from url: " + ttLogURL); |
| InputStream in = getURLinputStream(new URL(ttLogURL)); |
| OutputStream out = fs.create(new Path(path, action.getName() + "_" |
| + getMappedStatus(action.getStatus()) + ".log")); |
| IOUtils.copyBytes(in, out, 4096, true); |
| LOG.info("Copied log to " + path); |
| } |
| } |
| |
| private String getMappedStatus(WorkflowAction.Status status) { |
| if (status == WorkflowAction.Status.FAILED |
| || status == WorkflowAction.Status.KILLED |
| || status == WorkflowAction.Status.ERROR) { |
| return "FAILED"; |
| } else { |
| return "SUCCEEDED"; |
| } |
| } |
| |
| private void setupArgs(String[] arguments, ARGS args) throws ParseException { |
| Options options = new Options(); |
| Option opt; |
| opt = new Option("workflowEngineUrl", true, |
| "url of workflow engine, ex:oozie"); |
| opt.setRequired(true); |
| options.addOption(opt); |
| opt = new Option("subflowId", true, "external id of userworkflow"); |
| opt.setRequired(true); |
| options.addOption(opt); |
| opt = new Option("runId", true, "current workflow's runid"); |
| opt.setRequired(true); |
| options.addOption(opt); |
| opt = new Option("logDir", true, "log dir where job logs are stored"); |
| opt.setRequired(true); |
| options.addOption(opt); |
| opt = new Option("status", true, "user workflow status"); |
| opt.setRequired(true); |
| options.addOption(opt); |
| opt = new Option("entityType", true, "entity type feed or process"); |
| opt.setRequired(true); |
| options.addOption(opt); |
| |
| CommandLine cmd = new GnuParser().parse(options, arguments); |
| |
| args.oozieUrl = cmd.getOptionValue("workflowEngineUrl"); |
| args.subflowId = cmd.getOptionValue("subflowId"); |
| args.runId = cmd.getOptionValue("runId"); |
| args.logDir = cmd.getOptionValue("logDir"); |
| args.entityType = cmd.getOptionValue("entityType"); |
| } |
| |
| private String getTTlogURL(String jobId) throws Exception { |
| JobConf jobConf = new JobConf(getConf()); |
| JobClient jobClient = new JobClient(jobConf); |
| RunningJob job = jobClient.getJob(JobID.forName(jobId)); |
| if (job == null) { |
| LOG.warn("No running job for job id: " + jobId); |
| return null; |
| } |
| TaskCompletionEvent[] tasks = job.getTaskCompletionEvents(0); |
| // 0th even is setup, 1 event is launcher, 2 event is cleanup |
| if (tasks != null && tasks.length == 3 && tasks[1] != null) { |
| return tasks[1].getTaskTrackerHttp() + "/tasklog?attemptid=" |
| + tasks[1].getTaskAttemptId() + "&all=true"; |
| } else { |
| LOG.warn("No running task for job: " + jobId); |
| } |
| return null; |
| } |
| |
| private InputStream getURLinputStream(URL url) throws IOException { |
| URLConnection connection = url.openConnection(); |
| connection.setDoOutput(true); |
| connection.connect(); |
| return connection.getInputStream(); |
| } |
| } |