blob: 4b97fe6994d0b1bafafdeee1efc0e7cbc5ecdc85 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.ha;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.service.HAServiceTracker;
import org.apache.tajo.service.ServiceTrackerException;
import org.apache.tajo.service.TajoMasterInfo;
import org.apache.tajo.util.FileUtil;
import org.apache.tajo.util.TUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
/**
* This implements HAService utilizing HDFS cluster. This saves master status to HDFS cluster.
*
*/
@SuppressWarnings("unused")
public class HdfsServiceTracker extends HAServiceTracker {
private static Log LOG = LogFactory.getLog(HdfsServiceTracker.class);
private TajoConf conf;
private FileSystem fs;
private String masterName;
private Path rootPath;
private Path haPath;
private Path activePath;
private Path backupPath;
private boolean isActiveMaster = false;
//thread which runs periodically to see the last time since a heartbeat is received.
private Thread checkerThread;
private volatile boolean stopped = false;
private int monitorInterval;
private String currentActiveMaster;
public HdfsServiceTracker(TajoConf conf) throws IOException {
this.conf = conf;
initSystemDirectory();
InetSocketAddress socketAddress = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort();
this.monitorInterval = conf.getIntVar(ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL);
}
private void initSystemDirectory() throws IOException {
// Get Tajo root dir
this.rootPath = TajoConf.getTajoRootDir(conf);
// Check Tajo root dir
this.fs = rootPath.getFileSystem(conf);
// Check and create Tajo system HA dir
haPath = TajoConf.getSystemHADir(conf);
if (!fs.exists(haPath)) {
fs.mkdirs(haPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
LOG.info("System HA dir '" + haPath + "' is created");
}
activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
if (!fs.exists(activePath)) {
fs.mkdirs(activePath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
LOG.info("System HA Active dir '" + activePath + "' is created");
}
backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
if (!fs.exists(backupPath)) {
fs.mkdirs(backupPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
LOG.info("System HA Backup dir '" + backupPath + "' is created");
}
}
private void startPingChecker() {
if (checkerThread == null) {
checkerThread = new Thread(new PingChecker());
checkerThread.setName("Ping Checker");
checkerThread.start();
}
}
/**
* It will creates the following form string. It includes
*
* <pre>
* {CLIENT_RPC_HOST:PORT}_{RESOURCE_TRACKER_HOST:PORT}_{CATALOG_HOST:PORT}_{MASTER_WEB_HOST:PORT}
* </pre>
*
* @throws IOException
*/
@Override
public void register() throws IOException {
// Check lock file
boolean lockResult = createLockFile();
String fileName = masterName.replaceAll(":", "_");
Path activeFile = new Path(activePath, fileName);
Path backupFile = new Path(backupPath, fileName);
// Set TajoMasterInfo object which has several rpc server addresses.
StringBuilder sb = new StringBuilder();
InetSocketAddress address = getHostAddress(HAConstants.MASTER_UMBILICAL_RPC_ADDRESS);
sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS);
sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
address = getHostAddress(HAConstants.CATALOG_ADDRESS);
sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS);
sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort());
// Phase 1: If there is not another active master, this try to become active master.
if (lockResult) {
fs.delete(backupFile, false);
createMasterFile(activeFile, sb);
currentActiveMaster = masterName;
writeSystemConf();
LOG.info(String.format("This is added to active master (%s)", masterName));
} else {
// Phase 2: If there is active master information, we need to check its status.
FileStatus[] files = fs.listStatus(activePath);
Path existingActiveFile = null;
if (files.length > 2) {
throw new ServiceTrackerException("Three or more than active master entries.");
}
for(FileStatus eachFile : files) {
if (!eachFile.getPath().getName().equals(HAConstants.ACTIVE_LOCK_FILE)) {
existingActiveFile = eachFile.getPath();
}
}
currentActiveMaster = existingActiveFile.getName().replaceAll("_", ":");
// Phase 3: If current active master is dead, this master should be active master.
if (!checkConnection(currentActiveMaster)) {
fs.delete(existingActiveFile, false);
fs.delete(backupFile, false);
createMasterFile(activeFile, sb);
currentActiveMaster = masterName;
LOG.info(String.format("This is added to active master (%s)", masterName));
} else {
// Phase 4: If current active master is alive, this master need to be backup master.
if (masterName.equals(currentActiveMaster)) {
LOG.info(String.format("This has already been added to active master (%s)", masterName));
} else {
if (fs.exists(backupFile)) {
LOG.info(String.format("This has already been added to backup masters (%s)", masterName));
} else {
createMasterFile(backupFile, sb);
LOG.info(String.format("This is added to backup master (%s)", masterName));
}
}
}
}
if(!isActiveMaster()) {
startPingChecker();
}
}
/**
* Storing the system configs
*
* @throws IOException
*/
private void writeSystemConf() throws IOException {
Path systemConfPath = TajoConf.getSystemConfPath(conf);
FSDataOutputStream out = FileSystem.create(fs, systemConfPath,
new FsPermission(TajoMaster.SYSTEM_CONF_FILE_PERMISSION));
try {
conf.writeXml(out);
} finally {
out.close();
}
fs.setReplication(systemConfPath, (short) conf.getIntVar(ConfVars.SYSTEM_CONF_REPLICA_COUNT));
}
private boolean createLockFile() throws IOException {
boolean result = false;
FSDataOutputStream lockOutput = null;
Path lockFile = new Path(activePath, HAConstants.ACTIVE_LOCK_FILE);
try {
lockOutput = fs.create(lockFile, false);
lockOutput.close();
fs.deleteOnExit(lockFile);
result = true;
} catch (FileAlreadyExistsException e) {
LOG.info(String.format("Lock file already exists at (%s)", lockFile.toString()));
result = false;
} catch (Exception e) {
throw new IOException("Lock file creation is failed - " + e.getMessage());
} finally {
FileUtil.cleanup(LOG, lockOutput);
}
return result;
}
private void createMasterFile(Path path, StringBuilder sb) throws IOException {
FSDataOutputStream out = null;
try {
out = fs.create(path, false);
out.writeUTF(sb.toString());
out.close();
fs.deleteOnExit(path);
} catch (Exception e) {
throw new IOException("File creation is failed - " + e.getMessage());
} finally {
FileUtil.cleanup(LOG, out);
}
}
private InetSocketAddress getHostAddress(int type) {
InetSocketAddress address = null;
switch (type) {
case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS:
address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
break;
case HAConstants.MASTER_CLIENT_RPC_ADDRESS:
address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
break;
case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS:
address = conf.getSocketAddrVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS);
break;
case HAConstants.CATALOG_ADDRESS:
address = conf.getSocketAddrVar(ConfVars.CATALOG_ADDRESS);
break;
case HAConstants.MASTER_INFO_ADDRESS:
address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_INFO_ADDRESS);
break;
default:
break;
}
if (address != null) {
return NetUtils.createSocketAddr(masterName.split(":")[0] + ":" + address.getPort());
} else {
return null;
}
}
@Override
public synchronized void delete() throws IOException {
stopped = true;
if (ShutdownHookManager.get().isShutdownInProgress()) return;
String fileName = masterName.replaceAll(":", "_");
fs.delete(new Path(activePath, fileName), false);
fs.delete(new Path(activePath, HAConstants.ACTIVE_LOCK_FILE), false);
fs.delete(new Path(backupPath, fileName), false);
}
@Override
public boolean isActiveMaster() {
if (currentActiveMaster.equals(masterName)) {
return true;
} else {
return false;
}
}
@Override
public List<TajoMasterInfo> getMasters() throws IOException {
List<TajoMasterInfo> list = TUtil.newList();
FileStatus[] files = fs.listStatus(activePath);
for(FileStatus status : files) {
if (!status.getPath().getName().equals(HAConstants.ACTIVE_LOCK_FILE)) {
list.add(getTajoMasterInfo(status.getPath(), true));
}
}
files = fs.listStatus(backupPath);
for (FileStatus status : files) {
list.add(getTajoMasterInfo(status.getPath(), false));
}
return list;
}
private TajoMasterInfo getTajoMasterInfo(Path path, boolean isActive) throws IOException {
String masterAddress = path.getName().replaceAll("_", ":");
boolean isAlive = checkConnection(masterAddress);
FSDataInputStream stream = fs.open(path);
String data = stream.readUTF();
stream.close();
String[] addresses = data.split("_");
TajoMasterInfo info = new TajoMasterInfo();
info.setTajoMasterAddress(NetUtils.createSocketAddr(addresses[0]));
info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[1]));
info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[2]));
info.setCatalogAddress(NetUtils.createSocketAddr(addresses[3]));
info.setWebServerAddress(NetUtils.createSocketAddr(addresses[4]));
info.setAvailable(isAlive);
info.setActive(isActive);
return info;
}
private class PingChecker implements Runnable {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(monitorInterval);
} catch (InterruptedException e) {
LOG.info("PingChecker interrupted. - masterName:" + masterName);
break;
}
synchronized (HdfsServiceTracker.this) {
try {
if (!currentActiveMaster.equals(masterName)) {
if (LOG.isDebugEnabled()) {
LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName);
}
// If active master is dead, this master should be active master instead of
// previous active master.
if (!stopped && !checkConnection(currentActiveMaster)) {
Path activeFile = new Path(activePath, currentActiveMaster.replaceAll(":", "_"));
fs.delete(activeFile, false);
Path lockFile = new Path(activePath, HAConstants.ACTIVE_LOCK_FILE);
fs.delete(lockFile, false);
register();
}
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
}
}
}
private final static int MASTER_UMBILICAL_RPC_ADDRESS = 0;
private final static int MASTER_CLIENT_RPC_ADDRESS = 1;
private final static int RESOURCE_TRACKER_RPC_ADDRESS = 2;
private final static int CATALOG_ADDRESS = 3;
private final static int MASTER_HTTP_INFO = 4;
private volatile InetSocketAddress umbilicalRpcAddr;
private volatile InetSocketAddress clientRpcAddr;
private volatile InetSocketAddress resourceTrackerRpcAddr;
private volatile InetSocketAddress catalogAddr;
private volatile InetSocketAddress masterHttpInfoAddr;
@Override
public InetSocketAddress getUmbilicalAddress() {
if (!checkConnection(umbilicalRpcAddr)) {
umbilicalRpcAddr = NetUtils.createSocketAddr(getAddressElements().get(MASTER_UMBILICAL_RPC_ADDRESS));
}
return umbilicalRpcAddr;
}
@Override
public InetSocketAddress getClientServiceAddress() {
if (!checkConnection(clientRpcAddr)) {
clientRpcAddr = NetUtils.createSocketAddr(getAddressElements().get(MASTER_CLIENT_RPC_ADDRESS));
}
return clientRpcAddr;
}
@Override
public InetSocketAddress getResourceTrackerAddress() {
if (!checkConnection(resourceTrackerRpcAddr)) {
resourceTrackerRpcAddr = NetUtils.createSocketAddr(getAddressElements().get(RESOURCE_TRACKER_RPC_ADDRESS));
}
return resourceTrackerRpcAddr;
}
@Override
public InetSocketAddress getCatalogAddress() {
if (!checkConnection(catalogAddr)) {
catalogAddr = NetUtils.createSocketAddr(getAddressElements().get(CATALOG_ADDRESS));
}
return catalogAddr;
}
@Override
public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException {
if (!checkConnection(masterHttpInfoAddr)) {
masterHttpInfoAddr = NetUtils.createSocketAddr(getAddressElements().get(MASTER_HTTP_INFO));
}
return masterHttpInfoAddr;
}
/**
* Reads a text file stored in HDFS file, and then return all service addresses read from a HDFS file. *
*
* @return all service addresses
* @throws ServiceTrackerException
*/
private synchronized List<String> getAddressElements() throws ServiceTrackerException {
try {
FileSystem fs = getFileSystem(conf);
Path activeMasterBaseDir = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
if (!fs.exists(activeMasterBaseDir)) {
throw new ServiceTrackerException("No such active master base path: " + activeMasterBaseDir);
}
if (!fs.isDirectory(activeMasterBaseDir)) {
throw new ServiceTrackerException("Active master base path must be a directory.");
}
/* wait for active master from HDFS */
int pause = conf.getIntVar(ConfVars.TAJO_MASTER_HA_CLIENT_RETRY_PAUSE_TIME);
int maxRetry = conf.getIntVar(ConfVars.TAJO_MASTER_HA_CLIENT_RETRY_MAX_NUM);
int retry = 0;
Path activeMasterEntry = null;
FileStatus[] files = null;
loop:while (retry < maxRetry) {
files = fs.listStatus(activeMasterBaseDir);
for (FileStatus eachFile : files) {
//check if active file is written
if (!eachFile.getPath().getName().equals(HAConstants.ACTIVE_LOCK_FILE) && eachFile.getLen() > 0) {
activeMasterEntry = eachFile.getPath();
break loop;
}
}
try {
this.wait(pause);
} catch (InterruptedException e) {
throw new ServiceTrackerException(e);
}
}
if (files == null || activeMasterEntry == null) {
throw new ServiceTrackerException("Active master entry cannot be found in: " + activeMasterBaseDir);
}
if (files.length < 1) {
LOG.error("Exceeded the maximum retry (" + maxRetry + ") to read TajoMaster address from HDFS");
throw new ServiceTrackerException("No active master entry");
} else if (files.length > 2) {
throw new ServiceTrackerException("Three or more than active master entries.");
}
if (!fs.isFile(activeMasterEntry)) {
throw new ServiceTrackerException("Active master entry must be a file, but it is a directory.");
}
List<String> addressElements = TUtil.newList();
FSDataInputStream stream = fs.open(activeMasterEntry);
String data = stream.readUTF();
stream.close();
addressElements.addAll(TUtil.newList(data.split("_"))); // Add remains entries to elements
// ensure the number of entries
Preconditions.checkState(addressElements.size() == 5, "Fewer service addresses than necessary.");
return addressElements;
} catch (Throwable t) {
throw new ServiceTrackerException(t);
}
}
@Override
public int getState(String masterName, TajoConf conf) throws ServiceTrackerException {
String targetMaster = masterName.replaceAll(":", "_");
int retValue = -1;
try {
FileSystem fs = getFileSystem(conf);
Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
Path temPath = null;
// Check backup masters
FileStatus[] files = fs.listStatus(backupPath);
for (FileStatus status : files) {
temPath = status.getPath();
if (temPath.getName().equals(targetMaster)) {
return 0;
}
}
// Check active master
files = fs.listStatus(activePath);
if (files.length == 1) {
temPath = files[0].getPath();
if (temPath.getName().equals(targetMaster)) {
return 1;
}
}
retValue = -2;
} catch (Exception e) {
throw new ServiceTrackerException("Cannot get HA state - ERROR:" + e.getMessage());
}
return retValue;
}
@Override
public int formatHA(TajoConf conf) throws ServiceTrackerException{
int retValue = -1;
try {
FileSystem fs = getFileSystem(conf);
Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
Path temPath = null;
int aliveMasterCount = 0;
// Check backup masters
FileStatus[] files = fs.listStatus(backupPath);
for (FileStatus eachFile : files) {
if (checkConnection(eachFile.getPath().getName(), "_")) {
aliveMasterCount++;
}
}
// Check active master
files = fs.listStatus(activePath);
for (FileStatus eachFile : files) {
if (!eachFile.getPath().getName().equals(HAConstants.ACTIVE_LOCK_FILE) &&
checkConnection(eachFile.getPath().getName(), "_")) {
aliveMasterCount++;
}
}
// If there is any alive master, users can't format storage.
if (aliveMasterCount > 0) {
return 0;
}
// delete ha path.
fs.delete(TajoConf.getSystemHADir(conf), true);
retValue = 1;
} catch (Exception e) {
throw new ServiceTrackerException("Cannot format HA directories - ERROR:" + e.getMessage());
}
return retValue;
}
@Override
public List<String> getMasters(TajoConf conf) throws ServiceTrackerException {
List<String> list = new ArrayList<String>();
try {
FileSystem fs = getFileSystem(conf);
Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
Path temPath = null;
// Check backup masters
FileStatus[] files = fs.listStatus(backupPath);
for (FileStatus status : files) {
temPath = status.getPath();
list.add(temPath.getName().replaceAll("_", ":"));
}
// Check active master
files = fs.listStatus(activePath);
if (files.length == 1) {
temPath = files[0].getPath();
list.add(temPath.getName().replaceAll("_", ":"));
}
} catch (Exception e) {
throw new ServiceTrackerException("Cannot get master lists - ERROR:" + e.getMessage());
}
return list;
}
private static FileSystem getFileSystem(TajoConf conf) throws IOException {
Path rootPath = TajoConf.getTajoRootDir(conf);
return rootPath.getFileSystem(conf);
}
}