| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.test.system; |
| |
| import java.io.IOException; |
| import java.util.*; |
| |
| import org.junit.Assert; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.test.system.process.RemoteProcess; |
| |
| import javax.management.*; |
| import javax.management.remote.JMXConnector; |
| import javax.management.remote.JMXConnectorFactory; |
| import javax.management.remote.JMXServiceURL; |
| |
| /** |
| * Abstract class which encapsulates the DaemonClient which is used in the |
| * system tests.<br/> |
| * |
| * @param PROXY the proxy implementation of a specific Daemon |
| */ |
| public abstract class AbstractDaemonClient<PROXY extends DaemonProtocol> { |
| private Configuration conf; |
| private Boolean jmxEnabled = null; |
| private MBeanServerConnection connection; |
| private int jmxPortNumber = -1; |
| private RemoteProcess process; |
| private boolean connected; |
| |
| private static final Log LOG = LogFactory.getLog(AbstractDaemonClient.class); |
| private static final String HADOOP_JMX_DOMAIN = "Hadoop"; |
| private static final String HADOOP_OPTS_ENV = "HADOOP_OPTS"; |
| |
| /** |
| * Create a Daemon client.<br/> |
| * |
| * @param conf client to be used by proxy to connect to Daemon. |
| * @param process the Daemon process to manage the particular daemon. |
| * |
| * @throws IOException on RPC error |
| */ |
| public AbstractDaemonClient(Configuration conf, RemoteProcess process) |
| throws IOException { |
| this.conf = conf; |
| this.process = process; |
| } |
| |
| /** |
| * Gets if the client is connected to the Daemon <br/> |
| * |
| * @return true if connected. |
| */ |
| public boolean isConnected() { |
| return connected; |
| } |
| |
| protected void setConnected(boolean connected) { |
| this.connected = connected; |
| } |
| |
| /** |
| * Create an RPC proxy to the daemon <br/> |
| * |
| * @throws IOException on RPC error |
| */ |
| public abstract void connect() throws IOException; |
| |
| /** |
| * Disconnect the underlying RPC proxy to the daemon.<br/> |
| * @throws IOException in case of communication errors |
| */ |
| public abstract void disconnect() throws IOException; |
| |
| /** |
| * Get the proxy to connect to a particular service Daemon.<br/> |
| * |
| * @return proxy to connect to a particular service Daemon. |
| */ |
| protected abstract PROXY getProxy(); |
| |
| /** |
| * Gets the daemon level configuration.<br/> |
| * |
| * @return configuration using which daemon is running |
| */ |
| public Configuration getConf() { |
| return conf; |
| } |
| |
| /** |
| * Gets the host on which Daemon is currently running. <br/> |
| * |
| * @return hostname |
| */ |
| public String getHostName() { |
| return process.getHostName(); |
| } |
| |
| /** |
| * Gets if the Daemon is ready to accept RPC connections. <br/> |
| * |
| * @return true if daemon is ready. |
| * @throws IOException on RPC error |
| */ |
| public boolean isReady() throws IOException { |
| return getProxy().isReady(); |
| } |
| |
| /** |
| * Kills the Daemon process <br/> |
| * @throws IOException on RPC error |
| */ |
| public void kill() throws IOException { |
| process.kill(); |
| } |
| |
| /** |
| * Checks if the Daemon process is alive or not <br/> |
| * @throws IOException on RPC error |
| */ |
| public void ping() throws IOException { |
| getProxy().ping(); |
| } |
| |
| /** |
| * Start up the Daemon process. <br/> |
| * @throws IOException on RPC error |
| */ |
| public void start() throws IOException { |
| process.start(); |
| } |
| |
| /** |
| * Get system level view of the Daemon process. |
| * |
| * @return returns system level view of the Daemon process. |
| * |
| * @throws IOException on RPC error. |
| */ |
| public ProcessInfo getProcessInfo() throws IOException { |
| return getProxy().getProcessInfo(); |
| } |
| |
| /** |
| * Abstract method to retrieve the name of a daemon specific env. var |
| * @return name of Hadoop environment variable containing a daemon options |
| */ |
| abstract public String getHadoopOptsEnvName (); |
| |
| /** |
| * Checks remote daemon process info to see if certain JMX sys. properties |
| * are available and reckon if the JMX service is enabled on the remote side |
| * |
| * @return <code>boolean</code> code indicating availability of remote JMX |
| * @throws IOException is throws in case of communication errors |
| */ |
| public boolean isJmxEnabled() throws IOException { |
| return isJmxEnabled(HADOOP_OPTS_ENV) || |
| isJmxEnabled(getHadoopOptsEnvName()); |
| } |
| |
| /** |
| * Checks remote daemon process info to see if certain JMX sys. properties |
| * are available and reckon if the JMX service is enabled on the remote side |
| * |
| * @param envivar name of an evironment variable to be searched |
| * @return <code>boolean</code> code indicating availability of remote JMX |
| * @throws IOException is throws in case of communication errors |
| */ |
| protected boolean isJmxEnabled(String envivar) throws IOException { |
| if (jmxEnabled != null) return jmxEnabled; |
| boolean ret = false; |
| String jmxRemoteString = "-Dcom.sun.management.jmxremote"; |
| String hadoopOpts = getProcessInfo().getEnv().get(envivar); |
| LOG.debug("Looking into " + hadoopOpts + " from " + envivar); |
| List<String> options = Arrays.asList(hadoopOpts.split(" ")); |
| ret = options.contains(jmxRemoteString); |
| jmxEnabled = ret; |
| return ret; |
| } |
| |
| /** |
| * Checks remote daemon process info to find remote JMX server port number |
| * By default this method will look into "HADOOP_OPTS" variable only. |
| * @return number of remote JMX server or -1 if it can't be found |
| * @throws IOException is throws in case of communication errors |
| * @throws IllegalArgumentException if non-integer port is set |
| * in the remote process info |
| */ |
| public int getJmxPortNumber() throws IOException, IllegalArgumentException { |
| int portNo = getJmxPortNumber(HADOOP_OPTS_ENV); |
| return portNo != -1 ? portNo : getJmxPortNumber(getHadoopOptsEnvName()); |
| } |
| |
| /** |
| * Checks remote daemon process info to find remote JMX server port number |
| * |
| * @param envivar name of the env. var. to look for JMX specific settings |
| * @return number of remote JMX server or -1 if it can't be found |
| * @throws IOException is throws in case of communication errors |
| * @throws IllegalArgumentException if non-integer port is set |
| * in the remote process info |
| */ |
| protected int getJmxPortNumber(final String envivar) throws |
| IOException, IllegalArgumentException { |
| if (jmxPortNumber != -1) return jmxPortNumber; |
| String jmxPortString = "-Dcom.sun.management.jmxremote.port"; |
| |
| String hadoopOpts = getProcessInfo().getEnv().get(envivar); |
| int portNumber = -1; |
| boolean found = false; |
| String[] options = hadoopOpts.split(" "); |
| for (String option : options) { |
| if (option.startsWith(jmxPortString)) { |
| found = true; |
| try { |
| portNumber = Integer.parseInt(option.split("=")[1]); |
| } catch (NumberFormatException e) { |
| throw new IllegalArgumentException("JMX port number isn't integer"); |
| } |
| break; |
| } |
| } |
| if (!found) |
| throw new IllegalArgumentException("Can't detect JMX port number"); |
| jmxPortNumber = portNumber; |
| return jmxPortNumber; |
| } |
| |
| /** |
| * Return a file status object that represents the path. |
| * @param path |
| * given path |
| * @param local |
| * whether the path is local or not |
| * @return a FileStatus object |
| * @throws IOException see specific implementation |
| */ |
| public FileStatus getFileStatus(String path, boolean local) throws IOException { |
| return getProxy().getFileStatus(path, local); |
| } |
| |
| /** |
| * Create a file with full permissions in a file system. |
| * @param path - source path where the file has to create. |
| * @param fileName - file name |
| * @param local - identifying the path whether its local or not. |
| * @throws IOException - if an I/O error occurs. |
| */ |
| public void createFile(String path, String fileName, |
| boolean local) throws IOException { |
| getProxy().createFile(path, fileName, null, local); |
| } |
| |
| /** |
| * Create a file with given permissions in a file system. |
| * @param path - source path where the file has to create. |
| * @param fileName - file name. |
| * @param permission - file permissions. |
| * @param local - identifying the path whether its local or not. |
| * @throws IOException - if an I/O error occurs. |
| */ |
| public void createFile(String path, String fileName, |
| FsPermission permission, boolean local) throws IOException { |
| getProxy().createFile(path, fileName, permission, local); |
| } |
| |
| /** |
| * Create a folder with default permissions in a file system. |
| * @param path - source path where the file has to be creating. |
| * @param folderName - folder name. |
| * @param local - identifying the path whether its local or not. |
| * @throws IOException - if an I/O error occurs. |
| */ |
| public void createFolder(String path, String folderName, |
| boolean local) throws IOException { |
| getProxy().createFolder(path, folderName, null, local); |
| } |
| |
| /** |
| * Create a folder with given permissions in a file system. |
| * @param path - source path where the file has to be creating. |
| * @param folderName - folder name. |
| * @param permission - folder permissions. |
| * @param local - identifying the path whether its local or not. |
| * @throws IOException - if an I/O error occurs. |
| */ |
| public void createFolder(String path, String folderName, |
| FsPermission permission, boolean local) throws IOException { |
| getProxy().createFolder(path, folderName, permission, local); |
| } |
| |
| /** |
| * List the statuses of the files/directories in the given path if the path is |
| * a directory. |
| * |
| * @param path |
| * given path |
| * @param local |
| * whether the path is local or not |
| * @return the statuses of the files/directories in the given patch |
| * @throws IOException on RPC error. |
| */ |
| public FileStatus[] listStatus(String path, boolean local) |
| throws IOException { |
| return getProxy().listStatus(path, local); |
| } |
| |
| /** |
| * List the statuses of the files/directories in the given path if the path is |
| * a directory recursive/nonrecursively depending on parameters |
| * |
| * @param path |
| * given path |
| * @param local |
| * whether the path is local or not |
| * @param recursive |
| * whether to recursively get the status |
| * @return the statuses of the files/directories in the given patch |
| * @throws IOException is thrown on RPC error. |
| */ |
| public FileStatus[] listStatus(String path, boolean local, boolean recursive) |
| throws IOException { |
| List<FileStatus> status = new ArrayList<FileStatus>(); |
| addStatus(status, path, local, recursive); |
| return status.toArray(new FileStatus[0]); |
| } |
| |
| private void addStatus(List<FileStatus> status, String f, |
| boolean local, boolean recursive) |
| throws IOException { |
| FileStatus[] fs = listStatus(f, local); |
| if (fs != null) { |
| for (FileStatus fileStatus : fs) { |
| if (!f.equals(fileStatus.getPath().toString())) { |
| status.add(fileStatus); |
| if (recursive) { |
| addStatus(status, fileStatus.getPath().toString(), local, recursive); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Gets number of times FATAL log messages where logged in Daemon logs. |
| * <br/> |
| * Pattern used for searching is FATAL. <br/> |
| * @param excludeExpList list of exception to exclude |
| * @return number of occurrence of fatal message. |
| * @throws IOException in case of communication errors |
| */ |
| public int getNumberOfFatalStatementsInLog(String [] excludeExpList) |
| throws IOException { |
| DaemonProtocol proxy = getProxy(); |
| String pattern = "FATAL"; |
| return proxy.getNumberOfMatchesInLogFile(pattern, excludeExpList); |
| } |
| |
| /** |
| * Gets number of times ERROR log messages where logged in Daemon logs. |
| * <br/> |
| * Pattern used for searching is ERROR. <br/> |
| * @param excludeExpList list of exception to exclude |
| * @return number of occurrence of error message. |
| * @throws IOException is thrown on RPC error. |
| */ |
| public int getNumberOfErrorStatementsInLog(String[] excludeExpList) |
| throws IOException { |
| DaemonProtocol proxy = getProxy(); |
| String pattern = "ERROR"; |
| return proxy.getNumberOfMatchesInLogFile(pattern, excludeExpList); |
| } |
| |
| /** |
| * Gets number of times Warning log messages where logged in Daemon logs. |
| * <br/> |
| * Pattern used for searching is WARN. <br/> |
| * @param excludeExpList list of exception to exclude |
| * @return number of occurrence of warning message. |
| * @throws IOException thrown on RPC error. |
| */ |
| public int getNumberOfWarnStatementsInLog(String[] excludeExpList) |
| throws IOException { |
| DaemonProtocol proxy = getProxy(); |
| String pattern = "WARN"; |
| return proxy.getNumberOfMatchesInLogFile(pattern, excludeExpList); |
| } |
| |
| /** |
| * Gets number of time given Exception were present in log file. <br/> |
| * |
| * @param e exception class. |
| * @param excludeExpList list of exceptions to exclude. |
| * @return number of exceptions in log |
| * @throws IOException is thrown on RPC error. |
| */ |
| public int getNumberOfExceptionsInLog(Exception e, |
| String[] excludeExpList) throws IOException { |
| DaemonProtocol proxy = getProxy(); |
| String pattern = e.getClass().getSimpleName(); |
| return proxy.getNumberOfMatchesInLogFile(pattern, excludeExpList); |
| } |
| |
| /** |
| * Number of times ConcurrentModificationException present in log file. |
| * <br/> |
| * @param excludeExpList list of exceptions to exclude. |
| * @return number of times exception in log file. |
| * @throws IOException is thrown on RPC error. |
| */ |
| public int getNumberOfConcurrentModificationExceptionsInLog( |
| String[] excludeExpList) throws IOException { |
| return getNumberOfExceptionsInLog(new ConcurrentModificationException(), |
| excludeExpList); |
| } |
| |
| private int errorCount; |
| private int fatalCount; |
| private int concurrentExceptionCount; |
| |
| /** |
| * Populate the initial exception counts to be used to assert once a testcase |
| * is done there was no exception in the daemon when testcase was run. |
| * @param excludeExpList list of exceptions to exclude |
| * @throws IOException is thrown on RPC error. |
| */ |
| protected void populateExceptionCount(String [] excludeExpList) |
| throws IOException { |
| errorCount = getNumberOfErrorStatementsInLog(excludeExpList); |
| LOG.info("Number of error messages in logs : " + errorCount); |
| fatalCount = getNumberOfFatalStatementsInLog(excludeExpList); |
| LOG.info("Number of fatal statement in logs : " + fatalCount); |
| concurrentExceptionCount = |
| getNumberOfConcurrentModificationExceptionsInLog(excludeExpList); |
| LOG.info("Number of concurrent modification in logs : " |
| + concurrentExceptionCount); |
| } |
| |
| /** |
| * Assert if the new exceptions were logged into the log file. |
| * <br/> |
| * <b><i> |
| * Pre-req for the method is that populateExceptionCount() has |
| * to be called before calling this method.</b></i> |
| * @param excludeExpList list of exceptions to exclude |
| * @throws IOException is thrown on RPC error. |
| */ |
| protected void assertNoExceptionsOccurred(String [] excludeExpList) |
| throws IOException { |
| int newerrorCount = getNumberOfErrorStatementsInLog(excludeExpList); |
| LOG.info("Number of error messages while asserting :" + newerrorCount); |
| int newfatalCount = getNumberOfFatalStatementsInLog(excludeExpList); |
| LOG.info("Number of fatal messages while asserting : " + newfatalCount); |
| int newconcurrentExceptionCount = |
| getNumberOfConcurrentModificationExceptionsInLog(excludeExpList); |
| LOG.info("Number of concurrentmodification exception while asserting :" |
| + newconcurrentExceptionCount); |
| Assert.assertEquals( |
| "New Error Messages logged in the log file", errorCount, newerrorCount); |
| Assert.assertEquals( |
| "New Fatal messages logged in the log file", fatalCount, newfatalCount); |
| Assert.assertEquals( |
| "New ConcurrentModificationException in log file", |
| concurrentExceptionCount, newconcurrentExceptionCount); |
| } |
| |
| /** |
| * Builds correct name of JMX object name from given domain, service name, type |
| * @param domain JMX domain name |
| * @param serviceName of the service where MBean is registered (NameNode) |
| * @param typeName of the MXBean class |
| * @return ObjectName for requested MXBean of <code>null</code> if one wasn't |
| * found |
| * @throws java.io.IOException in if object name is malformed |
| */ |
| protected ObjectName getJmxBeanName(String domain, String serviceName, |
| String typeName) throws IOException { |
| if (domain == null) |
| domain = HADOOP_JMX_DOMAIN; |
| |
| ObjectName jmxBean; |
| try { |
| jmxBean = new ObjectName(domain + ":service=" + serviceName + |
| ",name=" + typeName); |
| } catch (MalformedObjectNameException e) { |
| LOG.debug(e.getStackTrace()); |
| throw new IOException(e); |
| } |
| return jmxBean; |
| } |
| |
| /** |
| * Create connection with the remote JMX server at given host and port |
| * @param host name of the remote JMX server host |
| * @param port port number of the remote JXM server host |
| * @return instance of MBeanServerConnection or <code>null</code> if one |
| * hasn't been established |
| * @throws IOException in case of comminication errors |
| */ |
| protected MBeanServerConnection establishJmxConnection(String host, int port) |
| throws IOException { |
| if (connection != null) return connection; |
| String urlPattern = null; |
| try { |
| urlPattern = "service:jmx:rmi:///jndi/rmi://" + |
| host + ":" + port + |
| "/jmxrmi"; |
| JMXServiceURL url = new JMXServiceURL(urlPattern); |
| JMXConnector connector = JMXConnectorFactory.connect(url, null); |
| connection = connector.getMBeanServerConnection(); |
| } catch (java.net.MalformedURLException badURLExc) { |
| LOG.debug("bad url: " + urlPattern, badURLExc); |
| throw new IOException(badURLExc); |
| } |
| return connection; |
| } |
| |
| Hashtable<String, ObjectName> jmxObjectNames = |
| new Hashtable<String, ObjectName>(); |
| |
| /** |
| * Method implements all logic for receiving a bean's attribute. |
| * If any initializations such as establishing bean server connections, etc. |
| * are need it will do it. |
| * @param serviceName name of the service where MBean is registered (NameNode) |
| * @param type name of the MXBean class |
| * @param attributeName name of the attribute to be retrieved |
| * @return Object value of the attribute or <code>null</code> if not found |
| * @throws IOException is thrown in case of any errors |
| */ |
| protected Object getJmxAttribute (String serviceName, |
| String type, |
| String attributeName) |
| throws IOException { |
| Object retAttribute = null; |
| String domain = null; |
| if (isJmxEnabled()) { |
| try { |
| MBeanServerConnection conn = |
| establishJmxConnection(getHostName(), |
| getJmxPortNumber(HADOOP_OPTS_ENV)); |
| for (String d : conn.getDomains()) { |
| if (d != null && d.startsWith(HADOOP_JMX_DOMAIN)) |
| domain = d; |
| } |
| if (!jmxObjectNames.containsKey(type)) |
| jmxObjectNames.put(type, getJmxBeanName(domain, serviceName, type)); |
| retAttribute = |
| conn.getAttribute(jmxObjectNames.get(type), attributeName); |
| } catch (MBeanException e) { |
| LOG.debug(e.getStackTrace()); |
| throw new IOException(e); |
| } catch (AttributeNotFoundException e) { |
| LOG.warn(e.getStackTrace()); |
| throw new IOException(e); |
| } catch (InstanceNotFoundException e) { |
| LOG.warn(e.getStackTrace()); |
| throw new IOException(e); |
| } catch (ReflectionException e) { |
| LOG.debug(e.getStackTrace()); |
| throw new IOException(e); |
| } |
| } |
| return retAttribute; |
| } |
| |
| /** |
| * This method has to be implemented by appropriate concrete daemon client |
| * e.g. DNClient, NNClient, etc. |
| * Concrete implementation has to provide names of the service and bean type |
| * @param attributeName name of the attribute to be retrieved |
| * @return Object value of the given attribute |
| * @throws IOException is thrown in case of communication errors |
| */ |
| public abstract Object getDaemonAttribute (String attributeName) |
| throws IOException; |
| } |