blob: 24b6cac4243f5438c5d5c8109419ca50ce365eb9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.blob;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import org.apache.commons.io.FileUtils;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.store.NoLockFactory;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CachingDirectoryFactory;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.DirectoryFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// TODO now: still failing tests (run CoreSynonymLoadTest no SolrCloud) because we have to support
// removing dir and old
// indexes in BlobStore. See all "TODO now"
public class BlobDirectoryFactory extends CachingDirectoryFactory {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private DirectoryFactory delegateFactory;
private String delegateLockType;
private String blobPath;
// Parameters for MMapDirectory
// TODO: Change DirectoryFactory.get() upstream to allow us to provide a Function<Directory,
// Directory> to wrap the
// directory when it is created. This would unblock the delegation of DirectoryFactory here. And
// we could get rid
// of these params, we could simply delegate to a delegateFactory instead.
private boolean unmapHack;
private boolean preload;
private int maxChunk;
@Override
public void initCoreContainer(CoreContainer cc) {
super.initCoreContainer(cc);
if (delegateFactory != null) {
delegateFactory.initCoreContainer(cc);
}
// blobListingManager = BlobListingManager.getInstance(cc, "/blobDirListings");
}
@Override
public void init(NamedList args) {
super.init(args);
SolrParams params = args.toSolrParams();
String delegateFactoryClass = params.get("delegateFactory");
if (delegateFactoryClass == null) {
throw new IllegalArgumentException("delegateFactory class is required");
}
delegateFactory =
coreContainer.getResourceLoader().newInstance(delegateFactoryClass, DirectoryFactory.class);
delegateFactory.initCoreContainer(coreContainer);
delegateFactory.init(args);
delegateLockType = params.get("delegateLockType");
if (delegateLockType == null) {
throw new IllegalArgumentException("delegateLockType is required");
}
blobPath = params.get("blobPath");
if (blobPath == null) {
throw new IllegalArgumentException("blobPath is required");
}
maxChunk = params.getInt("maxChunkSize", MMapDirectory.DEFAULT_MAX_CHUNK_SIZE);
if (maxChunk <= 0) {
throw new IllegalArgumentException("maxChunk must be greater than 0");
}
unmapHack = params.getBool("unmap", true);
preload = params.getBool("preload", false); // default turn-off
}
@Override
public void doneWithDirectory(Directory directory) throws IOException {
log.debug("doneWithDirectory {}", directory);
((BlobDirectory) directory).release();
// TODO delegateFactory.doneWithDirectory(directory);
super.doneWithDirectory(directory);
}
@Override
public void close() throws IOException {
log.debug("close");
// TODO delegateFactory.close();
super.close();
}
@Override
protected LockFactory createLockFactory(String rawLockType) throws IOException {
return rawLockType.equals(DirectoryFactory.LOCK_TYPE_NONE)
? NoLockFactory.INSTANCE
: NativeFSLockFactory.INSTANCE;
// TODO return rawLockType.equals(DirectoryFactory.LOCK_TYPE_NONE) ? NoLockFactory.INSTANCE :
// DELEGATE_LOCK_FACTORY;
}
@Override
protected Directory create(String path, LockFactory lockFactory, DirContext dirContext)
throws IOException {
log.debug("Create Directory {}", path);
MMapDirectory mapDirectory = new MMapDirectory(new File(path).toPath(), lockFactory, maxChunk);
try {
mapDirectory.setUseUnmap(unmapHack);
} catch (IllegalArgumentException e) {
log.warn("Unmap not supported on this JVM, continuing on without setting unmap", e);
}
mapDirectory.setPreload(preload);
Directory delegateDirectory = mapDirectory;
// TODO
// String delegateLockType = lockFactory == NoLockFactory.INSTANCE ?
// DirectoryFactory.LOCK_TYPE_NONE : this.delegateLockType;
// Directory delegateDirectory = delegateFactory.get(path, dirContext, delegateLockType);
BlobPusher blobPusher =
null; // nocommit new BlobPusher(new BlobStore(blobPath, path));//TODO now: Reuse BlobPusher
return new BlobDirectory(delegateDirectory, blobPusher);
}
@Override
public boolean exists(String path) throws IOException {
boolean exists = super.exists(path);
log.debug("exists {} = {}", path, exists);
return exists;
// TODO return delegateFactory.exists(path);
}
@Override
protected void removeDirectory(CacheValue cacheValue) throws IOException {
log.debug("removeDirectory {}", cacheValue);
File dirFile = new File(cacheValue.path);
FileUtils.deleteDirectory(dirFile);
// TODO delegateFactory.remove(cacheValue.path);
BlobPusher blobPusher =
null; // nocommit new BlobPusher(new BlobStore(blobPath, cacheValue.path));//TODO now: Reuse
// BlobPusher
// TODO now: blobPusher.deleteDirectory();
}
@Override
public void move(Directory fromDir, Directory toDir, String fileName, IOContext ioContext)
throws IOException {
// TODO: override for efficiency?
log.debug("move {} {} to {}", fromDir, fileName, toDir);
super.move(fromDir, toDir, fileName, ioContext);
}
@Override
public void renameWithOverwrite(Directory dir, String fileName, String toName)
throws IOException {
// TODO: override to perform an atomic rename if possible?
log.debug("renameWithOverwrite {} {} to {}", dir, fileName, toName);
super.renameWithOverwrite(dir, fileName, toName);
}
@Override
public boolean isPersistent() {
return true;
}
@Override
public boolean isSharedStorage() {
return true;
}
@Override
public void release(Directory directory) throws IOException {
log.debug("release {}", directory);
((BlobDirectory) directory).release();
// TODO delegateFactory.release(directory);
super.release(directory);
}
@Override
public boolean isAbsolute(String path) {
boolean isAbsolute = new File(path).isAbsolute();
log.debug("isAbsolute {} = {}", path, isAbsolute);
return isAbsolute;
// TODO return delegateFactory.isAbsolute(path);
}
@Override
public boolean searchersReserveCommitPoints() {
return false; // TODO: double check
}
@Override
public String getDataHome(CoreDescriptor cd) throws IOException {
String dataHome = super.getDataHome(cd);
log.debug("getDataHome {}", dataHome);
return dataHome;
}
@Override
public void cleanupOldIndexDirectories(
final String dataDirPath, final String currentIndexDirPath, boolean afterCoreReload) {
log.debug("cleanupOldIndexDirectories {} {}", dataDirPath, currentIndexDirPath);
super.cleanupOldIndexDirectories(dataDirPath, currentIndexDirPath, afterCoreReload);
// TODO now: cleanup with BlobPusher
}
}