blob: a94af9244aff092daa487472ff39377ba01a04eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.blob;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.solr.common.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BlobDirectory extends FilterDirectory {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final IOContext SYNC_IO_CONTEXT = new IOContext();
private final BlobPusher blobPusher;
/**
* Map of {@link BlobFileSupplier} for each file created by this directory. Keys are file names.
* Each {@link BlobFileSupplier} keeps a reference to the {@link IndexOutput} created for the
* file, to provide the checksums on {@link #sync(Collection)}. But it is able to free earlier the
* reference each time an {@link IndexOutput} is closed, by getting the checksum at that time.
*/
private final Map<String, BlobFileSupplier> blobFileSupplierMap;
private final Set<String> synchronizedFileNames;
private final Collection<String> deletedFileNames;
private volatile boolean isOpen;
public BlobDirectory(Directory delegate, BlobPusher blobPusher) {
super(delegate);
this.blobPusher = blobPusher;
blobFileSupplierMap = new HashMap<>();
synchronizedFileNames = new HashSet<>();
deletedFileNames = new ArrayList<>();
}
@Override
public void deleteFile(String name) throws IOException {
log.debug("deleteFile {}", name);
in.deleteFile(name);
deletedFileNames.add(name);
}
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
log.debug("createOutput {}", name);
IndexOutput indexOutput = in.createOutput(name, context);
BlobFileSupplier blobFileSupplier = new BlobFileSupplier(indexOutput);
blobFileSupplierMap.put(name, blobFileSupplier);
return new BlobIndexOutput(indexOutput, blobFileSupplier);
}
// createTempOutput(): We don't track tmp files since they are not synced.
@Override
public void sync(Collection<String> names) throws IOException {
log.debug("sync {}", names);
in.sync(names);
synchronizedFileNames.addAll(names);
}
@Override
public void rename(String source, String dest) throws IOException {
log.debug("rename {} to {}", source, dest);
in.rename(source, dest);
// Also rename the corresponding BlobFile.
BlobFileSupplier blobFileSupplier = blobFileSupplierMap.remove(source);
if (blobFileSupplier != null) {
blobFileSupplier.rename(source, dest);
blobFileSupplierMap.put(dest, blobFileSupplier);
}
// Also rename the tracked synchronized file.
if (synchronizedFileNames.remove(source)) {
synchronizedFileNames.add(dest);
}
}
@Override
public void syncMetaData() throws IOException {
log.debug("syncMetaData");
in.syncMetaData();
syncToBlobStore();
}
private void syncToBlobStore() throws IOException {
log.debug("File names to sync {}", synchronizedFileNames);
Collection<BlobFile> writes = new ArrayList<>(synchronizedFileNames.size());
for (String fileName : synchronizedFileNames) {
BlobFileSupplier blobFileSupplier = blobFileSupplierMap.get(fileName);
if (blobFileSupplier != null) {
// Only sync files that were synced since this directory was released. Previous files don't
// need to be synced.
writes.add(blobFileSupplier.getBlobFile());
}
}
log.debug("Sync to BlobStore writes={} deleted={}", writes, deletedFileNames);
blobPusher.push(writes, this::openInputStream, deletedFileNames);
synchronizedFileNames.clear();
deletedFileNames.clear();
}
private InputStream openInputStream(BlobFile blobFile) throws IOException {
return new IndexInputInputStream(in.openInput(blobFile.fileName(), SYNC_IO_CONTEXT));
}
public void release() {
log.debug("release");
blobFileSupplierMap.clear();
synchronizedFileNames.clear();
deletedFileNames.clear();
}
// obtainLock(): We get the delegate Directory lock.
@Override
public void close() {
log.debug("close");
isOpen = false;
IOUtils.closeQuietly(in);
IOUtils.closeQuietly(blobPusher);
}
@Override
public String toString() {
return getClass().getSimpleName() + "(" + in.toString() + ")";
}
@Override
protected void ensureOpen() throws AlreadyClosedException {
if (!isOpen) {
throw new AlreadyClosedException("This Directory is closed");
}
}
/**
* Delegating {@link IndexOutput} that hooks the {@link #close()} method to compute the checksum.
* The goal is to free the reference to the delegate {@link IndexOutput} when it is closed because
* we only need it to get the checksum.
*/
private static class BlobIndexOutput extends FilterIndexOutput {
private final BlobFileSupplier blobFileSupplier;
BlobIndexOutput(IndexOutput delegate, BlobFileSupplier blobFileSupplier) {
super("Blob " + delegate.toString(), delegate.getName(), delegate);
this.blobFileSupplier = blobFileSupplier;
}
@Override
public void close() throws IOException {
blobFileSupplier.getBlobFileFromIndexOutput();
super.close();
}
}
/**
* Supplies the length and checksum of a file created in this directory. Keeps a reference to the
* file {@link IndexOutput} to be able to get its final length and checksum. However we try to
* free the reference as soon as we can (when the {@link IndexOutput} is closed so we know the
* content is final).
*/
private static class BlobFileSupplier {
IndexOutput indexOutput;
String name;
BlobFile blobFile;
BlobFileSupplier(IndexOutput indexOutput) {
this.indexOutput = indexOutput;
name = indexOutput.getName();
}
void rename(String source, String dest) {
assert name.equals(source);
name = dest;
if (blobFile != null) {
blobFile = new BlobFile(name, blobFile.size(), blobFile.checksum());
}
}
BlobFile getBlobFile() throws IOException {
if (blobFile == null) {
getBlobFileFromIndexOutput();
}
return blobFile;
}
/**
* Gets the {@link BlobFile} of the referenced {@link IndexOutput} and then frees the reference.
*/
void getBlobFileFromIndexOutput() throws IOException {
blobFile = new BlobFile(name, indexOutput.getFilePointer(), indexOutput.getChecksum());
// log.debug("Freeing IndexOutput {}", indexOutput);
indexOutput = null; // Free the reference since we have the checksum.
}
}
}