blob: f68d3a3a52228f32bd228db5dd2b05cd9320a3e0 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.oozie.action.hadoop;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.dependency.FSURIHandler;
import org.apache.oozie.dependency.URIHandler;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UserGroupInformationService;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
* File system action executor. <p> This executes the file system mkdir, move and delete commands
public class FsActionExecutor extends ActionExecutor {
public static final String ACTION_TYPE = "fs";
public static final String FS_OP_MKDIR = "mkdir";
public static final String FS_OP_DELETE = "delete";
public static final String FS_OP_MOVE = "move";
public static final String FS_OP_CHMOD = "chmod";
public static final String FS_OP_TOUCHZ = "touchz";
public static final String FS_OP_CHGRP = "chgrp";
public static final String FS_OP_SETREP = "setrep";
public static final String FS_TAG_PATH = "path";
public static final String FS_TAG_SOURCE = "source";
public static final String FS_TAG_TARGET = "target";
public static final String FS_TAG_RECURSIVE = "recursive";
public static final String FS_TAG_DIRFILES = "dir-files";
public static final String FS_TAG_SKIPTRASH = "skip-trash";
public static final String FS_TAG_PERMISSIONS = "permissions";
private final int maxGlobCount;
private final XLog LOG = XLog.getLog(getClass());
public FsActionExecutor() {
maxGlobCount = ConfigurationService.getInt(LauncherAMUtils.CONF_OOZIE_ACTION_FS_GLOB_MAX);
* Initialize Action.
public void initActionType() {
registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.ERROR, "FS014");
Path getPath(Element element, String attribute) {
String str = element.getAttributeValue(attribute).trim();
return new Path(str);
void validatePath(Path path, boolean withScheme) throws ActionExecutorException {
try {
String scheme = path.toUri().getScheme();
if (withScheme) {
if (scheme == null) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS001",
"Missing scheme in path [{0}]", path);
else {
else {
if (scheme != null) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS002",
"Scheme [{0}] not allowed in path [{1}]", scheme, path);
catch (HadoopAccessorException hex) {
throw convertException(hex);
Path resolveToFullPath(Path nameNode, Path path, boolean withScheme) throws ActionExecutorException {
Path fullPath;
// If no nameNode is given, validate the path as-is and return it as-is
if (nameNode == null) {
validatePath(path, withScheme);
fullPath = path;
} else {
// If the path doesn't have a scheme or authority, use the nameNode which should have already been verified earlier
String pathScheme = path.toUri().getScheme();
String pathAuthority = path.toUri().getAuthority();
if (pathScheme == null || pathAuthority == null) {
if (path.isAbsolute()) {
String nameNodeSchemeAuthority = nameNode.toUri().getScheme() + "://" + nameNode.toUri().getAuthority();
fullPath = new Path(nameNodeSchemeAuthority + path.toString());
} else {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS011",
"Path [{0}] cannot be relative", path);
} else {
// If the path has a scheme and authority, but its not the nameNode then validate the path as-is and return it as-is
// If it is the nameNode, then it should have already been verified earlier so return it as-is
if (!nameNode.toUri().getScheme().equals(pathScheme) || !nameNode.toUri().getAuthority().equals(pathAuthority)) {
validatePath(path, withScheme);
fullPath = path;
return fullPath;
void validateSameNN(Path source, Path dest) throws ActionExecutorException {
Path destPath = new Path(source, dest);
String t = destPath.toUri().getScheme() + destPath.toUri().getAuthority();
String s = source.toUri().getScheme() + source.toUri().getAuthority();
//checking whether NN prefix of source and target is same. can modify this to adjust for a set of multiple whitelisted NN
if(!t.equals(s)) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS007",
"move, target NN URI different from that of source", dest);
void doOperations(Context context, Element element) throws ActionExecutorException {
try {
FileSystem fs = context.getAppFileSystem();
boolean recovery = fs.exists(getRecoveryPath(context));
if (!recovery) {
Path nameNodePath = null;
Element nameNodeElement = element.getChild("name-node", element.getNamespace());
if (nameNodeElement != null) {
String nameNode = nameNodeElement.getTextTrim();
if (nameNode != null) {
nameNodePath = new Path(nameNode);
// Verify the name node now
validatePath(nameNodePath, true);
XConfiguration fsConf = new XConfiguration();
Path appPath = new Path(context.getWorkflow().getAppPath());
// app path could be a file
if (fs.isFile(appPath)) {
appPath = appPath.getParent();
JavaActionExecutor.parseJobXmlAndConfiguration(context, element, appPath, fsConf);
for (Element commandElement : (List<Element>) element.getChildren()) {
String command = commandElement.getName();
switch (command) {
doMkdirOperation(context, fsConf, nameNodePath, commandElement);
doDeleteOperation(context, fsConf, nameNodePath, commandElement);
case FS_OP_MOVE:
doMoveOperation(context, fsConf, nameNodePath, commandElement, recovery);
doChmodOperation(context, fsConf, nameNodePath, commandElement);
doTouchzOperation(context, fsConf, nameNodePath, commandElement);
doChgrpOperation(context, fsConf, nameNodePath, commandElement);
doSetrepOperation(context, commandElement);
LOG.warn("Fs Action don't support [{0}] operation for the time being ", command);
} catch (Exception ex) {
throw convertException(ex);
private void doMkdirOperation(Context context, XConfiguration fsConf, Path nameNodePath,
Element commandElement) throws ActionExecutorException {
Path path = getPath(commandElement, FS_TAG_PATH);
mkdir(context, fsConf, nameNodePath, path);
private void doDeleteOperation(Context context, XConfiguration fsConf, Path nameNodePath,
Element commandElement) throws ActionExecutorException {
Path path = getPath(commandElement, FS_TAG_PATH);
boolean skipTrash = true;
String skipTrashTag = commandElement.getAttributeValue(FS_TAG_SKIPTRASH);
if (skipTrashTag != null && skipTrashTag.equals("false")) {
skipTrash = false;
delete(context, fsConf, nameNodePath, path, skipTrash);
private void doMoveOperation(Context context, XConfiguration fsConf, Path nameNodePath,
Element commandElement, boolean recovery) throws ActionExecutorException {
Path source = getPath(commandElement, FS_TAG_SOURCE);
Path target = getPath(commandElement, FS_TAG_TARGET);
move(context, fsConf, nameNodePath, source, target, recovery);
private void doChmodOperation(Context context, XConfiguration fsConf, Path nameNodePath,
Element commandElement) throws ActionExecutorException {
Path path = getPath(commandElement, FS_TAG_PATH);
String permissionsTag = commandElement.getAttributeValue(FS_TAG_PERMISSIONS).trim();
String dirFilesTag = commandElement.getAttributeValue(FS_TAG_DIRFILES);
boolean dirFiles = (dirFilesTag == null) || Boolean.parseBoolean(dirFilesTag);
boolean recursive = commandElement.getChild(FS_TAG_RECURSIVE, commandElement.getNamespace()) != null;
chmod(context, fsConf, nameNodePath, path, permissionsTag, dirFiles, recursive);
private void doTouchzOperation(Context context, XConfiguration fsConf, Path nameNodePath,
Element commandElement) throws ActionExecutorException {
Path path = getPath(commandElement, FS_TAG_PATH);
touchz(context, fsConf, nameNodePath, path);
private void doChgrpOperation(Context context, XConfiguration fsConf, Path nameNodePath,
Element commandElement) throws ActionExecutorException {
Path path = getPath(commandElement, FS_TAG_PATH);
String groupTag = commandElement.getAttributeValue("group");
String dirFilesTag = commandElement.getAttributeValue(FS_TAG_DIRFILES);
boolean dirFiles = (dirFilesTag == null) || Boolean.parseBoolean(dirFilesTag);
boolean recursive = commandElement.getChild(FS_TAG_RECURSIVE, commandElement.getNamespace()) != null;
chgrp(context, fsConf, nameNodePath, path, context.getWorkflow().getUser(),
groupTag, dirFiles, recursive);
private void doSetrepOperation(Context context, Element commandElement)
throws ActionExecutorException, HadoopAccessorException {
Path path = getPath(commandElement, FS_TAG_PATH);
String replicationFactor = commandElement.getAttributeValue("replication-factor");
if (replicationFactor != null) {
setrep(context, path, Short.parseShort(replicationFactor));
void chgrp(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String user, String group,
boolean dirFiles, boolean recursive) throws ActionExecutorException {"Setting ownership for [{0}] to group: [{1}], user: [{2}]. Recursive mode: [{3}]", path, group, user, recursive);
HashMap<String, String> argsMap = new HashMap<String, String>();
argsMap.put("user", user);
argsMap.put("group", group);
try {
FileSystem fs = getFileSystemFor(path, context, fsConf);
path = resolveToFullPath(nameNodePath, path, true);
Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path));
if (pathArr == null || pathArr.length == 0) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009", "chgrp"
+ ", path(s) that matches [{0}] does not exist", path);
for (Path p : pathArr) {
recursiveFsOperation("chgrp", fs, nameNodePath, p, argsMap, dirFiles, recursive, true);
catch (Exception ex) {
throw convertException(ex);
private void recursiveFsOperation(String op, FileSystem fs, Path nameNodePath, Path path,
Map<String, String> argsMap, boolean dirFiles, boolean recursive, boolean isRoot)
throws ActionExecutorException {
try {
FileStatus pathStatus = fs.getFileStatus(path);
List<Path> paths = new ArrayList<Path>();
if (dirFiles && pathStatus.isDirectory()) {
if (isRoot) {
FileStatus[] filesStatus = fs.listStatus(path);
for (int i = 0; i < filesStatus.length; i++) {
Path p = filesStatus[i].getPath();
if (recursive && filesStatus[i].isDirectory()) {
recursiveFsOperation(op, fs, null, p, argsMap, dirFiles, recursive, false);
else {
for (Path p : paths) {
doFsOperation(op, fs, p, argsMap);
catch (Exception ex) {
throw convertException(ex);
private void doFsOperation(String op, FileSystem fs, Path p, Map<String, String> argsMap)
throws ActionExecutorException, IOException {
if (op.equals(FS_OP_CHMOD)) {
String permissions = argsMap.get(FS_TAG_PERMISSIONS);
FsPermission newFsPermission = createShortPermission(permissions, p);
fs.setPermission(p, newFsPermission);
else if (op.equals(FS_OP_CHGRP)) {
String user = argsMap.get("user");
String group = argsMap.get("group");
fs.setOwner(p, user, group);
* @param path file path
* @param context executor context
* @param fsConf file system configuration
* @return FileSystem
* @throws HadoopAccessorException if FS is not accessible
private FileSystem getFileSystemFor(Path path, Context context, XConfiguration fsConf) throws HadoopAccessorException {
String user = context.getWorkflow().getUser();
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
Configuration conf = has.createConfiguration(path.toUri().getAuthority());
XConfiguration.copy(context.getProtoActionConf(), conf);
if (fsConf != null) {
XConfiguration.copy(fsConf, conf);
return has.createFileSystem(user, path.toUri(), conf);
* @param path file path
* @param user user
* @return FileSystem
* @throws HadoopAccessorException if FS is not accessible
private FileSystem getFileSystemFor(Path path, String user) throws HadoopAccessorException {
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
Configuration jobConf = has.createConfiguration(path.toUri().getAuthority());
return has.createFileSystem(user, path.toUri(), jobConf);
void mkdir(Context context, Path path) throws ActionExecutorException {
mkdir(context, null, null, path);
void mkdir(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {"Creating directory [{0}]", path);
try {
path = resolveToFullPath(nameNodePath, path, true);
FileSystem fs = getFileSystemFor(path, context, fsConf);
if (!fs.exists(path)) {
if (!fs.mkdirs(path)) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS004",
"mkdir, path [{0}] could not create directory", path);
} else {"[{0}] already exist, no need for creation", path);
catch (Exception ex) {
throw convertException(ex);
* Delete path
* @param context executor context
* @param path file path
* @throws ActionExecutorException if FS error occurs
public void delete(Context context, Path path) throws ActionExecutorException {
delete(context, null, null, path, true);
* Delete path
* @param context executor context
* @param fsConf file system configuration
* @param nameNodePath configured name node path
* @param path file path
* @param skipTrash flag to skip the trash.
* @throws ActionExecutorException if FS error occurs
public void delete(Context context, XConfiguration fsConf, Path nameNodePath, Path path, boolean skipTrash)
throws ActionExecutorException {"Deleting [{0}]. Skipping trash: [{1}]", path, skipTrash);
URI uri = path.toUri();
URIHandler handler;
org.apache.oozie.dependency.URIHandler.Context hcatContext = null;
try {
handler = Services.get().get(URIHandlerService.class).getURIHandler(uri);
if (handler instanceof FSURIHandler) {
// Use legacy code to handle hdfs partition deletion
path = resolveToFullPath(nameNodePath, path, true);
final FileSystem fs = getFileSystemFor(path, context, fsConf);
Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path));
if (pathArr != null && pathArr.length > 0) {
for (final Path p : pathArr) {
if (fs.exists(p)) {
if (!skipTrash) {
// Moving directory/file to trash of user.
UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class);
UserGroupInformation ugi = ugiService.getProxyUser(fs.getConf().get(OozieClient.USER_NAME));
ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
public FileSystem run() throws Exception {
Trash trash = new Trash(fs.getConf());
if (!trash.moveToTrash(p)) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
"Could not move path [{0}] to trash on delete", p);
return null;
else if (!fs.delete(p, true)) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
"delete, path [{0}] could not delete path", p);
} else {
hcatContext = handler.getContext(uri, fsConf, context.getWorkflow().getUser(), false);
handler.delete(uri, hcatContext);
catch (Exception ex) {
throw convertException(ex);
if (hcatContext != null) {
* Delete path
* @param user user
* @param group group
* @param path file path
* @throws ActionExecutorException if FS error occurs
public void delete(String user, String group, Path path) throws ActionExecutorException {
try {
validatePath(path, true);
FileSystem fs = getFileSystemFor(path, user);
if (fs.exists(path)) {
if (!fs.delete(path, true)) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
"delete, path [{0}] could not delete path", path);
catch (Exception ex) {
throw convertException(ex);
* Move source to target
* @param context executor context
* @param source source path
* @param target target path
* @param recovery set to true if in recovery mode
* @throws ActionExecutorException if FS error occurs
public void move(Context context, Path source, Path target, boolean recovery) throws ActionExecutorException {
move(context, null, null, source, target, recovery);
* Move source to target
* @param context executor context
* @param fsConf file system configuration
* @param nameNodePath configured name node path
* @param source source path
* @param target target path
* @param recovery set to true if in recovery mode
* @throws ActionExecutorException if FS error occurs
public void move(Context context, XConfiguration fsConf, Path nameNodePath, Path source, Path target, boolean recovery)
throws ActionExecutorException {"Moving [{0}] to [{1}]", source, target);
try {
source = resolveToFullPath(nameNodePath, source, true);
validateSameNN(source, target);
FileSystem fs = getFileSystemFor(source, context, fsConf);
Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(source));
if (( pathArr == null || pathArr.length == 0 ) ){
if (!recovery) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006",
"move, source path [{0}] does not exist", source);
} else {
if (pathArr.length > 1 && (!fs.exists(target) || fs.isFile(target))) {
if(!recovery) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS012",
"move, could not rename multiple sources to the same target name");
} else {
for (Path p : pathArr) {
if (!fs.rename(p, target) && !recovery) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008",
"move, could not move [{0}] to [{1}]", p, target);
catch (Exception ex) {
throw convertException(ex);
void chmod(Context context, Path path, String permissions, boolean dirFiles, boolean recursive) throws ActionExecutorException {
chmod(context, null, null, path, permissions, dirFiles, recursive);
void chmod(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String permissions,
boolean dirFiles, boolean recursive) throws ActionExecutorException {"Setting permissions [{0}] on [{1}]. Recursive mode: [{2}]", permissions, path, recursive);
HashMap<String, String> argsMap = new HashMap<String, String>();
argsMap.put(FS_TAG_PERMISSIONS, permissions);
try {
FileSystem fs = getFileSystemFor(path, context, fsConf);
path = resolveToFullPath(nameNodePath, path, true);
Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path));
if (pathArr == null || pathArr.length == 0) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009", "chmod"
+ ", path(s) that matches [{0}] does not exist", path);
for (Path p : pathArr) {
recursiveFsOperation(FS_OP_CHMOD, fs, nameNodePath, p, argsMap, dirFiles, recursive, true);
catch (Exception ex) {
throw convertException(ex);
void touchz(Context context, Path path) throws ActionExecutorException {
touchz(context, null, null, path);
void touchz(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException { ("Performing touch on [{0}]", path);
try {
path = resolveToFullPath(nameNodePath, path, true);
FileSystem fs = getFileSystemFor(path, context, fsConf);
FileStatus st;
if (fs.exists(path)) {
st = fs.getFileStatus(path);
if (st.isDirectory()) {
throw new Exception(path.toString() + " is a directory");
} else if (st.getLen() != 0) {
throw new Exception(path.toString() + " must be a zero-length file");
FSDataOutputStream out = fs.create(path);
catch (Exception ex) {
throw convertException(ex);
FsPermission createShortPermission(String permissions, Path path) throws ActionExecutorException {
if (permissions.length() == 3) {
char user = permissions.charAt(0);
char group = permissions.charAt(1);
char other = permissions.charAt(2);
int useri = user - '0';
int groupi = group - '0';
int otheri = other - '0';
int mask = useri * 100 + groupi * 10 + otheri;
short omask = Short.parseShort(Integer.toString(mask), 8);
return new FsPermission(omask);
else {
if (permissions.length() == 10) {
return FsPermission.valueOf(permissions);
else {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS010",
"chmod, path [{0}] invalid permissions mask [{1}]", path, permissions);
public void check(Context context, WorkflowAction action) throws ActionExecutorException {
public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
public void start(Context context, WorkflowAction action) throws ActionExecutorException {"Starting action");
try {
context.setStartData("-", "-", "-");
Element actionXml = XmlUtils.parseXml(action.getConf());
doOperations(context, actionXml);
context.setExecutionData("OK", null);
catch (Exception ex) {
throw convertException(ex);
public void end(Context context, WorkflowAction action) throws ActionExecutorException {
String externalStatus = action.getExternalStatus();
WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK :
context.setEndData(status, getActionSignal(status));
if (!context.getProtoActionConf().getBoolean(WorkflowXCommand.KEEP_WF_ACTION_DIR, false)) {
try {
FileSystem fs = context.getAppFileSystem();
fs.delete(context.getActionDir(), true);
catch (Exception ex) {
throw convertException(ex);
}"Action ended with external status [{0}]", action.getExternalStatus());
public boolean isCompleted(String externalStatus) {
return true;
* @param context executor context
* @return Path returns recovery path
* @throws HadoopAccessorException if accessing file system fails
* @throws IOException in case of IO error
* @throws URISyntaxException if the processed uri is not a valid URI
public Path getRecoveryPath(Context context) throws HadoopAccessorException, IOException, URISyntaxException {
return new Path(context.getActionDir(), "fs-" + context.getRecoveryId());
private void checkGlobMax(Path[] pathArr) throws ActionExecutorException {
if(pathArr.length > maxGlobCount) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS013",
"too many globbed files/dirs to do FS operation");
void setrep(Context context, Path path, short replicationFactor)
throws ActionExecutorException, HadoopAccessorException {"Setting replication factor: [{0}] for [{1}]", replicationFactor, path);
try {
path = resolveToFullPath(null, path, true);
FileSystem fs = getFileSystemFor(path, context, null);
if (fs.isFile(path)) {
fs.setReplication(path, replicationFactor);
} catch (IOException ex) {
public boolean supportsConfigurationJobXML() {
return true;