blob: 758c5c25982b94e984d172707c4636bae62448c7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.command.coord;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringReader;
import java.util.Date;
import com.google.common.base.Charsets;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.ConfigUtils;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
import org.eclipse.jgit.diff.DiffFormatter;
import org.eclipse.jgit.diff.EditList;
import org.eclipse.jgit.diff.HistogramDiff;
import org.eclipse.jgit.diff.RawText;
import org.eclipse.jgit.diff.RawTextComparator;
import org.jdom.Element;
/**
* This class provides the functionalities to update coordinator job XML and properties. It uses CoordSubmitXCommand
* functionality to validate XML and resolve all the variables or properties using job configurations.
*/
public class CoordUpdateXCommand extends CoordSubmitXCommand {
private final String jobId;
private boolean showDiff = true;
private boolean isConfChange = false;
//This properties are set in coord jobs by bundle. An update command should not overide it.
final static String[] bundleConfList = new String[] { OozieClient.BUNDLE_ID };
StringBuffer diff = new StringBuffer();
CoordinatorJobBean oldCoordJob = null;
public CoordUpdateXCommand(boolean dryrun, Configuration conf, String jobId) {
super(dryrun, conf);
this.jobId = jobId;
isConfChange = conf.size() != 0;
}
public CoordUpdateXCommand(boolean dryrun, Configuration conf, String jobId, boolean showDiff) {
super(dryrun, conf);
this.jobId = jobId;
this.showDiff = showDiff;
isConfChange = conf.size() != 0;
}
@Override
protected String storeToDB(String xmlElement, Element eJob, CoordinatorJobBean coordJob) throws CommandException {
check(oldCoordJob, coordJob);
ConfigUtils.checkAndSetDisallowedProperties(conf,
this.oldCoordJob.getUser(),
new CommandException(ErrorCode.E1003,
String.format("%s=%s", OozieClient.USER_NAME, conf.get(OozieClient.USER_NAME))),
true);
computeDiff(eJob);
oldCoordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH));
if (isConfChange) {
oldCoordJob.setConf(XmlUtils.prettyPrint(conf).toString());
}
oldCoordJob.setMatThrottling(coordJob.getMatThrottling());
oldCoordJob.setOrigJobXml(xmlElement);
oldCoordJob.setConcurrency(coordJob.getConcurrency());
oldCoordJob.setExecution(coordJob.getExecution());
oldCoordJob.setTimeout(coordJob.getTimeout());
oldCoordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString());
if (!dryrun) {
oldCoordJob.setLastModifiedTime(new Date());
// Should log the changes, this should be useful for debugging.
LOG.info("Coord update changes : " + diff.toString());
try {
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, oldCoordJob);
}
catch (JPAExecutorException jpaee) {
throw new CommandException(jpaee);
}
}
return jobId;
}
@Override
protected void loadState() throws CommandException {
super.loadState();
jpaService = Services.get().get(JPAService.class);
if (jpaService == null) {
throw new CommandException(ErrorCode.E0610);
}
coordJob = new CoordinatorJobBean();
try {
oldCoordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId);
}
catch (JPAExecutorException e) {
throw new CommandException(e);
}
LogUtils.setLogInfo(oldCoordJob);
if (!isConfChange || StringUtils.isEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH))) {
try {
XConfiguration jobConf = new XConfiguration(new StringReader(oldCoordJob.getConf()));
if (!isConfChange) {
conf = jobConf;
}
else {
for (String bundleConfKey : bundleConfList) {
if (jobConf.get(bundleConfKey) != null) {
conf.set(bundleConfKey, jobConf.get(bundleConfKey));
}
}
if (StringUtils.isEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH))) {
conf.set(OozieClient.COORDINATOR_APP_PATH, jobConf.get(OozieClient.COORDINATOR_APP_PATH));
}
}
}
catch (Exception e) {
throw new CommandException(ErrorCode.E1023, e.getMessage(), e);
}
}
coordJob.setConf(XmlUtils.prettyPrint(conf).toString());
setJob(coordJob);
LogUtils.setLogInfo(coordJob);
}
@Override
protected void verifyPrecondition() throws CommandException {
if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED
|| coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR) {
LOG.info("Can't update coord job. Job has finished processing");
throw new CommandException(ErrorCode.E1023, "Can't update coord job. Job has finished processing");
}
}
/**
* Gets the difference of job definition and properties.
*
* @param eJob the e job
* @return the diff
*/
private void computeDiff(Element eJob) {
try {
diff.append("**********Job definition changes**********").append(System.getProperty("line.separator"));
diff.append(getDiffinGitFormat(oldCoordJob.getJobXml(), XmlUtils.prettyPrint(eJob).toString()));
diff.append("******************************************").append(System.getProperty("line.separator"));
diff.append("**********Job conf changes****************").append(System.getProperty("line.separator"));
if (isConfChange) {
diff.append(getDiffinGitFormat(oldCoordJob.getConf(), XmlUtils.prettyPrint(conf).toString()));
}
else {
diff.append("No conf update requested").append(System.getProperty("line.separator"));
}
diff.append("******************************************").append(System.getProperty("line.separator"));
}
catch (IOException e) {
diff.append("Error computing diff. Error " + e.getMessage());
LOG.warn("Error computing diff.", e);
}
}
/**
* Get the differences in git format.
*
* @param string1 the string1
* @param string2 the string2
* @return the diff
* @throws IOException Signals that an I/O exception has occurred.
*/
private String getDiffinGitFormat(String string1, String string2) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
RawText rt1 = new RawText(string1.getBytes(Charsets.UTF_8));
RawText rt2 = new RawText(string2.getBytes(Charsets.UTF_8));
EditList diffList = new EditList();
diffList.addAll(new HistogramDiff().diff(RawTextComparator.DEFAULT, rt1, rt2));
new DiffFormatter(out).format(diffList, rt1, rt2);
return out.toString(Charsets.UTF_8.name());
}
@Override
protected String submit() throws CommandException {
LOG.info("STARTED Coordinator update");
submitJob();
LOG.info("ENDED Coordinator update");
if (showDiff) {
return diff.toString();
}
else {
return "";
}
}
/**
* Check. Frequency can't be changed. EndTime can't be changed. StartTime can't be changed. AppName can't be changed
* Timeunit can't be changed. Timezone can't be changed
*
* @param oldCoord the old coord
* @param newCoord the new coord
* @throws CommandException the command exception
*/
public void check(CoordinatorJobBean oldCoord, CoordinatorJobBean newCoord) throws CommandException {
if (!oldCoord.getFrequency().equals(newCoord.getFrequency())) {
throw new CommandException(ErrorCode.E1023, "Frequency can't be changed. Old frequency = "
+ oldCoord.getFrequency() + " new frequency = " + newCoord.getFrequency());
}
if (!oldCoord.getEndTime().equals(newCoord.getEndTime())) {
throw new CommandException(ErrorCode.E1023, "End time can't be changed. Old end time = "
+ oldCoord.getEndTime() + " new end time = " + newCoord.getEndTime());
}
if (!oldCoord.getStartTime().equals(newCoord.getStartTime())) {
throw new CommandException(ErrorCode.E1023, "Start time can't be changed. Old start time = "
+ oldCoord.getStartTime() + " new start time = " + newCoord.getStartTime());
}
if (!oldCoord.getAppName().equals(newCoord.getAppName())) {
throw new CommandException(ErrorCode.E1023, "Coord name can't be changed. Old name = "
+ oldCoord.getAppName() + " new name = " + newCoord.getAppName());
}
if (!oldCoord.getTimeUnitStr().equals(newCoord.getTimeUnitStr())) {
throw new CommandException(ErrorCode.E1023, "Timeunit can't be changed. Old Timeunit = "
+ oldCoord.getTimeUnitStr() + " new Timeunit = " + newCoord.getTimeUnitStr());
}
if (!oldCoord.getTimeZone().equals(newCoord.getTimeZone())) {
throw new CommandException(ErrorCode.E1023, "TimeZone can't be changed. Old timeZone = "
+ oldCoord.getTimeZone() + " new timeZone = " + newCoord.getTimeZone());
}
}
@Override
protected void queueMaterializeTransitionXCommand(String jobId) {
}
@Override
public void notifyParent() throws CommandException {
}
@Override
protected boolean isLockRequired() {
return true;
}
@Override
public String getEntityKey() {
return jobId;
}
@Override
public void transitToNext() {
}
@Override
public String getKey() {
return getName() + "_" + jobId;
}
@Override
public String getDryRun(CoordinatorJobBean job) throws Exception{
return super.getDryRun(oldCoordJob);
}
}