blob: 28e0bd4395e1f1aa38551c4828f24415a90b81a4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.hdfs3;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.nio.charset.Charset;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.pulsar.io.hdfs3.sink.HdfsSinkConfig;
/**
* A Simple abstract class for HDFS connectors.
* Provides methods for connecting to HDFS
*/
public abstract class AbstractHdfsConnector {
private static final Object RESOURCES_LOCK = new Object();
// Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
protected final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>();
protected AbstractHdfsConfig connectorConfig;
protected CompressionCodecFactory compressionCodecFactory;
public AbstractHdfsConnector() {
hdfsResources.set(new HdfsResources(null, null, null));
}
/*
* Reset Hadoop Configuration and FileSystem based on the supplied configuration resources.
*/
protected HdfsResources resetHDFSResources(HdfsSinkConfig hdfsSinkConfig) throws IOException {
Configuration config = new ExtendedConfiguration();
config.setClassLoader(Thread.currentThread().getContextClassLoader());
getConfig(config, connectorConfig.getHdfsConfigResources());
// first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout
checkHdfsUriForTimeout(config);
/* Disable caching of Configuration and FileSystem objects, else we cannot reconfigure
* the processor without a complete restart
*/
String disableCacheName = String.format("fs.%s.impl.disable.cache",
FileSystem.getDefaultUri(config).getScheme());
config.set(disableCacheName, "true");
// If kerberos is enabled, create the file system as the kerberos principal
// -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
FileSystem fs;
UserGroupInformation ugi;
synchronized (RESOURCES_LOCK) {
if (SecurityUtil.isSecurityEnabled(config)) {
ugi = SecurityUtil.loginKerberos(config,
connectorConfig.getKerberosUserPrincipal(), connectorConfig.getKeytab());
fs = getFileSystemAsUser(config, ugi);
} else {
config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
config.set("hadoop.security.authentication", "simple");
ugi = SecurityUtil.loginSimple(config);
fs = getFileSystemAsUser(config, ugi);
}
}
return new HdfsResources(config, fs, ugi);
}
private static Configuration getConfig(final Configuration config, String res) throws IOException {
boolean foundResources = false;
if (null != res) {
String[] resources = res.split(",");
for (String resource : resources) {
config.addResource(new Path(resource.trim()));
foundResources = true;
}
}
if (!foundResources) {
// check that at least 1 non-default resource is available on the classpath
String configStr = config.toString();
for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) {
if (!resource.contains("default") && config.getResource(resource.trim()) != null) {
foundResources = true;
break;
}
}
}
if (!foundResources) {
throw new IOException("Could not find any of the " + res + " on the classpath");
}
return config;
}
/*
* Reduce the timeout of a socket connection from the default in FileSystem.get()
*/
protected void checkHdfsUriForTimeout(Configuration config) throws IOException {
URI hdfsUri = FileSystem.getDefaultUri(config);
String address = hdfsUri.getAuthority();
int port = hdfsUri.getPort();
if (address == null || address.isEmpty() || port < 0) {
return;
}
InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
Socket socket = null;
try {
socket = socketFactory.createSocket();
NetUtils.connect(socket, namenode, 1000); // 1 second timeout
} finally {
IOUtils.closeQuietly(socket);
}
}
/**
* This exists in order to allow unit tests to override it so that they don't take several
* minutes waiting for UDP packets to be received.
*
* @param config
* the configuration to use
* @return the FileSystem that is created for the given Configuration
* @throws IOException
* if unable to create the FileSystem
*/
protected FileSystem getFileSystem(final Configuration config) throws IOException {
return FileSystem.get(config);
}
protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException {
try {
return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> FileSystem.get(config));
} catch (InterruptedException e) {
throw new IOException("Unable to create file system: " + e.getMessage());
}
}
protected Configuration getConfiguration() {
return hdfsResources.get().getConfiguration();
}
protected FileSystem getFileSystem() {
return hdfsResources.get().getFileSystem();
}
protected UserGroupInformation getUserGroupInformation() {
return hdfsResources.get().getUserGroupInformation();
}
protected String getEncoding() {
return StringUtils.isNotBlank(connectorConfig.getEncoding())
? connectorConfig.getEncoding() : Charset.defaultCharset().name();
}
protected CompressionCodec getCompressionCodec() {
if (connectorConfig.getCompression() == null) {
return null;
}
CompressionCodec codec = getCompressionCodecFactory()
.getCodecByName(connectorConfig.getCompression().name());
return (codec != null) ? codec : new DefaultCodec();
}
protected CompressionCodecFactory getCompressionCodecFactory() {
if (compressionCodecFactory == null) {
compressionCodecFactory = new CompressionCodecFactory(getConfiguration());
}
return compressionCodecFactory;
}
/**
* Extending Hadoop Configuration to prevent it from caching classes that can't be found. Since users may be
* adding additional JARs to the classpath we don't want them to have to restart the JVM to be able to load
* something that was previously not found, but might now be available.
* Reference the original getClassByNameOrNull from Configuration.
*/
static class ExtendedConfiguration extends Configuration {
private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>> cacheClasses = new WeakHashMap<>();
@Override
public Class<?> getClassByNameOrNull(String name) {
final ClassLoader classLoader = getClassLoader();
Map<String, WeakReference<Class<?>>> map;
synchronized (cacheClasses) {
map = cacheClasses.get(classLoader);
if (map == null) {
map = Collections.synchronizedMap(new WeakHashMap<>());
cacheClasses.put(classLoader, map);
}
}
Class<?> clazz = null;
WeakReference<Class<?>> ref = map.get(name);
if (ref != null) {
clazz = ref.get();
}
if (clazz == null) {
try {
clazz = Class.forName(name, true, classLoader);
} catch (ClassNotFoundException | NoClassDefFoundError e) {
return null;
}
// two putters can race here, but they'll put the same class
map.put(name, new WeakReference<>(clazz));
return clazz;
} else {
// cache hit
return clazz;
}
}
}
}