blob: 06714d66a1abfd19c17be050ba012533e09ff933 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.nar.hadoop;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.nar.NarProvider;
import org.apache.nifi.nar.NarProviderInitializationContext;
import org.apache.nifi.nar.hadoop.util.ExtensionFilter;
import org.apache.nifi.processors.hadoop.ExtendedConfiguration;
import org.apache.nifi.processors.hadoop.HdfsResources;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.SocketFactory;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@RequiresInstanceClassLoading(cloneAncestorResources = true)
public class HDFSNarProvider implements NarProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(HDFSNarProvider.class);
private static final String RESOURCES_PARAMETER = "resources";
private static final String SOURCE_DIRECTORY_PARAMETER = "source.directory";
private static final String STORAGE_LOCATION = "storage.location";
private static final String KERBEROS_PRINCIPAL_PARAMETER = "kerberos.principal";
private static final String KERBEROS_KEYTAB_PARAMETER = "kerberos.keytab";
private static final String KERBEROS_PASSWORD_PARAMETER = "kerberos.password";
private static final String NAR_EXTENSION = "nar";
private static final String DELIMITER = "/";
private static final int BUFFER_SIZE_DEFAULT = 4096;
private static final Object RESOURCES_LOCK = new Object();
private static final String STORAGE_LOCATION_PROPERTY = "fs.defaultFS";
private volatile List<String> resources = null;
private volatile Path sourceDirectory = null;
private volatile String storageLocation = null;
private volatile NarProviderInitializationContext context;
private volatile boolean initialized = false;
public void initialize(final NarProviderInitializationContext context) {
resources = Arrays.stream(Objects.requireNonNull(
context.getProperties().get(RESOURCES_PARAMETER)).split(",")).map(s -> s.trim()).filter(s -> !s.isEmpty()).collect(Collectors.toList());
if (resources.isEmpty()) {
throw new IllegalArgumentException("At least one HDFS configuration resource is necessary");
}
final String sourceDirectory = context.getProperties().get(SOURCE_DIRECTORY_PARAMETER);
if (sourceDirectory == null || sourceDirectory.isEmpty()) {
throw new IllegalArgumentException("Provider needs the source directory to be set");
}
this.sourceDirectory = new Path(sourceDirectory);
this.storageLocation = context.getProperties().get(STORAGE_LOCATION);
this.context = context;
this.initialized = true;
}
@Override
public Collection<String> listNars() throws IOException {
if (!initialized) {
LOGGER.error("Provider is not initialized");
}
final HdfsResources hdfsResources = getHdfsResources();
try {
final FileStatus[] fileStatuses = hdfsResources.getUserGroupInformation()
.doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfsResources.getFileSystem().listStatus(sourceDirectory, new ExtensionFilter(NAR_EXTENSION)));
final List<String> result = Arrays.stream(fileStatuses)
.filter(fileStatus -> fileStatus.isFile())
.map(fileStatus -> fileStatus.getPath().getName())
.collect(Collectors.toList());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("The following NARs were found: " + String.join(", ", result));
}
return result;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Provider cannot list NARs", e);
}
}
@Override
public InputStream fetchNarContents(final String location) throws IOException {
if (!initialized) {
LOGGER.error("Provider is not initialized");
}
final Path path = getNarLocation(location);
final HdfsResources hdfsResources = getHdfsResources();
try {
if (hdfsResources.getUserGroupInformation().doAs((PrivilegedExceptionAction<Boolean>) () -> !hdfsResources.getFileSystem().exists(path))) {
throw new IOException("Provider cannot find " + location);
}
return hdfsResources.getUserGroupInformation()
.doAs((PrivilegedExceptionAction<FSDataInputStream>) () -> hdfsResources.getFileSystem().open(path, BUFFER_SIZE_DEFAULT));
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Error during acquiring file", e);
}
}
private Path getNarLocation(final String location) {
String result = sourceDirectory.toString();
if (!result.endsWith(DELIMITER)) {
result += DELIMITER;
}
return new Path(result + location);
}
private HdfsResources getHdfsResources() throws IOException {
final Configuration config = new ExtendedConfiguration(LOGGER);
config.setClassLoader(Thread.currentThread().getContextClassLoader());
for (final String resource : resources) {
config.addResource(new Path(resource));
}
// If storage location property is set, the original location will be overwritten
if (storageLocation != null) {
config.set(STORAGE_LOCATION_PROPERTY, storageLocation);
}
// first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout
checkHdfsUriForTimeout(config);
// disable caching of Configuration and FileSystem objects, else we cannot reconfigure the processor without a complete restart
final String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme());
config.set(disableCacheName, "true");
// If kerberos is enabled, create the file system as the kerberos principal
// -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
FileSystem fs;
UserGroupInformation ugi;
KerberosUser kerberosUser;
synchronized (RESOURCES_LOCK) {
if (SecurityUtil.isSecurityEnabled(config)) {
final String principal = context.getProperties().get(KERBEROS_PRINCIPAL_PARAMETER);
final String keyTab = context.getProperties().get(KERBEROS_KEYTAB_PARAMETER);
final String password = context.getProperties().get(KERBEROS_PASSWORD_PARAMETER);
if (keyTab != null) {
kerberosUser = new KerberosKeytabUser(principal, keyTab);
} else if (password != null) {
kerberosUser = new KerberosPasswordUser(principal, password);
} else {
throw new IOException("Unable to authenticate with Kerberos, no keytab or password was provided");
}
ugi = SecurityUtil.getUgiForKerberosUser(config, kerberosUser);
} else {
config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
config.set("hadoop.security.authentication", "simple");
ugi = SecurityUtil.loginSimple(config);
kerberosUser = null;
}
fs = getFileSystemAsUser(config, ugi);
}
LOGGER.debug("resetHDFSResources UGI [{}], KerberosUser [{}]", new Object[]{ugi, kerberosUser});
final Path workingDir = fs.getWorkingDirectory();
LOGGER.debug("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
new Object[]{workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config.toString()});
if (!fs.exists(sourceDirectory)) {
throw new IllegalArgumentException("Source directory is not existing");
}
return new HdfsResources(config, fs, ugi, kerberosUser);
}
private void checkHdfsUriForTimeout(final Configuration config) throws IOException {
final URI hdfsUri = FileSystem.getDefaultUri(config);
final String address = hdfsUri.getAuthority();
final int port = hdfsUri.getPort();
if (address == null || address.isEmpty() || port < 0) {
return;
}
final InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
final SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
Socket socket = null;
try {
socket = socketFactory.createSocket();
NetUtils.connect(socket, namenode, 1000); // 1 second timeout
} finally {
IOUtils.closeQuietly(socket);
}
}
private FileSystem getFileSystemAsUser(final Configuration config, final UserGroupInformation ugi) throws IOException {
try {
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
return FileSystem.get(config);
}
});
} catch (final InterruptedException e) {
throw new IOException("Unable to create file system: " + e.getMessage(), e);
}
}
}