blob: cfb567e63124c7db21fed38a0268f2c3b51485f3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.lib.service.hadoop;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.lib.server.BaseService;
import org.apache.hadoop.lib.server.ServiceException;
import org.apache.hadoop.lib.service.FileSystemAccess;
import org.apache.hadoop.lib.service.FileSystemAccessException;
import org.apache.hadoop.lib.service.Instrumentation;
import org.apache.hadoop.lib.service.Scheduler;
import org.apache.hadoop.lib.util.Check;
import org.apache.hadoop.lib.util.ConfigurationUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.VersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@InterfaceAudience.Private
public class FileSystemAccessService extends BaseService implements FileSystemAccess {
private static final Logger LOG = LoggerFactory.getLogger(FileSystemAccessService.class);
public static final String PREFIX = "hadoop";
private static final String INSTRUMENTATION_GROUP = "hadoop";
public static final String AUTHENTICATION_TYPE = "authentication.type";
public static final String KERBEROS_KEYTAB = "authentication.kerberos.keytab";
public static final String KERBEROS_PRINCIPAL = "authentication.kerberos.principal";
public static final String FS_CACHE_PURGE_FREQUENCY = "filesystem.cache.purge.frequency";
public static final String FS_CACHE_PURGE_TIMEOUT = "filesystem.cache.purge.timeout";
public static final String NAME_NODE_WHITELIST = "name.node.whitelist";
public static final String HADOOP_CONF_DIR = "config.dir";
private static final String[] HADOOP_CONF_FILES = {"core-site.xml", "hdfs-site.xml"};
private static final String FILE_SYSTEM_SERVICE_CREATED = "FileSystemAccessService.created";
private static class CachedFileSystem {
private FileSystem fs;
private long lastUse;
private long timeout;
private int count;
public CachedFileSystem(long timeout) {
this.timeout = timeout;
lastUse = -1;
count = 0;
}
synchronized FileSystem getFileSytem(Configuration conf)
throws IOException {
if (fs == null) {
fs = FileSystem.get(conf);
}
lastUse = -1;
count++;
return fs;
}
synchronized void release() throws IOException {
count--;
if (count == 0) {
if (timeout == 0) {
fs.close();
fs = null;
lastUse = -1;
}
else {
lastUse = System.currentTimeMillis();
}
}
}
// to avoid race conditions in the map cache adding removing entries
// an entry in the cache remains forever, it just closes/opens filesystems
// based on their utilization. Worse case scenario, the penalty we'll
// pay is that the amount of entries in the cache will be the total
// number of users in HDFS (which seems a resonable overhead).
synchronized boolean purgeIfIdle() throws IOException {
boolean ret = false;
if (count == 0 && lastUse != -1 &&
(System.currentTimeMillis() - lastUse) > timeout) {
fs.close();
fs = null;
lastUse = -1;
ret = true;
}
return ret;
}
}
public FileSystemAccessService() {
super(PREFIX);
}
private Collection<String> nameNodeWhitelist;
Configuration serviceHadoopConf;
private AtomicInteger unmanagedFileSystems = new AtomicInteger();
private ConcurrentHashMap<String, CachedFileSystem> fsCache =
new ConcurrentHashMap<String, CachedFileSystem>();
private long purgeTimeout;
@Override
protected void init() throws ServiceException {
LOG.info("Using FileSystemAccess JARs version [{}]", VersionInfo.getVersion());
String security = getServiceConfig().get(AUTHENTICATION_TYPE, "simple").trim();
if (security.equals("kerberos")) {
String defaultName = getServer().getName();
String keytab = System.getProperty("user.home") + "/" + defaultName + ".keytab";
keytab = getServiceConfig().get(KERBEROS_KEYTAB, keytab).trim();
if (keytab.length() == 0) {
throw new ServiceException(FileSystemAccessException.ERROR.H01, KERBEROS_KEYTAB);
}
String principal = defaultName + "/localhost@LOCALHOST";
principal = getServiceConfig().get(KERBEROS_PRINCIPAL, principal).trim();
if (principal.length() == 0) {
throw new ServiceException(FileSystemAccessException.ERROR.H01, KERBEROS_PRINCIPAL);
}
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(conf);
try {
UserGroupInformation.loginUserFromKeytab(principal, keytab);
} catch (IOException ex) {
throw new ServiceException(FileSystemAccessException.ERROR.H02, ex.getMessage(), ex);
}
LOG.info("Using FileSystemAccess Kerberos authentication, principal [{}] keytab [{}]", principal, keytab);
} else if (security.equals("simple")) {
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "simple");
UserGroupInformation.setConfiguration(conf);
LOG.info("Using FileSystemAccess simple/pseudo authentication, principal [{}]", System.getProperty("user.name"));
} else {
throw new ServiceException(FileSystemAccessException.ERROR.H09, security);
}
String hadoopConfDirProp = getServiceConfig().get(HADOOP_CONF_DIR, getServer().getConfigDir());
File hadoopConfDir = new File(hadoopConfDirProp).getAbsoluteFile();
if (hadoopConfDir == null) {
hadoopConfDir = new File(getServer().getConfigDir()).getAbsoluteFile();
}
if (!hadoopConfDir.exists()) {
throw new ServiceException(FileSystemAccessException.ERROR.H10, hadoopConfDir);
}
try {
serviceHadoopConf = loadHadoopConf(hadoopConfDir);
} catch (IOException ex) {
throw new ServiceException(FileSystemAccessException.ERROR.H11, ex.toString(), ex);
}
LOG.debug("FileSystemAccess FileSystem configuration:");
for (Map.Entry entry : serviceHadoopConf) {
LOG.debug(" {} = {}", entry.getKey(), entry.getValue());
}
setRequiredServiceHadoopConf(serviceHadoopConf);
nameNodeWhitelist = toLowerCase(getServiceConfig().getTrimmedStringCollection(NAME_NODE_WHITELIST));
}
private Configuration loadHadoopConf(File dir) throws IOException {
Configuration hadoopConf = new Configuration(false);
for (String file : HADOOP_CONF_FILES) {
File f = new File(dir, file);
if (f.exists()) {
hadoopConf.addResource(new Path(f.getAbsolutePath()));
}
}
return hadoopConf;
}
@Override
public void postInit() throws ServiceException {
super.postInit();
Instrumentation instrumentation = getServer().get(Instrumentation.class);
instrumentation.addVariable(INSTRUMENTATION_GROUP, "unmanaged.fs", new Instrumentation.Variable<Integer>() {
@Override
public Integer getValue() {
return unmanagedFileSystems.get();
}
});
instrumentation.addSampler(INSTRUMENTATION_GROUP, "unmanaged.fs", 60, new Instrumentation.Variable<Long>() {
@Override
public Long getValue() {
return (long) unmanagedFileSystems.get();
}
});
Scheduler scheduler = getServer().get(Scheduler.class);
int purgeInterval = getServiceConfig().getInt(FS_CACHE_PURGE_FREQUENCY, 60);
purgeTimeout = getServiceConfig().getLong(FS_CACHE_PURGE_TIMEOUT, 60);
purgeTimeout = (purgeTimeout > 0) ? purgeTimeout : 0;
if (purgeTimeout > 0) {
scheduler.schedule(new FileSystemCachePurger(),
purgeInterval, purgeInterval, TimeUnit.SECONDS);
}
}
private class FileSystemCachePurger implements Runnable {
@Override
public void run() {
int count = 0;
for (CachedFileSystem cacheFs : fsCache.values()) {
try {
count += cacheFs.purgeIfIdle() ? 1 : 0;
} catch (Throwable ex) {
LOG.warn("Error while purging filesystem, " + ex.toString(), ex);
}
}
LOG.debug("Purged [{}} filesystem instances", count);
}
}
private Set<String> toLowerCase(Collection<String> collection) {
Set<String> set = new HashSet<String>();
for (String value : collection) {
set.add(value.toLowerCase());
}
return set;
}
@Override
public Class getInterface() {
return FileSystemAccess.class;
}
@Override
public Class[] getServiceDependencies() {
return new Class[]{Instrumentation.class, Scheduler.class};
}
protected UserGroupInformation getUGI(String user) throws IOException {
return UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
}
protected void setRequiredServiceHadoopConf(Configuration conf) {
conf.set("fs.hdfs.impl.disable.cache", "true");
}
private static final String HTTPFS_FS_USER = "httpfs.fs.user";
protected FileSystem createFileSystem(Configuration namenodeConf)
throws IOException {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
CachedFileSystem newCachedFS = new CachedFileSystem(purgeTimeout);
CachedFileSystem cachedFS = fsCache.putIfAbsent(user, newCachedFS);
if (cachedFS == null) {
cachedFS = newCachedFS;
}
Configuration conf = new Configuration(namenodeConf);
conf.set(HTTPFS_FS_USER, user);
return cachedFS.getFileSytem(conf);
}
protected void closeFileSystem(FileSystem fs) throws IOException {
if (fsCache.containsKey(fs.getConf().get(HTTPFS_FS_USER))) {
fsCache.get(fs.getConf().get(HTTPFS_FS_USER)).release();
}
}
protected void validateNamenode(String namenode) throws FileSystemAccessException {
if (nameNodeWhitelist.size() > 0 && !nameNodeWhitelist.contains("*")) {
if (!nameNodeWhitelist.contains(namenode.toLowerCase())) {
throw new FileSystemAccessException(FileSystemAccessException.ERROR.H05, namenode, "not in whitelist");
}
}
}
protected void checkNameNodeHealth(FileSystem fileSystem) throws FileSystemAccessException {
}
@Override
public <T> T execute(String user, final Configuration conf, final FileSystemExecutor<T> executor)
throws FileSystemAccessException {
Check.notEmpty(user, "user");
Check.notNull(conf, "conf");
Check.notNull(executor, "executor");
if (!conf.getBoolean(FILE_SYSTEM_SERVICE_CREATED, false)) {
throw new FileSystemAccessException(FileSystemAccessException.ERROR.H04);
}
if (conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) == null ||
conf.getTrimmed(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY).length() == 0) {
throw new FileSystemAccessException(FileSystemAccessException.ERROR.H06,
CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY);
}
try {
validateNamenode(
new URI(conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)).
getAuthority());
UserGroupInformation ugi = getUGI(user);
return ugi.doAs(new PrivilegedExceptionAction<T>() {
@Override
public T run() throws Exception {
FileSystem fs = createFileSystem(conf);
Instrumentation instrumentation = getServer().get(Instrumentation.class);
Instrumentation.Cron cron = instrumentation.createCron();
try {
checkNameNodeHealth(fs);
cron.start();
return executor.execute(fs);
} finally {
cron.stop();
instrumentation.addCron(INSTRUMENTATION_GROUP, executor.getClass().getSimpleName(), cron);
closeFileSystem(fs);
}
}
});
} catch (FileSystemAccessException ex) {
throw ex;
} catch (Exception ex) {
throw new FileSystemAccessException(FileSystemAccessException.ERROR.H03, ex);
}
}
public FileSystem createFileSystemInternal(String user, final Configuration conf)
throws IOException, FileSystemAccessException {
Check.notEmpty(user, "user");
Check.notNull(conf, "conf");
if (!conf.getBoolean(FILE_SYSTEM_SERVICE_CREATED, false)) {
throw new FileSystemAccessException(FileSystemAccessException.ERROR.H04);
}
try {
validateNamenode(
new URI(conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)).getAuthority());
UserGroupInformation ugi = getUGI(user);
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
return createFileSystem(conf);
}
});
} catch (IOException ex) {
throw ex;
} catch (FileSystemAccessException ex) {
throw ex;
} catch (Exception ex) {
throw new FileSystemAccessException(FileSystemAccessException.ERROR.H08, ex.getMessage(), ex);
}
}
@Override
public FileSystem createFileSystem(String user, final Configuration conf) throws IOException,
FileSystemAccessException {
unmanagedFileSystems.incrementAndGet();
return createFileSystemInternal(user, conf);
}
@Override
public void releaseFileSystem(FileSystem fs) throws IOException {
unmanagedFileSystems.decrementAndGet();
closeFileSystem(fs);
}
@Override
public Configuration getFileSystemConfiguration() {
Configuration conf = new Configuration(true);
ConfigurationUtils.copy(serviceHadoopConf, conf);
conf.setBoolean(FILE_SYSTEM_SERVICE_CREATED, true);
return conf;
}
}