blob: 2571d447dcbd725e08620c69e4fbcf69864a3bb7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.core;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.store.NoLockFactory;
import org.apache.lucene.store.SingleInstanceLockFactory;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.metrics.SolrMetricProducer;
import org.apache.solr.metrics.SolrMetricsContext;
import org.apache.solr.store.blockcache.BlockCache;
import org.apache.solr.store.blockcache.BlockDirectory;
import org.apache.solr.store.blockcache.BlockDirectoryCache;
import org.apache.solr.store.blockcache.BufferStore;
import org.apache.solr.store.blockcache.Cache;
import org.apache.solr.store.blockcache.Metrics;
import org.apache.solr.store.hdfs.HdfsDirectory;
import org.apache.solr.store.hdfs.HdfsLocalityReporter;
import org.apache.solr.store.hdfs.HdfsLockFactory;
import org.apache.solr.util.HdfsUtil;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
/**
* @deprecated since 8.6; see <a href="https://cwiki.apache.org/confluence/display/SOLR/Deprecations">Deprecations</a>
*/
public class HdfsDirectoryFactory extends CachingDirectoryFactory implements SolrCoreAware, SolrMetricProducer {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String BLOCKCACHE_SLAB_COUNT = "solr.hdfs.blockcache.slab.count";
public static final String BLOCKCACHE_DIRECT_MEMORY_ALLOCATION = "solr.hdfs.blockcache.direct.memory.allocation";
public static final String BLOCKCACHE_ENABLED = "solr.hdfs.blockcache.enabled";
public static final String BLOCKCACHE_GLOBAL = "solr.hdfs.blockcache.global";
public static final String BLOCKCACHE_READ_ENABLED = "solr.hdfs.blockcache.read.enabled";
public static final String BLOCKCACHE_WRITE_ENABLED = "solr.hdfs.blockcache.write.enabled"; // currently buggy and disabled
public static final String NRTCACHINGDIRECTORY_ENABLE = "solr.hdfs.nrtcachingdirectory.enable";
public static final String NRTCACHINGDIRECTORY_MAXMERGESIZEMB = "solr.hdfs.nrtcachingdirectory.maxmergesizemb";
public static final String NRTCACHINGDIRECTORY_MAXCACHEMB = "solr.hdfs.nrtcachingdirectory.maxcachedmb";
public static final String NUMBEROFBLOCKSPERBANK = "solr.hdfs.blockcache.blocksperbank";
public static final String LOCALITYMETRICS_ENABLED = "solr.hdfs.locality.metrics.enabled";
public static final String KERBEROS_ENABLED = "solr.hdfs.security.kerberos.enabled";
public static final String KERBEROS_KEYTAB = "solr.hdfs.security.kerberos.keytabfile";
public static final String KERBEROS_PRINCIPAL = "solr.hdfs.security.kerberos.principal";
public static final String HDFS_HOME = "solr.hdfs.home";
public static final String CONFIG_DIRECTORY = "solr.hdfs.confdir";
public static final String CACHE_MERGES = "solr.hdfs.blockcache.cachemerges";
public static final String CACHE_READONCE = "solr.hdfs.blockcache.cachereadonce";
private SolrParams params;
private String hdfsDataDir;
private String confDir;
private boolean cacheReadOnce;
private boolean cacheMerges;
private static BlockCache globalBlockCache;
public static Metrics metrics;
private static Boolean kerberosInit;
// we use this cache for FileSystem instances when we don't have access to a long lived instance
private com.google.common.cache.Cache<String,FileSystem> tmpFsCache = CacheBuilder.newBuilder()
.concurrencyLevel(10)
.maximumSize(1000)
.expireAfterAccess(5, TimeUnit.MINUTES).removalListener(new RemovalListener<String,FileSystem>() {
@Override
public void onRemoval(RemovalNotification<String,FileSystem> rn) {
IOUtils.closeQuietly(rn.getValue());
}
})
.build();
private final static class MetricsHolder {
// [JCIP SE, Goetz, 16.6] Lazy initialization
// Won't load until MetricsHolder is referenced
public static final Metrics metrics = new Metrics();
}
@Override
public void close() throws IOException {
super.close();
Collection<FileSystem> values = tmpFsCache.asMap().values();
for (FileSystem fs : values) {
IOUtils.closeQuietly(fs);
}
tmpFsCache.invalidateAll();
tmpFsCache.cleanUp();
try {
SolrMetricProducer.super.close();
MetricsHolder.metrics.close();
LocalityHolder.reporter.close();
} catch (Exception e) {
throw new IOException(e);
}
}
private final static class LocalityHolder {
public static final HdfsLocalityReporter reporter = new HdfsLocalityReporter();
}
@Override
public void init(@SuppressWarnings("rawtypes") NamedList args) {
super.init(args);
params = args.toSolrParams();
this.hdfsDataDir = getConfig(HDFS_HOME, null);
if (this.hdfsDataDir != null && this.hdfsDataDir.length() == 0) {
this.hdfsDataDir = null;
} else {
log.info("{}={}", HDFS_HOME, this.hdfsDataDir);
}
cacheMerges = getConfig(CACHE_MERGES, false);
cacheReadOnce = getConfig(CACHE_READONCE, false);
boolean kerberosEnabled = getConfig(KERBEROS_ENABLED, false);
if (log.isInfoEnabled()) {
log.info("Solr Kerberos Authentication {}", (kerberosEnabled ? "enabled" : "disabled"));
}
if (kerberosEnabled) {
initKerberos();
}
}
@Override
protected LockFactory createLockFactory(String rawLockType) throws IOException {
if (null == rawLockType) {
rawLockType = DirectoryFactory.LOCK_TYPE_HDFS;
log.warn("No lockType configured, assuming '{}'.", rawLockType);
}
final String lockType = rawLockType.toLowerCase(Locale.ROOT).trim();
switch (lockType) {
case DirectoryFactory.LOCK_TYPE_HDFS:
return HdfsLockFactory.INSTANCE;
case DirectoryFactory.LOCK_TYPE_SINGLE:
return new SingleInstanceLockFactory();
case DirectoryFactory.LOCK_TYPE_NONE:
return NoLockFactory.INSTANCE;
default:
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unrecognized lockType: " + rawLockType);
}
}
@Override
@SuppressWarnings("resource")
protected Directory create(String path, LockFactory lockFactory, DirContext dirContext) throws IOException {
assert params != null : "init must be called before create";
log.info("creating directory factory for path {}", path);
Configuration conf = getConf(new Path(path));
if (metrics == null) {
metrics = MetricsHolder.metrics;
}
boolean blockCacheEnabled = getConfig(BLOCKCACHE_ENABLED, true);
boolean blockCacheGlobal = getConfig(BLOCKCACHE_GLOBAL, true);
boolean blockCacheReadEnabled = getConfig(BLOCKCACHE_READ_ENABLED, true);
final HdfsDirectory hdfsDir;
final Directory dir;
if (blockCacheEnabled && dirContext != DirContext.META_DATA) {
int numberOfBlocksPerBank = getConfig(NUMBEROFBLOCKSPERBANK, 16384);
int blockSize = BlockDirectory.BLOCK_SIZE;
int bankCount = getConfig(BLOCKCACHE_SLAB_COUNT, 1);
boolean directAllocation = getConfig(BLOCKCACHE_DIRECT_MEMORY_ALLOCATION, true);
int slabSize = numberOfBlocksPerBank * blockSize;
log.info(
"Number of slabs of block cache [{}] with direct memory allocation set to [{}]",
bankCount, directAllocation);
if (log.isInfoEnabled()) {
log.info(
"Block cache target memory usage, slab size of [{}] will allocate [{}] slabs and use ~[{}] bytes",
new Object[]{slabSize, bankCount,
((long) bankCount * (long) slabSize)});
}
int bsBufferSize = params.getInt("solr.hdfs.blockcache.bufferstore.buffersize", blockSize);
int bsBufferCount = params.getInt("solr.hdfs.blockcache.bufferstore.buffercount", 0); // this is actually total size
BlockCache blockCache = getBlockDirectoryCache(numberOfBlocksPerBank,
blockSize, bankCount, directAllocation, slabSize,
bsBufferSize, bsBufferCount, blockCacheGlobal);
Cache cache = new BlockDirectoryCache(blockCache, path, metrics, blockCacheGlobal);
int readBufferSize = params.getInt("solr.hdfs.blockcache.read.buffersize", blockSize);
hdfsDir = new HdfsDirectory(new Path(path), lockFactory, conf, readBufferSize);
dir = new BlockDirectory(path, hdfsDir, cache, null, blockCacheReadEnabled, false, cacheMerges, cacheReadOnce);
} else {
hdfsDir = new HdfsDirectory(new Path(path), conf);
dir = hdfsDir;
}
if (params.getBool(LOCALITYMETRICS_ENABLED, false)) {
LocalityHolder.reporter.registerDirectory(hdfsDir);
}
boolean nrtCachingDirectory = getConfig(NRTCACHINGDIRECTORY_ENABLE, true);
if (nrtCachingDirectory) {
double nrtCacheMaxMergeSizeMB = getConfig(NRTCACHINGDIRECTORY_MAXMERGESIZEMB, 16);
double nrtCacheMaxCacheMB = getConfig(NRTCACHINGDIRECTORY_MAXCACHEMB, 192);
return new NRTCachingDirectory(dir, nrtCacheMaxMergeSizeMB, nrtCacheMaxCacheMB);
}
return dir;
}
boolean getConfig(String name, boolean defaultValue) {
Boolean value = params.getBool(name);
if (value == null) {
String sysValue = System.getProperty(name);
if (sysValue != null) {
value = Boolean.valueOf(sysValue);
}
}
return value == null ? defaultValue : value;
}
int getConfig(String name, int defaultValue) {
Integer value = params.getInt(name);
if (value == null) {
String sysValue = System.getProperty(name);
if (sysValue != null) {
value = Integer.parseInt(sysValue);
}
}
return value == null ? defaultValue : value;
}
String getConfig(String name, String defaultValue) {
String value = params.get(name);
if (value == null) {
value = System.getProperty(name);
}
return value == null ? defaultValue : value;
}
private BlockCache getBlockDirectoryCache(int numberOfBlocksPerBank, int blockSize, int bankCount,
boolean directAllocation, int slabSize, int bufferSize, int bufferCount, boolean staticBlockCache) {
if (!staticBlockCache) {
log.info("Creating new single instance HDFS BlockCache");
return createBlockCache(numberOfBlocksPerBank, blockSize, bankCount, directAllocation, slabSize, bufferSize, bufferCount);
}
synchronized (HdfsDirectoryFactory.class) {
if (globalBlockCache == null) {
log.info("Creating new global HDFS BlockCache");
globalBlockCache = createBlockCache(numberOfBlocksPerBank, blockSize, bankCount,
directAllocation, slabSize, bufferSize, bufferCount);
}
}
return globalBlockCache;
}
private BlockCache createBlockCache(int numberOfBlocksPerBank, int blockSize,
int bankCount, boolean directAllocation, int slabSize, int bufferSize,
int bufferCount) {
BufferStore.initNewBuffer(bufferSize, bufferCount, metrics);
long totalMemory = (long) bankCount * (long) numberOfBlocksPerBank
* (long) blockSize;
BlockCache blockCache;
try {
blockCache = new BlockCache(metrics, directAllocation, totalMemory, slabSize, blockSize);
} catch (OutOfMemoryError e) {
throw new RuntimeException(
"The max direct memory is likely too low. Either increase it (by adding -XX:MaxDirectMemorySize=<size>g -XX:+UseLargePages to your containers startup args)"
+ " or disable direct allocation using solr.hdfs.blockcache.direct.memory.allocation=false in solrconfig.xml. If you are putting the block cache on the heap,"
+ " your java heap size might not be large enough."
+ " Failed allocating ~" + totalMemory / 1000000.0 + " MB.",
e);
}
return blockCache;
}
@Override
public boolean exists(String path) {
final Path hdfsDirPath = new Path(path);
FileSystem fileSystem = getCachedFileSystem(path);
try {
return fileSystem.exists(hdfsDirPath);
} catch (IOException e) {
log.error("Error checking if hdfs path exists", e);
throw new RuntimeException("Error checking if hdfs path exists", e);
}
}
public Configuration getConf(Path path) {
Configuration conf = new Configuration();
confDir = getConfig(CONFIG_DIRECTORY, null);
HdfsUtil.addHdfsResources(conf, confDir);
if(path != null) {
String fsScheme = path.toUri().getScheme();
if(fsScheme != null) {
conf.setBoolean("fs." + fsScheme + ".impl.disable.cache", true);
}
}
return conf;
}
protected synchronized void removeDirectory(final CacheValue cacheValue) {
FileSystem fileSystem = getCachedFileSystem(cacheValue.path);
try {
boolean success = fileSystem.delete(new Path(cacheValue.path), true);
if (!success) {
throw new RuntimeException("Could not remove directory");
}
} catch (Exception e) {
log.error("Could not remove directory", e);
throw new SolrException(ErrorCode.SERVER_ERROR,
"Could not remove directory", e);
}
}
@Override
public boolean isAbsolute(String path) {
if(path.startsWith("/")) {
return false;
}
return new Path(path).isAbsolute();
}
@Override
public boolean isPersistent() {
return true;
}
@Override
public boolean isSharedStorage() {
return true;
}
@Override
public boolean searchersReserveCommitPoints() {
return true;
}
@Override
public String getDataHome(CoreDescriptor cd) throws IOException {
if (hdfsDataDir == null) {
throw new SolrException(ErrorCode.SERVER_ERROR, "You must set the "
+ this.getClass().getSimpleName() + " param " + HDFS_HOME
+ " for relative dataDir paths to work");
}
// by default, we go off the instance directory
String path;
if (cd.getCloudDescriptor() != null) {
path = URLEncoder.encode(cd.getCloudDescriptor().getCollectionName(),
"UTF-8")
+ "/"
+ URLEncoder.encode(cd.getCloudDescriptor().getCoreNodeName(),
"UTF-8");
} else {
path = cd.getName();
}
return normalize(SolrPaths.normalizeDir(ZkController
.trimLeadingAndTrailingSlashes(hdfsDataDir)
+ "/"
+ path
+ "/"
+ cd.getDataDir()));
}
/**
* @param directory to calculate size of
* @return size in bytes
* @throws IOException on low level IO error
*/
@Override
public long size(Directory directory) throws IOException {
String hdfsDirPath = getPath(directory);
return size(hdfsDirPath);
}
/**
* @param path to calculate size of
* @return size in bytes
* @throws IOException on low level IO error
*/
@Override
public long size(String path) throws IOException {
Path hdfsDirPath = new Path(path);
FileSystem fileSystem = getCachedFileSystem(path);
try {
return fileSystem.getContentSummary(hdfsDirPath).getLength();
} catch (IOException e) {
log.error("Error checking if hdfs path exists", e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Error checking if hdfs path exists", e);
} finally {
IOUtils.closeQuietly(fileSystem);
}
}
private FileSystem getCachedFileSystem(String pathStr) {
try {
// no need to close the fs, the cache will do it
Path path = new Path(pathStr);
return tmpFsCache.get(pathStr, () -> FileSystem.get(path.toUri(), getConf(path)));
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
public String getConfDir() {
return confDir;
}
private void initKerberos() {
String keytabFile = getConfig(KERBEROS_KEYTAB, "").trim();
if (keytabFile.length() == 0) {
throw new IllegalArgumentException(KERBEROS_KEYTAB + " required because "
+ KERBEROS_ENABLED + " set to true");
}
String principal = getConfig(KERBEROS_PRINCIPAL, "");
if (principal.length() == 0) {
throw new IllegalArgumentException(KERBEROS_PRINCIPAL
+ " required because " + KERBEROS_ENABLED + " set to true");
}
synchronized (HdfsDirectoryFactory.class) {
if (kerberosInit == null) {
kerberosInit = Boolean.TRUE;
final Configuration conf = getConf(null);
final String authVal = conf.get(HADOOP_SECURITY_AUTHENTICATION);
final String kerberos = "kerberos";
if (authVal != null && !authVal.equals(kerberos)) {
throw new IllegalArgumentException(HADOOP_SECURITY_AUTHENTICATION
+ " set to: " + authVal + ", not kerberos, but attempting to "
+ " connect to HDFS via kerberos");
}
// let's avoid modifying the supplied configuration, just to be conservative
final Configuration ugiConf = new Configuration(getConf(null));
ugiConf.set(HADOOP_SECURITY_AUTHENTICATION, kerberos);
UserGroupInformation.setConfiguration(ugiConf);
log.info(
"Attempting to acquire kerberos ticket with keytab: {}, principal: {} ",
keytabFile, principal);
try {
UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
log.info("Got Kerberos ticket");
}
}
}
@Override
public void initializeMetrics(SolrMetricsContext parentContext, String scope) {
MetricsHolder.metrics.initializeMetrics(parentContext, scope);
LocalityHolder.reporter.initializeMetrics(parentContext, scope);
}
@Override
public void inform(SolrCore core) {
setHost(core.getCoreContainer().getHostName());
}
@VisibleForTesting
void setHost(String hostname) {
LocalityHolder.reporter.setHost(hostname);
}
@Override
public void cleanupOldIndexDirectories(final String dataDir, final String currentIndexDir, boolean afterReload) {
// Get the FileSystem object
final Path dataDirPath = new Path(dataDir);
FileSystem fileSystem = getCachedFileSystem(dataDir);
boolean pathExists = false;
try {
pathExists = fileSystem.exists(dataDirPath);
} catch (IOException e) {
log.error("Error checking if hdfs path exists {}", dataDir, e);
}
if (!pathExists) {
log.warn("{} does not point to a valid data directory; skipping clean-up of old index directories.", dataDir);
return;
}
final Path currentIndexDirPath = new Path(currentIndexDir); // make sure we don't delete the current
final FileSystem fs = fileSystem;
FileStatus[] oldIndexDirs = null;
try {
oldIndexDirs = fileSystem.listStatus(dataDirPath, new PathFilter() {
@Override
public boolean accept(Path path) {
boolean accept = false;
String pathName = path.getName();
try {
accept = fs.getFileStatus(path).isDirectory() && !path.equals(currentIndexDirPath) &&
(pathName.equals("index") || pathName.matches(INDEX_W_TIMESTAMP_REGEX));
} catch (IOException e) {
log.error("Error checking if path {} is an old index directory, caused by: {}", path, e);
}
return accept;
}
});
} catch (FileNotFoundException fnfe) {
// already deleted - ignore
log.debug("Old index directory already deleted - skipping...", fnfe);
} catch (IOException ioExc) {
log.error("Error checking for old index directories to clean-up.", ioExc);
}
if (oldIndexDirs == null || oldIndexDirs.length == 0)
return; // nothing to clean-up
List<Path> oldIndexPaths = new ArrayList<>(oldIndexDirs.length);
for (FileStatus ofs : oldIndexDirs) {
oldIndexPaths.add(ofs.getPath());
}
Collections.sort(oldIndexPaths, Collections.reverseOrder());
Set<String> livePaths = getLivePaths();
int i = 0;
if (afterReload) {
log.info("Will not remove most recent old directory on reload {}", oldIndexDirs[0]);
i = 1;
}
log.info("Found {} old index directories to clean-up under {} afterReload={}", oldIndexDirs.length - i, dataDirPath, afterReload);
for (; i < oldIndexPaths.size(); i++) {
Path oldDirPath = oldIndexPaths.get(i);
if (livePaths.contains(oldDirPath.toString())) {
log.warn("Cannot delete directory {} because it is still being referenced in the cache.", oldDirPath);
} else {
try {
if (fileSystem.delete(oldDirPath, true)) {
log.info("Deleted old index directory {}", oldDirPath);
} else {
log.warn("Failed to delete old index directory {}", oldDirPath);
}
} catch (IOException e) {
log.error("Failed to delete old index directory {} due to: {}", oldDirPath, e);
}
}
}
}
// perform an atomic rename if possible
public void renameWithOverwrite(Directory dir, String fileName, String toName) throws IOException {
String hdfsDirPath = getPath(dir);
FileContext fileContext = FileContext.getFileContext(getConf(new Path(hdfsDirPath)));
fileContext.rename(new Path(hdfsDirPath, fileName), new Path(hdfsDirPath, toName), Options.Rename.OVERWRITE);
}
@Override
public void move(Directory fromDir, Directory toDir, String fileName, IOContext ioContext) throws IOException {
Directory baseFromDir = getBaseDir(fromDir);
Directory baseToDir = getBaseDir(toDir);
if (baseFromDir instanceof HdfsDirectory && baseToDir instanceof HdfsDirectory) {
Path dir1 = ((HdfsDirectory) baseFromDir).getHdfsDirPath();
Path dir2 = ((HdfsDirectory) baseToDir).getHdfsDirPath();
Path file1 = new Path(dir1, fileName);
Path file2 = new Path(dir2, fileName);
FileContext fileContext = FileContext.getFileContext(getConf(dir1));
fileContext.rename(file1, file2);
return;
}
super.move(fromDir, toDir, fileName, ioContext);
}
}