blob: b1277a0ea3a33177939cbbdd56b7bf08709fa047 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.test.system;
import java.io.IOException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileInputStream;
import java.io.DataInputStream;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Iterator;
import java.util.Enumeration;
import java.util.Arrays;
import java.util.Hashtable;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.system.process.ClusterProcessManager;
import org.apache.hadoop.test.system.process.RemoteProcess;
/**
* Abstract class which represent the cluster having multiple daemons.
*/
@SuppressWarnings("unchecked")
public abstract class AbstractDaemonCluster {
private static final Log LOG = LogFactory.getLog(AbstractDaemonCluster.class);
private String [] excludeExpList ;
private Configuration conf;
protected ClusterProcessManager clusterManager;
private Map<Enum<?>, List<AbstractDaemonClient>> daemons =
new LinkedHashMap<Enum<?>, List<AbstractDaemonClient>>();
private String newConfDir = null;
private static final String CONF_HADOOP_LOCAL_DIR =
"test.system.hdrc.hadoop.local.confdir";
private final static Object waitLock = new Object();
/**
* Constructor to create a cluster client.<br/>
*
* @param conf
* Configuration to be used while constructing the cluster.
* @param rcluster
* process manger instance to be used for managing the daemons.
*
* @throws IOException
*/
public AbstractDaemonCluster(Configuration conf,
ClusterProcessManager rcluster) throws IOException {
this.conf = conf;
this.clusterManager = rcluster;
createAllClients();
}
/**
* The method returns the cluster manager. The system test cases require an
* instance of HadoopDaemonRemoteCluster to invoke certain operation on the
* daemon.
*
* @return instance of clusterManager
*/
public ClusterProcessManager getClusterManager() {
return clusterManager;
}
protected void createAllClients() throws IOException {
for (RemoteProcess p : clusterManager.getAllProcesses()) {
List<AbstractDaemonClient> dms = daemons.get(p.getRole());
if (dms == null) {
dms = new ArrayList<AbstractDaemonClient>();
daemons.put(p.getRole(), dms);
}
dms.add(createClient(p));
}
}
/**
* Method to create the daemon client.<br/>
*
* @param process
* to manage the daemon.
* @return instance of the daemon client
*
* @throws IOException
*/
protected abstract AbstractDaemonClient<DaemonProtocol>
createClient(RemoteProcess process) throws IOException;
/**
* Get the global cluster configuration which was used to create the
* cluster. <br/>
*
* @return global configuration of the cluster.
*/
public Configuration getConf() {
return conf;
}
/**
*
/**
* Return the client handle of all the Daemons.<br/>
*
* @return map of role to daemon clients' list.
*/
public Map<Enum<?>, List<AbstractDaemonClient>> getDaemons() {
return daemons;
}
/**
* Checks if the cluster is ready for testing. <br/>
* Algorithm for checking is as follows : <br/>
* <ul>
* <li> Wait for Daemon to come up </li>
* <li> Check if daemon is ready </li>
* <li> If one of the daemon is not ready, return false </li>
* </ul>
*
* @return true if whole cluster is ready.
*
* @throws IOException
*/
public boolean isReady() throws IOException {
for (List<AbstractDaemonClient> set : daemons.values()) {
for (AbstractDaemonClient daemon : set) {
waitForDaemon(daemon);
if (!daemon.isReady()) {
return false;
}
}
}
return true;
}
protected void waitForDaemon(AbstractDaemonClient d) {
final int TEN_SEC = 10000;
while(true) {
try {
LOG.info("Waiting for daemon at " + d.getHostName() + " to come up.");
LOG.info("Daemon might not be " +
"ready or the call to setReady() method hasn't been " +
"injected to " + d.getClass() + " ");
d.connect();
break;
} catch (IOException e) {
try {
Thread.sleep(TEN_SEC);
} catch (InterruptedException ie) {
}
}
}
}
/**
* Starts the cluster daemons.
* @throws IOException
*/
public void start() throws IOException {
clusterManager.start();
}
/**
* Stops the cluster daemons.
* @throws IOException
*/
public void stop() throws IOException {
clusterManager.stop();
}
/**
* Connect to daemon RPC ports.
* @throws IOException
*/
public void connect() throws IOException {
for (List<AbstractDaemonClient> set : daemons.values()) {
for (AbstractDaemonClient daemon : set) {
daemon.connect();
}
}
}
/**
* Disconnect to daemon RPC ports.
* @throws IOException
*/
public void disconnect() throws IOException {
for (List<AbstractDaemonClient> set : daemons.values()) {
for (AbstractDaemonClient daemon : set) {
daemon.disconnect();
}
}
}
/**
* Ping all the daemons of the cluster.
* @throws IOException
*/
public void ping() throws IOException {
for (List<AbstractDaemonClient> set : daemons.values()) {
for (AbstractDaemonClient daemon : set) {
LOG.info("Daemon is : " + daemon.getHostName() + " pinging....");
daemon.ping();
}
}
}
/**
* Connect to the cluster and ensure that it is clean to run tests.
* @throws Exception
*/
public void setUp() throws Exception {
while (!isReady()) {
Thread.sleep(1000);
}
connect();
ping();
clearAllControlActions();
ensureClean();
populateExceptionCounts();
}
/**
* This is mainly used for the test cases to set the list of exceptions
* that will be excluded.
* @param excludeExpList list of exceptions to exclude
*/
public void setExcludeExpList(String [] excludeExpList) {
this.excludeExpList = excludeExpList;
}
public void clearAllControlActions() throws IOException {
for (List<AbstractDaemonClient> set : daemons.values()) {
for (AbstractDaemonClient daemon : set) {
LOG.info("Daemon is : " + daemon.getHostName() + " pinging....");
daemon.getProxy().clearActions();
}
}
}
/**
* Ensure that the cluster is clean to run tests.
* @throws IOException
*/
public void ensureClean() throws IOException {
}
/**
* Ensure that cluster is clean. Disconnect from the RPC ports of the daemons.
* @throws IOException
*/
public void tearDown() throws IOException {
ensureClean();
clearAllControlActions();
assertNoExceptionMessages();
disconnect();
}
/**
* Populate the exception counts in all the daemons so that it can be checked when
* the testcase has finished running.<br/>
* @throws IOException
*/
protected void populateExceptionCounts() throws IOException {
for(List<AbstractDaemonClient> lst : daemons.values()) {
for(AbstractDaemonClient d : lst) {
d.populateExceptionCount(excludeExpList);
}
}
}
/**
* Assert no exception has been thrown during the sequence of the actions.
* <br/>
* @throws IOException
*/
protected void assertNoExceptionMessages() throws IOException {
for(List<AbstractDaemonClient> lst : daemons.values()) {
for(AbstractDaemonClient d : lst) {
d.assertNoExceptionsOccurred(excludeExpList);
}
}
}
/**
* Get the proxy user definitions from cluster from configuration.
* @return ProxyUserDefinitions - proxy users data like groups and hosts.
* @throws Exception - if no proxy users found in config.
*/
public ProxyUserDefinitions getHadoopProxyUsers() throws
Exception {
Iterator itr = conf.iterator();
ArrayList<String> proxyUsers = new ArrayList<String>();
while (itr.hasNext()) {
if (itr.next().toString().indexOf("hadoop.proxyuser") >= 0 &&
itr.next().toString().indexOf("groups=") >= 0) {
proxyUsers.add(itr.next().toString().split("\\.")[2]);
}
}
if (proxyUsers.size() == 0) {
LOG.error("No proxy users found in the configuration.");
throw new Exception("No proxy users found in the configuration.");
}
ProxyUserDefinitions pud = new ProxyUserDefinitions() {
@Override
public boolean writeToFile(URI filePath) throws IOException {
throw new UnsupportedOperationException("No such method exists.");
};
};
for (String userName : proxyUsers) {
List<String> groups = Arrays.asList(conf.get("hadoop.proxyuser." +
userName + ".groups").split("//,"));
List<String> hosts = Arrays.asList(conf.get("hadoop.proxyuser." +
userName + ".hosts").split("//,"));
ProxyUserDefinitions.GroupsAndHost definitions =
pud.new GroupsAndHost();
definitions.setGroups(groups);
definitions.setHosts(hosts);
pud.addProxyUser(userName, definitions);
}
return pud;
}
/**
* It's a local folder where the config file stores temporarily
* while serializing the object.
* @return String temporary local folder path for configuration.
*/
private String getHadoopLocalConfDir() {
String hadoopLocalConfDir = conf.get(CONF_HADOOP_LOCAL_DIR);
if (hadoopLocalConfDir == null || hadoopLocalConfDir.isEmpty()) {
LOG.error("No configuration "
+ "for the CONF_HADOOP_LOCAL_DIR passed");
throw new IllegalArgumentException(
"No Configuration passed for hadoop conf local directory");
}
return hadoopLocalConfDir;
}
/**
* It uses to restart the cluster with new configuration at runtime.<br/>
* @param props attributes for new configuration.
* @param configFile configuration file.
* @throws IOException if an I/O error occurs.
*/
public void restartClusterWithNewConfig(Hashtable<String,?> props,
String configFile) throws IOException {
String mapredConf = null;
String localDirPath = null;
File localFolderObj = null;
File xmlFileObj = null;
String confXMLFile = null;
Configuration initConf = new Configuration(getConf());
Enumeration<String> e = props.keys();
while (e.hasMoreElements()) {
String propKey = e.nextElement();
Object propValue = props.get(propKey);
initConf.set(propKey,propValue.toString());
}
localDirPath = getHadoopLocalConfDir();
localFolderObj = new File(localDirPath);
if (!localFolderObj.exists()) {
localFolderObj.mkdir();
}
confXMLFile = localDirPath + File.separator + configFile;
xmlFileObj = new File(confXMLFile);
initConf.writeXml(new FileOutputStream(xmlFileObj));
newConfDir = clusterManager.pushConfig(localDirPath);
stop();
waitForClusterToStop();
clusterManager.start(newConfDir);
waitForClusterToStart();
localFolderObj.delete();
}
/**
* It uses to restart the cluster with default configuration.<br/>
* @throws IOException if an I/O error occurs.
*/
public void restart() throws
IOException {
stop();
waitForClusterToStop();
start();
waitForClusterToStart();
cleanupNewConf(newConfDir);
}
/**
* It uses to delete the new configuration folder.
* @param path - configuration directory path.
* @throws IOException if an I/O error occurs.
*/
public void cleanupNewConf(String path) throws IOException {
File file = new File(path);
file.delete();
}
/**
* It uses to wait until the cluster is stopped.<br/>
* @throws IOException if an I/O error occurs.
*/
public void waitForClusterToStop() throws
IOException {
List<Thread> chkDaemonStop = new ArrayList<Thread>();
for (List<AbstractDaemonClient> set : daemons.values()) {
for (AbstractDaemonClient daemon : set) {
DaemonStopThread dmStop = new DaemonStopThread(daemon);
chkDaemonStop.add(dmStop);
dmStop.start();
}
}
for (Thread daemonThread : chkDaemonStop){
try {
daemonThread.join();
} catch(InterruptedException intExp) {
LOG.warn("Interrupted while thread is joining." + intExp.getMessage());
}
}
}
/**
* It uses to wait until the cluster is started.<br/>
* @throws IOException if an I/O error occurs.
*/
public void waitForClusterToStart() throws
IOException {
List<Thread> chkDaemonStart = new ArrayList<Thread>();
for (List<AbstractDaemonClient> set : daemons.values()) {
for (AbstractDaemonClient daemon : set) {
DaemonStartThread dmStart = new DaemonStartThread(daemon);
chkDaemonStart.add(dmStart);;
dmStart.start();
}
}
for (Thread daemonThread : chkDaemonStart){
try {
daemonThread.join();
} catch(InterruptedException intExp) {
LOG.warn("Interrupted while thread is joining" + intExp.getMessage());
}
}
}
/**
* It waits for specified amount of time.
* @param duration time in milliseconds.
* @throws InterruptedException if any thread interrupted the current
* thread while it is waiting for a notification.
*/
public void waitFor(long duration) {
try {
synchronized (waitLock) {
waitLock.wait(duration);
}
} catch (InterruptedException intExp) {
LOG.warn("Interrrupeted while thread is waiting" + intExp.getMessage());
}
}
class DaemonStartThread extends Thread {
private AbstractDaemonClient daemon;
public DaemonStartThread(AbstractDaemonClient daemon) {
this.daemon = daemon;
}
public void run(){
LOG.info("Waiting for Daemon " + daemon.getHostName()
+ " to come up.....");
while (true) {
try {
daemon.ping();
LOG.info("Daemon is : " + daemon.getHostName() + " pinging...");
break;
} catch (Exception exp) {
if(LOG.isDebugEnabled()) {
LOG.debug(daemon.getHostName() + " is waiting to come up.");
}
waitFor(60000);
}
}
}
}
class DaemonStopThread extends Thread {
private AbstractDaemonClient daemon;
public DaemonStopThread(AbstractDaemonClient daemon) {
this.daemon = daemon;
}
public void run() {
LOG.info("Waiting for Daemon " + daemon.getHostName()
+ " to stop.....");
while (true) {
try {
daemon.ping();
if(LOG.isDebugEnabled()) {
LOG.debug(daemon.getHostName() +" is waiting state to stop.");
}
waitFor(60000);
} catch (Exception exp) {
LOG.info("Daemon is : " + daemon.getHostName() + " stopped...");
break;
}
}
}
}
}