SOLR-15051: Initial WIP
Co-authored-by: Bruno Roustant <broustant@salesforce.com>
diff --git a/gradle/maven/defaults-maven.gradle b/gradle/maven/defaults-maven.gradle
index 2662a69..f64ddf0 100644
--- a/gradle/maven/defaults-maven.gradle
+++ b/gradle/maven/defaults-maven.gradle
@@ -61,6 +61,7 @@
":solr:solrj",
":solr:contrib:analysis-extras",
":solr:contrib:analytics",
+ ":solr:contrib:blob-directory",
":solr:contrib:clustering",
":solr:contrib:extraction",
":solr:contrib:langid",
diff --git a/settings.gradle b/settings.gradle
index be2c09c..a683ddf 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -60,6 +60,7 @@
include "solr:server"
include "solr:contrib:analysis-extras"
include "solr:contrib:analytics"
+include "solr:contrib:blob-directory"
include "solr:contrib:clustering"
include "solr:contrib:extraction"
include "solr:contrib:langid"
diff --git a/solr/contrib/blob-directory/build.gradle b/solr/contrib/blob-directory/build.gradle
new file mode 100644
index 0000000..5fd53c5
--- /dev/null
+++ b/solr/contrib/blob-directory/build.gradle
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+apply plugin: 'java-library'
+
+description = 'A shared storage approach based on Lucene\'s Directory abstraction'
+
+dependencies {
+ implementation project(':solr:core')
+
+ testImplementation project(':solr:test-framework')
+}
diff --git a/solr/contrib/blob-directory/codeDiagram.png b/solr/contrib/blob-directory/codeDiagram.png
new file mode 100644
index 0000000..7afe3af
--- /dev/null
+++ b/solr/contrib/blob-directory/codeDiagram.png
Binary files differ
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectory.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectory.java
new file mode 100644
index 0000000..a94af92
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectory.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.solr.common.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BlobDirectory extends FilterDirectory {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final IOContext SYNC_IO_CONTEXT = new IOContext();
+
+ private final BlobPusher blobPusher;
+
+ /**
+ * Map of {@link BlobFileSupplier} for each file created by this directory. Keys are file names.
+ * Each {@link BlobFileSupplier} keeps a reference to the {@link IndexOutput} created for the
+ * file, to provide the checksums on {@link #sync(Collection)}. But it is able to free earlier the
+ * reference each time an {@link IndexOutput} is closed, by getting the checksum at that time.
+ */
+ private final Map<String, BlobFileSupplier> blobFileSupplierMap;
+
+ private final Set<String> synchronizedFileNames;
+
+ private final Collection<String> deletedFileNames;
+
+ private volatile boolean isOpen;
+
+ public BlobDirectory(Directory delegate, BlobPusher blobPusher) {
+ super(delegate);
+ this.blobPusher = blobPusher;
+ blobFileSupplierMap = new HashMap<>();
+ synchronizedFileNames = new HashSet<>();
+ deletedFileNames = new ArrayList<>();
+ }
+
+ @Override
+ public void deleteFile(String name) throws IOException {
+ log.debug("deleteFile {}", name);
+ in.deleteFile(name);
+ deletedFileNames.add(name);
+ }
+
+ @Override
+ public IndexOutput createOutput(String name, IOContext context) throws IOException {
+ log.debug("createOutput {}", name);
+ IndexOutput indexOutput = in.createOutput(name, context);
+ BlobFileSupplier blobFileSupplier = new BlobFileSupplier(indexOutput);
+ blobFileSupplierMap.put(name, blobFileSupplier);
+ return new BlobIndexOutput(indexOutput, blobFileSupplier);
+ }
+
+ // createTempOutput(): We don't track tmp files since they are not synced.
+
+ @Override
+ public void sync(Collection<String> names) throws IOException {
+ log.debug("sync {}", names);
+ in.sync(names);
+ synchronizedFileNames.addAll(names);
+ }
+
+ @Override
+ public void rename(String source, String dest) throws IOException {
+ log.debug("rename {} to {}", source, dest);
+ in.rename(source, dest);
+ // Also rename the corresponding BlobFile.
+ BlobFileSupplier blobFileSupplier = blobFileSupplierMap.remove(source);
+ if (blobFileSupplier != null) {
+ blobFileSupplier.rename(source, dest);
+ blobFileSupplierMap.put(dest, blobFileSupplier);
+ }
+ // Also rename the tracked synchronized file.
+ if (synchronizedFileNames.remove(source)) {
+ synchronizedFileNames.add(dest);
+ }
+ }
+
+ @Override
+ public void syncMetaData() throws IOException {
+ log.debug("syncMetaData");
+ in.syncMetaData();
+ syncToBlobStore();
+ }
+
+ private void syncToBlobStore() throws IOException {
+ log.debug("File names to sync {}", synchronizedFileNames);
+
+ Collection<BlobFile> writes = new ArrayList<>(synchronizedFileNames.size());
+ for (String fileName : synchronizedFileNames) {
+ BlobFileSupplier blobFileSupplier = blobFileSupplierMap.get(fileName);
+ if (blobFileSupplier != null) {
+ // Only sync files that were synced since this directory was released. Previous files don't
+ // need to be synced.
+ writes.add(blobFileSupplier.getBlobFile());
+ }
+ }
+
+ log.debug("Sync to BlobStore writes={} deleted={}", writes, deletedFileNames);
+ blobPusher.push(writes, this::openInputStream, deletedFileNames);
+ synchronizedFileNames.clear();
+ deletedFileNames.clear();
+ }
+
+ private InputStream openInputStream(BlobFile blobFile) throws IOException {
+ return new IndexInputInputStream(in.openInput(blobFile.fileName(), SYNC_IO_CONTEXT));
+ }
+
+ public void release() {
+ log.debug("release");
+ blobFileSupplierMap.clear();
+ synchronizedFileNames.clear();
+ deletedFileNames.clear();
+ }
+
+ // obtainLock(): We get the delegate Directory lock.
+
+ @Override
+ public void close() {
+ log.debug("close");
+ isOpen = false;
+ IOUtils.closeQuietly(in);
+ IOUtils.closeQuietly(blobPusher);
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "(" + in.toString() + ")";
+ }
+
+ @Override
+ protected void ensureOpen() throws AlreadyClosedException {
+ if (!isOpen) {
+ throw new AlreadyClosedException("This Directory is closed");
+ }
+ }
+
+ /**
+ * Delegating {@link IndexOutput} that hooks the {@link #close()} method to compute the checksum.
+ * The goal is to free the reference to the delegate {@link IndexOutput} when it is closed because
+ * we only need it to get the checksum.
+ */
+ private static class BlobIndexOutput extends FilterIndexOutput {
+
+ private final BlobFileSupplier blobFileSupplier;
+
+ BlobIndexOutput(IndexOutput delegate, BlobFileSupplier blobFileSupplier) {
+ super("Blob " + delegate.toString(), delegate.getName(), delegate);
+ this.blobFileSupplier = blobFileSupplier;
+ }
+
+ @Override
+ public void close() throws IOException {
+ blobFileSupplier.getBlobFileFromIndexOutput();
+ super.close();
+ }
+ }
+
+ /**
+ * Supplies the length and checksum of a file created in this directory. Keeps a reference to the
+ * file {@link IndexOutput} to be able to get its final length and checksum. However we try to
+ * free the reference as soon as we can (when the {@link IndexOutput} is closed so we know the
+ * content is final).
+ */
+ private static class BlobFileSupplier {
+
+ IndexOutput indexOutput;
+ String name;
+ BlobFile blobFile;
+
+ BlobFileSupplier(IndexOutput indexOutput) {
+ this.indexOutput = indexOutput;
+ name = indexOutput.getName();
+ }
+
+ void rename(String source, String dest) {
+ assert name.equals(source);
+ name = dest;
+ if (blobFile != null) {
+ blobFile = new BlobFile(name, blobFile.size(), blobFile.checksum());
+ }
+ }
+
+ BlobFile getBlobFile() throws IOException {
+ if (blobFile == null) {
+ getBlobFileFromIndexOutput();
+ }
+ return blobFile;
+ }
+
+ /**
+ * Gets the {@link BlobFile} of the referenced {@link IndexOutput} and then frees the reference.
+ */
+ void getBlobFileFromIndexOutput() throws IOException {
+ blobFile = new BlobFile(name, indexOutput.getFilePointer(), indexOutput.getChecksum());
+ // log.debug("Freeing IndexOutput {}", indexOutput);
+ indexOutput = null; // Free the reference since we have the checksum.
+ }
+ }
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectoryFactory.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectoryFactory.java
new file mode 100644
index 0000000..24b6cac
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectoryFactory.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.LockFactory;
+import org.apache.lucene.store.MMapDirectory;
+import org.apache.lucene.store.NativeFSLockFactory;
+import org.apache.lucene.store.NoLockFactory;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CachingDirectoryFactory;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.DirectoryFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO now: still failing tests (run CoreSynonymLoadTest no SolrCloud) because we have to support
+// removing dir and old
+// indexes in BlobStore. See all "TODO now"
+
+public class BlobDirectoryFactory extends CachingDirectoryFactory {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private DirectoryFactory delegateFactory;
+ private String delegateLockType;
+ private String blobPath;
+
+ // Parameters for MMapDirectory
+ // TODO: Change DirectoryFactory.get() upstream to allow us to provide a Function<Directory,
+ // Directory> to wrap the
+ // directory when it is created. This would unblock the delegation of DirectoryFactory here. And
+ // we could get rid
+ // of these params, we could simply delegate to a delegateFactory instead.
+ private boolean unmapHack;
+ private boolean preload;
+ private int maxChunk;
+
+ @Override
+ public void initCoreContainer(CoreContainer cc) {
+ super.initCoreContainer(cc);
+ if (delegateFactory != null) {
+ delegateFactory.initCoreContainer(cc);
+ }
+ // blobListingManager = BlobListingManager.getInstance(cc, "/blobDirListings");
+ }
+
+ @Override
+ public void init(NamedList args) {
+ super.init(args);
+ SolrParams params = args.toSolrParams();
+
+ String delegateFactoryClass = params.get("delegateFactory");
+ if (delegateFactoryClass == null) {
+ throw new IllegalArgumentException("delegateFactory class is required");
+ }
+ delegateFactory =
+ coreContainer.getResourceLoader().newInstance(delegateFactoryClass, DirectoryFactory.class);
+ delegateFactory.initCoreContainer(coreContainer);
+ delegateFactory.init(args);
+
+ delegateLockType = params.get("delegateLockType");
+ if (delegateLockType == null) {
+ throw new IllegalArgumentException("delegateLockType is required");
+ }
+
+ blobPath = params.get("blobPath");
+ if (blobPath == null) {
+ throw new IllegalArgumentException("blobPath is required");
+ }
+
+ maxChunk = params.getInt("maxChunkSize", MMapDirectory.DEFAULT_MAX_CHUNK_SIZE);
+ if (maxChunk <= 0) {
+ throw new IllegalArgumentException("maxChunk must be greater than 0");
+ }
+ unmapHack = params.getBool("unmap", true);
+ preload = params.getBool("preload", false); // default turn-off
+ }
+
+ @Override
+ public void doneWithDirectory(Directory directory) throws IOException {
+ log.debug("doneWithDirectory {}", directory);
+ ((BlobDirectory) directory).release();
+ // TODO delegateFactory.doneWithDirectory(directory);
+ super.doneWithDirectory(directory);
+ }
+
+ @Override
+ public void close() throws IOException {
+ log.debug("close");
+ // TODO delegateFactory.close();
+ super.close();
+ }
+
+ @Override
+ protected LockFactory createLockFactory(String rawLockType) throws IOException {
+ return rawLockType.equals(DirectoryFactory.LOCK_TYPE_NONE)
+ ? NoLockFactory.INSTANCE
+ : NativeFSLockFactory.INSTANCE;
+ // TODO return rawLockType.equals(DirectoryFactory.LOCK_TYPE_NONE) ? NoLockFactory.INSTANCE :
+ // DELEGATE_LOCK_FACTORY;
+ }
+
+ @Override
+ protected Directory create(String path, LockFactory lockFactory, DirContext dirContext)
+ throws IOException {
+ log.debug("Create Directory {}", path);
+ MMapDirectory mapDirectory = new MMapDirectory(new File(path).toPath(), lockFactory, maxChunk);
+ try {
+ mapDirectory.setUseUnmap(unmapHack);
+ } catch (IllegalArgumentException e) {
+ log.warn("Unmap not supported on this JVM, continuing on without setting unmap", e);
+ }
+ mapDirectory.setPreload(preload);
+ Directory delegateDirectory = mapDirectory;
+ // TODO
+ // String delegateLockType = lockFactory == NoLockFactory.INSTANCE ?
+ // DirectoryFactory.LOCK_TYPE_NONE : this.delegateLockType;
+ // Directory delegateDirectory = delegateFactory.get(path, dirContext, delegateLockType);
+ BlobPusher blobPusher =
+ null; // nocommit new BlobPusher(new BlobStore(blobPath, path));//TODO now: Reuse BlobPusher
+ return new BlobDirectory(delegateDirectory, blobPusher);
+ }
+
+ @Override
+ public boolean exists(String path) throws IOException {
+ boolean exists = super.exists(path);
+ log.debug("exists {} = {}", path, exists);
+ return exists;
+ // TODO return delegateFactory.exists(path);
+ }
+
+ @Override
+ protected void removeDirectory(CacheValue cacheValue) throws IOException {
+ log.debug("removeDirectory {}", cacheValue);
+ File dirFile = new File(cacheValue.path);
+ FileUtils.deleteDirectory(dirFile);
+ // TODO delegateFactory.remove(cacheValue.path);
+ BlobPusher blobPusher =
+ null; // nocommit new BlobPusher(new BlobStore(blobPath, cacheValue.path));//TODO now: Reuse
+ // BlobPusher
+ // TODO now: blobPusher.deleteDirectory();
+ }
+
+ @Override
+ public void move(Directory fromDir, Directory toDir, String fileName, IOContext ioContext)
+ throws IOException {
+ // TODO: override for efficiency?
+ log.debug("move {} {} to {}", fromDir, fileName, toDir);
+ super.move(fromDir, toDir, fileName, ioContext);
+ }
+
+ @Override
+ public void renameWithOverwrite(Directory dir, String fileName, String toName)
+ throws IOException {
+ // TODO: override to perform an atomic rename if possible?
+ log.debug("renameWithOverwrite {} {} to {}", dir, fileName, toName);
+ super.renameWithOverwrite(dir, fileName, toName);
+ }
+
+ @Override
+ public boolean isPersistent() {
+ return true;
+ }
+
+ @Override
+ public boolean isSharedStorage() {
+ return true;
+ }
+
+ @Override
+ public void release(Directory directory) throws IOException {
+ log.debug("release {}", directory);
+ ((BlobDirectory) directory).release();
+ // TODO delegateFactory.release(directory);
+ super.release(directory);
+ }
+
+ @Override
+ public boolean isAbsolute(String path) {
+ boolean isAbsolute = new File(path).isAbsolute();
+ log.debug("isAbsolute {} = {}", path, isAbsolute);
+ return isAbsolute;
+ // TODO return delegateFactory.isAbsolute(path);
+ }
+
+ @Override
+ public boolean searchersReserveCommitPoints() {
+ return false; // TODO: double check
+ }
+
+ @Override
+ public String getDataHome(CoreDescriptor cd) throws IOException {
+ String dataHome = super.getDataHome(cd);
+ log.debug("getDataHome {}", dataHome);
+ return dataHome;
+ }
+
+ @Override
+ public void cleanupOldIndexDirectories(
+ final String dataDirPath, final String currentIndexDirPath, boolean afterCoreReload) {
+ log.debug("cleanupOldIndexDirectories {} {}", dataDirPath, currentIndexDirPath);
+ super.cleanupOldIndexDirectories(dataDirPath, currentIndexDirPath, afterCoreReload);
+ // TODO now: cleanup with BlobPusher
+ }
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobFile.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobFile.java
new file mode 100644
index 0000000..6763483
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobFile.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.util.Objects;
+
+/** A file in Blob, consisting of a name, size, and checksum. */
+public class BlobFile {
+ protected final String fileName;
+ protected final long size;
+ protected final long checksum;
+
+ public BlobFile(String fileName, long size, long checksum) {
+ this.fileName = fileName;
+ this.size = size;
+ this.checksum = checksum;
+ }
+
+ public String fileName() {
+ return fileName;
+ }
+
+ public long size() {
+ return size;
+ }
+
+ public long checksum() {
+ return checksum;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof BlobFile)) return false;
+ BlobFile impl = (BlobFile) o;
+ return size == impl.size && checksum == impl.checksum && fileName.equals(impl.fileName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fileName, size, checksum);
+ }
+
+ @Override
+ public String toString() {
+ return fileName + " size=" + size + " chk=" + checksum;
+ }
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobListing.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobListing.java
new file mode 100644
index 0000000..bacf616
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobListing.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** A listing of files with references to files in other listings. Lucene/Solr agnostic. */
+public class BlobListing {
+
+ public static final class LocalEntry {
+ private final BlobFile blobFile;
+ /**
+ * Set of relative paths to other listings that refer to this file. The current listing is "."
+ * and is always first if present.
+ */
+ private final List<String> references;
+
+ public LocalEntry(BlobFile blobFile, List<String> references) {
+ this.blobFile = blobFile;
+ this.references = references;
+ }
+
+ public BlobFile getBlobFile() {
+ return blobFile;
+ }
+
+ public boolean localDeleted() {
+ return references.get(0).equals(".");
+ }
+
+ public List<String> references() {
+ return references;
+ }
+
+ @Override
+ public String toString() {
+ return "LocalEntry{" + "blobFile=" + blobFile + ", references=" + references + '}';
+ }
+
+ public LocalEntry copyWithRef(String path) {
+ final int idx = Collections.binarySearch(references, path);
+ assert idx < 0;
+ final int insertionPoint = -idx - 1;
+ ArrayList<String> newList = new ArrayList<>(references.size() + 1);
+ newList.add(insertionPoint, path);
+ return new LocalEntry(blobFile, newList);
+ }
+ }
+
+ public static final class RefEntry {
+ private final String fileName;
+ private final String sourcePath;
+
+ public RefEntry(String fileName, String sourcePath) {
+ this.fileName = fileName;
+ this.sourcePath = sourcePath;
+ }
+
+ public String fileName() {
+ return fileName;
+ }
+
+ public String sourcePath() {
+ return sourcePath;
+ }
+
+ @Override
+ public String toString() {
+ return "RefEntry{"
+ + "fileName='"
+ + fileName
+ + '\''
+ + ", sourcePath='"
+ + sourcePath
+ + '\''
+ + '}';
+ }
+ }
+
+ public static BlobListing fromJson(byte[] bytes) {
+ throw new UnsupportedOperationException("TODO"); // TODO
+ }
+
+ public byte[] toJson() {
+ throw new UnsupportedOperationException("TODO"); // TODO
+ }
+
+ private final Map<BlobFile, LocalEntry> localFiles;
+ private final Map<String, RefEntry> refFiles;
+
+ public BlobListing(Map<BlobFile, LocalEntry> localFiles, Map<String, RefEntry> refFiles) {
+ this.localFiles = localFiles;
+ this.refFiles = refFiles;
+
+ final Set<String> refFileSet = refFiles.keySet();
+ assert localFiles.keySet().stream().noneMatch(bf -> refFileSet.contains(bf.fileName()));
+ // TODO assert sorted
+ }
+
+ public LocalEntry lookupLocalEntry(BlobFile blobFile) {
+ return localFiles.get(blobFile);
+ }
+
+ public BlobFile lookupLocalBlobFile(String fileName) {
+ for (BlobFile blobFile : localFiles.keySet()) {
+ if (blobFile.fileName().equals(fileName)) {
+ return blobFile;
+ }
+ }
+ return null;
+ }
+
+ public RefEntry lookupRemoteEntry(String fileName) {
+ return refFiles.get(fileName);
+ }
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobPusher.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobPusher.java
new file mode 100644
index 0000000..324ca99
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobPusher.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.lucene.util.IOUtils;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Pushes a set of files to Blob, and works with listings. */
+public class BlobPusher implements Closeable {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ // WORK IN PROGRESS!!
+
+ private final BlobStore blobStore;
+ private final ExecutorService executor = ExecutorUtil.newMDCAwareCachedThreadPool("blobPusher");
+
+ public BlobPusher(BlobStore blobStore) {
+ this.blobStore = blobStore;
+ }
+
+ public void push(
+ Collection<BlobFile> writes,
+ IOUtils.IOFunction<BlobFile, InputStream> inputStreamSupplier,
+ Collection<String> deletes)
+ throws IOException {
+
+ // update "foreign" listings
+ // TODO David
+
+ // send files to BlobStore and delete our files too
+ log.debug("Pushing {}", writes);
+ executeAll(pushFiles(writes, inputStreamSupplier));
+ log.debug("Deleting {}", deletes);
+ deleteFiles(deletes);
+
+ // update "our" listing
+ // TODO David
+ }
+
+ private void executeAll(List<Callable<Void>> actions) throws IOException {
+ try {
+ for (Future<Void> future : executor.invokeAll(actions)) {
+ future.get();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private List<Callable<Void>> pushFiles(
+ Collection<BlobFile> blobFiles,
+ IOUtils.IOFunction<BlobFile, InputStream> inputStreamSupplier) {
+ return blobFiles.stream()
+ .map(
+ (blobFile) ->
+ (Callable<Void>)
+ () -> {
+ try (InputStream in = inputStreamSupplier.apply(blobFile)) {
+ blobStore.create(blobFile.fileName(), in, blobFile.size());
+ }
+ return null;
+ })
+ .collect(Collectors.toList());
+ }
+
+ private void deleteFiles(Collection<String> fileNames) throws IOException {
+ blobStore.delete(fileNames);
+ }
+
+ @Override
+ public void close() {
+ // TODO
+ }
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStore.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStore.java
new file mode 100644
index 0000000..b411d6d
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStore.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.io.Closeable;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.List;
+
+public abstract class BlobStore implements Closeable {
+
+ public abstract void create(String fileName, InputStream inputStream, long contentLength);
+
+ /** Delete blob path */
+ public abstract void delete(Collection<String> fileNames);
+
+ /** Read stream from blob path */
+ public abstract InputStream read(String blobPath);
+
+ /** List blob files */
+ public abstract List<String> list(String blobPath);
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/FilterIndexOutput.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/FilterIndexOutput.java
new file mode 100644
index 0000000..ff9331d
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/FilterIndexOutput.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.IndexOutput;
+
+public class FilterIndexOutput extends IndexOutput {
+
+ protected final IndexOutput delegate;
+
+ public FilterIndexOutput(String resourceDescription, String name, IndexOutput delegate) {
+ super(resourceDescription, name);
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+
+ @Override
+ public long getFilePointer() {
+ return delegate.getFilePointer();
+ }
+
+ @Override
+ public long getChecksum() throws IOException {
+ return delegate.getChecksum();
+ }
+
+ @Override
+ public void writeByte(byte b) throws IOException {
+ delegate.writeByte(b);
+ }
+
+ @Override
+ public void writeBytes(byte[] b, int offset, int length) throws IOException {
+ delegate.writeBytes(b, offset, length);
+ }
+
+ @Override
+ public void writeBytes(byte[] b, int length) throws IOException {
+ delegate.writeBytes(b, length);
+ }
+
+ @Override
+ public void writeInt(int i) throws IOException {
+ delegate.writeInt(i);
+ }
+
+ @Override
+ public void writeShort(short i) throws IOException {
+ delegate.writeShort(i);
+ }
+
+ @Override
+ public void writeLong(long i) throws IOException {
+ delegate.writeLong(i);
+ }
+
+ @Override
+ public void writeString(String s) throws IOException {
+ delegate.writeString(s);
+ }
+
+ @Override
+ public void copyBytes(DataInput input, long numBytes) throws IOException {
+ delegate.copyBytes(input, numBytes);
+ }
+
+ @Override
+ public void writeMapOfStrings(Map<String, String> map) throws IOException {
+ delegate.writeMapOfStrings(map);
+ }
+
+ @Override
+ public void writeSetOfStrings(Set<String> set) throws IOException {
+ delegate.writeSetOfStrings(set);
+ }
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/IndexInputInputStream.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/IndexInputInputStream.java
new file mode 100644
index 0000000..4dc1adc
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/IndexInputInputStream.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.lucene.store.IndexInput;
+
+/**
+ * An {@link InputStream} which wraps an {@link IndexInput}. Exact copy of {@code
+ * org.apache.lucene.replicator.IndexInputInputStream}.
+ */
+public final class IndexInputInputStream extends InputStream {
+
+ private final IndexInput in;
+
+ private long remaining;
+
+ public IndexInputInputStream(IndexInput in) {
+ this.in = in;
+ remaining = in.length();
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (remaining == 0) {
+ return -1;
+ } else {
+ --remaining;
+ return in.readByte();
+ }
+ }
+
+ @Override
+ public int available() throws IOException {
+ return (int) in.length();
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (remaining == 0) {
+ return -1;
+ }
+ if (remaining < len) {
+ len = (int) remaining;
+ }
+ in.readBytes(b, off, len);
+ remaining -= len;
+ return len;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (remaining == 0) {
+ return -1;
+ }
+ if (remaining < n) {
+ n = remaining;
+ }
+ in.seek(in.getFilePointer() + n);
+ remaining -= n;
+ return n;
+ }
+}
diff --git a/solr/packaging/build.gradle b/solr/packaging/build.gradle
index 27e8e3a..d3aacde 100644
--- a/solr/packaging/build.gradle
+++ b/solr/packaging/build.gradle
@@ -46,6 +46,7 @@
[":solr:contrib:analysis-extras",
":solr:contrib:analytics",
+ ":solr:contrib:blob-directory",
":solr:contrib:extraction",
":solr:contrib:clustering",
":solr:contrib:jaegertracer-configurator",