| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.solr.blob; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.lang.invoke.MethodHandles; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.lucene.store.AlreadyClosedException; |
| import org.apache.lucene.store.Directory; |
| import org.apache.lucene.store.FilterDirectory; |
| import org.apache.lucene.store.IOContext; |
| import org.apache.lucene.store.IndexOutput; |
| import org.apache.solr.common.util.IOUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class BlobDirectory extends FilterDirectory { |
| |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| |
| private static final IOContext SYNC_IO_CONTEXT = new IOContext(); |
| |
| private final BlobPusher blobPusher; |
| |
| /** |
| * Map of {@link BlobFileSupplier} for each file created by this directory. Keys are file names. |
| * Each {@link BlobFileSupplier} keeps a reference to the {@link IndexOutput} created for the |
| * file, to provide the checksums on {@link #sync(Collection)}. But it is able to free earlier the |
| * reference each time an {@link IndexOutput} is closed, by getting the checksum at that time. |
| */ |
| private final Map<String, BlobFileSupplier> blobFileSupplierMap; |
| |
| private final Set<String> synchronizedFileNames; |
| |
| private final Collection<String> deletedFileNames; |
| |
| private volatile boolean isOpen; |
| |
| public BlobDirectory(Directory delegate, BlobPusher blobPusher) { |
| super(delegate); |
| this.blobPusher = blobPusher; |
| blobFileSupplierMap = new HashMap<>(); |
| synchronizedFileNames = new HashSet<>(); |
| deletedFileNames = new ArrayList<>(); |
| } |
| |
| @Override |
| public void deleteFile(String name) throws IOException { |
| log.debug("deleteFile {}", name); |
| in.deleteFile(name); |
| deletedFileNames.add(name); |
| } |
| |
| @Override |
| public IndexOutput createOutput(String name, IOContext context) throws IOException { |
| log.debug("createOutput {}", name); |
| IndexOutput indexOutput = in.createOutput(name, context); |
| BlobFileSupplier blobFileSupplier = new BlobFileSupplier(indexOutput); |
| blobFileSupplierMap.put(name, blobFileSupplier); |
| return new BlobIndexOutput(indexOutput, blobFileSupplier); |
| } |
| |
| // createTempOutput(): We don't track tmp files since they are not synced. |
| |
| @Override |
| public void sync(Collection<String> names) throws IOException { |
| log.debug("sync {}", names); |
| in.sync(names); |
| synchronizedFileNames.addAll(names); |
| } |
| |
| @Override |
| public void rename(String source, String dest) throws IOException { |
| log.debug("rename {} to {}", source, dest); |
| in.rename(source, dest); |
| // Also rename the corresponding BlobFile. |
| BlobFileSupplier blobFileSupplier = blobFileSupplierMap.remove(source); |
| if (blobFileSupplier != null) { |
| blobFileSupplier.rename(source, dest); |
| blobFileSupplierMap.put(dest, blobFileSupplier); |
| } |
| // Also rename the tracked synchronized file. |
| if (synchronizedFileNames.remove(source)) { |
| synchronizedFileNames.add(dest); |
| } |
| } |
| |
| @Override |
| public void syncMetaData() throws IOException { |
| log.debug("syncMetaData"); |
| in.syncMetaData(); |
| syncToBlobStore(); |
| } |
| |
| private void syncToBlobStore() throws IOException { |
| log.debug("File names to sync {}", synchronizedFileNames); |
| |
| Collection<BlobFile> writes = new ArrayList<>(synchronizedFileNames.size()); |
| for (String fileName : synchronizedFileNames) { |
| BlobFileSupplier blobFileSupplier = blobFileSupplierMap.get(fileName); |
| if (blobFileSupplier != null) { |
| // Only sync files that were synced since this directory was released. Previous files don't |
| // need to be synced. |
| writes.add(blobFileSupplier.getBlobFile()); |
| } |
| } |
| |
| log.debug("Sync to BlobStore writes={} deleted={}", writes, deletedFileNames); |
| blobPusher.push(writes, this::openInputStream, deletedFileNames); |
| synchronizedFileNames.clear(); |
| deletedFileNames.clear(); |
| } |
| |
| private InputStream openInputStream(BlobFile blobFile) throws IOException { |
| return new IndexInputInputStream(in.openInput(blobFile.fileName(), SYNC_IO_CONTEXT)); |
| } |
| |
| public void release() { |
| log.debug("release"); |
| blobFileSupplierMap.clear(); |
| synchronizedFileNames.clear(); |
| deletedFileNames.clear(); |
| } |
| |
| // obtainLock(): We get the delegate Directory lock. |
| |
| @Override |
| public void close() { |
| log.debug("close"); |
| isOpen = false; |
| IOUtils.closeQuietly(in); |
| IOUtils.closeQuietly(blobPusher); |
| } |
| |
| @Override |
| public String toString() { |
| return getClass().getSimpleName() + "(" + in.toString() + ")"; |
| } |
| |
| @Override |
| protected void ensureOpen() throws AlreadyClosedException { |
| if (!isOpen) { |
| throw new AlreadyClosedException("This Directory is closed"); |
| } |
| } |
| |
| /** |
| * Delegating {@link IndexOutput} that hooks the {@link #close()} method to compute the checksum. |
| * The goal is to free the reference to the delegate {@link IndexOutput} when it is closed because |
| * we only need it to get the checksum. |
| */ |
| private static class BlobIndexOutput extends FilterIndexOutput { |
| |
| private final BlobFileSupplier blobFileSupplier; |
| |
| BlobIndexOutput(IndexOutput delegate, BlobFileSupplier blobFileSupplier) { |
| super("Blob " + delegate.toString(), delegate.getName(), delegate); |
| this.blobFileSupplier = blobFileSupplier; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| blobFileSupplier.getBlobFileFromIndexOutput(); |
| super.close(); |
| } |
| } |
| |
| /** |
| * Supplies the length and checksum of a file created in this directory. Keeps a reference to the |
| * file {@link IndexOutput} to be able to get its final length and checksum. However we try to |
| * free the reference as soon as we can (when the {@link IndexOutput} is closed so we know the |
| * content is final). |
| */ |
| private static class BlobFileSupplier { |
| |
| IndexOutput indexOutput; |
| String name; |
| BlobFile blobFile; |
| |
| BlobFileSupplier(IndexOutput indexOutput) { |
| this.indexOutput = indexOutput; |
| name = indexOutput.getName(); |
| } |
| |
| void rename(String source, String dest) { |
| assert name.equals(source); |
| name = dest; |
| if (blobFile != null) { |
| blobFile = new BlobFile(name, blobFile.size(), blobFile.checksum()); |
| } |
| } |
| |
| BlobFile getBlobFile() throws IOException { |
| if (blobFile == null) { |
| getBlobFileFromIndexOutput(); |
| } |
| return blobFile; |
| } |
| |
| /** |
| * Gets the {@link BlobFile} of the referenced {@link IndexOutput} and then frees the reference. |
| */ |
| void getBlobFileFromIndexOutput() throws IOException { |
| blobFile = new BlobFile(name, indexOutput.getFilePointer(), indexOutput.getChecksum()); |
| // log.debug("Freeing IndexOutput {}", indexOutput); |
| indexOutput = null; // Free the reference since we have the checksum. |
| } |
| } |
| } |