blob: 341c8115ef65b431314d189186b20acc29aef4df [file] [log] [blame]
package org.apache.solr.core;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.net.URI;
import java.net.URLEncoder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.store.blockcache.BlockCache;
import org.apache.solr.store.blockcache.BlockDirectory;
import org.apache.solr.store.blockcache.BlockDirectoryCache;
import org.apache.solr.store.blockcache.BufferStore;
import org.apache.solr.store.blockcache.Cache;
import org.apache.solr.store.blockcache.Metrics;
import org.apache.solr.store.hdfs.HdfsDirectory;
import org.apache.solr.util.HdfsUtil;
import org.apache.solr.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HdfsDirectoryFactory extends CachingDirectoryFactory {
public static Logger LOG = LoggerFactory
.getLogger(HdfsDirectoryFactory.class);
public static final String BLOCKCACHE_SLAB_COUNT = "solr.hdfs.blockcache.slab.count";
public static final String BLOCKCACHE_DIRECT_MEMORY_ALLOCATION = "solr.hdfs.blockcache.direct.memory.allocation";
public static final String BLOCKCACHE_ENABLED = "solr.hdfs.blockcache.enabled";
public static final String BLOCKCACHE_READ_ENABLED = "solr.hdfs.blockcache.read.enabled";
public static final String BLOCKCACHE_WRITE_ENABLED = "solr.hdfs.blockcache.write.enabled";
public static final String NRTCACHINGDIRECTORY_ENABLE = "solr.hdfs.nrtcachingdirectory.enable";
public static final String NRTCACHINGDIRECTORY_MAXMERGESIZEMB = "solr.hdfs.nrtcachingdirectory.maxmergesizemb";
public static final String NRTCACHINGDIRECTORY_MAXCACHEMB = "solr.hdfs.nrtcachingdirectory.maxcachedmb";
public static final String NUMBEROFBLOCKSPERBANK = "solr.hdfs.blockcache.blocksperbank";
public static final String KERBEROS_ENABLED = "solr.hdfs.security.kerberos.enabled";
public static final String KERBEROS_KEYTAB = "solr.hdfs.security.kerberos.keytabfile";
public static final String KERBEROS_PRINCIPAL = "solr.hdfs.security.kerberos.principal";
public static final String HDFS_HOME = "solr.hdfs.home";
public static final String CONFIG_DIRECTORY = "solr.hdfs.confdir";
private SolrParams params;
private String hdfsDataDir;
private String confDir;
public static Metrics metrics;
private static Boolean kerberosInit;
@Override
public void init(NamedList args) {
params = SolrParams.toSolrParams(args);
this.hdfsDataDir = params.get(HDFS_HOME);
if (this.hdfsDataDir != null && this.hdfsDataDir.length() == 0) {
this.hdfsDataDir = null;
}
boolean kerberosEnabled = params.getBool(KERBEROS_ENABLED, false);
LOG.info("Solr Kerberos Authentication "
+ (kerberosEnabled ? "enabled" : "disabled"));
if (kerberosEnabled) {
initKerberos();
}
}
@Override
protected Directory create(String path, DirContext dirContext)
throws IOException {
LOG.info("creating directory factory for path {}", path);
Configuration conf = getConf();
if (metrics == null) {
metrics = new Metrics(conf);
}
boolean blockCacheEnabled = params.getBool(BLOCKCACHE_ENABLED, true);
boolean blockCacheReadEnabled = params.getBool(BLOCKCACHE_READ_ENABLED,
true);
boolean blockCacheWriteEnabled = params.getBool(BLOCKCACHE_WRITE_ENABLED, true);
Directory dir = null;
if (blockCacheEnabled && dirContext != DirContext.META_DATA) {
int numberOfBlocksPerBank = params.getInt(NUMBEROFBLOCKSPERBANK, 16384);
int blockSize = BlockDirectory.BLOCK_SIZE;
int bankCount = params.getInt(BLOCKCACHE_SLAB_COUNT, 1);
boolean directAllocation = params.getBool(
BLOCKCACHE_DIRECT_MEMORY_ALLOCATION, true);
BlockCache blockCache;
int slabSize = numberOfBlocksPerBank * blockSize;
LOG.info(
"Number of slabs of block cache [{}] with direct memory allocation set to [{}]",
bankCount, directAllocation);
LOG.info(
"Block cache target memory usage, slab size of [{}] will allocate [{}] slabs and use ~[{}] bytes",
new Object[] {slabSize, bankCount,
((long) bankCount * (long) slabSize)});
int _1024Size = params.getInt("solr.hdfs.blockcache.bufferstore.1024",
8192);
int _8192Size = params.getInt("solr.hdfs.blockcache.bufferstore.8192",
8192);
BufferStore.init(_1024Size, _8192Size, metrics);
long totalMemory = (long) bankCount * (long) numberOfBlocksPerBank
* (long) blockSize;
try {
blockCache = new BlockCache(metrics, directAllocation, totalMemory,
slabSize, blockSize);
} catch (OutOfMemoryError e) {
throw new RuntimeException(
"The max direct memory is likely too low. Either increase it (by adding -XX:MaxDirectMemorySize=<size>g -XX:+UseLargePages to your containers startup args)"
+ " or disable direct allocation using solr.hdfs.blockcache.direct.memory.allocation=false in solrconfig.xml. If you are putting the block cache on the heap,"
+ " your java heap size might not be large enough."
+ " Failed allocating ~" + totalMemory / 1000000.0 + " MB.", e);
}
Cache cache = new BlockDirectoryCache(blockCache, metrics);
HdfsDirectory hdfsDirectory = new HdfsDirectory(new Path(path), conf);
dir = new BlockDirectory("solrcore", hdfsDirectory, cache, null,
blockCacheReadEnabled, blockCacheWriteEnabled);
} else {
dir = new HdfsDirectory(new Path(path), conf);
}
boolean nrtCachingDirectory = params.getBool(NRTCACHINGDIRECTORY_ENABLE, true);
if (nrtCachingDirectory) {
double nrtCacheMaxMergeSizeMB = params.getInt(
NRTCACHINGDIRECTORY_MAXMERGESIZEMB, 16);
double nrtCacheMaxCacheMB = params.getInt(NRTCACHINGDIRECTORY_MAXCACHEMB,
192);
return new NRTCachingDirectory(dir, nrtCacheMaxMergeSizeMB,
nrtCacheMaxCacheMB);
}
return dir;
}
@Override
public boolean exists(String path) {
Path hdfsDirPath = new Path(path);
Configuration conf = getConf();
FileSystem fileSystem = null;
try {
fileSystem = FileSystem.newInstance(hdfsDirPath.toUri(), conf);
return fileSystem.exists(hdfsDirPath);
} catch (IOException e) {
LOG.error("Error checking if hdfs path exists", e);
throw new RuntimeException("Error checking if hdfs path exists", e);
} finally {
IOUtils.closeQuietly(fileSystem);
}
}
private Configuration getConf() {
Configuration conf = new Configuration();
confDir = params.get(CONFIG_DIRECTORY, null);
HdfsUtil.addHdfsResources(conf, confDir);
return conf;
}
protected synchronized void removeDirectory(CacheValue cacheValue)
throws IOException {
Configuration conf = getConf();
FileSystem fileSystem = null;
try {
fileSystem = FileSystem.newInstance(new URI(cacheValue.path), conf);
boolean success = fileSystem.delete(new Path(cacheValue.path), true);
if (!success) {
throw new RuntimeException("Could not remove directory");
}
} catch (Exception e) {
LOG.error("Could not remove directory", e);
throw new SolrException(ErrorCode.SERVER_ERROR,
"Could not remove directory", e);
} finally {
IOUtils.closeQuietly(fileSystem);
}
}
@Override
public boolean isAbsolute(String path) {
return path.startsWith("hdfs:/");
}
@Override
public boolean isPersistent() {
return true;
}
@Override
public boolean searchersReserveCommitPoints() {
return true;
}
@Override
public String getDataHome(CoreDescriptor cd) throws IOException {
if (hdfsDataDir == null) {
throw new SolrException(ErrorCode.SERVER_ERROR, "You must set the "
+ this.getClass().getSimpleName() + " param " + HDFS_HOME
+ " for relative dataDir paths to work");
}
// by default, we go off the instance directory
String path;
if (cd.getCloudDescriptor() != null) {
path = URLEncoder.encode(cd.getCloudDescriptor().getCollectionName(),
"UTF-8")
+ "/"
+ URLEncoder.encode(cd.getCloudDescriptor().getCoreNodeName(),
"UTF-8");
} else {
path = cd.getName();
}
return normalize(SolrResourceLoader.normalizeDir(ZkController
.trimLeadingAndTrailingSlashes(hdfsDataDir)
+ "/"
+ path
+ "/"
+ cd.getDataDir()));
}
public String getConfDir() {
return confDir;
}
private void initKerberos() {
String keytabFile = params.get(KERBEROS_KEYTAB, "").trim();
if (keytabFile.length() == 0) {
throw new IllegalArgumentException(KERBEROS_KEYTAB + " required because "
+ KERBEROS_ENABLED + " set to true");
}
String principal = params.get(KERBEROS_PRINCIPAL, "");
if (principal.length() == 0) {
throw new IllegalArgumentException(KERBEROS_PRINCIPAL
+ " required because " + KERBEROS_ENABLED + " set to true");
}
synchronized (HdfsDirectoryFactory.class) {
if (kerberosInit == null) {
kerberosInit = new Boolean(true);
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(conf);
LOG.info(
"Attempting to acquire kerberos ticket with keytab: {}, principal: {} ",
keytabFile, principal);
try {
UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
LOG.info("Got Kerberos ticket");
}
}
}
}