| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.test.system; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.ArrayList; |
| import java.util.Map; |
| import java.util.Properties; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.util.Shell.ShellCommandExecutor; |
| import org.apache.hadoop.util.Shell; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.permission.FsAction; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.conf.Configuration; |
| |
| /** |
| * Default DaemonProtocolAspect which is used to provide default implementation |
| * for all the common daemon methods. If a daemon requires more specialized |
| * version of method, it is responsibility of the DaemonClient to introduce the |
| * same in woven classes. |
| * |
| */ |
| public aspect DaemonProtocolAspect { |
| |
| private boolean DaemonProtocol.ready; |
| |
| @SuppressWarnings("unchecked") |
| private HashMap<Object, List<ControlAction>> DaemonProtocol.actions = |
| new HashMap<Object, List<ControlAction>>(); |
| private static final Log LOG = LogFactory.getLog( |
| DaemonProtocolAspect.class.getName()); |
| |
| private static FsPermission defaultPermission = new FsPermission( |
| FsAction.READ_WRITE, FsAction.READ_WRITE, FsAction.READ_WRITE); |
| |
| /** |
| * Set if the daemon process is ready or not, concrete daemon protocol should |
| * implement pointcuts to determine when the daemon is ready and use the |
| * setter to set the ready state. |
| * |
| * @param ready |
| * true if the Daemon is ready. |
| */ |
| public void DaemonProtocol.setReady(boolean ready) { |
| this.ready = ready; |
| } |
| |
| /** |
| * Checks if the daemon process is alive or not. |
| * |
| * @throws IOException |
| * if daemon is not alive. |
| */ |
| public void DaemonProtocol.ping() throws IOException { |
| } |
| |
| /** |
| * Checks if the daemon process is ready to accepting RPC connections after it |
| * finishes initialization. <br/> |
| * |
| * @return true if ready to accept connection. |
| * |
| * @throws IOException |
| */ |
| public boolean DaemonProtocol.isReady() throws IOException { |
| return ready; |
| } |
| |
| /** |
| * Returns the process related information regarding the daemon process. <br/> |
| * |
| * @return process information. |
| * @throws IOException |
| */ |
| public ProcessInfo DaemonProtocol.getProcessInfo() throws IOException { |
| int activeThreadCount = Thread.activeCount(); |
| long currentTime = System.currentTimeMillis(); |
| long maxmem = Runtime.getRuntime().maxMemory(); |
| long freemem = Runtime.getRuntime().freeMemory(); |
| long totalmem = Runtime.getRuntime().totalMemory(); |
| Map<String, String> envMap = System.getenv(); |
| Properties sysProps = System.getProperties(); |
| Map<String, String> props = new HashMap<String, String>(); |
| for (Map.Entry entry : sysProps.entrySet()) { |
| props.put((String) entry.getKey(), (String) entry.getValue()); |
| } |
| ProcessInfo info = new ProcessInfoImpl(activeThreadCount, currentTime, |
| freemem, maxmem, totalmem, envMap, props); |
| return info; |
| } |
| |
| public void DaemonProtocol.enable(List<Enum<?>> faults) throws IOException { |
| } |
| |
| public void DaemonProtocol.disableAll() throws IOException { |
| } |
| |
| public abstract Configuration DaemonProtocol.getDaemonConf() |
| throws IOException; |
| |
| public FileStatus DaemonProtocol.getFileStatus(String path, boolean local) |
| throws IOException { |
| Path p = new Path(path); |
| FileSystem fs = getFS(p, local); |
| p.makeQualified(fs); |
| FileStatus fileStatus = fs.getFileStatus(p); |
| return cloneFileStatus(fileStatus); |
| } |
| |
| /** |
| * Create a file with given permissions in a file system. |
| * @param path - source path where the file has to create. |
| * @param fileName - file name. |
| * @param permission - file permissions. |
| * @param local - identifying the path whether its local or not. |
| * @throws IOException - if an I/O error occurs. |
| */ |
| public void DaemonProtocol.createFile(String path, String fileName, |
| FsPermission permission, boolean local) throws IOException { |
| Path p = new Path(path); |
| FileSystem fs = getFS(p, local); |
| Path filePath = new Path(path, fileName); |
| fs.create(filePath); |
| if (permission == null) { |
| fs.setPermission(filePath, defaultPermission); |
| } else { |
| fs.setPermission(filePath, permission); |
| } |
| fs.close(); |
| } |
| |
| /** |
| * Create a folder with given permissions in a file system. |
| * @param path - source path where the file has to be creating. |
| * @param folderName - folder name. |
| * @param permission - folder permissions. |
| * @param local - identifying the path whether its local or not. |
| * @throws IOException - if an I/O error occurs. |
| */ |
| public void DaemonProtocol.createFolder(String path, String folderName, |
| FsPermission permission, boolean local) throws IOException { |
| Path p = new Path(path); |
| FileSystem fs = getFS(p, local); |
| Path folderPath = new Path(path, folderName); |
| fs.mkdirs(folderPath); |
| if (permission == null) { |
| fs.setPermission(folderPath, defaultPermission); |
| } else { |
| fs.setPermission(folderPath, permission); |
| } |
| fs.close(); |
| } |
| |
| public FileStatus[] DaemonProtocol.listStatus(String path, boolean local) |
| throws IOException { |
| Path p = new Path(path); |
| FileSystem fs = getFS(p, local); |
| FileStatus[] status = fs.listStatus(p); |
| if (status != null) { |
| FileStatus[] result = new FileStatus[status.length]; |
| int i = 0; |
| for (FileStatus fileStatus : status) { |
| result[i++] = cloneFileStatus(fileStatus); |
| } |
| return result; |
| } |
| return status; |
| } |
| |
| /** |
| * FileStatus object may not be serializable. Clone it into raw FileStatus |
| * object. |
| */ |
| private FileStatus DaemonProtocol.cloneFileStatus(FileStatus fileStatus) { |
| return new FileStatus(fileStatus.getLen(), |
| fileStatus.isDir(), |
| fileStatus.getReplication(), |
| fileStatus.getBlockSize(), |
| fileStatus.getModificationTime(), |
| fileStatus.getAccessTime(), |
| fileStatus.getPermission(), |
| fileStatus.getOwner(), |
| fileStatus.getGroup(), |
| fileStatus.getPath()); |
| } |
| |
| private FileSystem DaemonProtocol.getFS(final Path path, final boolean local) |
| throws IOException { |
| FileSystem ret = null; |
| try { |
| ret = UserGroupInformation.getLoginUser().doAs ( |
| new PrivilegedExceptionAction<FileSystem>() { |
| public FileSystem run() throws IOException { |
| FileSystem fs = null; |
| if (local) { |
| fs = FileSystem.getLocal(getDaemonConf()); |
| } else { |
| fs = path.getFileSystem(getDaemonConf()); |
| } |
| return fs; |
| } |
| }); |
| } catch (InterruptedException ie) { |
| } |
| return ret; |
| } |
| |
| @SuppressWarnings("unchecked") |
| public ControlAction[] DaemonProtocol.getActions(Writable key) |
| throws IOException { |
| synchronized (actions) { |
| List<ControlAction> actionList = actions.get(key); |
| if(actionList == null) { |
| return new ControlAction[0]; |
| } else { |
| return (ControlAction[]) actionList.toArray(new ControlAction[actionList |
| .size()]); |
| } |
| } |
| } |
| |
| |
| @SuppressWarnings("unchecked") |
| public void DaemonProtocol.sendAction(ControlAction action) |
| throws IOException { |
| synchronized (actions) { |
| List<ControlAction> actionList = actions.get(action.getTarget()); |
| if(actionList == null) { |
| actionList = new ArrayList<ControlAction>(); |
| actions.put(action.getTarget(), actionList); |
| } |
| actionList.add(action); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| public boolean DaemonProtocol.isActionPending(ControlAction action) |
| throws IOException{ |
| synchronized (actions) { |
| List<ControlAction> actionList = actions.get(action.getTarget()); |
| if(actionList == null) { |
| return false; |
| } else { |
| return actionList.contains(action); |
| } |
| } |
| } |
| |
| |
| @SuppressWarnings("unchecked") |
| public void DaemonProtocol.removeAction(ControlAction action) |
| throws IOException { |
| synchronized (actions) { |
| List<ControlAction> actionList = actions.get(action.getTarget()); |
| if(actionList == null) { |
| return; |
| } else { |
| actionList.remove(action); |
| } |
| } |
| } |
| |
| public void DaemonProtocol.clearActions() throws IOException { |
| synchronized (actions) { |
| actions.clear(); |
| } |
| } |
| |
| public String DaemonProtocol.getFilePattern() { |
| //We use the environment variable HADOOP_LOGFILE to get the |
| //pattern to use in the search. |
| String logDir = System.getProperty("hadoop.log.dir"); |
| String daemonLogPattern = System.getProperty("hadoop.log.file"); |
| if(daemonLogPattern == null && daemonLogPattern.isEmpty()) { |
| return "*"; |
| } |
| return logDir+File.separator+daemonLogPattern+"*"; |
| } |
| |
| public int DaemonProtocol.getNumberOfMatchesInLogFile(String pattern, |
| String[] list) throws IOException { |
| StringBuffer filePattern = new StringBuffer(getFilePattern()); |
| String[] cmd = null; |
| if (list != null) { |
| StringBuffer filterExpPattern = new StringBuffer(); |
| int index=0; |
| for (String excludeExp : list) { |
| if (index++ < list.length -1) { |
| filterExpPattern.append("grep -v " + excludeExp + " | "); |
| } else { |
| filterExpPattern.append("grep -v " + excludeExp + " | wc -l"); |
| } |
| } |
| cmd = new String[] { |
| "bash", |
| "-c", |
| "grep " |
| + pattern + " " + filePattern + " | " |
| + filterExpPattern}; |
| } else { |
| cmd = new String[] { |
| "bash", |
| "-c", |
| "grep -c " |
| + pattern + " " + filePattern |
| + " | awk -F: '{s+=$2} END {print s}'" }; |
| } |
| ShellCommandExecutor shexec = new ShellCommandExecutor(cmd); |
| shexec.execute(); |
| String output = shexec.getOutput(); |
| return Integer.parseInt(output.replaceAll("\n", "").trim()); |
| } |
| |
| /** |
| * This method is used for suspending the process. |
| * @param pid process id |
| * @throws IOException if an I/O error occurs. |
| * @return true if process is suspended otherwise false. |
| */ |
| public boolean DaemonProtocol.suspendProcess(String pid) throws IOException { |
| String suspendCmd = getDaemonConf().get("test.system.hdrc.suspend.cmd", |
| "kill -SIGSTOP"); |
| String [] command = {"bash", "-c", suspendCmd + " " + pid}; |
| ShellCommandExecutor shexec = new ShellCommandExecutor(command); |
| try { |
| shexec.execute(); |
| } catch (Shell.ExitCodeException e) { |
| LOG.warn("suspended process throws an exitcode " |
| + "exception for not being suspended the given process id."); |
| return false; |
| } |
| LOG.info("The suspend process command is :" |
| + shexec.toString() |
| + " and the output for the command is " |
| + shexec.getOutput()); |
| return true; |
| } |
| |
| /** |
| * This method is used for resuming the process |
| * @param pid process id of suspended process. |
| * @throws IOException if an I/O error occurs. |
| * @return true if suspeneded process is resumed otherwise false. |
| */ |
| public boolean DaemonProtocol.resumeProcess(String pid) throws IOException { |
| String resumeCmd = getDaemonConf().get("test.system.hdrc.resume.cmd", |
| "kill -SIGCONT"); |
| String [] command = {"bash", "-c", resumeCmd + " " + pid}; |
| ShellCommandExecutor shexec = new ShellCommandExecutor(command); |
| try { |
| shexec.execute(); |
| } catch(Shell.ExitCodeException e) { |
| LOG.warn("Resume process throws an exitcode " |
| + "exception for not being resumed the given process id."); |
| return false; |
| } |
| LOG.info("The resume process command is :" |
| + shexec.toString() |
| + " and the output for the command is " |
| + shexec.getOutput()); |
| return true; |
| } |
| |
| private String DaemonProtocol.user = null; |
| |
| public String DaemonProtocol.getDaemonUser() { |
| return user; |
| } |
| |
| public void DaemonProtocol.setUser(String user) { |
| this.user = user; |
| } |
| } |
| |