SOLR-15051: Initial WIP

Co-authored-by: Bruno Roustant <broustant@salesforce.com>
diff --git a/gradle/maven/defaults-maven.gradle b/gradle/maven/defaults-maven.gradle
index 2662a69..f64ddf0 100644
--- a/gradle/maven/defaults-maven.gradle
+++ b/gradle/maven/defaults-maven.gradle
@@ -61,6 +61,7 @@
         ":solr:solrj",
         ":solr:contrib:analysis-extras",
         ":solr:contrib:analytics",
+        ":solr:contrib:blob-directory",
         ":solr:contrib:clustering",
         ":solr:contrib:extraction",
         ":solr:contrib:langid",
diff --git a/settings.gradle b/settings.gradle
index be2c09c..a683ddf 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -60,6 +60,7 @@
 include "solr:server"
 include "solr:contrib:analysis-extras"
 include "solr:contrib:analytics"
+include "solr:contrib:blob-directory"
 include "solr:contrib:clustering"
 include "solr:contrib:extraction"
 include "solr:contrib:langid"
diff --git a/solr/contrib/blob-directory/build.gradle b/solr/contrib/blob-directory/build.gradle
new file mode 100644
index 0000000..5fd53c5
--- /dev/null
+++ b/solr/contrib/blob-directory/build.gradle
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+apply plugin: 'java-library'
+
+description = 'A shared storage approach based on Lucene\'s Directory abstraction'
+
+dependencies {
+  implementation project(':solr:core')
+
+  testImplementation project(':solr:test-framework')
+}
diff --git a/solr/contrib/blob-directory/codeDiagram.png b/solr/contrib/blob-directory/codeDiagram.png
new file mode 100644
index 0000000..7afe3af
--- /dev/null
+++ b/solr/contrib/blob-directory/codeDiagram.png
Binary files differ
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectory.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectory.java
new file mode 100644
index 0000000..a94af92
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectory.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.solr.common.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BlobDirectory extends FilterDirectory {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final IOContext SYNC_IO_CONTEXT = new IOContext();
+
+  private final BlobPusher blobPusher;
+
+  /**
+   * Map of {@link BlobFileSupplier} for each file created by this directory. Keys are file names.
+   * Each {@link BlobFileSupplier} keeps a reference to the {@link IndexOutput} created for the
+   * file, to provide the checksums on {@link #sync(Collection)}. But it is able to free earlier the
+   * reference each time an {@link IndexOutput} is closed, by getting the checksum at that time.
+   */
+  private final Map<String, BlobFileSupplier> blobFileSupplierMap;
+
+  private final Set<String> synchronizedFileNames;
+
+  private final Collection<String> deletedFileNames;
+
+  private volatile boolean isOpen;
+
+  public BlobDirectory(Directory delegate, BlobPusher blobPusher) {
+    super(delegate);
+    this.blobPusher = blobPusher;
+    blobFileSupplierMap = new HashMap<>();
+    synchronizedFileNames = new HashSet<>();
+    deletedFileNames = new ArrayList<>();
+  }
+
+  @Override
+  public void deleteFile(String name) throws IOException {
+    log.debug("deleteFile {}", name);
+    in.deleteFile(name);
+    deletedFileNames.add(name);
+  }
+
+  @Override
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    log.debug("createOutput {}", name);
+    IndexOutput indexOutput = in.createOutput(name, context);
+    BlobFileSupplier blobFileSupplier = new BlobFileSupplier(indexOutput);
+    blobFileSupplierMap.put(name, blobFileSupplier);
+    return new BlobIndexOutput(indexOutput, blobFileSupplier);
+  }
+
+  // createTempOutput(): We don't track tmp files since they are not synced.
+
+  @Override
+  public void sync(Collection<String> names) throws IOException {
+    log.debug("sync {}", names);
+    in.sync(names);
+    synchronizedFileNames.addAll(names);
+  }
+
+  @Override
+  public void rename(String source, String dest) throws IOException {
+    log.debug("rename {} to {}", source, dest);
+    in.rename(source, dest);
+    // Also rename the corresponding BlobFile.
+    BlobFileSupplier blobFileSupplier = blobFileSupplierMap.remove(source);
+    if (blobFileSupplier != null) {
+      blobFileSupplier.rename(source, dest);
+      blobFileSupplierMap.put(dest, blobFileSupplier);
+    }
+    // Also rename the tracked synchronized file.
+    if (synchronizedFileNames.remove(source)) {
+      synchronizedFileNames.add(dest);
+    }
+  }
+
+  @Override
+  public void syncMetaData() throws IOException {
+    log.debug("syncMetaData");
+    in.syncMetaData();
+    syncToBlobStore();
+  }
+
+  private void syncToBlobStore() throws IOException {
+    log.debug("File names to sync {}", synchronizedFileNames);
+
+    Collection<BlobFile> writes = new ArrayList<>(synchronizedFileNames.size());
+    for (String fileName : synchronizedFileNames) {
+      BlobFileSupplier blobFileSupplier = blobFileSupplierMap.get(fileName);
+      if (blobFileSupplier != null) {
+        // Only sync files that were synced since this directory was released. Previous files don't
+        // need to be synced.
+        writes.add(blobFileSupplier.getBlobFile());
+      }
+    }
+
+    log.debug("Sync to BlobStore writes={} deleted={}", writes, deletedFileNames);
+    blobPusher.push(writes, this::openInputStream, deletedFileNames);
+    synchronizedFileNames.clear();
+    deletedFileNames.clear();
+  }
+
+  private InputStream openInputStream(BlobFile blobFile) throws IOException {
+    return new IndexInputInputStream(in.openInput(blobFile.fileName(), SYNC_IO_CONTEXT));
+  }
+
+  public void release() {
+    log.debug("release");
+    blobFileSupplierMap.clear();
+    synchronizedFileNames.clear();
+    deletedFileNames.clear();
+  }
+
+  // obtainLock(): We get the delegate Directory lock.
+
+  @Override
+  public void close() {
+    log.debug("close");
+    isOpen = false;
+    IOUtils.closeQuietly(in);
+    IOUtils.closeQuietly(blobPusher);
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + "(" + in.toString() + ")";
+  }
+
+  @Override
+  protected void ensureOpen() throws AlreadyClosedException {
+    if (!isOpen) {
+      throw new AlreadyClosedException("This Directory is closed");
+    }
+  }
+
+  /**
+   * Delegating {@link IndexOutput} that hooks the {@link #close()} method to compute the checksum.
+   * The goal is to free the reference to the delegate {@link IndexOutput} when it is closed because
+   * we only need it to get the checksum.
+   */
+  private static class BlobIndexOutput extends FilterIndexOutput {
+
+    private final BlobFileSupplier blobFileSupplier;
+
+    BlobIndexOutput(IndexOutput delegate, BlobFileSupplier blobFileSupplier) {
+      super("Blob " + delegate.toString(), delegate.getName(), delegate);
+      this.blobFileSupplier = blobFileSupplier;
+    }
+
+    @Override
+    public void close() throws IOException {
+      blobFileSupplier.getBlobFileFromIndexOutput();
+      super.close();
+    }
+  }
+
+  /**
+   * Supplies the length and checksum of a file created in this directory. Keeps a reference to the
+   * file {@link IndexOutput} to be able to get its final length and checksum. However we try to
+   * free the reference as soon as we can (when the {@link IndexOutput} is closed so we know the
+   * content is final).
+   */
+  private static class BlobFileSupplier {
+
+    IndexOutput indexOutput;
+    String name;
+    BlobFile blobFile;
+
+    BlobFileSupplier(IndexOutput indexOutput) {
+      this.indexOutput = indexOutput;
+      name = indexOutput.getName();
+    }
+
+    void rename(String source, String dest) {
+      assert name.equals(source);
+      name = dest;
+      if (blobFile != null) {
+        blobFile = new BlobFile(name, blobFile.size(), blobFile.checksum());
+      }
+    }
+
+    BlobFile getBlobFile() throws IOException {
+      if (blobFile == null) {
+        getBlobFileFromIndexOutput();
+      }
+      return blobFile;
+    }
+
+    /**
+     * Gets the {@link BlobFile} of the referenced {@link IndexOutput} and then frees the reference.
+     */
+    void getBlobFileFromIndexOutput() throws IOException {
+      blobFile = new BlobFile(name, indexOutput.getFilePointer(), indexOutput.getChecksum());
+      // log.debug("Freeing IndexOutput {}", indexOutput);
+      indexOutput = null; // Free the reference since we have the checksum.
+    }
+  }
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectoryFactory.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectoryFactory.java
new file mode 100644
index 0000000..24b6cac
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectoryFactory.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.LockFactory;
+import org.apache.lucene.store.MMapDirectory;
+import org.apache.lucene.store.NativeFSLockFactory;
+import org.apache.lucene.store.NoLockFactory;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CachingDirectoryFactory;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.DirectoryFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO now: still failing tests (run CoreSynonymLoadTest no SolrCloud) because we have to support
+// removing dir and old
+// indexes in BlobStore. See all "TODO now"
+
+public class BlobDirectoryFactory extends CachingDirectoryFactory {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private DirectoryFactory delegateFactory;
+  private String delegateLockType;
+  private String blobPath;
+
+  // Parameters for MMapDirectory
+  // TODO: Change DirectoryFactory.get() upstream to allow us to provide a Function<Directory,
+  // Directory> to wrap the
+  //  directory when it is created. This would unblock the delegation of DirectoryFactory here. And
+  // we could get rid
+  //  of these params, we could simply delegate to a delegateFactory instead.
+  private boolean unmapHack;
+  private boolean preload;
+  private int maxChunk;
+
+  @Override
+  public void initCoreContainer(CoreContainer cc) {
+    super.initCoreContainer(cc);
+    if (delegateFactory != null) {
+      delegateFactory.initCoreContainer(cc);
+    }
+    //        blobListingManager = BlobListingManager.getInstance(cc, "/blobDirListings");
+  }
+
+  @Override
+  public void init(NamedList args) {
+    super.init(args);
+    SolrParams params = args.toSolrParams();
+
+    String delegateFactoryClass = params.get("delegateFactory");
+    if (delegateFactoryClass == null) {
+      throw new IllegalArgumentException("delegateFactory class is required");
+    }
+    delegateFactory =
+        coreContainer.getResourceLoader().newInstance(delegateFactoryClass, DirectoryFactory.class);
+    delegateFactory.initCoreContainer(coreContainer);
+    delegateFactory.init(args);
+
+    delegateLockType = params.get("delegateLockType");
+    if (delegateLockType == null) {
+      throw new IllegalArgumentException("delegateLockType is required");
+    }
+
+    blobPath = params.get("blobPath");
+    if (blobPath == null) {
+      throw new IllegalArgumentException("blobPath is required");
+    }
+
+    maxChunk = params.getInt("maxChunkSize", MMapDirectory.DEFAULT_MAX_CHUNK_SIZE);
+    if (maxChunk <= 0) {
+      throw new IllegalArgumentException("maxChunk must be greater than 0");
+    }
+    unmapHack = params.getBool("unmap", true);
+    preload = params.getBool("preload", false); // default turn-off
+  }
+
+  @Override
+  public void doneWithDirectory(Directory directory) throws IOException {
+    log.debug("doneWithDirectory {}", directory);
+    ((BlobDirectory) directory).release();
+    // TODO delegateFactory.doneWithDirectory(directory);
+    super.doneWithDirectory(directory);
+  }
+
+  @Override
+  public void close() throws IOException {
+    log.debug("close");
+    // TODO delegateFactory.close();
+    super.close();
+  }
+
+  @Override
+  protected LockFactory createLockFactory(String rawLockType) throws IOException {
+    return rawLockType.equals(DirectoryFactory.LOCK_TYPE_NONE)
+        ? NoLockFactory.INSTANCE
+        : NativeFSLockFactory.INSTANCE;
+    // TODO return rawLockType.equals(DirectoryFactory.LOCK_TYPE_NONE) ? NoLockFactory.INSTANCE :
+    // DELEGATE_LOCK_FACTORY;
+  }
+
+  @Override
+  protected Directory create(String path, LockFactory lockFactory, DirContext dirContext)
+      throws IOException {
+    log.debug("Create Directory {}", path);
+    MMapDirectory mapDirectory = new MMapDirectory(new File(path).toPath(), lockFactory, maxChunk);
+    try {
+      mapDirectory.setUseUnmap(unmapHack);
+    } catch (IllegalArgumentException e) {
+      log.warn("Unmap not supported on this JVM, continuing on without setting unmap", e);
+    }
+    mapDirectory.setPreload(preload);
+    Directory delegateDirectory = mapDirectory;
+    // TODO
+    // String delegateLockType = lockFactory == NoLockFactory.INSTANCE ?
+    // DirectoryFactory.LOCK_TYPE_NONE : this.delegateLockType;
+    // Directory delegateDirectory = delegateFactory.get(path, dirContext, delegateLockType);
+    BlobPusher blobPusher =
+        null; // nocommit new BlobPusher(new BlobStore(blobPath, path));//TODO now: Reuse BlobPusher
+    return new BlobDirectory(delegateDirectory, blobPusher);
+  }
+
+  @Override
+  public boolean exists(String path) throws IOException {
+    boolean exists = super.exists(path);
+    log.debug("exists {} = {}", path, exists);
+    return exists;
+    // TODO return delegateFactory.exists(path);
+  }
+
+  @Override
+  protected void removeDirectory(CacheValue cacheValue) throws IOException {
+    log.debug("removeDirectory {}", cacheValue);
+    File dirFile = new File(cacheValue.path);
+    FileUtils.deleteDirectory(dirFile);
+    // TODO delegateFactory.remove(cacheValue.path);
+    BlobPusher blobPusher =
+        null; // nocommit new BlobPusher(new BlobStore(blobPath, cacheValue.path));//TODO now: Reuse
+    // BlobPusher
+    // TODO now: blobPusher.deleteDirectory();
+  }
+
+  @Override
+  public void move(Directory fromDir, Directory toDir, String fileName, IOContext ioContext)
+      throws IOException {
+    // TODO: override for efficiency?
+    log.debug("move {} {} to {}", fromDir, fileName, toDir);
+    super.move(fromDir, toDir, fileName, ioContext);
+  }
+
+  @Override
+  public void renameWithOverwrite(Directory dir, String fileName, String toName)
+      throws IOException {
+    // TODO: override to perform an atomic rename if possible?
+    log.debug("renameWithOverwrite {} {} to {}", dir, fileName, toName);
+    super.renameWithOverwrite(dir, fileName, toName);
+  }
+
+  @Override
+  public boolean isPersistent() {
+    return true;
+  }
+
+  @Override
+  public boolean isSharedStorage() {
+    return true;
+  }
+
+  @Override
+  public void release(Directory directory) throws IOException {
+    log.debug("release {}", directory);
+    ((BlobDirectory) directory).release();
+    // TODO delegateFactory.release(directory);
+    super.release(directory);
+  }
+
+  @Override
+  public boolean isAbsolute(String path) {
+    boolean isAbsolute = new File(path).isAbsolute();
+    log.debug("isAbsolute {} = {}", path, isAbsolute);
+    return isAbsolute;
+    // TODO return delegateFactory.isAbsolute(path);
+  }
+
+  @Override
+  public boolean searchersReserveCommitPoints() {
+    return false; // TODO: double check
+  }
+
+  @Override
+  public String getDataHome(CoreDescriptor cd) throws IOException {
+    String dataHome = super.getDataHome(cd);
+    log.debug("getDataHome {}", dataHome);
+    return dataHome;
+  }
+
+  @Override
+  public void cleanupOldIndexDirectories(
+      final String dataDirPath, final String currentIndexDirPath, boolean afterCoreReload) {
+    log.debug("cleanupOldIndexDirectories {} {}", dataDirPath, currentIndexDirPath);
+    super.cleanupOldIndexDirectories(dataDirPath, currentIndexDirPath, afterCoreReload);
+    // TODO now: cleanup with BlobPusher
+  }
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobFile.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobFile.java
new file mode 100644
index 0000000..6763483
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobFile.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.util.Objects;
+
+/** A file in Blob, consisting of a name, size, and checksum. */
+public class BlobFile {
+  protected final String fileName;
+  protected final long size;
+  protected final long checksum;
+
+  public BlobFile(String fileName, long size, long checksum) {
+    this.fileName = fileName;
+    this.size = size;
+    this.checksum = checksum;
+  }
+
+  public String fileName() {
+    return fileName;
+  }
+
+  public long size() {
+    return size;
+  }
+
+  public long checksum() {
+    return checksum;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof BlobFile)) return false;
+    BlobFile impl = (BlobFile) o;
+    return size == impl.size && checksum == impl.checksum && fileName.equals(impl.fileName);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(fileName, size, checksum);
+  }
+
+  @Override
+  public String toString() {
+    return fileName + " size=" + size + " chk=" + checksum;
+  }
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobListing.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobListing.java
new file mode 100644
index 0000000..bacf616
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobListing.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** A listing of files with references to files in other listings. Lucene/Solr agnostic. */
+public class BlobListing {
+
+  public static final class LocalEntry {
+    private final BlobFile blobFile;
+    /**
+     * Set of relative paths to other listings that refer to this file. The current listing is "."
+     * and is always first if present.
+     */
+    private final List<String> references;
+
+    public LocalEntry(BlobFile blobFile, List<String> references) {
+      this.blobFile = blobFile;
+      this.references = references;
+    }
+
+    public BlobFile getBlobFile() {
+      return blobFile;
+    }
+
+    public boolean localDeleted() {
+      return references.get(0).equals(".");
+    }
+
+    public List<String> references() {
+      return references;
+    }
+
+    @Override
+    public String toString() {
+      return "LocalEntry{" + "blobFile=" + blobFile + ", references=" + references + '}';
+    }
+
+    public LocalEntry copyWithRef(String path) {
+      final int idx = Collections.binarySearch(references, path);
+      assert idx < 0;
+      final int insertionPoint = -idx - 1;
+      ArrayList<String> newList = new ArrayList<>(references.size() + 1);
+      newList.add(insertionPoint, path);
+      return new LocalEntry(blobFile, newList);
+    }
+  }
+
+  public static final class RefEntry {
+    private final String fileName;
+    private final String sourcePath;
+
+    public RefEntry(String fileName, String sourcePath) {
+      this.fileName = fileName;
+      this.sourcePath = sourcePath;
+    }
+
+    public String fileName() {
+      return fileName;
+    }
+
+    public String sourcePath() {
+      return sourcePath;
+    }
+
+    @Override
+    public String toString() {
+      return "RefEntry{"
+          + "fileName='"
+          + fileName
+          + '\''
+          + ", sourcePath='"
+          + sourcePath
+          + '\''
+          + '}';
+    }
+  }
+
+  public static BlobListing fromJson(byte[] bytes) {
+    throw new UnsupportedOperationException("TODO"); // TODO
+  }
+
+  public byte[] toJson() {
+    throw new UnsupportedOperationException("TODO"); // TODO
+  }
+
+  private final Map<BlobFile, LocalEntry> localFiles;
+  private final Map<String, RefEntry> refFiles;
+
+  public BlobListing(Map<BlobFile, LocalEntry> localFiles, Map<String, RefEntry> refFiles) {
+    this.localFiles = localFiles;
+    this.refFiles = refFiles;
+
+    final Set<String> refFileSet = refFiles.keySet();
+    assert localFiles.keySet().stream().noneMatch(bf -> refFileSet.contains(bf.fileName()));
+    // TODO assert sorted
+  }
+
+  public LocalEntry lookupLocalEntry(BlobFile blobFile) {
+    return localFiles.get(blobFile);
+  }
+
+  public BlobFile lookupLocalBlobFile(String fileName) {
+    for (BlobFile blobFile : localFiles.keySet()) {
+      if (blobFile.fileName().equals(fileName)) {
+        return blobFile;
+      }
+    }
+    return null;
+  }
+
+  public RefEntry lookupRemoteEntry(String fileName) {
+    return refFiles.get(fileName);
+  }
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobPusher.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobPusher.java
new file mode 100644
index 0000000..324ca99
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobPusher.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.lucene.util.IOUtils;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Pushes a set of files to Blob, and works with listings. */
+public class BlobPusher implements Closeable {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  // WORK IN PROGRESS!!
+
+  private final BlobStore blobStore;
+  private final ExecutorService executor = ExecutorUtil.newMDCAwareCachedThreadPool("blobPusher");
+
+  public BlobPusher(BlobStore blobStore) {
+    this.blobStore = blobStore;
+  }
+
+  public void push(
+      Collection<BlobFile> writes,
+      IOUtils.IOFunction<BlobFile, InputStream> inputStreamSupplier,
+      Collection<String> deletes)
+      throws IOException {
+
+    // update "foreign" listings
+    //      TODO David
+
+    // send files to BlobStore and delete our files too
+    log.debug("Pushing {}", writes);
+    executeAll(pushFiles(writes, inputStreamSupplier));
+    log.debug("Deleting {}", deletes);
+    deleteFiles(deletes);
+
+    // update "our" listing
+    //      TODO David
+  }
+
+  private void executeAll(List<Callable<Void>> actions) throws IOException {
+    try {
+      for (Future<Void> future : executor.invokeAll(actions)) {
+        future.get();
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException(e);
+    } catch (ExecutionException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private List<Callable<Void>> pushFiles(
+      Collection<BlobFile> blobFiles,
+      IOUtils.IOFunction<BlobFile, InputStream> inputStreamSupplier) {
+    return blobFiles.stream()
+        .map(
+            (blobFile) ->
+                (Callable<Void>)
+                    () -> {
+                      try (InputStream in = inputStreamSupplier.apply(blobFile)) {
+                        blobStore.create(blobFile.fileName(), in, blobFile.size());
+                      }
+                      return null;
+                    })
+        .collect(Collectors.toList());
+  }
+
+  private void deleteFiles(Collection<String> fileNames) throws IOException {
+    blobStore.delete(fileNames);
+  }
+
+  @Override
+  public void close() {
+    // TODO
+  }
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStore.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStore.java
new file mode 100644
index 0000000..b411d6d
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStore.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.io.Closeable;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.List;
+
+public abstract class BlobStore implements Closeable {
+
+  public abstract void create(String fileName, InputStream inputStream, long contentLength);
+
+  /** Delete blob path */
+  public abstract void delete(Collection<String> fileNames);
+
+  /** Read stream from blob path */
+  public abstract InputStream read(String blobPath);
+
+  /** List blob files */
+  public abstract List<String> list(String blobPath);
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/FilterIndexOutput.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/FilterIndexOutput.java
new file mode 100644
index 0000000..ff9331d
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/FilterIndexOutput.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.IndexOutput;
+
+public class FilterIndexOutput extends IndexOutput {
+
+  protected final IndexOutput delegate;
+
+  public FilterIndexOutput(String resourceDescription, String name, IndexOutput delegate) {
+    super(resourceDescription, name);
+    this.delegate = delegate;
+  }
+
+  @Override
+  public void close() throws IOException {
+    delegate.close();
+  }
+
+  @Override
+  public long getFilePointer() {
+    return delegate.getFilePointer();
+  }
+
+  @Override
+  public long getChecksum() throws IOException {
+    return delegate.getChecksum();
+  }
+
+  @Override
+  public void writeByte(byte b) throws IOException {
+    delegate.writeByte(b);
+  }
+
+  @Override
+  public void writeBytes(byte[] b, int offset, int length) throws IOException {
+    delegate.writeBytes(b, offset, length);
+  }
+
+  @Override
+  public void writeBytes(byte[] b, int length) throws IOException {
+    delegate.writeBytes(b, length);
+  }
+
+  @Override
+  public void writeInt(int i) throws IOException {
+    delegate.writeInt(i);
+  }
+
+  @Override
+  public void writeShort(short i) throws IOException {
+    delegate.writeShort(i);
+  }
+
+  @Override
+  public void writeLong(long i) throws IOException {
+    delegate.writeLong(i);
+  }
+
+  @Override
+  public void writeString(String s) throws IOException {
+    delegate.writeString(s);
+  }
+
+  @Override
+  public void copyBytes(DataInput input, long numBytes) throws IOException {
+    delegate.copyBytes(input, numBytes);
+  }
+
+  @Override
+  public void writeMapOfStrings(Map<String, String> map) throws IOException {
+    delegate.writeMapOfStrings(map);
+  }
+
+  @Override
+  public void writeSetOfStrings(Set<String> set) throws IOException {
+    delegate.writeSetOfStrings(set);
+  }
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/IndexInputInputStream.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/IndexInputInputStream.java
new file mode 100644
index 0000000..4dc1adc
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/IndexInputInputStream.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.lucene.store.IndexInput;
+
+/**
+ * An {@link InputStream} which wraps an {@link IndexInput}. Exact copy of {@code
+ * org.apache.lucene.replicator.IndexInputInputStream}.
+ */
+public final class IndexInputInputStream extends InputStream {
+
+  private final IndexInput in;
+
+  private long remaining;
+
+  public IndexInputInputStream(IndexInput in) {
+    this.in = in;
+    remaining = in.length();
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (remaining == 0) {
+      return -1;
+    } else {
+      --remaining;
+      return in.readByte();
+    }
+  }
+
+  @Override
+  public int available() throws IOException {
+    return (int) in.length();
+  }
+
+  @Override
+  public void close() throws IOException {
+    in.close();
+  }
+
+  @Override
+  public int read(byte[] b) throws IOException {
+    return read(b, 0, b.length);
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    if (remaining == 0) {
+      return -1;
+    }
+    if (remaining < len) {
+      len = (int) remaining;
+    }
+    in.readBytes(b, off, len);
+    remaining -= len;
+    return len;
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+    if (remaining == 0) {
+      return -1;
+    }
+    if (remaining < n) {
+      n = remaining;
+    }
+    in.seek(in.getFilePointer() + n);
+    remaining -= n;
+    return n;
+  }
+}
diff --git a/solr/packaging/build.gradle b/solr/packaging/build.gradle
index 27e8e3a..d3aacde 100644
--- a/solr/packaging/build.gradle
+++ b/solr/packaging/build.gradle
@@ -46,6 +46,7 @@
 
   [":solr:contrib:analysis-extras",
    ":solr:contrib:analytics",
+   ":solr:contrib:blob-directory",
    ":solr:contrib:extraction",
    ":solr:contrib:clustering",
    ":solr:contrib:jaegertracer-configurator",