blob: 76f51294098061c282aabf29b0074e9092bbf7e6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.test.system;
import java.io.File;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
/**
* Default DaemonProtocolAspect which is used to provide default implementation
* for all the common daemon methods. If a daemon requires more specialized
* version of method, it is responsibility of the DaemonClient to introduce the
* same in woven classes.
*
*/
public aspect DaemonProtocolAspect {
private boolean DaemonProtocol.ready;
@SuppressWarnings("unchecked")
private HashMap<Object, List<ControlAction>> DaemonProtocol.actions =
new HashMap<Object, List<ControlAction>>();
private static final Log LOG = LogFactory.getLog(
DaemonProtocolAspect.class.getName());
private static FsPermission defaultPermission = new FsPermission(
FsAction.READ_WRITE, FsAction.READ_WRITE, FsAction.READ_WRITE);
/**
* Set if the daemon process is ready or not, concrete daemon protocol should
* implement pointcuts to determine when the daemon is ready and use the
* setter to set the ready state.
*
* @param ready
* true if the Daemon is ready.
*/
public void DaemonProtocol.setReady(boolean ready) {
this.ready = ready;
}
/**
* Checks if the daemon process is alive or not.
*
* @throws IOException
* if daemon is not alive.
*/
public void DaemonProtocol.ping() throws IOException {
}
/**
* Checks if the daemon process is ready to accepting RPC connections after it
* finishes initialization. <br/>
*
* @return true if ready to accept connection.
*
* @throws IOException
*/
public boolean DaemonProtocol.isReady() throws IOException {
return ready;
}
/**
* Returns the process related information regarding the daemon process. <br/>
*
* @return process information.
* @throws IOException
*/
public ProcessInfo DaemonProtocol.getProcessInfo() throws IOException {
int activeThreadCount = Thread.activeCount();
long currentTime = System.currentTimeMillis();
long maxmem = Runtime.getRuntime().maxMemory();
long freemem = Runtime.getRuntime().freeMemory();
long totalmem = Runtime.getRuntime().totalMemory();
Map<String, String> envMap = System.getenv();
Properties sysProps = System.getProperties();
Map<String, String> props = new HashMap<String, String>();
for (Map.Entry entry : sysProps.entrySet()) {
props.put((String) entry.getKey(), (String) entry.getValue());
}
ProcessInfo info = new ProcessInfoImpl(activeThreadCount, currentTime,
freemem, maxmem, totalmem, envMap, props);
return info;
}
public void DaemonProtocol.enable(List<Enum<?>> faults) throws IOException {
}
public void DaemonProtocol.disableAll() throws IOException {
}
public abstract Configuration DaemonProtocol.getDaemonConf()
throws IOException;
public FileStatus DaemonProtocol.getFileStatus(String path, boolean local)
throws IOException {
Path p = new Path(path);
FileSystem fs = getFS(p, local);
p.makeQualified(fs);
FileStatus fileStatus = fs.getFileStatus(p);
return cloneFileStatus(fileStatus);
}
/**
* Create a file with given permissions in a file system.
* @param path - source path where the file has to create.
* @param fileName - file name.
* @param permission - file permissions.
* @param local - identifying the path whether its local or not.
* @throws IOException - if an I/O error occurs.
*/
public void DaemonProtocol.createFile(String path, String fileName,
FsPermission permission, boolean local) throws IOException {
Path p = new Path(path);
FileSystem fs = getFS(p, local);
Path filePath = new Path(path, fileName);
fs.create(filePath);
if (permission == null) {
fs.setPermission(filePath, defaultPermission);
} else {
fs.setPermission(filePath, permission);
}
fs.close();
}
/**
* Create a folder with given permissions in a file system.
* @param path - source path where the file has to be creating.
* @param folderName - folder name.
* @param permission - folder permissions.
* @param local - identifying the path whether its local or not.
* @throws IOException - if an I/O error occurs.
*/
public void DaemonProtocol.createFolder(String path, String folderName,
FsPermission permission, boolean local) throws IOException {
Path p = new Path(path);
FileSystem fs = getFS(p, local);
Path folderPath = new Path(path, folderName);
fs.mkdirs(folderPath);
if (permission == null) {
fs.setPermission(folderPath, defaultPermission);
} else {
fs.setPermission(folderPath, permission);
}
fs.close();
}
public FileStatus[] DaemonProtocol.listStatus(String path, boolean local)
throws IOException {
Path p = new Path(path);
FileSystem fs = getFS(p, local);
FileStatus[] status = fs.listStatus(p);
if (status != null) {
FileStatus[] result = new FileStatus[status.length];
int i = 0;
for (FileStatus fileStatus : status) {
result[i++] = cloneFileStatus(fileStatus);
}
return result;
}
return status;
}
/**
* FileStatus object may not be serializable. Clone it into raw FileStatus
* object.
*/
private FileStatus DaemonProtocol.cloneFileStatus(FileStatus fileStatus) {
return new FileStatus(fileStatus.getLen(),
fileStatus.isDir(),
fileStatus.getReplication(),
fileStatus.getBlockSize(),
fileStatus.getModificationTime(),
fileStatus.getAccessTime(),
fileStatus.getPermission(),
fileStatus.getOwner(),
fileStatus.getGroup(),
fileStatus.getPath());
}
private FileSystem DaemonProtocol.getFS(final Path path, final boolean local)
throws IOException {
FileSystem ret = null;
try {
ret = UserGroupInformation.getLoginUser().doAs (
new PrivilegedExceptionAction<FileSystem>() {
public FileSystem run() throws IOException {
FileSystem fs = null;
if (local) {
fs = FileSystem.getLocal(getDaemonConf());
} else {
fs = path.getFileSystem(getDaemonConf());
}
return fs;
}
});
} catch (InterruptedException ie) {
}
return ret;
}
@SuppressWarnings("unchecked")
public ControlAction[] DaemonProtocol.getActions(Writable key)
throws IOException {
synchronized (actions) {
List<ControlAction> actionList = actions.get(key);
if(actionList == null) {
return new ControlAction[0];
} else {
return (ControlAction[]) actionList.toArray(new ControlAction[actionList
.size()]);
}
}
}
@SuppressWarnings("unchecked")
public void DaemonProtocol.sendAction(ControlAction action)
throws IOException {
synchronized (actions) {
List<ControlAction> actionList = actions.get(action.getTarget());
if(actionList == null) {
actionList = new ArrayList<ControlAction>();
actions.put(action.getTarget(), actionList);
}
actionList.add(action);
}
}
@SuppressWarnings("unchecked")
public boolean DaemonProtocol.isActionPending(ControlAction action)
throws IOException{
synchronized (actions) {
List<ControlAction> actionList = actions.get(action.getTarget());
if(actionList == null) {
return false;
} else {
return actionList.contains(action);
}
}
}
@SuppressWarnings("unchecked")
public void DaemonProtocol.removeAction(ControlAction action)
throws IOException {
synchronized (actions) {
List<ControlAction> actionList = actions.get(action.getTarget());
if(actionList == null) {
return;
} else {
actionList.remove(action);
}
}
}
public void DaemonProtocol.clearActions() throws IOException {
synchronized (actions) {
actions.clear();
}
}
public String DaemonProtocol.getFilePattern() {
//We use the environment variable HADOOP_LOGFILE to get the
//pattern to use in the search.
String logDir = System.getProperty("hadoop.log.dir");
String daemonLogPattern = System.getProperty("hadoop.log.file");
if(daemonLogPattern == null && daemonLogPattern.isEmpty()) {
return "*";
}
return logDir+File.separator+daemonLogPattern+"*";
}
public int DaemonProtocol.getNumberOfMatchesInLogFile(String pattern,
String[] list) throws IOException {
StringBuffer filePattern = new StringBuffer(getFilePattern());
String[] cmd = null;
if (list != null) {
StringBuffer filterExpPattern = new StringBuffer();
int index=0;
for (String excludeExp : list) {
if (index++ < list.length -1) {
filterExpPattern.append("grep -v " + excludeExp + " | ");
} else {
filterExpPattern.append("grep -v " + excludeExp + " | wc -l");
}
}
cmd = new String[] {
"bash",
"-c",
"grep "
+ pattern + " " + filePattern + " | "
+ filterExpPattern};
} else {
cmd = new String[] {
"bash",
"-c",
"grep -c "
+ pattern + " " + filePattern
+ " | awk -F: '{s+=$2} END {print s}'" };
}
ShellCommandExecutor shexec = new ShellCommandExecutor(cmd);
shexec.execute();
String output = shexec.getOutput();
return Integer.parseInt(output.replaceAll("\n", "").trim());
}
/**
* This method is used for suspending the process.
* @param pid process id
* @throws IOException if an I/O error occurs.
* @return true if process is suspended otherwise false.
*/
public boolean DaemonProtocol.suspendProcess(String pid) throws IOException {
String suspendCmd = getDaemonConf().get("test.system.hdrc.suspend.cmd",
"kill -SIGSTOP");
String [] command = {"bash", "-c", suspendCmd + " " + pid};
ShellCommandExecutor shexec = new ShellCommandExecutor(command);
try {
shexec.execute();
} catch (Shell.ExitCodeException e) {
LOG.warn("suspended process throws an exitcode "
+ "exception for not being suspended the given process id.");
return false;
}
LOG.info("The suspend process command is :"
+ shexec.toString()
+ " and the output for the command is "
+ shexec.getOutput());
return true;
}
/**
* This method is used for resuming the process
* @param pid process id of suspended process.
* @throws IOException if an I/O error occurs.
* @return true if suspeneded process is resumed otherwise false.
*/
public boolean DaemonProtocol.resumeProcess(String pid) throws IOException {
String resumeCmd = getDaemonConf().get("test.system.hdrc.resume.cmd",
"kill -SIGCONT");
String [] command = {"bash", "-c", resumeCmd + " " + pid};
ShellCommandExecutor shexec = new ShellCommandExecutor(command);
try {
shexec.execute();
} catch(Shell.ExitCodeException e) {
LOG.warn("Resume process throws an exitcode "
+ "exception for not being resumed the given process id.");
return false;
}
LOG.info("The resume process command is :"
+ shexec.toString()
+ " and the output for the command is "
+ shexec.getOutput());
return true;
}
private String DaemonProtocol.user = null;
public String DaemonProtocol.getDaemonUser() {
return user;
}
public void DaemonProtocol.setUser(String user) {
this.user = user;
}
}