Add encryption module.

diff --git a/ENCRYPTION.md b/ENCRYPTION.md
new file mode 100644
index 0000000..d9caeb1
--- /dev/null
+++ b/ENCRYPTION.md
@@ -0,0 +1,149 @@
+# Solr Encryption: Getting Started
+
+**A Java-level encryption-at-rest solution for Apache Solr.**
+
+## Overview
+
+This solution provides the encryption of the Lucene index files at the Java level.
+It encrypts all (or some) the files in a given index with a provided encryption key.
+It stores the id of the encryption key in the commit metadata (and obviously the
+key secret is never stored). It is possible to define a different key per Solr Core.
+This module also provides an EncryptionRequestHandler so that a client can trigger
+the (re)encryption of a Solr Core index. The (re)encryption is done concurrently
+while the Solr Core can continue to serve update and query requests.
+
+Comparing with an OS-level encryption:
+
+- OS-level encryption [1][2] is more performant and more adapted to let Lucene
+leverage the OS memory cache. It can manage encryption at block or filesystem
+level in the OS. This makes it possible to encrypt with different keys per-directory,
+making multi-tenant use-cases possible.
+If you can use OS-level encryption, prefer it and skip this Java-level encryption.
+
+- Java-level encryption can be used when the OS-level encryption management is
+not possible (e.g. host machine managed by a cloud provider). It has an impact
+on performance: expect -20% on most queries, -60% on multi-term queries.
+
+[1] https://wiki.archlinux.org/title/Fscrypt
+
+[2] https://www.kernel.org/doc/html/latest/filesystems/fscrypt.html
+
+## Limitations and Work In Progress
+
+- Currently, this encryption module does not encrypt TLogs.
+That means the update requests data that are stored in these logs are cleartext.
+
+- Currently, EncryptionMergePolicy does not fully work, so it is disabled.
+That means a call to /admin/encrypt to re-encrypt a Solr Core index will trigger
+an optimized commit which merges all index segments into one. This works but is
+heavyweight.
+
+## Installing and Configuring the Encryption Plug-In
+
+1. Configure the sharedLib directory in solr.xml (e.g. sharedLIb=lib) and place
+the Encryption plug-in jar file into the specified folder.
+
+**solr.xml**
+
+```xml
+<solr>
+
+    <str name="sharedLib">${solr.sharedLib:}</str>
+
+</solr>
+```
+
+2. Configure the Encryption classes in solrconfig.xml.
+
+**solrconfig.xml**
+
+```xml
+<config>
+
+    <directoryFactory name="DirectoryFactory"
+                      class="org.apache.solr.encryption.EncryptionDirectoryFactory">
+        <str name="keyManagerSupplier">com.yourApp.YourKeyManager$Supplier</str>
+        <str name="encrypterFactory">org.apache.solr.encryption.crypto.CipherAesCtrEncrypter$Factory</str>
+    </directoryFactory>
+
+    <updateHandler class="org.apache.solr.encryption.EncryptionUpdateHandler">
+    </updateHandler>
+
+    <requestHandler name="/admin/encrypt" class="org.apache.solr.encryption.EncryptionRequestHandler"/>
+
+    <mergePolicyFactory class="org.apache.solr.encryption.EncryptionMergePolicyFactory">
+        <str name="wrapped.prefix">delegate</str>
+        <str name="delegate.class">org.apache.solr.index.TieredMergePolicyFactory</str>
+    </mergePolicyFactory>
+
+</config>
+```
+
+`EncryptionDirectoryFactory` is the DirectoryFactory that encrypts/decrypts all (or some) the index files.
+
+`keyManagerSupplier` is a required parameter to specify your implementation of
+`org.apache.solr.encryption.KeyManager.Supplier`. This class is used to get your `KeyManager`.
+
+`encrypterFactory` is an optional parameter to specify the `org.apache.solr.encryption.crypto.AesCtrEncrypterFactory`
+to use. By default `CipherAesCtrEncrypter$Factory` is used. You can change to `LightAesCtrEncrypter$Factory` for a
+more lightweight and efficient implementation (+10% perf), but it calls an internal com.sun.crypto.provider.AESCrypt()
+constructor which logs a JDK warning (Illegal reflective access).
+
+`EncryptionUpdateHandler` replaces the standard `DirectUpdateHandler2` (which it extends) to store persistently the
+encryption key id in the commit metadata. It supports all the configuration parameters of `DirectUpdateHandler2`.
+
+`EncryptionRequestHandler` receives (re)encryption requests. See its dedicated section below for its usage.
+
+`EncryptionMergePolicyFactory` is a wrapper above a delegate MergePolicyFactory (e.g. the standard
+`TieredMergePolicyFactory`) to ensure all index segments are re-written (re-encrypted).
+
+## Calling EncryptionRequestHandler
+
+Once Solr is set up, it is ready to encrypt. To set the encryption key id to use, the Solr client
+calls the `EncryptionRequestHandler` at `/admin/encrypt`.
+
+`EncryptionRequestHandler` handles an encryption request for a specific Solr core.
+
+The caller provides the mandatory `encryptionKeyId` request parameter to define the encryption
+key id to use to encrypt the index files. To decrypt the index to cleartext, the special parameter
+value `no_key_id` must be provided.
+
+The encryption processing is asynchronous. The request returns immediately with two response parameters.
+- `encryptionState` parameter with value either `pending`, `complete`, or `busy`.
+- `status` parameter with values either `success` or `failure`.
+
+The expected usage of this handler is to first send an encryption request with a key id, and to receive
+a response with `status`=`success` and `encryptionState`=`pending`. If the caller needs to know when the
+encryption is complete, it can (optionally) repeatedly send the same encryption request with the same key id,
+until it receives a response with `status`=`success` and `encryptionState`=`complete`.
+
+If the handler returns a response with `encryptionState`=`busy`, it means that another encryption for a
+different key id is ongoing on the same Solr core. It cannot start a new encryption until it finishes.
+
+If the handler returns a response with `status`=`failure`, it means the request did not succeed and should be
+retried by the caller (there should be error logs).
+
+## Encryption Algorithm
+
+This encryption module implements AES-CTR.
+
+AES-CTR compared to AES-XTS:
+Lucene produces read-only files per index segment. Since we have a new random IV per file, we don't repeat
+the same AES encrypted blocks. So we are in a safe write-once case where AES-XTS and AES-CTR have the same
+strength [1][2]. CTR was chosen because it is simpler.
+
+[1] https://crypto.stackexchange.com/questions/64556/aes-xts-vs-aes-ctr-for-write-once-storage
+
+[2] https://crypto.stackexchange.com/questions/14628/why-do-we-use-xts-over-ctr-for-disk-encryption
+
+## Performance Notes
+
+The performance benchmark was run in LUCENE-9379. Here is the summary:
+
+- An OS-level encryption is better and faster.
+- If really it’s not possible, expect an average of -20% perf impact on most queries, -60% on multi-term queries.
+- You can use the `LightAesCtrEncrypter$Factory` to get +10% perf. This is a simple config change. See the
+solrconfig.xml configuration section above.
+- You can make the Lucene Codec store its FST on heap and expect +15% perf, at the price of more Java heap usage.
+This requires a code change. See `org.apache.lucene.util.fst.FSTStore` implementations and usage in
+`org.apache.lucene.codecs.lucene90.blocktree.FieldReader`.
\ No newline at end of file
diff --git a/build.gradle b/build.gradle
index 79f252e..9a38422 100644
--- a/build.gradle
+++ b/build.gradle
@@ -28,4 +28,5 @@
 
 subprojects {
     group "org.apache.solr.crossdc"
+    group "org.apache.solr.encryption"
 }
diff --git a/encryption/build.gradle b/encryption/build.gradle
new file mode 100644
index 0000000..a08adb5
--- /dev/null
+++ b/encryption/build.gradle
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+plugins {
+    id 'java'
+}
+
+description = 'Index Encryption At-Rest package'
+
+repositories {
+    mavenCentral()
+}
+
+configurations {
+    provided
+}
+
+sourceSets {
+    main { compileClasspath += configurations.provided }
+}
+
+dependencies {
+    implementation 'org.apache.solr:solr-core:9.1.1'
+    implementation 'com.google.code.findbugs:jsr305:3.0.2'
+
+    // Remove when removing DirectUpdateHandler2Copy.
+    implementation 'javax.servlet:javax.servlet-api:3.1.0'
+
+    testImplementation group: 'org.apache.lucene', name: 'lucene-test-framework', version: '9.3.0'
+    testImplementation group: 'org.apache.solr', name: 'solr-test-framework', version: '9.1.0'
+}
+
+test {
+    jvmArgs '-Djava.security.egd=file:/dev/./urandom'
+}
diff --git a/encryption/gradle.properties b/encryption/gradle.properties
new file mode 100644
index 0000000..0df7afe
--- /dev/null
+++ b/encryption/gradle.properties
@@ -0,0 +1,2 @@
+group=org.apache.solr
+version=0.1-SNAPSHOT
\ No newline at end of file
diff --git a/encryption/src/main/java/org/apache/solr/encryption/CommitUtil.java b/encryption/src/main/java/org/apache/solr/encryption/CommitUtil.java
new file mode 100644
index 0000000..b483869
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/CommitUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption;
+
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.store.Directory;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.SolrCore;
+
+import java.io.IOException;
+
+/**
+ * Utility method to access to commit data.
+ */
+public class CommitUtil {
+
+  /**
+   * Reads the latest commit of the given {@link SolrCore}.
+   */
+  public static SegmentInfos readLatestCommit(SolrCore core) throws IOException {
+    DirectoryFactory directoryFactory = core.getDirectoryFactory();
+    Directory indexDir = directoryFactory.get(core.getIndexDir(), DirectoryFactory.DirContext.DEFAULT, DirectoryFactory.LOCK_TYPE_NONE);
+    try {
+      return SegmentInfos.readLatestCommit(indexDir);
+    } finally {
+      directoryFactory.release(indexDir);
+    }
+  }
+}
diff --git a/encryption/src/main/java/org/apache/solr/encryption/EncryptionDirectory.java b/encryption/src/main/java/org/apache/solr/encryption/EncryptionDirectory.java
new file mode 100644
index 0000000..84cc9a2
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/EncryptionDirectory.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption;
+
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.SegmentCommitInfo;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.DataOutput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.solr.encryption.crypto.AesCtrEncrypter;
+import org.apache.solr.encryption.crypto.AesCtrEncrypterFactory;
+import org.apache.solr.encryption.crypto.DecryptingIndexInput;
+import org.apache.solr.encryption.crypto.EncryptingIndexOutput;
+
+import javax.annotation.Nullable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.solr.encryption.EncryptionUtil.*;
+
+/**
+ * {@link FilterDirectory} that wraps a delegate {@link Directory} to encrypt/decrypt files on the fly.
+ * <p>
+ * When opening an {@link IndexOutput} for writing:
+ * <br>If {@link KeyManager#isEncryptable(String)} returns true, and if there is an
+ * {@link EncryptionUtil#getActiveKeyRefFromCommit(Map) active encryption key} defined in the latest
+ * commit user data, then the output is wrapped with a {@link EncryptingIndexOutput} to be encrypted
+ * on the fly. In this case an {@link #ENCRYPTION_MAGIC} header is written at the beginning of the output,
+ * followed by the key reference number.
+ * Otherwise, the {@link IndexOutput} created by the delegate is directly provided without encryption.
+ * <p>
+ * When opening an {@link IndexInput} for reading:
+ * <br>If the input header is the {@link #ENCRYPTION_MAGIC}, then the key reference number that follows
+ * is used to {@link EncryptionUtil#getKeyIdFromCommit get} the key id from the latest commit user data.
+ * In this case the input is wrapped with a {@link DecryptingIndexInput} to be decrypted on the fly.
+ * Otherwise, the {@link IndexInput} created by the delegate is directly provided without decryption.
+ *
+ * @see EncryptingIndexOutput
+ * @see DecryptingIndexInput
+ */
+public class EncryptionDirectory extends FilterDirectory {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Constant to identify the start of an encrypted file.
+   * It is different from {@link CodecUtil#CODEC_MAGIC} to detect when a file is encrypted.
+   */
+  public static final int ENCRYPTION_MAGIC = 0x2E5BF271; // 777777777 in decimal
+
+  protected final AesCtrEncrypterFactory encrypterFactory;
+
+  protected final KeyManager keyManager;
+
+  /** Cache of the latest commit user data. */
+  protected volatile CommitUserData commitUserData;
+
+  /** Optimization flag to avoid checking encryption when reading a file if we know the index is cleartext. */
+  protected volatile boolean shouldCheckEncryptionWhenReading;
+
+  /** Optimization flag to only read the commit user data once after a commit. */
+  protected volatile boolean shouldReadCommitUserData;
+
+  /**
+   * Creates an {@link EncryptionDirectory} which wraps a delegate {@link Directory} to encrypt/decrypt
+   * files on the fly.
+   *
+   * @param encrypterFactory creates {@link AesCtrEncrypter}.
+   * @param keyManager      provides key secrets and determines which files are encryptable.
+   */
+  public EncryptionDirectory(Directory delegate, AesCtrEncrypterFactory encrypterFactory, KeyManager keyManager)
+    throws IOException {
+    super(delegate);
+    this.encrypterFactory = encrypterFactory;
+    this.keyManager = keyManager;
+    commitUserData = readLatestCommitUserData();
+
+    // If there is no encryption key id parameter in the latest commit user data, then we know the index
+    // is cleartext, so we can skip fast any encryption check. This flag becomes true indefinitely if we
+    // detect an encryption key when opening a file for writing.
+    shouldCheckEncryptionWhenReading = hasKeyIdInCommit(commitUserData.data);
+  }
+
+  @Override
+  public IndexOutput createOutput(String fileName, IOContext context) throws IOException {
+    return maybeWrapOutput(in.createOutput(fileName, context));
+  }
+
+  @Override
+  public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
+    return maybeWrapOutput(in.createTempOutput(prefix, suffix, context));
+  }
+
+  /**
+   * Maybe wraps the {@link IndexOutput} created by the delegate {@link Directory} with an
+   * {@link EncryptingIndexOutput}.
+   */
+  protected IndexOutput maybeWrapOutput(IndexOutput indexOutput) throws IOException {
+    String fileName = indexOutput.getName();
+    assert !fileName.startsWith(IndexFileNames.SEGMENTS);
+    if (fileName.startsWith(IndexFileNames.PENDING_SEGMENTS)) {
+      // The pending_segments file should not be encrypted. Do not wrap the IndexOutput.
+      // It also means a commit has started, so set the flag to read the commit user data
+      // next time we need it.
+      shouldReadCommitUserData = true;
+      return indexOutput;
+    }
+    if (!keyManager.isEncryptable(fileName)) {
+      // The file should not be encrypted, based on its name. Do not wrap the IndexOutput.
+      return indexOutput;
+    }
+    boolean success = false;
+    try {
+      String keyRef = getKeyRefForWriting(indexOutput);
+      if (keyRef != null) {
+        // The IndexOutput has to be wrapped to be encrypted with the key.
+        indexOutput = new EncryptingIndexOutput(indexOutput, getKeySecret(keyRef), encrypterFactory);
+      }
+      success = true;
+    } finally {
+      if (!success) {
+        // Something went wrong. Close the IndexOutput before the exception continues.
+        IOUtils.closeWhileHandlingException(indexOutput);
+      }
+    }
+    return indexOutput;
+  }
+
+  /**
+   * Gets the active key reference number for writing an index output.
+   * <p>
+   * The active key ref is defined in the user data of the latest commit. If it is present, then this method
+   * writes to the output the {@link #ENCRYPTION_MAGIC} header, followed by the key reference number as a
+   * 4B big-endian int.
+   *
+   * @return the key reference number; or null if none.
+   */
+  protected String getKeyRefForWriting(IndexOutput indexOutput) throws IOException {
+    String keyRef;
+    if ((keyRef = getActiveKeyRefFromCommit(getLatestCommitData().data)) == null) {
+      return null;
+    }
+    shouldCheckEncryptionWhenReading = true;
+    // Write the encryption magic header and the key reference number.
+    writeBEInt(indexOutput, ENCRYPTION_MAGIC);
+    writeBEInt(indexOutput, Integer.parseInt(keyRef));
+    return keyRef;
+  }
+
+  /** Write int value on header / footer with big endian order. See readBEInt. */
+  private static void writeBEInt(DataOutput out, int i) throws IOException {
+    out.writeByte((byte) (i >> 24));
+    out.writeByte((byte) (i >> 16));
+    out.writeByte((byte) (i >> 8));
+    out.writeByte((byte) i);
+  }
+
+  /**
+   * Gets the user data from the latest commit, potentially reading the latest commit if the cache is stale.
+   */
+  protected CommitUserData getLatestCommitData() throws IOException {
+    if (shouldReadCommitUserData) {
+      synchronized (this) {
+        if (shouldReadCommitUserData) {
+          CommitUserData newCommitUserData = readLatestCommitUserData();
+          if (newCommitUserData != commitUserData) {
+            commitUserData = newCommitUserData;
+            shouldReadCommitUserData = false;
+          }
+        }
+      }
+    }
+    return commitUserData;
+  }
+
+  /**
+   * Reads the user data from the latest commit, or keeps the cached value if the segments file name has
+   * not changed.
+   */
+  protected CommitUserData readLatestCommitUserData() throws IOException {
+    try {
+      return new SegmentInfos.FindSegmentsFile<CommitUserData>(this) {
+        protected CommitUserData doBody(String segmentFileName) throws IOException {
+          if (commitUserData != null && commitUserData.segmentFileName.equals(segmentFileName)) {
+            // If the segments file is the same, then keep the same commit user data.
+            return commitUserData;
+          }
+          // New segments file, so we have to read it.
+          SegmentInfos segmentInfos = SegmentInfos.readCommit(EncryptionDirectory.this, segmentFileName);
+          return new CommitUserData(segmentFileName, segmentInfos.getUserData());
+        }
+      }.run();
+    } catch (NoSuchFileException | FileNotFoundException e) {
+      // No commit yet, so no encryption key.
+      return CommitUserData.EMPTY;
+    }
+  }
+
+  /**
+   * Gets the key secret from the provided key reference number.
+   * First, gets the key id corresponding to the key reference based on the mapping defined in the latest
+   * commit user data. Then, calls the {@link KeyManager} to get the corresponding key secret.
+   */
+  protected byte[] getKeySecret(String keyRef) throws IOException {
+    String keyId = getKeyIdFromCommit(keyRef, getLatestCommitData().data);
+    return keyManager.getKeySecret(keyId, keyRef, this::getKeyCookie);
+  }
+
+  /**
+   * Gets the key cookie to provide to the {@link KeyManager} to get the key secret.
+   *
+   * @return the key cookie bytes; or null if none.
+   */
+  @Nullable
+  protected byte[] getKeyCookie(String keyRef) {
+    return getKeyCookieFromCommit(keyRef, commitUserData.data);
+  }
+
+  @Override
+  public IndexInput openInput(String fileName, IOContext context) throws IOException {
+    IndexInput indexInput = in.openInput(fileName, context);
+    if (!shouldCheckEncryptionWhenReading) {
+      // Return the IndexInput directly as we know it is not encrypted.
+      return indexInput;
+    }
+    boolean success = false;
+    try {
+      String keyRef = getKeyRefForReading(indexInput);
+      if (keyRef != null) {
+        // The IndexInput has to be wrapped to be decrypted with the key.
+        indexInput = new DecryptingIndexInput(indexInput, getKeySecret(keyRef), encrypterFactory);
+      }
+      success = true;
+    } finally {
+      if (!success) {
+        // Something went wrong. Close the IndexInput before the exception continues.
+        IOUtils.closeWhileHandlingException(indexInput);
+      }
+    }
+    return indexInput;
+  }
+
+  /**
+   * Gets the key reference number for reading an index input.
+   * <p>
+   * If the file is ciphered, it starts with the {@link #ENCRYPTION_MAGIC} header, followed by the reference
+   * number as a 4B big-endian int.
+   * If the file is cleartext, it starts with the {@link CodecUtil#CODEC_MAGIC} header.
+   *
+   * @return the key reference number; or null if none.
+   */
+  protected String getKeyRefForReading(IndexInput indexInput) throws IOException {
+    long filePointer = indexInput.getFilePointer();
+    int magic = readBEInt(indexInput);
+    if (magic == ENCRYPTION_MAGIC) {
+      // This file is encrypted.
+      // Read the key reference that follows.
+      return Integer.toString(readBEInt(indexInput));
+    } else {
+      // This file is cleartext.
+      // Restore the file pointer.
+      indexInput.seek(filePointer);
+      return null;
+    }
+  }
+
+  /**
+   * Read int value from header / footer with big endian order.
+   * We force big endian order when reading a codec. See CodecUtil.readBEInt in Lucene 9.0 or above.
+   */
+  private static int readBEInt(DataInput in) throws IOException {
+    return ((in.readByte() & 0xFF) << 24)
+      | ((in.readByte() & 0xFF) << 16)
+      | ((in.readByte() & 0xFF) << 8)
+      | (in.readByte() & 0xFF);
+  }
+
+  /**
+   * Returns the segments having an encryption key id different from the active one.
+   *
+   * @param activeKeyId the current active key id, or null if none.
+   * @return the segments with old key ids, or an empty list if none.
+   */
+  public List<SegmentCommitInfo> getSegmentsWithOldKeyId(SegmentInfos segmentInfos, String activeKeyId)
+    throws IOException {
+    List<SegmentCommitInfo> segmentsWithOldKeyId = null;
+    if (log.isDebugEnabled()) {
+      log.debug("reading segments {} for key ids different from {}",
+                segmentInfos.asList().stream().map(i -> i.info.name).collect(Collectors.toList()),
+                activeKeyId);
+    }
+    for (SegmentCommitInfo segmentCommitInfo : segmentInfos) {
+      for (String fileName : segmentCommitInfo.files()) {
+        if (keyManager.isEncryptable(fileName)) {
+          try (IndexInput fileInput = in.openInput(fileName, IOContext.READ)) {
+            String keyRef = getKeyRefForReading(fileInput);
+            String keyId = keyRef == null ? null : getKeyIdFromCommit(keyRef, segmentInfos.getUserData());
+            log.debug("reading file {} of segment {} => keyId={}", fileName, segmentCommitInfo.info.name, keyId);
+            if (!Objects.equals(keyId, activeKeyId)) {
+              if (segmentsWithOldKeyId == null) {
+                segmentsWithOldKeyId = new ArrayList<>();
+              }
+              segmentsWithOldKeyId.add(segmentCommitInfo);
+            }
+          }
+          break;
+        }
+      }
+    }
+    return segmentsWithOldKeyId == null ? Collections.emptyList() : segmentsWithOldKeyId;
+  }
+
+  /**
+   * Keeps the {@link SegmentInfos commit} file name and user data.
+   */
+  protected static class CommitUserData {
+
+    protected static final CommitUserData EMPTY = new CommitUserData("", Collections.emptyMap());
+
+    public final String segmentFileName;
+    public final Map<String, String> data;
+
+    protected CommitUserData(String segmentFileName, Map<String, String> data) {
+      this.segmentFileName = segmentFileName;
+      this.data = data;
+    }
+  }
+}
diff --git a/encryption/src/main/java/org/apache/solr/encryption/EncryptionDirectoryFactory.java b/encryption/src/main/java/org/apache/solr/encryption/EncryptionDirectoryFactory.java
new file mode 100644
index 0000000..010ecc3
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/EncryptionDirectoryFactory.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.LockFactory;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.MMapDirectoryFactory;
+import org.apache.solr.encryption.crypto.AesCtrEncrypterFactory;
+import org.apache.solr.encryption.crypto.CipherAesCtrEncrypter;
+
+import java.io.IOException;
+
+/**
+ * Creates an {@link EncryptionDirectory} delegating to a {@link org.apache.lucene.store.MMapDirectory}.
+ * <p/>
+ * To be configured with two parameters:
+ * <ul>
+ *   <li>{@link #PARAM_KEY_MANAGER_SUPPLIER} defines the {@link KeyManager.Supplier} to use.
+ *   Required.</li>
+ *   <li>{@link #PARAM_ENCRYPTER_FACTORY} defines which {@link AesCtrEncrypterFactory} to use.
+ *   Default is {@link CipherAesCtrEncrypter.Factory}.</li>
+ * </ul>
+ * <pre>
+ *   <directoryFactory name="DirectoryFactory"
+ *       class="${solr.directoryFactory:org.apache.solr.encryption.EncryptionDirectoryFactory}">
+ *     <str name="keyManagerSupplier">${solr.keyManagerSupplier:com.myproject.MyKeyManagerSupplier}</str>
+ *     <str name="encrypterFactory">${solr.encrypterFactory:org.apache.solr.encryption.crypto.LightAesCtrEncrypter$Factory}</str>
+ *   </directoryFactory>
+ * </pre>
+ */
+public class EncryptionDirectoryFactory extends MMapDirectoryFactory {
+
+  // TODO: Ideally EncryptionDirectoryFactory would extend a DelegatingDirectoryFactory to delegate
+  //  to any other DirectoryFactory. There is a waiting Jira issue SOLR-15060 for that because
+  //  a DelegatingDirectoryFactory is not straightforward. There is the tricky case of the
+  //  CachingDirectoryFactory (extended by most DirectoryFactory implementations) that currently
+  //  creates the Directory itself, so it would not be our delegating Directory.
+  //  Right now, EncryptionDirectoryFactory extends MMapDirectoryFactory. And we hope we will
+  //  refactor later.
+
+  public static final String PARAM_KEY_MANAGER_SUPPLIER = "keyManagerSupplier";
+  public static final String PARAM_ENCRYPTER_FACTORY = "encrypterFactory";
+  /**
+   * Visible for tests only - Property defining the class name of the inner encryption directory factory.
+   */
+  static final String PROPERTY_INNER_ENCRYPTION_DIRECTORY_FACTORY = "innerEncryptionDirectoryFactory";
+
+  private KeyManager keyManager;
+  private AesCtrEncrypterFactory encrypterFactory;
+  private InnerFactory innerFactory;
+
+  @Override
+  public void init(NamedList<?> args) {
+    super.init(args);
+    SolrParams params = args.toSolrParams();
+
+    String keyManagerSupplierClass = params.get(PARAM_KEY_MANAGER_SUPPLIER);
+    if (keyManagerSupplierClass == null) {
+      throw new IllegalArgumentException("Missing " + PARAM_KEY_MANAGER_SUPPLIER + " argument for " + getClass().getName());
+    }
+    KeyManager.Supplier keyManagerSupplier = coreContainer.getResourceLoader().newInstance(keyManagerSupplierClass,
+                                                                                           KeyManager.Supplier.class);
+    keyManagerSupplier.init(args);
+    keyManager = keyManagerSupplier.getKeyManager();
+
+    String encrypterFactoryClass = params.get(PARAM_ENCRYPTER_FACTORY, CipherAesCtrEncrypter.Factory.class.getName());
+    encrypterFactory = coreContainer.getResourceLoader().newInstance(encrypterFactoryClass,
+                                                                     AesCtrEncrypterFactory.class);
+    if (!encrypterFactory.isSupported()) {
+      throw new UnsupportedOperationException(getClass().getName() + " cannot create an encrypterFactory of type "
+                                                + encrypterFactory.getClass().getName(),
+                                              encrypterFactory.getUnsupportedCause());
+    }
+
+    innerFactory = createInnerFactory();
+  }
+
+  private InnerFactory createInnerFactory() {
+    String factoryClassName = System.getProperty(PROPERTY_INNER_ENCRYPTION_DIRECTORY_FACTORY);
+    if (factoryClassName == null) {
+      return EncryptionDirectory::new;
+    }
+    try {
+      return (InnerFactory) EncryptionDirectoryFactory.class.getClassLoader()
+        .loadClass(factoryClassName).getDeclaredConstructor().newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException("Cannot load custom inner directory factory " + factoryClassName, e);
+    }
+  }
+
+  /** Gets the {@link KeyManager} used by this factory and all the encryption directories it creates. */
+  public KeyManager getKeyManager() {
+    return keyManager;
+  }
+
+  @Override
+  protected Directory create(String path, LockFactory lockFactory, DirContext dirContext) throws IOException {
+    return innerFactory.create(super.create(path, lockFactory, dirContext), encrypterFactory, getKeyManager());
+  }
+
+  /**
+   * Visible for tests only - Inner factory that creates {@link EncryptionDirectory} instances.
+   */
+  interface InnerFactory {
+    EncryptionDirectory create(Directory delegate, AesCtrEncrypterFactory encrypterFactory, KeyManager keyManager)
+      throws IOException;
+  }
+}
diff --git a/encryption/src/main/java/org/apache/solr/encryption/EncryptionMergePolicy.java b/encryption/src/main/java/org/apache/solr/encryption/EncryptionMergePolicy.java
new file mode 100644
index 0000000..7f5a581
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/EncryptionMergePolicy.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption;
+
+import org.apache.lucene.index.FilterMergePolicy;
+import org.apache.lucene.index.MergePolicy;
+import org.apache.lucene.index.SegmentCommitInfo;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.store.Directory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.solr.encryption.EncryptionUtil.*;
+
+/**
+ * Encrypts each segment individually with a forced merge.
+ * <p>
+ * Delegates all methods, but intercepts
+ * {@link #findForcedMerges(SegmentInfos, int, Map, MergeContext)}
+ * and if the requested max segment count is {@link Integer#MAX_VALUE} (a trigger
+ * which means "keep all segments"), then force-merges individually each segment
+ * which is not encrypted with the latest active key id.
+ */
+public class EncryptionMergePolicy extends FilterMergePolicy {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public EncryptionMergePolicy(MergePolicy in) {
+    super(in);
+  }
+
+  @Override
+  public MergeSpecification findForcedMerges(SegmentInfos segmentInfos,
+                                             int maxSegmentCount,
+                                             Map<SegmentCommitInfo,Boolean> segmentsToMerge,
+                                             MergeContext mergeContext) throws IOException {
+    if (maxSegmentCount != Integer.MAX_VALUE) {
+      return super.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, mergeContext);
+    }
+    if (segmentInfos.size() == 0) {
+      return null;
+    }
+
+    //TODO: this does not seem to work correctly under heavy concurrent load.
+
+    Directory dir = segmentInfos.info(0).info.dir;
+    if (!(dir instanceof EncryptionDirectory)) {
+      // This may happen if the DirectoryFactory configured is not the EncryptionDirectoryFactory,
+      // but this is a misconfiguration. Let's log a warning.
+      log.warn("{} is configured whereas {} is not set; check the DirectoryFactory configuration",
+               getClass().getName(), EncryptionDirectoryFactory.class.getName());
+      return super.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, mergeContext);
+    }
+    String keyRef = getActiveKeyRefFromCommit(segmentInfos.getUserData());
+    String activeKeyId = keyRef == null ? null : getKeyIdFromCommit(keyRef, segmentInfos.getUserData());
+    List<SegmentCommitInfo> segmentsWithOldKeyId = ((EncryptionDirectory) dir).getSegmentsWithOldKeyId(segmentInfos, activeKeyId);
+    if (segmentsWithOldKeyId.isEmpty()) {
+      return null;
+    }
+    // The goal is to rewrite each segment encrypted with an old key, so that it is re-encrypted
+    // with the latest active encryption key.
+    // Create a MergeSpecification containing multiple OneMerge, a OneMerge for each segment.
+    MergeSpecification spec = new MergeSpecification();
+    for (SegmentCommitInfo segmentInfo : segmentsWithOldKeyId) {
+      spec.add(new OneMerge(Collections.singletonList(segmentInfo)));
+    }
+    return spec;
+  }
+
+  MergePolicy getDelegate() {
+    return in;
+  }
+}
diff --git a/encryption/src/main/java/org/apache/solr/encryption/EncryptionMergePolicyFactory.java b/encryption/src/main/java/org/apache/solr/encryption/EncryptionMergePolicyFactory.java
new file mode 100644
index 0000000..274f8aa
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/EncryptionMergePolicyFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption;
+
+import org.apache.lucene.index.MergePolicy;
+import org.apache.solr.core.SolrResourceLoader;
+import org.apache.solr.index.MergePolicyFactoryArgs;
+import org.apache.solr.index.WrapperMergePolicyFactory;
+import org.apache.solr.schema.IndexSchema;
+
+/**
+ * Factory for {@link EncryptionMergePolicy} which delegates to another
+ * wrapped {@link MergePolicy}.
+ */
+public class EncryptionMergePolicyFactory extends WrapperMergePolicyFactory {
+
+  public EncryptionMergePolicyFactory(SolrResourceLoader resourceLoader,
+                                      MergePolicyFactoryArgs args,
+                                      IndexSchema schema) {
+    super(resourceLoader, args, schema);
+  }
+
+  @Override
+  protected MergePolicy getMergePolicyInstance(MergePolicy wrappedMP) {
+    return new EncryptionMergePolicy(wrappedMP);
+  }
+}
diff --git a/encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java b/encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java
new file mode 100644
index 0000000..a4427c4
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/EncryptionRequestHandler.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption;
+
+import org.apache.lucene.index.SegmentCommitInfo;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.store.Directory;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.RequestHandlerBase;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.security.AuthorizationContext;
+import org.apache.solr.security.PermissionNameProvider;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+
+import static org.apache.solr.encryption.CommitUtil.readLatestCommit;
+import static org.apache.solr.encryption.EncryptionUtil.*;
+
+/**
+ * Handles an encryption request for a specific {@link SolrCore}.
+ * <p>
+ * The caller provides the mandatory {@link #PARAM_KEY_ID} request parameter to define the encryption
+ * key id to use to encrypt the index files. To decrypt the index to cleartext, the special parameter
+ * value {@link #NO_KEY_ID} must be provided.
+ * <p>
+ * The encryption processing is asynchronous. The request returns immediately with two response
+ * parameters. {@link #ENCRYPTION_STATE} parameter with values {@link #STATE_PENDING},
+ * {@link #STATE_COMPLETE}, or {@link #STATE_BUSY}. And {@link #STATUS} parameter with values
+ * {@link #STATUS_SUCCESS} or {@link #STATUS_FAILURE}.
+ * <p>
+ * The expected usage of this handler is to first send an encryption request with a key id, and
+ * receive a response with {@link #STATUS_SUCCESS} and a {@link #STATE_PENDING}. If the caller needs
+ * to know when the encryption is complete, it can (optionally) repeatedly send the same encryption
+ * request with the same key id, until it receives a response with {@link #STATUS_SUCCESS} and a
+ * {@link #STATE_COMPLETE}.
+ * <p>
+ * If the handler returns a response with {@link #STATE_BUSY}, it means that another encryption for a
+ * different key id is ongoing on the same Solr core. It cannot start a new encryption until it finishes.
+ * <p>
+ * If the handler returns a response with {@link #STATUS_FAILURE}, it means the request did not succeed
+ * and should be retried by the caller (there should be error logs).
+ */
+public class EncryptionRequestHandler extends RequestHandlerBase {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Key id request parameter.
+   * Its value should be {@link #NO_KEY_ID} for no key.
+   */
+  public static final String PARAM_KEY_ID = "encryptionKeyId";
+  /**
+   * Value of the key id parameter meaning there is no key and no encryption.
+   */
+  public static final String NO_KEY_ID = "no_key_id";
+
+  /**
+   * Encryption pending boolean parameter, in the commit user data.
+   * When present and true, this means the encryption process is still pending.
+   * It includes the {@link EncryptionUpdateHandler#TRANSFERABLE_COMMIT_DATA} prefix to be transferred from a
+   * commit to the next one automatically.
+   */
+  private static final String COMMIT_ENCRYPTION_PENDING = COMMIT_CRYPTO + "encryptionPending";
+
+  /**
+   * Status of the request.
+   */
+  public static final String STATUS = "status";
+  /**
+   * One of {@link #STATUS} values: the request was processed successfully. Get additional information
+   * with the {@link #ENCRYPTION_STATE} response parameter.
+   */
+  public static final String STATUS_SUCCESS = "success";
+  /**
+   * One of {@link #STATUS} values: the request was not processed correctly, an error occurred.
+   */
+  public static final String STATUS_FAILURE = "failure";
+
+  /**
+   * Response parameter name to provide the status of the encryption.
+   */
+  public static final String ENCRYPTION_STATE = "encryptionState";
+  /**
+   * One of {@link #ENCRYPTION_STATE} values: the encryption with the provided key id is ongoing and pending.
+   */
+  public static final String STATE_PENDING = "pending";
+  /**
+   * One of {@link #ENCRYPTION_STATE} values: the encryption with the provided key id is complete.
+   */
+  public static final String STATE_COMPLETE = "complete";
+  /**
+   * One of {@link #ENCRYPTION_STATE} values: another encryption for a different key id is ongoing
+   * on the same Solr core; cannot start a new encryption until it finishes.
+   */
+  public static final String STATE_BUSY = "busy";
+
+  private static final Object pendingEncryptionLock = new Object();
+  private static final Map<String, PendingKeyId> pendingEncryptions = new HashMap<>();
+
+  private final ExecutorService executor = ExecutorUtil.newMDCAwareFixedThreadPool(4, new ThreadFactory() {
+    private int threadNum;
+
+    @Override
+    public synchronized Thread newThread(Runnable r) {
+      Thread t = new Thread(r, "Encryption-" + threadNum++);
+      t.setDaemon(true);
+      return t;
+    }
+  });
+
+  @Override
+  public void close() throws IOException {
+    try {
+      ExecutorUtil.shutdownAndAwaitTermination(executor);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      super.close();
+    }
+  }
+
+  @Override
+  public String getDescription() {
+    return "Handles encryption requests";
+  }
+
+  @Override
+  public Name getPermissionName(AuthorizationContext request) {
+    return PermissionNameProvider.Name.UPDATE_PERM;
+  }
+
+  @Override
+  public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
+    long startTimeMs = System.currentTimeMillis();
+    String keyId = req.getParams().get(PARAM_KEY_ID);
+    if (keyId == null || keyId.isEmpty()) {
+      rsp.add(STATUS, STATUS_FAILURE);
+      throw new IOException("Parameter " + PARAM_KEY_ID + " must be present and not empty."
+                              + " Use [" + PARAM_KEY_ID + "=\"" + NO_KEY_ID + "\"] for explicit decryption.");
+    } else if (keyId.equals(NO_KEY_ID)) {
+      keyId = null;
+    }
+    boolean success = false;
+    String encryptionState = STATE_PENDING;
+    try {
+      SegmentInfos segmentInfos = readLatestCommit(req.getCore());
+      if (segmentInfos.size() == 0) {
+        commitEmptyIndexForEncryption(keyId, segmentInfos, req);
+        encryptionState = STATE_COMPLETE;
+        success = true;
+        return;
+      }
+
+      boolean encryptionComplete = false;
+      if (isCommitActiveKeyId(keyId, segmentInfos)) {
+        log.debug("provided keyId={} is the current active key id", keyId);
+        if (Boolean.parseBoolean(segmentInfos.getUserData().get(COMMIT_ENCRYPTION_PENDING))) {
+          encryptionComplete = areAllSegmentsEncryptedWithKeyId(keyId, req.getCore(), segmentInfos);
+          if (encryptionComplete) {
+            commitEncryptionComplete(keyId, segmentInfos, req);
+          }
+        } else {
+          encryptionComplete = true;
+        }
+      }
+      if (encryptionComplete) {
+        encryptionState = STATE_COMPLETE;
+        success = true;
+        return;
+      }
+
+      synchronized (pendingEncryptionLock) {
+        PendingKeyId pendingKeyId = pendingEncryptions.get(req.getCore().getName());
+        if (pendingKeyId != null) {
+          if (Objects.equals(pendingKeyId.keyId, keyId)) {
+            log.debug("ongoing encryption for keyId={}", keyId);
+            encryptionState = STATE_PENDING;
+            success = true;
+          } else {
+            log.debug("core busy encrypting for keyId={} different than requested keyId={}", pendingKeyId.keyId, keyId);
+            encryptionState = STATE_BUSY;
+          }
+          return;
+        }
+        pendingEncryptions.put(req.getCore().getName(), new PendingKeyId(keyId));
+      }
+      try {
+        commitEncryptionStart(keyId, segmentInfos, req);
+        encryptAsync(req, startTimeMs);
+        success = true;
+      } finally {
+        if (!success) {
+          synchronized (pendingEncryptionLock) {
+            pendingEncryptions.remove(req.getCore().getName());
+          }
+        }
+      }
+
+    } finally {
+      if (success) {
+        rsp.add(STATUS, STATUS_SUCCESS);
+      } else {
+        rsp.add(STATUS, STATUS_FAILURE);
+      }
+      log.debug("responding encryption state={} success={} for keyId={}", encryptionState, success, keyId);
+      rsp.add(ENCRYPTION_STATE, encryptionState);
+    }
+  }
+
+  private void commitEmptyIndexForEncryption(@Nullable String keyId,
+                                             SegmentInfos segmentInfos,
+                                             SolrQueryRequest req) throws IOException {
+    // Commit no change, with the new active key id in the commit user data.
+    log.debug("commit on empty index for keyId={}", keyId);
+    CommitUpdateCommand commitCmd = new CommitUpdateCommand(req, false);
+    commitCmd.commitData = new HashMap<>(segmentInfos.getUserData());
+    commitCmd.commitData.remove(COMMIT_ENCRYPTION_PENDING);
+    setNewActiveKeyIdInCommit(keyId, commitCmd, req);
+    assert !commitCmd.commitData.isEmpty();
+    req.getCore().getUpdateHandler().commit(commitCmd);
+  }
+
+  private void setNewActiveKeyIdInCommit(String keyId, CommitUpdateCommand commitCmd, SolrQueryRequest req)
+    throws IOException {
+    if (keyId == null) {
+      removeActiveKeyRefFromCommit(commitCmd.commitData);
+      ensureNonEmptyCommitDataForEmptyCommit(commitCmd.commitData);
+    } else {
+      byte[] keyCookie = getKeyManager(req).getKeyCookie(keyId);
+      EncryptionUtil.setNewActiveKeyIdInCommit(keyId, keyCookie, commitCmd.commitData);
+    }
+  }
+
+  private KeyManager getKeyManager(SolrQueryRequest req) {
+    try {
+      return ((EncryptionDirectoryFactory) req.getCore().getDirectoryFactory()).getKeyManager();
+    } catch (ClassCastException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+                              "DirectoryFactory class must be set to " + EncryptionDirectoryFactory.class.getName() + " to use " + getClass().getSimpleName(),
+                              e);
+    }
+  }
+
+  private void commitEncryptionComplete(String keyId,
+                                        SegmentInfos segmentInfos,
+                                        SolrQueryRequest req) throws IOException {
+    assert isCommitActiveKeyId(keyId, segmentInfos);
+    log.debug("commit encryption complete for keyId={}", keyId);
+    CommitUpdateCommand commitCmd = new CommitUpdateCommand(req, false);
+    commitCmd.commitData = new HashMap<>(segmentInfos.getUserData());
+    commitCmd.commitData.remove(COMMIT_ENCRYPTION_PENDING);
+    // All segments are encrypted with the key id,
+    // clear the oldest inactive key ids from the commit user data.
+    clearOldInactiveKeyIdsFromCommit(commitCmd.commitData);
+    ensureNonEmptyCommitDataForEmptyCommit(commitCmd.commitData);
+    req.getCore().getUpdateHandler().commit(commitCmd);
+  }
+
+  private void ensureNonEmptyCommitDataForEmptyCommit(Map<String, String> commitData) {
+    if (commitData.isEmpty()) {
+      // Ensure that there is some data in the commit user data so that an empty commit
+      // (with no change) is allowed.
+      commitData.put("crypto.cleartext", "true");
+    }
+  }
+
+  private void commitEncryptionStart(String keyId,
+                                     SegmentInfos segmentInfos,
+                                     SolrQueryRequest req) throws IOException {
+    log.debug("commit encryption starting for keyId={}", keyId);
+    CommitUpdateCommand commitCmd = new CommitUpdateCommand(req, false);
+    commitCmd.commitData = new HashMap<>(segmentInfos.getUserData());
+    commitCmd.commitData.put(COMMIT_ENCRYPTION_PENDING, "true");
+    setNewActiveKeyIdInCommit(keyId, commitCmd, req);
+    req.getCore().getUpdateHandler().commit(commitCmd);
+  }
+
+  private void encryptAsync(SolrQueryRequest req, long startTimeMs) {
+    log.debug("submitting async encryption");
+    executor.submit(() -> {
+      try {
+        log.debug("running async encryption");
+        CommitUpdateCommand commitCmd = new CommitUpdateCommand(req, true);
+        // Trigger EncryptionMergePolicy.findForcedMerges() to re-encrypt
+        // each segment which is not encrypted with the latest active key id.
+        // TODO: Set maxOptimizeSegments to Integer.MAX_VALUE to trigger EncryptionMergePolicy
+        //  when EncryptionHeavyLoadTest passes with it.
+        commitCmd.maxOptimizeSegments = 1;
+        req.getCore().getUpdateHandler().commit(commitCmd);
+        log.info("Successfully encrypted the index in " + elapsedTime(startTimeMs));
+      } catch (IOException e) {
+        log.error("Exception while encrypting the index after " + elapsedTime(startTimeMs), e);
+      } finally {
+        synchronized (pendingEncryptionLock) {
+          pendingEncryptions.remove(req.getCore().getName());
+        }
+      }
+      return null;
+    });
+  }
+
+  public static boolean areAllSegmentsEncryptedWithKeyId(@Nullable String keyId,
+                                                         SolrCore core,
+                                                         SegmentInfos segmentInfos) throws IOException {
+    DirectoryFactory directoryFactory = core.getDirectoryFactory();
+    Directory indexDir = directoryFactory.get(core.getIndexDir(),
+                                              DirectoryFactory.DirContext.DEFAULT,
+                                              DirectoryFactory.LOCK_TYPE_NONE);
+    try {
+      EncryptionDirectory dir = (EncryptionDirectory) indexDir;
+      List<SegmentCommitInfo> segmentsWithOldKeyId = dir.getSegmentsWithOldKeyId(segmentInfos, keyId);
+      log.debug("encryption is pending; {} segments do not have keyId={}", segmentsWithOldKeyId.size(), keyId);
+      return segmentsWithOldKeyId.isEmpty();
+    } finally {
+      directoryFactory.release(indexDir);
+    }
+  }
+
+  private boolean isCommitActiveKeyId(String keyId, SegmentInfos segmentInfos) {
+    String keyRef = getActiveKeyRefFromCommit(segmentInfos.getUserData());
+    String activeKeyId = keyRef == null ? null : getKeyIdFromCommit(keyRef, segmentInfos.getUserData());
+    return Objects.equals(keyId, activeKeyId);
+  }
+
+  private static String elapsedTime(long startTimeMs) {
+    return (System.currentTimeMillis() - startTimeMs) + " ms";
+  }
+
+  /**
+   * Wraps a nullable key id (null key id means cleartext).
+   */
+  private static class PendingKeyId {
+    @Nullable
+    final String keyId;
+
+    PendingKeyId(@Nullable String keyId) {
+      this.keyId = keyId;
+    }
+  }
+}
diff --git a/encryption/src/main/java/org/apache/solr/encryption/EncryptionUpdateHandler.java b/encryption/src/main/java/org/apache/solr/encryption/EncryptionUpdateHandler.java
new file mode 100644
index 0000000..95d1c99
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/EncryptionUpdateHandler.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption;
+
+import org.apache.lucene.index.IndexWriter;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.DirectUpdateHandler2Copy;
+import org.apache.solr.update.UpdateHandler;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.solr.encryption.CommitUtil.readLatestCommit;
+
+/**
+ * Extends {@link org.apache.solr.update.DirectUpdateHandler2} and adds the capability
+ * to transfer some transferable user data from the previous commit to the next one.
+ */
+//TODO: extend DirectUpdateHandler2 once Solr 9.2 is available
+public class EncryptionUpdateHandler extends DirectUpdateHandler2Copy {
+
+  /**
+   * Parameter prefix to state that this parameter should be transferred from a commit
+   * user data to the next commit user data automatically.
+   */
+  public static final String TRANSFERABLE_COMMIT_DATA = "#transfer.";
+
+  public EncryptionUpdateHandler(SolrCore core) {
+    super(core);
+  }
+
+  public EncryptionUpdateHandler(SolrCore core, UpdateHandler updateHandler) {
+    super(core, updateHandler);
+  }
+
+  /**
+   * Transfers commit-transferable data just before the effective call to {@link IndexWriter#commit()}.
+   * This method is atomically protected with the commit lock.
+   */
+  @Override
+  protected boolean shouldCommit(CommitUpdateCommand cmd, IndexWriter writer) throws IOException {
+    if (!super.shouldCommit(cmd, writer)) {
+      return false;
+    }
+    // Two cases:
+    // - If cmd.commitData is null, then transfer all the latest commit transferable
+    //   data to the current commit.
+    // - If cmd.commitData is not null, nothing is transferred. It is the caller
+    //   responsibility to include all the required user data from the latest commit.
+    //   That way, the caller can remove some entries.
+    if (cmd.commitData == null) {
+      Map<String, String> latestCommitData = readLatestCommit(core).getUserData();
+      Map<String, String> commitData = null;
+      for (Map.Entry<String, String> latestCommitEntry : latestCommitData.entrySet()) {
+        if (latestCommitEntry.getKey().startsWith(TRANSFERABLE_COMMIT_DATA)) {
+          if (commitData == null) {
+            commitData = new HashMap<>();
+          }
+          commitData.put(latestCommitEntry.getKey(), latestCommitEntry.getValue());
+        }
+      }
+      cmd.commitData = commitData;
+    }
+    return true;
+  }
+}
diff --git a/encryption/src/main/java/org/apache/solr/encryption/EncryptionUtil.java b/encryption/src/main/java/org/apache/solr/encryption/EncryptionUtil.java
new file mode 100644
index 0000000..244ce2d
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/EncryptionUtil.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Constants and methods for encryption.
+ */
+public class EncryptionUtil {
+
+  /**
+   * Crypto parameter prefix, in the commit user data.
+   * It includes the {@link EncryptionUpdateHandler#TRANSFERABLE_COMMIT_DATA} prefix to be transferred from a
+   * commit to the next one automatically.
+   */
+  public static final String COMMIT_CRYPTO = EncryptionUpdateHandler.TRANSFERABLE_COMMIT_DATA + "crypto.";
+
+  /**
+   * Active encryption key ref parameter, in the commit user data.
+   * If this parameter is not present, it means the index is in cleartext and has never been encrypted.
+   * This parameter value is the reference number of the key in the {@link #COMMIT_KEY_ID} and
+   * {@link #COMMIT_KEY_COOKIE} mappings.
+   */
+  public static final String COMMIT_ACTIVE_KEY = COMMIT_CRYPTO + "active";
+
+  /**
+   * Commit user data parameter that maps a key reference number to its corresponding key id.
+   * The complete parameter name is the concatenation of this parameter prefix and the key reference number.
+   */
+  public static final String COMMIT_KEY_ID = COMMIT_CRYPTO + "id.";
+
+  /**
+   * Commit user data parameter that maps a key reference number to its corresponding crypto cookie.
+   * The complete parameter name is the concatenation of this parameter prefix and the key reference number.
+   */
+  public static final String COMMIT_KEY_COOKIE = COMMIT_CRYPTO + "cookie.";
+
+  /**
+   * Number of inactive key ids to keep when clearing the old inactive key ids.
+   * @see #clearOldInactiveKeyIdsFromCommit
+   */
+  private static final int INACTIVE_KEY_IDS_TO_KEEP = 5;
+
+  /**
+   * Sets the new active encryption key id, and its optional cookie in the provided commit user data.
+   * New index files will be encrypted using this new key.
+   *
+   * @param keyId          the new active encryption key id; must not be null.
+   * @param keyCookie      may be null if none.
+   * @param commitUserData read to retrieve the current active key ref, and then updated with the new
+   *                       active key ref.
+   */
+  public static void setNewActiveKeyIdInCommit(String keyId,
+                                               @Nullable byte[] keyCookie,
+                                               Map<String, String> commitUserData) {
+    // Key references are integers stored as strings. They are ordered by the natural ordering of
+    // integers. This method is the only location where key references are created. Outside, key
+    // references are simply considered as strings, except clearOldInactiveKeyIdsFromCommit() which
+    // sorts key references by the integer ordering.
+    assert keyId != null;
+    String oldKeyRef = getActiveKeyRefFromCommit(commitUserData);
+    String newKeyRef = oldKeyRef == null ? "0" : Integer.toString(Integer.parseInt(oldKeyRef) + 1);
+    commitUserData.put(COMMIT_ACTIVE_KEY, newKeyRef);
+    commitUserData.put(COMMIT_KEY_ID + newKeyRef, keyId);
+    if (keyCookie != null) {
+      commitUserData.put(COMMIT_KEY_COOKIE + newKeyRef, Base64.getEncoder().encodeToString(keyCookie));
+    }
+  }
+
+  /**
+   * Removes the active encryption key id.
+   * New index files will be cleartext.
+   *
+   * @param commitUserData updated to remove the active key ref.
+   */
+  public static void removeActiveKeyRefFromCommit(Map<String, String> commitUserData) {
+    commitUserData.remove(COMMIT_ACTIVE_KEY);
+  }
+
+  /**
+   * Gets the reference number of the currently active encryption key, from the provided commit user data.
+   *
+   * @return the reference number of the active encryption key; or null if none, which means cleartext.
+   */
+  @Nullable
+  public static String getActiveKeyRefFromCommit(Map<String, String> commitUserData) {
+    return commitUserData.get(COMMIT_ACTIVE_KEY);
+  }
+
+  /**
+   * Gets the key id from the provided commit user data, for the given key reference number.
+   */
+  public static String getKeyIdFromCommit(String keyRef, Map<String, String> commitUserData) {
+    String keyId = commitUserData.get(COMMIT_KEY_ID + keyRef);
+    if (keyId == null) {
+      throw new NoSuchElementException("No key id for key ref=" + keyRef);
+    }
+    return keyId;
+  }
+
+  /**
+   * Gets the key cookie from the provided commit user data, for the given key reference number.
+   *
+   * @return the key cookie bytes; or null if none.
+   */
+  @Nullable
+  public static byte[] getKeyCookieFromCommit(String keyRef, Map<String, String> commitUserData) {
+    String cookieString = commitUserData.get(COMMIT_KEY_COOKIE + keyRef);
+    return cookieString == null ? null : Base64.getDecoder().decode(cookieString);
+  }
+
+  /**
+   * @return Whether the provided commit user data contain some encryption key ids (active or not).
+   */
+  public static boolean hasKeyIdInCommit(Map<String, String> commitUserData) {
+    for (String entryKey : commitUserData.keySet()) {
+      if (entryKey.startsWith(COMMIT_KEY_ID)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Clear the oldest inactive key ids to keep only the most recent ones.
+   * We don't clear all the inactive key ids just in the improbable case there would be pending
+   * segment creations using previous key id(s) still in flight. This is really to be safe.
+   */
+  public static void clearOldInactiveKeyIdsFromCommit(Map<String, String> commitUserData) {
+    // List the inactive key references.
+    String activeKeyRef = getActiveKeyRefFromCommit(commitUserData);
+    List<Integer> inactiveKeyRefs = new ArrayList<>();
+    for (String dataKey : commitUserData.keySet()) {
+      if (dataKey.startsWith(COMMIT_KEY_ID)) {
+        String keyRef = dataKey.substring(COMMIT_KEY_ID.length());
+        if (!keyRef.equals(activeKeyRef)) {
+          inactiveKeyRefs.add(Integer.parseInt(keyRef));
+        }
+      }
+    }
+    // Clear them except the most recent ones.
+    if (inactiveKeyRefs.size() > INACTIVE_KEY_IDS_TO_KEEP) {
+      inactiveKeyRefs.sort(Comparator.naturalOrder());
+      for (Integer keyRef : inactiveKeyRefs.subList(0, inactiveKeyRefs.size() - INACTIVE_KEY_IDS_TO_KEEP)) {
+        commitUserData.remove(COMMIT_KEY_ID + keyRef);
+        commitUserData.remove(COMMIT_KEY_COOKIE + keyRef);
+      }
+    }
+  }
+}
diff --git a/encryption/src/main/java/org/apache/solr/encryption/KeyManager.java b/encryption/src/main/java/org/apache/solr/encryption/KeyManager.java
new file mode 100644
index 0000000..27a381c
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/KeyManager.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption;
+
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.solr.common.util.NamedList;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+/**
+ * Manages encryption keys and defines which index files to encrypt.
+ * Supplies the encryption key secrets corresponding to provided key ids.
+ */
+public interface KeyManager {
+
+  /**
+   * Indicates whether the provided file is encryptable based on its name.
+   * <p/>
+   * Segments files ({@link IndexFileNames#SEGMENTS} or {@link IndexFileNames#PENDING_SEGMENTS}) are never
+   * passed as parameter because they are filtered before calling this method (they must not be encrypted).
+   */
+  boolean isEncryptable(String fileName);
+
+  /**
+   * Gets the cookie corresponding to a given key.
+   * The cookie is an additional binary data to provide to get the key secret.
+   *
+   * @throws java.util.NoSuchElementException if the key is unknown.
+   */
+  byte[] getKeyCookie(String keyId) throws IOException;
+
+  /**
+   * Gets the encryption key secret corresponding to the provided key id.
+   *
+   * @param keyId          Key id which identifies uniquely the encryption key.
+   * @param keyRef         Key internal reference number to provide to the cookie supplier to retrieve the
+   *                       corresponding cookie, if any.
+   * @param cookieSupplier Takes the key reference number as input and supplies an additional binary data
+   *                       cookie required to get the key secret. This supplier may not be called if the
+   *                       key secret is in the transient memory cache. It may return null if there are no
+   *                       cookies.
+   * @return The key secret bytes. It must be either 16, 24 or 32 bytes long. The caller is not permitted
+   * to modify its content. Returns null if the key is known but has no secret bytes, in this case the data
+   * is not encrypted.
+   * @throws java.util.NoSuchElementException if the key is unknown.
+   */
+  byte[] getKeySecret(String keyId, String keyRef, Function<String, byte[]> cookieSupplier) throws IOException;
+
+  /**
+   * Supplies the {@link KeyManager}.
+   */
+  interface Supplier {
+
+    /** This supplier may be configured with parameters defined in solrconfig.xml. */
+    void init(NamedList<?> args);
+
+    /** Gets the {@link KeyManager}. */
+    KeyManager getKeyManager();
+  }
+}
diff --git a/encryption/src/main/java/org/apache/solr/encryption/crypto/AesCtrEncrypter.java b/encryption/src/main/java/org/apache/solr/encryption/crypto/AesCtrEncrypter.java
new file mode 100644
index 0000000..16fc33e
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/crypto/AesCtrEncrypter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption.crypto;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Stateful Encrypter specialized for AES algorithm in CTR (counter) mode with no padding.
+ * <p>In CTR mode, encryption and decryption are actually the same operation, so this API does not require specifying
+ * whether it is used to encrypt or decrypt.
+ * <p>An {@link AesCtrEncrypter} must be first {@link #init(long) initialized} before it can be used to
+ * {@link #process(ByteBuffer, ByteBuffer) encrypt/decrypt}.
+ * <p>Not thread safe.
+ */
+public interface AesCtrEncrypter extends Cloneable {
+
+  /**
+   * Initializes this encrypter at the provided CTR block counter (counter of blocks of size {@link AesCtrUtil#AES_BLOCK_SIZE}).
+   * <p>For example, the data byte at index i is inside the block at counter = i / {@link AesCtrUtil#AES_BLOCK_SIZE}.
+   * CTR mode computes an IV for this block based on the initial IV (at counter 0) and the provided counter. This allows
+   * efficient random access to encrypted data. Only the target block needs to be decrypted.
+   * <p>This method must be called first. Then the next call to {@link #process(ByteBuffer, ByteBuffer)} will start at the
+   * beginning of the block: the first byte of data at input buffer {@link ByteBuffer#position()} must be the first byte
+   * of the block.
+   */
+  void init(long counter);
+
+  /**
+   * Encrypts/decrypts the provided input buffer data and stores the encrypted/decrypted data in an output buffer.
+   * In CTR mode, encryption and decryption are actually the same operation.
+   * <p>Both buffers must be backed by array ({@link ByteBuffer#hasArray()} returns true), and must not share the same
+   * array.
+   * <p>Do not call this method when this {@link AesCtrEncrypter} is not {@link #init(long) initialized}.
+   * <p>This method takes care of incrementing the CTR counter while encrypting/decrypting the data. It can be called
+   * repeatedly without calling {@link #init(long)} again. {@link #init(long)} is called only to jump to a given block.
+   *
+   * @param inBuffer  Input data from {@link ByteBuffer#position()} (inclusive) to {@link ByteBuffer#limit()} (exclusive).
+   * @param outBuffer Output data to be stored at {@link ByteBuffer#position()}. outBuffer {@link ByteBuffer#remaining()}
+   *                  must be greater than or equal to inBuffer {@link ByteBuffer#remaining()}.
+   */
+  void process(ByteBuffer inBuffer, ByteBuffer outBuffer);
+
+  /**
+   * Clones this {@link AesCtrEncrypter} for efficiency as it clones the internal encryption key and IV.
+   * The returned clone must be initialized by calling {@link #init(long)} first.
+   */
+  AesCtrEncrypter clone();
+}
\ No newline at end of file
diff --git a/encryption/src/main/java/org/apache/solr/encryption/crypto/AesCtrEncrypterFactory.java b/encryption/src/main/java/org/apache/solr/encryption/crypto/AesCtrEncrypterFactory.java
new file mode 100644
index 0000000..aa6557a
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/crypto/AesCtrEncrypterFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption.crypto;
+
+/**
+ * {@link AesCtrEncrypter} factory.
+ */
+public interface AesCtrEncrypterFactory {
+
+  /**
+   * Creates a new {@link AesCtrEncrypter} instance.
+   *
+   * @param key The encryption key. It is cloned internally, its content is not modified, and no reference to it is kept.
+   * @param iv  The Initialization Vector (IV) for the CTR mode. It MUST be random for the effectiveness of the encryption.
+   *            It can be public (for example stored clear at the beginning of the encrypted file). It is cloned internally,
+   *            its content is not modified, and no reference to it is kept.
+   */
+  AesCtrEncrypter create(byte[] key, byte[] iv);
+
+  /**
+   * Returns whether this factory is supported.
+   */
+  boolean isSupported();
+
+  /**
+   * Returns why this factory is not supported.
+   * @return the cause; or null if none.
+   */
+  Throwable getUnsupportedCause();
+}
diff --git a/encryption/src/main/java/org/apache/solr/encryption/crypto/AesCtrUtil.java b/encryption/src/main/java/org/apache/solr/encryption/crypto/AesCtrUtil.java
new file mode 100644
index 0000000..7546a9c
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/crypto/AesCtrUtil.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption.crypto;
+
+import java.security.SecureRandom;
+
+/**
+ * Methods for AES/CTR encryption.
+ */
+public class AesCtrUtil {
+
+  /**
+   * AES block has a fixed length of 16 bytes (128 bits).
+   */
+  public static final int AES_BLOCK_SIZE = 16;
+
+  /**
+   * AES/CTR IV length. It is equal to {@link #AES_BLOCK_SIZE}. It is defined separately mainly for code clarity.
+   */
+  public static final int IV_LENGTH = AES_BLOCK_SIZE;
+
+  /**
+   * Checks a key for AES. Its length must be either 16, 24 or 32 bytes.
+   */
+  public static void checkAesKey(byte[] key) {
+    if (key.length != 16 && key.length != 24 && key.length != 32) {
+      // AES requires either 128, 192 or 256 bits keys.
+      throw new IllegalArgumentException("Invalid AES key length; it must be either 128, 192 or 256 bits long");
+    }
+  }
+
+  /**
+   * Checks the CTR counter. It must be positive or null.
+   */
+  public static void checkCtrCounter(long counter) {
+    if (counter < 0) {
+      throw new IllegalArgumentException("Illegal counter=" + counter);
+    }
+  }
+
+  /**
+   * Generates a random IV for AES/CTR of length {@link #IV_LENGTH}.
+   */
+  public static byte[] generateRandomAesCtrIv(SecureRandom secureRandom) {
+    // IV length must be the AES block size.
+    // IV must be random for the CTR mode. It starts with counter 0, so it's simply IV.
+    byte[] iv = new byte[IV_LENGTH];
+    do {
+      secureRandom.nextBytes(iv);
+      // Ensure that we have enough bits left to allow the 8 bytes counter to add with the carry.
+      // The high-order byte is at index 0.
+      // We check that there is at least one unset bit in the 3 highest bytes. It guarantees
+      // that we can add with the carry at least 5 bytes of the counter, which means we handle
+      // files of at least 2^(5*8) * 2 * 16 B = 35,000 GB.
+    } while (iv[0] == -1 && iv[1] == -1 && iv[2] == -1);
+    return iv;
+  }
+
+  /**
+   * Builds an AES/CTR IV based on the provided counter and an initial IV.
+   * The built IV is the same as with {@code com.sun.crypto.provider.CounterMode.increment()}.
+   */
+  public static void buildAesCtrIv(byte[] initialIv, long counter, byte[] iv) {
+    assert initialIv.length == IV_LENGTH && iv.length == IV_LENGTH;
+    int ivIndex = iv.length;
+    int counterIndex = 0;
+    int sum = 0;
+    while (ivIndex-- > 0) {
+      // (sum >>> Byte.SIZE) is the carry for counter addition.
+      sum = (initialIv[ivIndex] & 0xff) + (sum >>> Byte.SIZE);
+      // Add long counter.
+      if (counterIndex++ < 8) {
+        sum += (byte) counter & 0xff;
+        counter >>>= 8;
+      }
+      iv[ivIndex] = (byte) sum;
+    }
+  }
+}
\ No newline at end of file
diff --git a/encryption/src/main/java/org/apache/solr/encryption/crypto/CipherAesCtrEncrypter.java b/encryption/src/main/java/org/apache/solr/encryption/crypto/CipherAesCtrEncrypter.java
new file mode 100644
index 0000000..d1bfa90
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/crypto/CipherAesCtrEncrypter.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption.crypto;
+
+import javax.crypto.Cipher;
+import javax.crypto.NoSuchPaddingException;
+import javax.crypto.ShortBufferException;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.nio.ByteBuffer;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.Key;
+import java.security.NoSuchAlgorithmException;
+
+import static org.apache.solr.encryption.crypto.AesCtrUtil.*;
+
+/**
+ * {@link AesCtrEncrypter} backed by a {@link javax.crypto.Cipher} with the "AES/CTR/NoPadding" transformation.
+ * <p>This encrypter loads the internal {@link javax.crypto.CipherSpi} implementation from the classpath, see
+ * {@link Cipher#getInstance(String)}. It is heavy to create, to initialize and to clone, but the
+ * {@link #process(ByteBuffer, ByteBuffer) encryption} is extremely fast thanks to {@code HotSpotIntrinsicCandidate}
+ * annotation in com.sun.crypto.provider.CounterMode.
+ */
+public class CipherAesCtrEncrypter implements AesCtrEncrypter {
+
+  /**
+   * {@link CipherAesCtrEncrypter} factory.
+   */
+  public static final AesCtrEncrypterFactory FACTORY = new Factory();
+
+  // Most fields are not final for the clone() method.
+  private final Key key;
+  private final byte[] initialIv;
+  private byte[] iv;
+  private ReusableIvParameterSpec ivParameterSpec;
+  private Cipher cipher;
+  private long counter;
+
+  /**
+   * @param key The encryption key. It is cloned internally, its content is not modified, and no reference to it is kept.
+   * @param iv  The Initialization Vector (IV) for the CTR mode. It MUST be random for the effectiveness of the encryption.
+   *            It can be public (for example stored clear at the beginning of the encrypted file). It is cloned internally,
+   *            its content is not modified, and no reference to it is kept.
+   */
+  public CipherAesCtrEncrypter(byte[] key, byte[] iv) {
+    checkAesKey(key);
+    this.key = new SecretKeySpec(key, "AES");
+    this.initialIv = iv.clone();
+    this.iv = iv.clone();
+    ivParameterSpec = new ReusableIvParameterSpec(this.iv);
+    cipher = createAesCtrCipher();
+  }
+
+  @Override
+  public void init(long counter) {
+    checkCtrCounter(counter);
+    if (counter != this.counter) {
+      this.counter = counter;
+      buildAesCtrIv(initialIv, counter, iv);
+    }
+    try {
+      cipher.init(Cipher.ENCRYPT_MODE, key, ivParameterSpec, SecureRandomProvider.get());
+    } catch (InvalidKeyException | InvalidAlgorithmParameterException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void process(ByteBuffer inBuffer, ByteBuffer outBuffer) {
+    try {
+      int inputSize = inBuffer.remaining();
+      int numEncryptedBytes = cipher.update(inBuffer, outBuffer);
+      if (numEncryptedBytes < inputSize) {
+        throw new UnsupportedOperationException(Cipher.class.getSimpleName() + " implementation does not maintain an encryption context; this is not supported");
+      }
+    } catch (ShortBufferException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public CipherAesCtrEncrypter clone() {
+    CipherAesCtrEncrypter clone;
+    try {
+      clone = (CipherAesCtrEncrypter) super.clone();
+    } catch (CloneNotSupportedException e) {
+      throw new Error("This cannot happen: Failing to clone " + CipherAesCtrEncrypter.class.getSimpleName(), e);
+    }
+    // key and initialIv are the same references.
+    clone.iv = initialIv.clone();
+    clone.ivParameterSpec = new ReusableIvParameterSpec(clone.iv);
+    clone.cipher = createAesCtrCipher();
+    clone.counter = 0;
+    return clone;
+  }
+
+  private static Cipher createAesCtrCipher() {
+    try {
+      Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding");
+      assert cipher.getBlockSize() == AES_BLOCK_SIZE : "Invalid AES block size: " + cipher.getBlockSize();
+      return cipher;
+    } catch (NoSuchAlgorithmException | NoSuchPaddingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Avoids cloning the IV in the constructor each time we need an {@link IvParameterSpec}.
+   */
+  private static class ReusableIvParameterSpec extends IvParameterSpec {
+
+    static final byte[] EMPTY_BYTES = new byte[0];
+
+    final byte[] iv;
+
+    ReusableIvParameterSpec(byte[] iv) {
+      super(EMPTY_BYTES);
+      this.iv = iv;
+    }
+
+    public byte[] getIV() {
+      return iv.clone();
+    }
+  }
+
+  /**
+   * {@link CipherAesCtrEncrypter} factory.
+   */
+  public static class Factory implements AesCtrEncrypterFactory {
+    @Override
+    public AesCtrEncrypter create(byte[] key, byte[] iv) {
+      return new CipherAesCtrEncrypter(key, iv);
+    }
+
+    @Override
+    public boolean isSupported() {
+      return true;
+    }
+
+    @Override
+    public Throwable getUnsupportedCause() {
+      return null;
+    }
+  }
+}
\ No newline at end of file
diff --git a/encryption/src/main/java/org/apache/solr/encryption/crypto/DecryptingIndexInput.java b/encryption/src/main/java/org/apache/solr/encryption/crypto/DecryptingIndexInput.java
new file mode 100644
index 0000000..eff5fe4
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/crypto/DecryptingIndexInput.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption.crypto;
+
+import org.apache.lucene.store.IndexInput;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.solr.encryption.crypto.AesCtrUtil.*;
+
+/**
+ * {@link IndexInput} that reads from a delegate {@link IndexInput} and decrypts data on the fly.
+ * <p>It decrypts with the AES algorithm in CTR (counter) mode with no padding. It is appropriate for random access of
+ * the read-only index files. It can decrypt data previously encrypted with an {@link EncryptingIndexOutput}.
+ * <p>It first reads the CTR Initialization Vector (IV). This random IV is not encrypted. Then it can decrypt the rest
+ * of the file, which probably contains a header and footer, with random access.
+ *
+ * @see EncryptingIndexOutput
+ * @see AesCtrEncrypter
+ */
+public class DecryptingIndexInput extends IndexInput {
+
+  /**
+   * Must be a multiple of {@link AesCtrUtil#AES_BLOCK_SIZE}.
+   * Benchmarks showed that 6 x {@link AesCtrUtil#AES_BLOCK_SIZE} is a good buffer size.
+   */
+  private static final int BUFFER_CAPACITY = 6 * AES_BLOCK_SIZE; // 96 B
+
+  private static final long AES_BLOCK_SIZE_MOD_MASK = AES_BLOCK_SIZE - 1;
+
+  // Most fields are not final for the clone() method.
+  private boolean isClone;
+  private final long delegateOffset;
+  private final long sliceOffset;
+  private final long sliceEnd;
+  private IndexInput indexInput;
+  private AesCtrEncrypter encrypter;
+  private ByteBuffer inBuffer;
+  private ByteBuffer outBuffer;
+  private byte[] inArray;
+  private byte[] oneByteBuf;
+  private int padding;
+  private boolean closed;
+
+  /**
+   * @param indexInput The delegate {@link IndexInput} to read and decrypt data from. Its current file pointer may be
+   *                   greater than or equal to zero, this allows for example the caller to first read some special
+   *                   encryption header followed by a key id to retrieve the key secret.
+   * @param key        The encryption key secret. It is cloned internally, its content is not modified, and no
+   *                   reference to it is kept.
+   * @param factory    The factory to use to create one instance of {@link AesCtrEncrypter}. This instance may be cloned.
+   */
+  public DecryptingIndexInput(IndexInput indexInput, byte[] key, AesCtrEncrypterFactory factory) throws IOException {
+    this("Decrypting " + indexInput.toString(),
+      indexInput.getFilePointer() + IV_LENGTH,
+      indexInput.getFilePointer() + IV_LENGTH,
+      indexInput.length() - indexInput.getFilePointer() - IV_LENGTH,
+      false,
+      indexInput,
+      createEncrypter(indexInput, key, factory));
+  }
+
+  private DecryptingIndexInput(String resourceDescription,
+                               long delegateOffset,
+                               long sliceOffset,
+                               long sliceLength,
+                               boolean isClone,
+                               IndexInput indexInput,
+                               AesCtrEncrypter encrypter) {
+    super(resourceDescription);
+    assert delegateOffset >= 0 && sliceOffset >= 0 && sliceLength >= 0;
+    this.delegateOffset = delegateOffset;
+    this.sliceOffset = sliceOffset;
+    this.sliceEnd = sliceOffset + sliceLength;
+    this.isClone = isClone;
+    this.indexInput = indexInput;
+    this.encrypter = encrypter;
+    encrypter.init(0);
+    inBuffer = ByteBuffer.allocate(getBufferCapacity());
+    outBuffer = ByteBuffer.allocate(getBufferCapacity() + AES_BLOCK_SIZE);
+    outBuffer.limit(0);
+    assert inBuffer.hasArray() && outBuffer.hasArray();
+    assert inBuffer.arrayOffset() == 0;
+    inArray = inBuffer.array();
+    oneByteBuf = new byte[1];
+  }
+
+  /**
+   * Creates the {@link AesCtrEncrypter} based on the secret key and the IV at the beginning of the index input.
+   */
+  private static AesCtrEncrypter createEncrypter(IndexInput indexInput,
+                                                 byte[] key,
+                                                 AesCtrEncrypterFactory factory)
+    throws IOException {
+    byte[] iv = new byte[IV_LENGTH];
+    indexInput.readBytes(iv, 0, iv.length, false);
+    return factory.create(key, iv);
+  }
+
+  /**
+   * Gets the buffer capacity. It must be a multiple of {@link AesCtrUtil#AES_BLOCK_SIZE}.
+   */
+  protected int getBufferCapacity() {
+    return BUFFER_CAPACITY;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!closed) {
+      closed = true;
+      if (!isClone) {
+        indexInput.close();
+      }
+    }
+  }
+
+  @Override
+  public long getFilePointer() {
+    return getPosition() - sliceOffset;
+  }
+
+  /**
+   * Gets the current internal position in the delegate {@link IndexInput}. It includes IV length.
+   */
+  private long getPosition() {
+    return indexInput.getFilePointer() - outBuffer.remaining();
+  }
+
+  @Override
+  public void seek(long position) throws IOException {
+    if (position < 0) {
+      throw new IllegalArgumentException("Invalid position=" + position);
+    }
+    if (position > length()) {
+      throw new EOFException("Seek beyond EOF (position=" + position + ", length=" + length() + ") in " + this);
+    }
+    long targetPosition = position + sliceOffset;
+    long delegatePosition = indexInput.getFilePointer();
+    long currentPosition = delegatePosition - outBuffer.remaining();
+    if (targetPosition >= currentPosition && targetPosition <= delegatePosition) {
+      // The target position is within the buffered output. Just move the output buffer position.
+      outBuffer.position(outBuffer.position() + (int) (targetPosition - currentPosition));
+      assert targetPosition == delegatePosition - outBuffer.remaining();
+    } else {
+      indexInput.seek(targetPosition);
+      setPosition(targetPosition);
+    }
+  }
+
+  private void setPosition(long position) {
+    inBuffer.clear();
+    outBuffer.clear();
+    outBuffer.limit(0);
+    // Compute the counter by ignoring the IV and the delegate offset, if any.
+    long delegatePosition = position - delegateOffset;
+    long counter = delegatePosition / AES_BLOCK_SIZE;
+    encrypter.init(counter);
+    padding = (int) (delegatePosition & AES_BLOCK_SIZE_MOD_MASK);
+    inBuffer.position(padding);
+  }
+
+  /**
+   * Returns the number of encrypted/decrypted bytes in the file.
+   * <p>It is the logical length of the file, not the physical length. It excludes the IV added artificially to manage
+   * the encryption. It includes only and all the encrypted bytes (probably a header, content, and a footer).
+   * <p>With AES/CTR/NoPadding encryption, the length of the encrypted data is identical to the length of the decrypted data.
+   */
+  @Override
+  public long length() {
+    return sliceEnd - sliceOffset;
+  }
+
+  @Override
+  public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
+    if (offset < 0 || length < 0 || offset + length > length()) {
+      throw new IllegalArgumentException("Slice \"" + sliceDescription + "\" out of bounds (offset=" + offset
+        + ", sliceLength=" + length + ", fileLength=" + length() + ") of " + this);
+    }
+    DecryptingIndexInput slice = new DecryptingIndexInput(getFullSliceDescription(sliceDescription),
+      delegateOffset, sliceOffset + offset, length, true, indexInput.clone(), encrypter.clone());
+    slice.seek(0);
+    return slice;
+  }
+
+  @Override
+  public byte readByte() throws IOException {
+    readBytes(oneByteBuf, 0, 1);
+    return oneByteBuf[0];
+  }
+
+  @Override
+  public void readBytes(byte[] b, int offset, int length) throws IOException {
+    if (offset < 0 || length < 0 || offset + length > b.length) {
+      throw new IllegalArgumentException("Invalid read buffer parameters (offset=" + offset + ", length=" + length
+        + ", arrayLength=" + b.length + ")");
+    }
+    if (getPosition() + length > sliceEnd) {
+      throw new EOFException("Read beyond EOF (position=" + (getPosition() - sliceOffset) + ", arrayLength=" + length
+        + ", fileLength=" + length() + ") in " + this);
+    }
+    while (length > 0) {
+      // Transfer decrypted bytes from outBuffer.
+      int outRemaining = outBuffer.remaining();
+      if (outRemaining > 0) {
+        if (length <= outRemaining) {
+          outBuffer.get(b, offset, length);
+          return;
+        }
+        outBuffer.get(b, offset, outRemaining);
+        assert outBuffer.remaining() == 0;
+        offset += outRemaining;
+        length -= outRemaining;
+      }
+
+      readToFillBuffer(length);
+      decryptBuffer();
+    }
+  }
+
+  private void readToFillBuffer(int length) throws IOException {
+    assert length > 0;
+    int inRemaining = inBuffer.remaining();
+    if (inRemaining > 0) {
+      int position = inBuffer.position();
+      int numBytesToRead = Math.min(inRemaining, length);
+      indexInput.readBytes(inArray, position, numBytesToRead);
+      inBuffer.position(position + numBytesToRead);
+    }
+  }
+
+  private void decryptBuffer() {
+    assert inBuffer.position() > padding : "position=" + inBuffer.position() + ", padding=" + padding;
+    inBuffer.flip();
+    outBuffer.clear();
+    encrypter.process(inBuffer, outBuffer);
+    inBuffer.clear();
+    outBuffer.flip();
+    if (padding > 0) {
+      outBuffer.position(padding);
+      padding = 0;
+    }
+  }
+
+  @Override
+  public DecryptingIndexInput clone() {
+    DecryptingIndexInput clone = (DecryptingIndexInput) super.clone();
+    clone.isClone = true;
+    clone.indexInput = indexInput.clone();
+    assert clone.indexInput.getFilePointer() == indexInput.getFilePointer();
+    clone.encrypter = encrypter.clone();
+    clone.inBuffer = ByteBuffer.allocate(getBufferCapacity());
+    clone.outBuffer = ByteBuffer.allocate(getBufferCapacity() + AES_BLOCK_SIZE);
+    clone.inArray = clone.inBuffer.array();
+    clone.oneByteBuf = new byte[1];
+    // The clone must be initialized.
+    clone.setPosition(getPosition());
+    return clone;
+  }
+}
\ No newline at end of file
diff --git a/encryption/src/main/java/org/apache/solr/encryption/crypto/EncryptingIndexOutput.java b/encryption/src/main/java/org/apache/solr/encryption/crypto/EncryptingIndexOutput.java
new file mode 100644
index 0000000..4a14e01
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/crypto/EncryptingIndexOutput.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption.crypto;
+
+import org.apache.lucene.store.BufferedChecksum;
+import org.apache.lucene.store.IndexOutput;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import static org.apache.solr.encryption.crypto.AesCtrUtil.*;
+
+/**
+ * {@link IndexOutput} that encrypts data and writes to a delegate {@link IndexOutput} on the fly.
+ * <p>It encrypts with the AES algorithm in CTR (counter) mode with no padding. It is appropriate for random access of
+ * the read-only index files. Use a {@link DecryptingIndexInput} to decrypt this file.
+ * <p>It generates a cryptographically strong random CTR Initialization Vector (IV). This random IV is not encrypted and
+ * is skipped by any {@link DecryptingIndexInput} reading the written data. Then it can encrypt the rest of the file
+ * which probably contains a header and footer.
+ *
+ * @see DecryptingIndexInput
+ * @see AesCtrEncrypter
+ */
+public class EncryptingIndexOutput extends IndexOutput {
+
+  /**
+   * Must be a multiple of {@link AesCtrUtil#AES_BLOCK_SIZE}.
+   */
+  private static final int BUFFER_CAPACITY = 64 * AES_BLOCK_SIZE; // 1024
+
+  private final IndexOutput indexOutput;
+  private final AesCtrEncrypter encrypter;
+  private final ByteBuffer inBuffer;
+  private final ByteBuffer outBuffer;
+  private final byte[] outArray;
+  private final byte[] oneByteBuf;
+  private final Checksum clearChecksum;
+  private long filePointer;
+  private boolean closed;
+
+  /**
+   * @param indexOutput The delegate {@link IndexOutput} to write encrypted data to. Its current file pointer may be
+   *                    greater than or equal to zero, this allows for example the caller to first write some special
+   *                    encryption header followed by a key id, to identify the key secret used to encrypt.
+   * @param key         The encryption key secret. It is cloned internally, its content is not modified, and no
+   *                    reference to it is kept.
+   * @param factory     The factory to use to create one instance of {@link AesCtrEncrypter}. This instance may be cloned.
+   */
+  public EncryptingIndexOutput(IndexOutput indexOutput, byte[] key, AesCtrEncrypterFactory factory) throws IOException {
+    super("Encrypting " + indexOutput.toString(), indexOutput.getName());
+    this.indexOutput = indexOutput;
+
+    byte[] iv = generateRandomIv();
+    encrypter = factory.create(key, iv);
+    encrypter.init(0);
+    // IV is written at the beginning of the index output. It's public.
+    // Even if the delegate indexOutput is positioned after the initial IV, this index output file pointer is 0 initially.
+    indexOutput.writeBytes(iv, 0, iv.length);
+
+    inBuffer = ByteBuffer.allocate(getBufferCapacity());
+    outBuffer = ByteBuffer.allocate(getBufferCapacity() + AES_BLOCK_SIZE);
+    assert inBuffer.hasArray() && outBuffer.hasArray();
+    assert outBuffer.arrayOffset() == 0;
+    outArray = outBuffer.array();
+    oneByteBuf = new byte[1];
+
+    // Compute the checksum to skip the initial IV, because an external checksum checker will not see it.
+    clearChecksum = new BufferedChecksum(new CRC32());
+  }
+
+  /**
+   * Generates a cryptographically strong CTR random IV of length {@link AesCtrUtil#IV_LENGTH}.
+   */
+  protected byte[] generateRandomIv() {
+    return generateRandomAesCtrIv(SecureRandomProvider.get());
+  }
+
+  /**
+   * Gets the buffer capacity. It must be a multiple of {@link AesCtrUtil#AES_BLOCK_SIZE}.
+   */
+  protected int getBufferCapacity() {
+    return BUFFER_CAPACITY;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!closed) {
+      closed = true;
+      try {
+        if (inBuffer.position() != 0) {
+          encryptBufferAndWrite();
+        }
+      } finally {
+        indexOutput.close();
+      }
+    }
+  }
+
+  @Override
+  public long getFilePointer() {
+    // With AES/CTR/NoPadding, the encrypted and decrypted data have the same length.
+    // We return here the file pointer excluding the initial IV length at the beginning of the file.
+    return filePointer;
+  }
+
+  @Override
+  public long getChecksum() {
+    // The checksum is computed on the clear data, excluding the initial IV.
+    return clearChecksum.getValue();
+  }
+
+  @Override
+  public void writeByte(byte b) throws IOException {
+    oneByteBuf[0] = b;
+    writeBytes(oneByteBuf, 0, oneByteBuf.length);
+  }
+
+  @Override
+  public void writeBytes(byte[] b, int offset, int length) throws IOException {
+    if (offset < 0 || length < 0 || offset + length > b.length) {
+      throw new IllegalArgumentException("Invalid write buffer parameters (offset=" + offset + ", length=" + length + ", arrayLength=" + b.length + ")");
+    }
+    clearChecksum.update(b, offset, length);
+    filePointer += length;
+    while (length > 0) {
+      int remaining = inBuffer.remaining();
+      if (length < remaining) {
+        inBuffer.put(b, offset, length);
+        break;
+      } else {
+        inBuffer.put(b, offset, remaining);
+        offset += remaining;
+        length -= remaining;
+        encryptBufferAndWrite();
+      }
+    }
+  }
+
+  private void encryptBufferAndWrite() throws IOException {
+    assert inBuffer.position() != 0;
+    inBuffer.flip();
+    outBuffer.clear();
+    encrypter.process(inBuffer, outBuffer);
+    inBuffer.clear();
+    outBuffer.flip();
+    indexOutput.writeBytes(outArray, 0, outBuffer.limit());
+  }
+}
\ No newline at end of file
diff --git a/encryption/src/main/java/org/apache/solr/encryption/crypto/LightAesCtrEncrypter.java b/encryption/src/main/java/org/apache/solr/encryption/crypto/LightAesCtrEncrypter.java
new file mode 100644
index 0000000..76b4c15
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/crypto/LightAesCtrEncrypter.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption.crypto;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+import org.apache.lucene.util.SuppressForbidden;
+
+import static org.apache.solr.encryption.crypto.AesCtrUtil.*;
+
+/**
+ * Hack {@link AesCtrEncrypter} equivalent to {@link javax.crypto.Cipher} with "AES/CTR/NoPadding" but more efficient.
+ * <p>The {@link #LightAesCtrEncrypter(byte[], byte[]) constructor} and the {@link #init(long)} operations are lighter and
+ * faster; {@link #clone()} is much faster. But it needs to call internal private {@code com.sun.crypto.provider.CounterMode}
+ * with reflection. It may not be {@link #isSupported() supported}.
+ * <p>Why do we need to access private {@code com.sun.crypto.provider.CounterMode} and {@code com.sun.crypto.provider.AESCrypt}?
+ * Because they contain the special JVM annotation @HotSpotIntrinsicCandidate that makes their encryption method extremely
+ * fast. If we copy the code in pure Java, it runs 30x slower.
+ */
+public class LightAesCtrEncrypter implements AesCtrEncrypter {
+
+  /**
+   * {@link LightAesCtrEncrypter} factory.
+   */
+  public static final AesCtrEncrypterFactory FACTORY = new Factory();
+
+  private static final Constructor<?> AES_CRYPT_CONSTRUCTOR;
+  private static final Method AES_CRYPT_INIT_METHOD;
+  private static final Constructor<?> COUNTER_MODE_CONSTRUCTOR;
+  private static final Field COUNTER_MODE_IV_FIELD;
+  private static final Method COUNTER_MODE_RESET_METHOD;
+  private static final Method COUNTER_MODE_CRYPT_METHOD;
+  private static final Throwable HACK_FAILURE;
+  static {
+    Hack hack = AccessController.doPrivileged((PrivilegedAction<Hack>) LightAesCtrEncrypter::hack);
+    AES_CRYPT_CONSTRUCTOR = hack.aesCryptConstructor;
+    AES_CRYPT_INIT_METHOD = hack.aesCryptInitMethod;
+    COUNTER_MODE_CONSTRUCTOR = hack.counterModeConstructor;
+    COUNTER_MODE_IV_FIELD = hack.counterIvField;
+    COUNTER_MODE_RESET_METHOD = hack.counterModeResetMethod;
+    COUNTER_MODE_CRYPT_METHOD = hack.counterModeCryptMethod;
+    HACK_FAILURE = hack.hackFailure;
+  }
+
+  private final Object aesCrypt;
+  private final byte[] initialIv;
+  private Object counterMode;
+  private byte[] iv;
+
+  /**
+   * Indicates whether the {@link LightAesCtrEncrypter} hack is supported.
+   * If it is not supported, then {@link LightAesCtrEncrypter} constructor throws an {@link UnsupportedOperationException}
+   * (with the hack failure cause).
+   */
+  public static boolean isSupported() {
+    return HACK_FAILURE == null;
+  }
+
+  /**
+   * @param key The encryption key. It is cloned internally, its content is not modified, and no reference to it is kept.
+   * @param iv  The Initialization Vector (IV) for the CTR mode. It MUST be random for the effectiveness of the encryption.
+   *            It can be public (for example stored clear at the beginning of the encrypted file). It is cloned internally,
+   *            its content is not modified, and no reference to it is kept.
+   * @throws UnsupportedOperationException If the hack is not {@link #isSupported() supported}.
+   */
+  public LightAesCtrEncrypter(byte[] key, byte[] iv) {
+    if (HACK_FAILURE != null) {
+      throw new UnsupportedOperationException(HACK_FAILURE);
+    }
+    checkAesKey(key);
+    try {
+      aesCrypt = AES_CRYPT_CONSTRUCTOR.newInstance();
+      AES_CRYPT_INIT_METHOD.invoke(aesCrypt, false, "AES", key);
+      counterMode = COUNTER_MODE_CONSTRUCTOR.newInstance(aesCrypt);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    this.initialIv = iv.clone();
+    this.iv = iv.clone();
+  }
+
+  @Override
+  public void init(long counter) {
+    checkCtrCounter(counter);
+    buildAesCtrIv(initialIv, counter, iv);
+    try {
+      COUNTER_MODE_IV_FIELD.set(counterMode, iv);
+      COUNTER_MODE_RESET_METHOD.invoke(counterMode);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void process(ByteBuffer inBuffer, ByteBuffer outBuffer) {
+    assert inBuffer.array() != outBuffer.array() : "Input and output buffers must not be backed by the same array";
+    int length = inBuffer.remaining();
+    if (length > outBuffer.remaining()) {
+      throw new IllegalArgumentException("Output buffer does not have enough remaining space (needs " + length + " B)");
+    }
+    int outPos = outBuffer.position();
+    try {
+      COUNTER_MODE_CRYPT_METHOD.invoke(counterMode, inBuffer.array(), inBuffer.arrayOffset() + inBuffer.position(),
+        length, outBuffer.array(), outBuffer.arrayOffset() + outPos);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    inBuffer.position(inBuffer.limit());
+    outBuffer.position(outPos + length);
+  }
+
+  @Override
+  public LightAesCtrEncrypter clone() {
+    LightAesCtrEncrypter clone;
+    try {
+      clone = (LightAesCtrEncrypter) super.clone();
+    } catch (CloneNotSupportedException e) {
+      throw new Error("Failed to clone " + LightAesCtrEncrypter.class.getSimpleName() + "; this should not happen");
+    }
+    // aesCrypt and initialIv are the same references.
+    try {
+      clone.counterMode = COUNTER_MODE_CONSTRUCTOR.newInstance(aesCrypt);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    clone.iv = initialIv.clone();
+    return clone;
+  }
+
+  @SuppressForbidden(reason = "Needs access to private APIs in com.sun.crypto.provider.CounterMode and com.sun.crypto.provider.AESCrypt to enable the hack")
+  private static Hack hack() {
+    Hack hack = new Hack();
+    try {
+      Class<?> aesCryptClass = Class.forName("com.sun.crypto.provider.AESCrypt");
+      hack.aesCryptConstructor = aesCryptClass.getDeclaredConstructor();
+      hack.aesCryptConstructor.setAccessible(true);
+      hack.aesCryptInitMethod = aesCryptClass.getDeclaredMethod("init", boolean.class, String.class, byte[].class);
+      hack.aesCryptInitMethod.setAccessible(true);
+      Class<?> counterModeClass = Class.forName("com.sun.crypto.provider.CounterMode");
+      Class<?> symmetricCipherClass = Class.forName("com.sun.crypto.provider.SymmetricCipher");
+      hack.counterModeConstructor = counterModeClass.getDeclaredConstructor(symmetricCipherClass);
+      hack.counterModeConstructor.setAccessible(true);
+      Class<?> feedbackCipherClass = Class.forName("com.sun.crypto.provider.FeedbackCipher");
+      hack.counterIvField = feedbackCipherClass.getDeclaredField("iv");
+      hack.counterIvField.setAccessible(true);
+      hack.counterModeResetMethod = counterModeClass.getDeclaredMethod("reset");
+      hack.counterModeResetMethod.setAccessible(true);
+      hack.counterModeCryptMethod = counterModeClass.getDeclaredMethod("implCrypt", byte[].class, int.class, int.class, byte[].class, int.class);
+      hack.counterModeCryptMethod.setAccessible(true);
+    } catch (SecurityException se) {
+      hack.hackFailure = new UnsupportedOperationException(LightAesCtrEncrypter.class.getName() + " is not supported"
+        + " because not all required permissions are given to the Encryption JAR file: " + se +
+        " [To support it, grant at least the following permissions:" +
+        " RuntimePermission(\"accessClassInPackage.com.sun.crypto.provider\") " +
+        " and ReflectPermission(\"suppressAccessChecks\")]", se);
+    } catch (ReflectiveOperationException | RuntimeException e) {
+      hack.hackFailure = new UnsupportedOperationException(LightAesCtrEncrypter.class.getName() + " is not supported"
+        + " on this platform because internal Java APIs are not compatible with this Solr version: " + e, e);
+    }
+    if (hack.hackFailure != null) {
+      hack.aesCryptConstructor = null;
+      hack.aesCryptInitMethod = null;
+      hack.counterModeConstructor = null;
+      hack.counterIvField = null;
+      hack.counterModeResetMethod = null;
+      hack.counterModeCryptMethod = null;
+    }
+    return hack;
+  }
+
+  private static class Hack {
+    Constructor<?> aesCryptConstructor;
+    Method aesCryptInitMethod;
+    Constructor<?> counterModeConstructor;
+    Field counterIvField;
+    Method counterModeResetMethod;
+    Method counterModeCryptMethod;
+    UnsupportedOperationException hackFailure;
+  }
+
+  /**
+   * {@link LightAesCtrEncrypter} factory.
+   */
+  public static class Factory implements AesCtrEncrypterFactory {
+    @Override
+    public AesCtrEncrypter create(byte[] key, byte[] iv) {
+      return new LightAesCtrEncrypter(key, iv);
+    }
+
+    @Override
+    public boolean isSupported() {
+      return LightAesCtrEncrypter.isSupported();
+    }
+
+    @Override
+    public Throwable getUnsupportedCause() {
+      return LightAesCtrEncrypter.HACK_FAILURE;
+    }
+  }
+}
\ No newline at end of file
diff --git a/encryption/src/main/java/org/apache/solr/encryption/crypto/SecureRandomProvider.java b/encryption/src/main/java/org/apache/solr/encryption/crypto/SecureRandomProvider.java
new file mode 100644
index 0000000..6acce24
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/crypto/SecureRandomProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption.crypto;
+
+import java.security.SecureRandom;
+
+/**
+ * Provides a {@link SecureRandom} singleton.
+ */
+public class SecureRandomProvider {
+
+  /**
+   * Gets the {@link SecureRandom} singleton.
+   */
+  public static SecureRandom get() {
+    return Holder.SECURE_RANDOM;
+  }
+
+  private static class Holder {
+    static final SecureRandom SECURE_RANDOM = new SecureRandom();
+  }
+}
diff --git a/encryption/src/main/java/org/apache/solr/update/DirectUpdateHandler2Copy.java b/encryption/src/main/java/org/apache/solr/update/DirectUpdateHandler2Copy.java
new file mode 100644
index 0000000..a186db6
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/update/DirectUpdateHandler2Copy.java
@@ -0,0 +1,1110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update;
+
+import com.codahale.metrics.Meter;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.index.CodecReader;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.SlowCodecReaderWrapper;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.queries.function.ValueSource;
+import org.apache.lucene.search.BooleanClause.Occur;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefHash;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrConfig.UpdateHandlerInfo;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.metrics.SolrMetricProducer;
+import org.apache.solr.metrics.SolrMetricsContext;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.request.SolrRequestInfo;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.FunctionRangeQuery;
+import org.apache.solr.search.QParser;
+import org.apache.solr.search.QueryUtils;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.search.SyntaxError;
+import org.apache.solr.search.function.ValueSourceRangeFilter;
+import org.apache.solr.util.RefCounted;
+import org.apache.solr.util.TestInjection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+//TODO: remove this DirectUpdateHandler2 copy once Solr 9.2 is available.
+// Its only modification is the shouldCommit() method that will be available in 9.2.
+/**
+ * <code>DirectUpdateHandler2</code> implements an UpdateHandler where documents are added directly
+ * to the main Lucene index as opposed to adding to a separate smaller index.
+ */
+public class DirectUpdateHandler2Copy extends UpdateHandler
+  implements SolrCoreState.IndexWriterCloser, SolrMetricProducer {
+
+  private static final int NO_FILE_SIZE_UPPER_BOUND_PLACEHOLDER = -1;
+
+  protected final SolrCoreState solrCoreState;
+
+  // stats
+  LongAdder addCommands = new LongAdder();
+  Meter addCommandsCumulative;
+  LongAdder deleteByIdCommands = new LongAdder();
+  Meter deleteByIdCommandsCumulative;
+  LongAdder deleteByQueryCommands = new LongAdder();
+  Meter deleteByQueryCommandsCumulative;
+  Meter expungeDeleteCommands;
+  Meter mergeIndexesCommands;
+  Meter commitCommands;
+  Meter splitCommands;
+  Meter optimizeCommands;
+  Meter rollbackCommands;
+  LongAdder numDocsPending = new LongAdder();
+  LongAdder numErrors = new LongAdder();
+  Meter numErrorsCumulative;
+
+  // tracks when auto-commit should occur
+  protected final CommitTracker commitTracker;
+  protected final CommitTracker softCommitTracker;
+
+  protected boolean commitWithinSoftCommit;
+  /**
+   * package access for testing
+   *
+   * @lucene.internal
+   */
+  void setCommitWithinSoftCommit(boolean value) {
+    this.commitWithinSoftCommit = value;
+  }
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public DirectUpdateHandler2Copy(SolrCore core) {
+    super(core);
+
+    solrCoreState = core.getSolrCoreState();
+
+    UpdateHandlerInfo updateHandlerInfo = core.getSolrConfig().getUpdateHandlerInfo();
+    int docsUpperBound = updateHandlerInfo.autoCommmitMaxDocs;
+    int timeUpperBound = updateHandlerInfo.autoCommmitMaxTime;
+    long fileSizeUpperBound = updateHandlerInfo.autoCommitMaxSizeBytes;
+    commitTracker =
+      new CommitTracker(
+        "Hard",
+        core,
+        docsUpperBound,
+        timeUpperBound,
+        fileSizeUpperBound,
+        updateHandlerInfo.openSearcher,
+        false);
+
+    int softCommitDocsUpperBound = updateHandlerInfo.autoSoftCommmitMaxDocs;
+    int softCommitTimeUpperBound = updateHandlerInfo.autoSoftCommmitMaxTime;
+    softCommitTracker =
+      new CommitTracker(
+        "Soft",
+        core,
+        softCommitDocsUpperBound,
+        softCommitTimeUpperBound,
+        NO_FILE_SIZE_UPPER_BOUND_PLACEHOLDER,
+        true,
+        true);
+
+    commitWithinSoftCommit = updateHandlerInfo.commitWithinSoftCommit;
+
+    ZkController zkController = core.getCoreContainer().getZkController();
+    if (zkController != null
+      && core.getCoreDescriptor().getCloudDescriptor().getReplicaType() == Replica.Type.TLOG) {
+      commitWithinSoftCommit = false;
+      commitTracker.setOpenSearcher(true);
+    }
+  }
+
+  public DirectUpdateHandler2Copy(SolrCore core, UpdateHandler updateHandler) {
+    super(core, updateHandler.getUpdateLog());
+    solrCoreState = core.getSolrCoreState();
+
+    UpdateHandlerInfo updateHandlerInfo = core.getSolrConfig().getUpdateHandlerInfo();
+    int docsUpperBound = updateHandlerInfo.autoCommmitMaxDocs;
+    int timeUpperBound = updateHandlerInfo.autoCommmitMaxTime;
+    long fileSizeUpperBound = updateHandlerInfo.autoCommitMaxSizeBytes;
+    commitTracker =
+      new CommitTracker(
+        "Hard",
+        core,
+        docsUpperBound,
+        timeUpperBound,
+        fileSizeUpperBound,
+        updateHandlerInfo.openSearcher,
+        false);
+
+    int softCommitDocsUpperBound = updateHandlerInfo.autoSoftCommmitMaxDocs;
+    int softCommitTimeUpperBound = updateHandlerInfo.autoSoftCommmitMaxTime;
+    softCommitTracker =
+      new CommitTracker(
+        "Soft",
+        core,
+        softCommitDocsUpperBound,
+        softCommitTimeUpperBound,
+        NO_FILE_SIZE_UPPER_BOUND_PLACEHOLDER,
+        updateHandlerInfo.openSearcher,
+        true);
+
+    commitWithinSoftCommit = updateHandlerInfo.commitWithinSoftCommit;
+
+    UpdateLog existingLog = updateHandler.getUpdateLog();
+    if (this.ulog != null && this.ulog == existingLog) {
+      // If we are reusing the existing update log, inform the log that its update handler has
+      // changed. We do this as late as possible.
+      this.ulog.init(this, core);
+    }
+  }
+
+  @Override
+  public void initializeMetrics(SolrMetricsContext parentContext, String scope) {
+    solrMetricsContext = parentContext.getChildContext(this);
+    commitCommands = solrMetricsContext.meter("commits", getCategory().toString(), scope);
+    solrMetricsContext.gauge(
+      () -> commitTracker.getCommitCount(), true, "autoCommits", getCategory().toString(), scope);
+    solrMetricsContext.gauge(
+      () -> softCommitTracker.getCommitCount(),
+      true,
+      "softAutoCommits",
+      getCategory().toString(),
+      scope);
+    if (commitTracker.getDocsUpperBound() > 0) {
+      solrMetricsContext.gauge(
+        () -> commitTracker.getDocsUpperBound(),
+        true,
+        "autoCommitMaxDocs",
+        getCategory().toString(),
+        scope);
+    }
+    if (commitTracker.getTimeUpperBound() > 0) {
+      solrMetricsContext.gauge(
+        () -> "" + commitTracker.getTimeUpperBound() + "ms",
+        true,
+        "autoCommitMaxTime",
+        getCategory().toString(),
+        scope);
+    }
+    if (commitTracker.getTLogFileSizeUpperBound() > 0) {
+      solrMetricsContext.gauge(
+        () -> commitTracker.getTLogFileSizeUpperBound(),
+        true,
+        "autoCommitMaxSize",
+        getCategory().toString(),
+        scope);
+    }
+    if (softCommitTracker.getDocsUpperBound() > 0) {
+      solrMetricsContext.gauge(
+        () -> softCommitTracker.getDocsUpperBound(),
+        true,
+        "softAutoCommitMaxDocs",
+        getCategory().toString(),
+        scope);
+    }
+    if (softCommitTracker.getTimeUpperBound() > 0) {
+      solrMetricsContext.gauge(
+        () -> "" + softCommitTracker.getTimeUpperBound() + "ms",
+        true,
+        "softAutoCommitMaxTime",
+        getCategory().toString(),
+        scope);
+    }
+    optimizeCommands = solrMetricsContext.meter("optimizes", getCategory().toString(), scope);
+    rollbackCommands = solrMetricsContext.meter("rollbacks", getCategory().toString(), scope);
+    splitCommands = solrMetricsContext.meter("splits", getCategory().toString(), scope);
+    mergeIndexesCommands = solrMetricsContext.meter("merges", getCategory().toString(), scope);
+    expungeDeleteCommands =
+      solrMetricsContext.meter("expungeDeletes", getCategory().toString(), scope);
+    solrMetricsContext.gauge(
+      () -> numDocsPending.longValue(), true, "docsPending", getCategory().toString(), scope);
+    solrMetricsContext.gauge(
+      () -> addCommands.longValue(), true, "adds", getCategory().toString(), scope);
+    solrMetricsContext.gauge(
+      () -> deleteByIdCommands.longValue(), true, "deletesById", getCategory().toString(), scope);
+    solrMetricsContext.gauge(
+      () -> deleteByQueryCommands.longValue(),
+      true,
+      "deletesByQuery",
+      getCategory().toString(),
+      scope);
+    solrMetricsContext.gauge(
+      () -> numErrors.longValue(), true, "errors", getCategory().toString(), scope);
+
+    addCommandsCumulative =
+      solrMetricsContext.meter("cumulativeAdds", getCategory().toString(), scope);
+    deleteByIdCommandsCumulative =
+      solrMetricsContext.meter("cumulativeDeletesById", getCategory().toString(), scope);
+    deleteByQueryCommandsCumulative =
+      solrMetricsContext.meter("cumulativeDeletesByQuery", getCategory().toString(), scope);
+    numErrorsCumulative =
+      solrMetricsContext.meter("cumulativeErrors", getCategory().toString(), scope);
+  }
+
+  private void deleteAll() throws IOException {
+    log.info("REMOVING ALL DOCUMENTS FROM INDEX");
+    RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
+    try {
+      iw.get().deleteAll();
+    } finally {
+      iw.decref();
+    }
+  }
+
+  protected void rollbackWriter() throws IOException {
+    numDocsPending.reset();
+    solrCoreState.rollbackIndexWriter(core);
+  }
+
+  @Override
+  public int addDoc(AddUpdateCommand cmd) throws IOException {
+    TestInjection.injectDirectUpdateLatch();
+    try {
+      return addDoc0(cmd);
+    } catch (SolrException e) {
+      throw e;
+    } catch (AlreadyClosedException e) {
+      throw new SolrException(
+        SolrException.ErrorCode.SERVER_ERROR,
+        String.format(
+          Locale.ROOT,
+          "Server error writing document id %s to the index",
+          cmd.getPrintableId()),
+        e);
+    } catch (IllegalArgumentException iae) {
+      throw new SolrException(
+        SolrException.ErrorCode.BAD_REQUEST,
+        String.format(
+          Locale.ROOT,
+          "Exception writing document id %s to the index; possible analysis error: "
+            + iae.getMessage()
+            + (iae.getCause() instanceof BytesRefHash.MaxBytesLengthExceededException
+            ? ". Perhaps the document has an indexed string field (solr.StrField) which is too large"
+            : ""),
+          cmd.getPrintableId()),
+        iae);
+    } catch (RuntimeException t) {
+      throw new SolrException(
+        SolrException.ErrorCode.BAD_REQUEST,
+        String.format(
+          Locale.ROOT,
+          "Exception writing document id %s to the index; possible analysis error.",
+          cmd.getPrintableId()),
+        t);
+    }
+  }
+
+  /**
+   * This is the implementation of {@link #addDoc(AddUpdateCommand)}. It is factored out to allow an
+   * exception handler to decorate RuntimeExceptions with information about the document being
+   * handled.
+   *
+   * @param cmd the command.
+   * @return the count.
+   */
+  private int addDoc0(AddUpdateCommand cmd) throws IOException {
+    int rc = -1;
+
+    addCommands.increment();
+    addCommandsCumulative.mark();
+
+    // if there is no ID field, don't overwrite
+    if (idField == null) {
+      cmd.overwrite = false;
+    }
+    try {
+      if ((cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
+        if (ulog != null) ulog.add(cmd);
+        return 1;
+      }
+
+      if (cmd.overwrite) {
+        // Check for delete by query commands newer (i.e. reordered). This
+        // should always be null on a leader
+        List<UpdateLog.DBQ> deletesAfter = null;
+        if (ulog != null && cmd.version > 0) {
+          deletesAfter = ulog.getDBQNewer(cmd.version);
+        }
+
+        if (deletesAfter != null) {
+          addAndDelete(cmd, deletesAfter);
+        } else {
+          doNormalUpdate(cmd);
+        }
+      } else {
+        allowDuplicateUpdate(cmd);
+      }
+
+      if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) {
+        long currentTlogSize = getCurrentTLogSize();
+        if (commitWithinSoftCommit) {
+          commitTracker.addedDocument(-1, currentTlogSize);
+          softCommitTracker.addedDocument(cmd.commitWithin);
+        } else {
+          softCommitTracker.addedDocument(-1);
+          commitTracker.addedDocument(cmd.commitWithin, currentTlogSize);
+        }
+      }
+
+      rc = 1;
+    } finally {
+      if (rc != 1) {
+        numErrors.increment();
+        numErrorsCumulative.mark();
+      } else {
+        numDocsPending.increment();
+      }
+    }
+
+    return rc;
+  }
+
+  private void allowDuplicateUpdate(AddUpdateCommand cmd) throws IOException {
+    RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
+    try {
+      IndexWriter writer = iw.get();
+      writer.addDocuments(cmd.makeLuceneDocs());
+      if (ulog != null) ulog.add(cmd);
+
+    } finally {
+      iw.decref();
+    }
+  }
+
+  private void doNormalUpdate(AddUpdateCommand cmd) throws IOException {
+    RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
+    try {
+      IndexWriter writer = iw.get();
+
+      updateDocOrDocValues(cmd, writer);
+
+      // Add to the transaction log *after* successfully adding to the
+      // index, if there was no error.
+      // This ordering ensures that if we log it, it's definitely been
+      // added to the the index.
+      // This also ensures that if a commit sneaks in-between, that we
+      // know everything in a particular
+      // log version was definitely committed.
+      if (ulog != null) ulog.add(cmd);
+
+    } finally {
+      iw.decref();
+    }
+  }
+
+  private void addAndDelete(AddUpdateCommand cmd, List<UpdateLog.DBQ> deletesAfter)
+    throws IOException {
+    // this logic is different enough from doNormalUpdate that it's separate
+    log.info("Reordered DBQs detected.  Update={} DBQs={}", cmd, deletesAfter);
+    List<Query> dbqList = new ArrayList<>(deletesAfter.size());
+    for (UpdateLog.DBQ dbq : deletesAfter) {
+      try {
+        DeleteUpdateCommand tmpDel = new DeleteUpdateCommand(cmd.req);
+        tmpDel.query = dbq.q;
+        tmpDel.version = -dbq.version;
+        dbqList.add(getQuery(tmpDel));
+      } catch (Exception e) {
+        log.error("Exception parsing reordered query : {}", dbq, e);
+      }
+    }
+
+    RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
+    try {
+      IndexWriter writer = iw.get();
+
+      // see comment in deleteByQuery
+      synchronized (solrCoreState.getUpdateLock()) {
+        updateDocOrDocValues(cmd, writer);
+
+        if (cmd.isInPlaceUpdate() && ulog != null) {
+          ulog.openRealtimeSearcher(); // This is needed due to LUCENE-7344.
+        }
+        for (Query q : dbqList) {
+          writer.deleteDocuments(new DeleteByQueryWrapper(q, core.getLatestSchema()));
+        }
+        if (ulog != null) ulog.add(cmd, true); // this needs to be protected by update lock
+      }
+    } finally {
+      iw.decref();
+    }
+  }
+
+  private void updateDeleteTrackers(DeleteUpdateCommand cmd) {
+    if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) {
+      if (commitWithinSoftCommit) {
+        softCommitTracker.deletedDocument(cmd.commitWithin);
+      } else {
+        commitTracker.deletedDocument(cmd.commitWithin);
+      }
+
+      if (commitTracker.getTimeUpperBound() > 0) {
+        commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound());
+      }
+
+      long currentTlogSize = getCurrentTLogSize();
+      commitTracker.scheduleMaxSizeTriggeredCommitIfNeeded(currentTlogSize);
+
+      if (softCommitTracker.getTimeUpperBound() > 0) {
+        softCommitTracker.scheduleCommitWithin(softCommitTracker.getTimeUpperBound());
+      }
+    }
+  }
+
+  // we don't return the number of docs deleted because it's not always possible to quickly know
+  // that info.
+  @Override
+  public void delete(DeleteUpdateCommand cmd) throws IOException {
+    TestInjection.injectDirectUpdateLatch();
+    deleteByIdCommands.increment();
+    deleteByIdCommandsCumulative.mark();
+
+    if ((cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
+      if (ulog != null) ulog.delete(cmd);
+      return;
+    }
+
+    Term deleteTerm = getIdTerm(cmd.getIndexedId());
+    // SolrCore.verbose("deleteDocuments",deleteTerm,writer);
+    RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
+    try {
+      iw.get().deleteDocuments(deleteTerm);
+    } finally {
+      iw.decref();
+    }
+    // SolrCore.verbose("deleteDocuments",deleteTerm,"DONE");
+
+    if (ulog != null) ulog.delete(cmd);
+
+    updateDeleteTrackers(cmd);
+  }
+
+  public void clearIndex() throws IOException {
+    deleteAll();
+    if (ulog != null) {
+      ulog.deleteAll();
+    }
+  }
+
+  private Query getQuery(DeleteUpdateCommand cmd) {
+    Query q;
+    try {
+      // move this higher in the stack?
+      QParser parser = QParser.getParser(cmd.getQuery(), cmd.req);
+      q = parser.getQuery();
+      q = QueryUtils.makeQueryable(q);
+
+      // Make sure not to delete newer versions
+      if (ulog != null && cmd.getVersion() != 0 && cmd.getVersion() != -Long.MAX_VALUE) {
+        BooleanQuery.Builder bq = new BooleanQuery.Builder();
+        bq.add(q, Occur.MUST);
+        SchemaField sf = ulog.getVersionInfo().getVersionField();
+        ValueSource vs = sf.getType().getValueSource(sf, null);
+        ValueSourceRangeFilter filt =
+          new ValueSourceRangeFilter(
+            vs, Long.toString(Math.abs(cmd.getVersion())), null, true, true);
+        FunctionRangeQuery range = new FunctionRangeQuery(filt);
+        // formulated in the "MUST_NOT" sense so we can delete docs w/o a version (some tests depend
+        // on this...)
+        bq.add(range, Occur.MUST_NOT);
+        q = bq.build();
+      }
+
+      return q;
+
+    } catch (SyntaxError e) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
+    }
+  }
+
+  // we don't return the number of docs deleted because it's not always possible to quickly know
+  // that info.
+  @Override
+  public void deleteByQuery(DeleteUpdateCommand cmd) throws IOException {
+    TestInjection.injectDirectUpdateLatch();
+    deleteByQueryCommands.increment();
+    deleteByQueryCommandsCumulative.mark();
+    boolean madeIt = false;
+    try {
+      if ((cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
+        if (ulog != null) ulog.deleteByQuery(cmd);
+        madeIt = true;
+        return;
+      }
+      Query q = getQuery(cmd);
+
+      boolean delAll = MatchAllDocsQuery.class == q.getClass();
+
+      // currently for testing purposes.  Do a delete of complete index w/o worrying about versions,
+      // don't log, clean up most state in update log, etc
+      if (delAll && cmd.getVersion() == -Long.MAX_VALUE) {
+        synchronized (solrCoreState.getUpdateLock()) {
+          deleteAll();
+          ulog.deleteAll();
+          return;
+        }
+      }
+
+      //
+      // synchronized to prevent deleteByQuery from running during the "open new searcher"
+      // part of a commit.  DBQ needs to signal that a fresh reader will be needed for
+      // a realtime view of the index.  When a new searcher is opened after a DBQ, that
+      // flag can be cleared.  If those thing happen concurrently, it's not thread safe.
+      // Also, ulog.deleteByQuery clears caches and is thus not safe to be called between
+      // preSoftCommit/postSoftCommit and thus we use the updateLock to prevent this (just
+      // as we use around ulog.preCommit... also see comments in ulog.postSoftCommit)
+      //
+      synchronized (solrCoreState.getUpdateLock()) {
+
+        // We are reopening a searcher before applying the deletes to overcome LUCENE-7344.
+        // Once LUCENE-7344 is resolved, we can consider removing this.
+        if (ulog != null) ulog.openRealtimeSearcher();
+
+        if (delAll) {
+          deleteAll();
+        } else {
+          RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
+          try {
+            iw.get().deleteDocuments(new DeleteByQueryWrapper(q, core.getLatestSchema()));
+          } finally {
+            iw.decref();
+          }
+        }
+
+        if (ulog != null) ulog.deleteByQuery(cmd); // this needs to be protected by the update lock
+      }
+
+      madeIt = true;
+
+      updateDeleteTrackers(cmd);
+
+    } finally {
+      if (!madeIt) {
+        numErrors.increment();
+        numErrorsCumulative.mark();
+      }
+    }
+  }
+
+  @Override
+  public int mergeIndexes(MergeIndexesCommand cmd) throws IOException {
+    TestInjection.injectDirectUpdateLatch();
+    mergeIndexesCommands.mark();
+    int rc;
+
+    log.info("start {}", cmd);
+
+    List<DirectoryReader> readers = cmd.readers;
+    if (readers != null && readers.size() > 0) {
+      List<CodecReader> mergeReaders = new ArrayList<>();
+      for (DirectoryReader reader : readers) {
+        for (LeafReaderContext leaf : reader.leaves()) {
+          mergeReaders.add(SlowCodecReaderWrapper.wrap(leaf.reader()));
+        }
+      }
+      RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
+      try {
+        iw.get().addIndexes(mergeReaders.toArray(new CodecReader[mergeReaders.size()]));
+      } finally {
+        iw.decref();
+      }
+      rc = 1;
+    } else {
+      rc = 0;
+    }
+    log.info("end_mergeIndexes");
+
+    // TODO: consider soft commit issues
+    if (rc == 1 && commitTracker.getTimeUpperBound() > 0) {
+      commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound());
+    } else if (rc == 1 && softCommitTracker.getTimeUpperBound() > 0) {
+      softCommitTracker.scheduleCommitWithin(softCommitTracker.getTimeUpperBound());
+    }
+
+    return rc;
+  }
+
+  public void prepareCommit(CommitUpdateCommand cmd) throws IOException {
+
+    boolean error = true;
+
+    try {
+      log.debug("start {}", cmd);
+      RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
+      try {
+        SolrIndexWriter.setCommitData(iw.get(), cmd.getVersion(), cmd.commitData);
+        iw.get().prepareCommit();
+      } finally {
+        iw.decref();
+      }
+
+      log.debug("end_prepareCommit");
+
+      error = false;
+    } finally {
+      if (error) {
+        numErrors.increment();
+        numErrorsCumulative.mark();
+      }
+    }
+  }
+
+  @Override
+  public void commit(CommitUpdateCommand cmd) throws IOException {
+    TestInjection.injectDirectUpdateLatch();
+    if (cmd.prepareCommit) {
+      prepareCommit(cmd);
+      return;
+    }
+
+    if (cmd.optimize) {
+      optimizeCommands.mark();
+    } else {
+      commitCommands.mark();
+      if (cmd.expungeDeletes) expungeDeleteCommands.mark();
+    }
+
+    @SuppressWarnings("unchecked")
+    Future<Void>[] waitSearcher =
+      cmd.waitSearcher ? (Future<Void>[]) Array.newInstance(Future.class, 1) : null;
+
+    boolean error = true;
+    try {
+      // only allow one hard commit to proceed at once
+      if (!cmd.softCommit) {
+        solrCoreState.getCommitLock().lock();
+      }
+
+      log.debug("start {}", cmd);
+
+      // We must cancel pending commits *before* we actually execute the commit.
+
+      if (cmd.openSearcher) {
+        // we can cancel any pending soft commits if this commit will open a new searcher
+        softCommitTracker.cancelPendingCommit();
+      }
+      if (!cmd.softCommit && (cmd.openSearcher || !commitTracker.getOpenSearcher())) {
+        // cancel a pending hard commit if this commit is of equal or greater "strength"...
+        // If the autoCommit has openSearcher=true, then this commit must have openSearcher=true
+        // to cancel.
+        commitTracker.cancelPendingCommit();
+      }
+
+      RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
+      try {
+        IndexWriter writer = iw.get();
+        if (cmd.optimize) {
+          writer.forceMerge(cmd.maxOptimizeSegments);
+        } else if (cmd.expungeDeletes) {
+          writer.forceMergeDeletes();
+        }
+
+        if (!cmd.softCommit) {
+          synchronized (solrCoreState.getUpdateLock()) {
+            // sync is currently needed to prevent preCommit from being called between preSoft and
+            // postSoft... see postSoft comments.
+            if (ulog != null) ulog.preCommit(cmd);
+          }
+
+          // SolrCore.verbose("writer.commit() start writer=",writer);
+
+          if (shouldCommit(cmd, writer)) {
+            SolrIndexWriter.setCommitData(writer, cmd.getVersion(), cmd.commitData);
+            writer.commit();
+          } else {
+            log.debug("No uncommitted changes. Skipping IW.commit.");
+          }
+
+          // SolrCore.verbose("writer.commit() end");
+          numDocsPending.reset();
+          callPostCommitCallbacks();
+        }
+      } finally {
+        iw.decref();
+      }
+
+      if (cmd.optimize) {
+        callPostOptimizeCallbacks();
+      }
+
+      if (cmd.softCommit) {
+        // ulog.preSoftCommit();
+        synchronized (solrCoreState.getUpdateLock()) {
+          if (ulog != null) ulog.preSoftCommit(cmd);
+          core.getSearcher(true, false, waitSearcher, true);
+          if (ulog != null) ulog.postSoftCommit(cmd);
+        }
+        callPostSoftCommitCallbacks();
+      } else {
+        synchronized (solrCoreState.getUpdateLock()) {
+          if (ulog != null) ulog.preSoftCommit(cmd);
+          if (cmd.openSearcher) {
+            core.getSearcher(true, false, waitSearcher);
+          } else {
+            // force open a new realtime searcher so realtime-get and versioning code can see the
+            // latest
+            RefCounted<SolrIndexSearcher> searchHolder = core.openNewSearcher(true, true);
+            searchHolder.decref();
+          }
+          if (ulog != null) ulog.postSoftCommit(cmd);
+        }
+        if (ulog != null) ulog.postCommit(cmd); // postCommit currently means new searcher has
+        // also been opened
+      }
+
+      // reset commit tracking
+
+      if (cmd.softCommit) {
+        softCommitTracker.didCommit();
+      } else {
+        commitTracker.didCommit();
+      }
+
+      log.debug("end_commit_flush");
+
+      error = false;
+    } finally {
+      if (!cmd.softCommit) {
+        solrCoreState.getCommitLock().unlock();
+      }
+
+      addCommands.reset();
+      deleteByIdCommands.reset();
+      deleteByQueryCommands.reset();
+      if (error) {
+        numErrors.increment();
+        numErrorsCumulative.mark();
+      }
+    }
+
+    // if we are supposed to wait for the searcher to be registered, then we should do it
+    // outside any synchronized block so that other update operations can proceed.
+    if (waitSearcher != null && waitSearcher[0] != null) {
+      try {
+        waitSearcher[0].get();
+      } catch (InterruptedException | ExecutionException e) {
+        SolrException.log(log, e);
+      }
+    }
+  }
+
+  /**
+   * Determines whether the commit command should effectively trigger a commit on the index writer.
+   * This method is called with the commit lock and is the last step before effectively calling
+   * {@link IndexWriter#commit()}.
+   */
+  protected boolean shouldCommit(CommitUpdateCommand cmd, IndexWriter writer) throws IOException {
+    return writer.hasUncommittedChanges() || (cmd.commitData != null && !cmd.commitData.isEmpty());
+  }
+
+  @Override
+  public void newIndexWriter(boolean rollback) throws IOException {
+    solrCoreState.newIndexWriter(core, rollback);
+  }
+
+  /**
+   * @since Solr 1.4
+   */
+  @Override
+  public void rollback(RollbackUpdateCommand cmd) throws IOException {
+    TestInjection.injectDirectUpdateLatch();
+    if (core.getCoreContainer().isZooKeeperAware()) {
+      throw new UnsupportedOperationException(
+        "Rollback is currently not supported in SolrCloud mode. (SOLR-4895)");
+    }
+
+    rollbackCommands.mark();
+
+    boolean error = true;
+
+    try {
+      log.info("start {}", cmd);
+
+      rollbackWriter();
+
+      // callPostRollbackCallbacks();
+
+      // reset commit tracking
+      commitTracker.didRollback();
+      softCommitTracker.didRollback();
+
+      log.info("end_rollback");
+
+      error = false;
+    } finally {
+      addCommandsCumulative.mark(-addCommands.sumThenReset());
+      deleteByIdCommandsCumulative.mark(-deleteByIdCommands.sumThenReset());
+      deleteByQueryCommandsCumulative.mark(-deleteByQueryCommands.sumThenReset());
+      if (error) {
+        numErrors.increment();
+        numErrorsCumulative.mark();
+      }
+    }
+  }
+
+  @Override
+  public UpdateLog getUpdateLog() {
+    return ulog;
+  }
+
+  @Override
+  public void close() throws IOException {
+    log.debug("closing {}", this);
+
+    commitTracker.close();
+    softCommitTracker.close();
+
+    numDocsPending.reset();
+    try {
+      super.close();
+    } catch (Exception e) {
+      throw new IOException("Error closing", e);
+    }
+  }
+
+  // IndexWriterCloser interface method - called from solrCoreState.decref(this)
+  @Override
+  public void closeWriter(IndexWriter writer) throws IOException {
+    log.trace("closeWriter({}): ulog={}", writer, ulog);
+
+    assert TestInjection.injectNonGracefullClose(core.getCoreContainer());
+
+    boolean clearRequestInfo = false;
+
+    SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
+    SolrQueryResponse rsp = new SolrQueryResponse();
+    if (SolrRequestInfo.getRequestInfo() == null) {
+      clearRequestInfo = true;
+      SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp)); // important for debugging
+    }
+    try {
+
+      if (TestInjection.injectSkipIndexWriterCommitOnClose(writer)) {
+        // if this TestInjection triggers, we do some simple rollback()
+        // (which closes the underlying IndexWriter) and then return immediately
+        log.warn("Skipping commit for IndexWriter.close() due to TestInjection");
+        if (writer != null) {
+          writer.rollback();
+        }
+        // we shouldn't close the transaction logs either, but leaving them open
+        // means we can't delete them on windows (needed for tests)
+        if (ulog != null) ulog.close(false);
+
+        return;
+      }
+
+      // do a commit before we quit?
+      boolean tryToCommit =
+        writer != null
+          && ulog != null
+          && ulog.hasUncommittedChanges()
+          && ulog.getState() == UpdateLog.State.ACTIVE;
+
+      // be tactical with this lock! closing the updatelog can deadlock when it tries to commit
+      solrCoreState.getCommitLock().lock();
+      try {
+        try {
+          if (log.isInfoEnabled()) {
+            log.info(
+              "Committing on IndexWriter.close() {}.",
+              (tryToCommit ? "" : " ... SKIPPED (unnecessary)"));
+          }
+          if (tryToCommit) {
+            CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
+            cmd.openSearcher = false;
+            cmd.waitSearcher = false;
+            cmd.softCommit = false;
+
+            // TODO: keep other commit callbacks from being called?
+            // this.commit(cmd); // too many test failures using this method... is it because of
+            // callbacks?
+
+            synchronized (solrCoreState.getUpdateLock()) {
+              ulog.preCommit(cmd);
+            }
+
+            // todo: refactor this shared code (or figure out why a real CommitUpdateCommand can't
+            // be used)
+            SolrIndexWriter.setCommitData(writer, cmd.getVersion(), null);
+            writer.commit();
+
+            synchronized (solrCoreState.getUpdateLock()) {
+              ulog.postCommit(cmd);
+            }
+          }
+        } catch (Throwable th) {
+          log.error("Error in final commit", th);
+          if (th instanceof OutOfMemoryError) {
+            throw (OutOfMemoryError) th;
+          }
+        }
+
+      } finally {
+        solrCoreState.getCommitLock().unlock();
+      }
+    } finally {
+      if (clearRequestInfo) SolrRequestInfo.clearRequestInfo();
+    }
+    // we went through the normal process to commit, so we don't have to artificially
+    // cap any ulog files.
+    try {
+      if (ulog != null) ulog.close(false);
+    } catch (Throwable th) {
+      log.error("Error closing log files", th);
+      if (th instanceof OutOfMemoryError) {
+        throw (OutOfMemoryError) th;
+      }
+    }
+
+    if (writer != null) {
+      writer.close();
+    }
+  }
+
+  @Override
+  public void split(SplitIndexCommand cmd) throws IOException {
+    commit(new CommitUpdateCommand(cmd.req, false));
+    SolrIndexSplitter splitter = new SolrIndexSplitter(cmd);
+    splitCommands.mark();
+    NamedList<Object> results = new NamedList<>();
+    try {
+      splitter.split(results);
+      cmd.rsp.addResponse(results);
+    } catch (IOException e) {
+      numErrors.increment();
+      numErrorsCumulative.mark();
+      throw e;
+    }
+  }
+
+  /**
+   * Calls either {@link IndexWriter#updateDocValues} or <code>IndexWriter#updateDocument</code>(s)
+   * as needed based on {@link AddUpdateCommand#isInPlaceUpdate}.
+   *
+   * <p>If the this is an UPDATE_INPLACE cmd, then all fields included in {@link
+   * AddUpdateCommand#makeLuceneDocForInPlaceUpdate} must either be the uniqueKey field, or be
+   * DocValue only fields.
+   *
+   * @param cmd - cmd apply to IndexWriter
+   * @param writer - IndexWriter to use
+   */
+  private void updateDocOrDocValues(AddUpdateCommand cmd, IndexWriter writer) throws IOException {
+    // this code path requires an idField in order to potentially replace a doc
+    assert idField != null;
+    boolean hasUpdateTerm = cmd.updateTerm != null; // AKA dedupe
+
+    if (cmd.isInPlaceUpdate()) {
+      if (hasUpdateTerm) {
+        throw new IllegalStateException(
+          "cmd.updateTerm/dedupe is not compatible with in-place updates");
+      }
+      // we don't support the solrInputDoc with nested child docs either but we'll throw an
+      // exception if attempted
+
+      // can't use cmd.getIndexedId because it will be a root doc if this doc is a child
+      Term updateTerm =
+        new Term(
+          idField.getName(),
+          core.getLatestSchema().indexableUniqueKey(cmd.getSelfOrNestedDocIdStr()));
+      // skips uniqueKey and _root_
+      List<IndexableField> fields = cmd.makeLuceneDocForInPlaceUpdate().getFields();
+      log.debug("updateDocValues({})", cmd);
+      writer.updateDocValues(updateTerm, fields.toArray(new Field[fields.size()]));
+
+    } else { // more normal path
+
+      Iterable<Document> nestedDocs = cmd.makeLuceneDocs();
+      Term idTerm = getIdTerm(cmd.getIndexedId());
+      Term updateTerm = hasUpdateTerm ? cmd.updateTerm : idTerm;
+
+      log.debug("updateDocuments({})", cmd);
+      writer.updateDocuments(updateTerm, nestedDocs);
+
+      // If hasUpdateTerm, then delete any existing documents with the same ID other than the one
+      // added above (used in near-duplicate replacement)
+      if (hasUpdateTerm) { // rare
+        BooleanQuery.Builder bq = new BooleanQuery.Builder();
+        // don't want the one we added above (will be unique)
+        bq.add(new TermQuery(updateTerm), Occur.MUST_NOT);
+        bq.add(new TermQuery(idTerm), Occur.MUST); // same ID
+        writer.deleteDocuments(new DeleteByQueryWrapper(bq.build(), core.getLatestSchema()));
+      }
+    }
+  }
+
+  private Term getIdTerm(BytesRef termVal) {
+    boolean useRootId = core.getLatestSchema().isUsableForChildDocs();
+    return new Term(useRootId ? IndexSchema.ROOT_FIELD_NAME : idField.getName(), termVal);
+  }
+
+  /////////////////////////////////////////////////////////////////////
+  // SolrInfoBean stuff: Statistics and Module Info
+  /////////////////////////////////////////////////////////////////////
+
+  @Override
+  public String getName() {
+    return DirectUpdateHandler2.class.getName();
+  }
+
+  @Override
+  public String getDescription() {
+    return "Update handler that efficiently directly updates the on-disk main lucene index";
+  }
+
+  @Override
+  public SolrCoreState getSolrCoreState() {
+    return solrCoreState;
+  }
+
+  private long getCurrentTLogSize() {
+    return ulog != null && ulog.hasUncommittedChanges() ? ulog.getCurrentLogSizeFromStream() : -1;
+  }
+
+  // allow access for tests
+  public CommitTracker getCommitTracker() {
+    return commitTracker;
+  }
+
+  // allow access for tests
+  public CommitTracker getSoftCommitTracker() {
+    return softCommitTracker;
+  }
+}
diff --git a/encryption/src/test/java/org/apache/solr/encryption/EncryptionDirectoryTest.java b/encryption/src/test/java/org/apache/solr/encryption/EncryptionDirectoryTest.java
new file mode 100644
index 0000000..77ba050
--- /dev/null
+++ b/encryption/src/test/java/org/apache/solr/encryption/EncryptionDirectoryTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.solr.encryption.crypto.AesCtrEncrypterFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Tests {@link EncryptionDirectory}.
+ * <p>
+ * This test class ignores the DirectoryFactory defined in solrconfig.xml to use
+ * {@link EncryptionDirectoryFactory}.
+ */
+public class EncryptionDirectoryTest extends SolrCloudTestCase {
+
+  private static final String COLLECTION_PREFIX = EncryptionDirectoryTest.class.getSimpleName() + "-collection-";
+
+  private static MockEncryptionDirectory mockDir;
+
+  private String collectionName;
+  private CloudSolrClient solrClient;
+  private TestUtil testUtil;
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    System.setProperty(EncryptionDirectoryFactory.PROPERTY_INNER_ENCRYPTION_DIRECTORY_FACTORY, MockFactory.class.getName());
+    TestUtil.setInstallDirProperty();
+    cluster = new MiniSolrCloudCluster.Builder(1, createTempDir())
+      .addConfig("config", TestUtil.getConfigPath("collection1"))
+      .configure();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    System.clearProperty(EncryptionDirectoryFactory.PROPERTY_INNER_ENCRYPTION_DIRECTORY_FACTORY);
+    cluster.shutdown();
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    collectionName = COLLECTION_PREFIX + UUID.randomUUID();
+    solrClient = cluster.getSolrClient();
+    solrClient.setDefaultCollection(collectionName);
+    CollectionAdminRequest.createCollection(collectionName, 1, 1).process(solrClient);
+    cluster.waitForActiveCollection(collectionName, 1, 1);
+    testUtil = new TestUtil(solrClient, collectionName);
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    mockDir.clearMockValues();
+    CollectionAdminRequest.deleteCollection(collectionName).process(solrClient);
+    super.tearDown();
+  }
+
+  /**
+   * Verifies the encryption of an index, moving from no keys to one key.
+   */
+  @Test
+  public void testEncryptionFromNoKeysToOneKey() throws Exception {
+    indexAndEncryptOneSegment();
+  }
+
+  /**
+   * Starts from an empty index, indexes two documents in two segments, then encrypt the index
+   * with {@link TestingKeyManager#KEY_ID_1}. The resulting encrypted index is composed of one segment.
+   */
+  private void indexAndEncryptOneSegment() throws Exception {
+    // Start with no key ids defined in the latest commit metadata.
+    mockDir.clearMockValues();
+    // Create 2 index segments without encryption.
+    testUtil.indexDocsAndCommit("weather broadcast");
+    testUtil.indexDocsAndCommit("sunny weather");
+    testUtil.assertQueryReturns("weather", 2);
+
+    // Verify that without key id, we can reload the index because it is not encrypted.
+    testUtil.reloadCore();
+    testUtil.assertQueryReturns("weather", 2);
+
+    // Set the encryption key id in the commit user data,
+    // and run an optimized commit to rewrite the index, now encrypted.
+    mockDir.setKeysInCommitUserData(TestingKeyManager.KEY_ID_1);
+    solrClient.optimize();
+
+    // Verify that without key id, we cannot decrypt the index anymore.
+    mockDir.forceClearText = true;
+    testUtil.assertCannotReloadCore();
+    // Verify that with a wrong key id, we cannot decrypt the index.
+    mockDir.forceClearText = false;
+    mockDir.forceKeySecret = TestingKeyManager.KEY_SECRET_2;
+    testUtil.assertCannotReloadCore();
+    // Verify that with the right key id, we can decrypt the index and search it.
+    mockDir.forceKeySecret = null;
+    mockDir.expectedKeySecret = TestingKeyManager.KEY_SECRET_1;
+    testUtil.reloadCore();
+    testUtil.assertQueryReturns("weather", 2);
+    testUtil.assertQueryReturns("sunny", 1);
+    mockDir.clearMockValues();
+  }
+
+  /**
+   * Verifies an encrypted index cannot be loaded without the right encryption key,
+   * and that we can search the index if we have the right encryption key.
+   */
+  @Test
+  public void testIndexingAndQueryingWithEncryption() throws Exception {
+    indexAndEncryptTwoSegments();
+  }
+
+  /**
+   * Creates an index encrypted with {@link TestingKeyManager#KEY_ID_1} and containing two segments.
+   */
+  private void indexAndEncryptTwoSegments() throws Exception {
+    // Prepare an encrypted index with one segment.
+    indexAndEncryptOneSegment();
+
+    // Create 1 new segment with the same encryption key id.
+    mockDir.setKeysInCommitUserData(TestingKeyManager.KEY_ID_1);
+    testUtil.indexDocsAndCommit("foggy weather");
+
+    // Verify that without key id, we cannot decrypt the index.
+    mockDir.forceClearText = true;
+    testUtil.assertCannotReloadCore();
+    // Verify that with a wrong key id, we cannot decrypt the index.
+    mockDir.forceClearText = false;
+    mockDir.forceKeySecret = TestingKeyManager.KEY_SECRET_2;
+    testUtil.assertCannotReloadCore();
+    // Verify that with the right key id, we can decrypt the index and search it.
+    mockDir.forceKeySecret = null;
+    mockDir.expectedKeySecret = TestingKeyManager.KEY_SECRET_1;
+    testUtil.reloadCore();
+    testUtil.assertQueryReturns("weather", 3);
+    testUtil.assertQueryReturns("sunny", 1);
+    mockDir.clearMockValues();
+  }
+
+  /**
+   * Verifies the re-encryption of an index, moving from one key to another key.
+   */
+  @Test
+  public void testReEncryptionFromOneKeyToAnotherKey() throws Exception {
+    // Prepare an encrypted index with two segments.
+    indexAndEncryptTwoSegments();
+
+    // Set the new encryption key id in the commit user data,
+    // and run an optimized commit to rewrite the index, now encrypted with the new key.
+    mockDir.setKeysInCommitUserData(TestingKeyManager.KEY_ID_1, TestingKeyManager.KEY_ID_2);
+    solrClient.optimize();
+
+    // Verify that without key id, we cannot decrypt the index.
+    mockDir.forceClearText = true;
+    testUtil.assertCannotReloadCore();
+    // Verify that with a wrong key id, we cannot decrypt the index.
+    mockDir.forceClearText = false;
+    mockDir.forceKeySecret = TestingKeyManager.KEY_SECRET_1;
+    testUtil.assertCannotReloadCore();
+    // Verify that with the right key id, we can decrypt the index and search it.
+    mockDir.forceKeySecret = null;
+    mockDir.expectedKeySecret = TestingKeyManager.KEY_SECRET_2;
+    testUtil.reloadCore();
+    testUtil.assertQueryReturns("weather", 3);
+    testUtil.assertQueryReturns("sunny", 1);
+  }
+
+  /**
+   * Verifies the decryption of an index, moving from one key to no keys.
+   */
+  @Test
+  public void testDecryptionFromOneKeyToNoKeys() throws Exception {
+    // Prepare an encrypted index with two segments.
+    indexAndEncryptTwoSegments();
+
+    // Remove the active key parameter from the commit user data,
+    // and run an optimized commit to rewrite the index, now cleartext with no keys.
+    mockDir.setKeysInCommitUserData(TestingKeyManager.KEY_ID_1, null);
+    solrClient.optimize();
+
+    // Verify that without key id, we can reload the index because it is not encrypted.
+    mockDir.forceClearText = true;
+    testUtil.reloadCore();
+    testUtil.assertQueryReturns("weather", 3);
+    testUtil.assertQueryReturns("sunny", 1);
+  }
+
+  public static class MockFactory implements EncryptionDirectoryFactory.InnerFactory {
+    @Override
+    public EncryptionDirectory create(Directory delegate,
+                                      AesCtrEncrypterFactory encrypterFactory,
+                                      KeyManager keyManager) throws IOException {
+      return mockDir = new MockEncryptionDirectory(delegate, encrypterFactory, keyManager);
+    }
+  }
+
+  private static class MockEncryptionDirectory extends EncryptionDirectory {
+
+    final KeyManager keyManager;
+    boolean forceClearText;
+    byte[] forceKeySecret;
+    byte[] expectedKeySecret;
+
+    MockEncryptionDirectory(Directory delegate, AesCtrEncrypterFactory encrypterFactory, KeyManager keyManager)
+      throws IOException {
+      super(delegate, encrypterFactory, keyManager);
+      this.keyManager = keyManager;
+    }
+
+    void clearMockValues() {
+      commitUserData.data.clear();
+      forceClearText = false;
+      forceKeySecret = null;
+      expectedKeySecret = null;
+    }
+
+    /**
+     * Clears the commit user data, then sets the provided key ids. The last key id is the active one.
+     */
+    void setKeysInCommitUserData(String... keyIds) throws IOException {
+      commitUserData.data.clear();
+      for (String keyId : keyIds) {
+        if (keyId == null) {
+          EncryptionUtil.removeActiveKeyRefFromCommit(commitUserData.data);
+        } else {
+          EncryptionUtil.setNewActiveKeyIdInCommit(keyId, keyManager.getKeyCookie(keyId), commitUserData.data);
+        }
+      }
+    }
+
+    @Override
+    public IndexInput openInput(String fileName, IOContext context) throws IOException {
+      return forceClearText ? in.openInput(fileName, context) : super.openInput(fileName, context);
+    }
+
+    @Override
+    protected CommitUserData readLatestCommitUserData() {
+      // Keep the same data map because it contains the mock values for the test,
+      // so the test is easier to write and clearer to read.
+      Map<String, String> data = commitUserData == null ? new HashMap<>() : commitUserData.data;
+      return new CommitUserData("test", data);
+    }
+
+    @Override
+    protected byte[] getKeySecret(String keyRef) throws IOException {
+      if (forceKeySecret != null) {
+        return forceKeySecret;
+      }
+      byte[] keySecret = super.getKeySecret(keyRef);
+      if (expectedKeySecret != null) {
+        assertArrayEquals(expectedKeySecret, keySecret);
+      }
+      return keySecret;
+    }
+  }
+}
diff --git a/encryption/src/test/java/org/apache/solr/encryption/EncryptionHeavyLoadTest.java b/encryption/src/test/java/org/apache/solr/encryption/EncryptionHeavyLoadTest.java
new file mode 100644
index 0000000..20959e2
--- /dev/null
+++ b/encryption/src/test/java/org/apache/solr/encryption/EncryptionHeavyLoadTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption;
+
+import com.carrotsearch.randomizedtesting.generators.RandomStrings;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.GenericSolrRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+import static org.apache.solr.encryption.EncryptionRequestHandler.*;
+import static org.apache.solr.encryption.TestingKeyManager.*;
+
+/**
+ * Tests the encryption handler under heavy concurrent load test.
+ * <p>
+ * Sends concurrent indexing and querying requests with high throughput while
+ * triggering re-encryption with the handler to verify concurrent segment merging
+ * is handled correctly without stopping indexing nor querying, and all encrypted
+ * files are decrypted correctly when refreshing the index searcher after each
+ * commit.
+ */
+@RunWith(com.carrotsearch.randomizedtesting.RandomizedRunner.class)
+public class EncryptionHeavyLoadTest extends SolrCloudTestCase {
+
+  // Change the test duration manually to run longer, e.g. 3 minutes.
+  private static final long TEST_DURATION_MS = TimeUnit.SECONDS.toMillis(10);
+  private static final int RANDOM_DELAY_BETWEEN_INDEXING_BATCHES_MS = 50;
+  private static final int RANDOM_NUM_DOCS_PER_BATCH = 200;
+  private static final float PROBABILITY_OF_COMMIT_PER_BATCH = 0.33f;
+  private static final int DICTIONARY_SIZE = 5000;
+  private static final int RANDOM_DELAY_BETWEEN_QUERIES_MS = 10;
+  private static final int NUM_INDEXING_THREADS = 3;
+  private static final int NUM_QUERYING_THREADS = 2;
+  private static final int RANDOM_DELAY_BETWEEN_REENCRYPTION_MS = 2000;
+  private static final String[] KEY_IDS = {KEY_ID_1, KEY_ID_2, KEY_ID_3, NO_KEY_ID};
+  private static final float PROBABILITY_OF_WAITING_ENCRYPTION_COMPLETION = 0.5f;
+
+  private static final String COLLECTION_PREFIX = EncryptionHeavyLoadTest.class.getSimpleName() + "-collection-";
+  private static final String SYSTEM_OUTPUT_MARKER = "*** ";
+
+  private volatile CloudSolrClient solrClient;
+  private volatile boolean stopTest;
+  private volatile Dictionary dictionary;
+  private List<Thread> threads;
+  private int nextKeyIndex;
+  private String keyId;
+  private volatile Exception exception;
+  private long startTimeMs;
+  private long endTimeMs;
+  private long lastDisplayTimeMs;
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    TestUtil.setInstallDirProperty();
+    cluster = new MiniSolrCloudCluster.Builder(1, createTempDir())
+      .addConfig("config", TestUtil.getConfigPath("collection1"))
+      .configure();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    cluster.shutdown();
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    String collectionName = COLLECTION_PREFIX + UUID.randomUUID();
+    solrClient = cluster.getSolrClient();
+    solrClient.setDefaultCollection(collectionName);
+    CollectionAdminRequest.createCollection(collectionName, 1, 1).process(solrClient);
+    cluster.waitForActiveCollection(collectionName, 1, 1);
+    dictionary = new Dictionary.Builder().build(DICTIONARY_SIZE, random());
+    threads = new ArrayList<>();
+  }
+
+  @Override
+  @After
+  public void tearDown() throws Exception {
+    try {
+      stopTest = true;
+      for (Thread thread : threads) {
+        try {
+          thread.join(5000);
+          print(thread.getName() + " stopped");
+        } catch (InterruptedException e) {
+          System.err.println("Interrupted while closing " + thread.getName());
+        }
+      }
+      startTimeMs = lastDisplayTimeMs = System.currentTimeMillis();
+      endTimeMs = startTimeMs + TimeUnit.SECONDS.toMillis(20);
+      print("waiting for the final encryption completion");
+      assertTrue("Timeout waiting for the final encryption completion", encrypt(keyId, true));
+      print("final encryption complete");
+    } finally {
+      super.tearDown();
+    }
+  }
+
+  @Test
+  public void testReencryptionUnderHeavyConcurrentLoad() throws Exception {
+    print("Starting test");
+    startTimeMs = lastDisplayTimeMs = System.currentTimeMillis();
+    endTimeMs = startTimeMs + TEST_DURATION_MS;
+    Random random = random();
+    if (random.nextBoolean()) {
+      print("preparing empty index for encryption");
+      encrypt(nextKeyId(), waitForCompletion(random));
+    }
+    startThreads(NUM_INDEXING_THREADS, "Indexing", Indexer::new);
+    startThreads(NUM_QUERYING_THREADS, "Querying", Querier::new);
+    while (!isTimeElapsed()) {
+      Thread.sleep(random.nextInt(RANDOM_DELAY_BETWEEN_REENCRYPTION_MS));
+      encrypt(nextKeyId(), waitForCompletion(random));
+    }
+    if (System.currentTimeMillis() - lastDisplayTimeMs >= 1000) {
+      print("elapsed time = " + ((System.currentTimeMillis() - startTimeMs) / 1000) + " s");
+    }
+    print("Stopping test");
+    if (exception != null) {
+      throw exception;
+    }
+  }
+
+  private void startThreads(int numThreads, String namePrefix, Supplier<Runnable> runnableSupplier) {
+    for (int i = 0; i < numThreads; i++) {
+      String name = namePrefix + "-" + i;
+      print("Start " + name);
+      Thread thread = new Thread(runnableSupplier.get(), name);
+      thread.setDaemon(true);
+      threads.add(thread);
+      thread.start();
+    }
+  }
+
+  private boolean isTimeElapsed() {
+    long timeMs = System.currentTimeMillis();
+    if (timeMs - lastDisplayTimeMs >= 10000) {
+      print("elapsed time = " + ((timeMs - startTimeMs) / 1000) + " s");
+      lastDisplayTimeMs = timeMs;
+    }
+    return timeMs >= endTimeMs;
+  }
+
+  private String nextKeyId() {
+    keyId = KEY_IDS[nextKeyIndex++];
+    if (nextKeyIndex == KEY_IDS.length) {
+      nextKeyIndex = 0;
+    }
+    return keyId;
+  }
+
+  private boolean encrypt(String keyId, boolean waitForCompletion) throws Exception {
+    NamedList<Object> response = sendEncryptionRequest(keyId);
+    if (response.get(ENCRYPTION_STATE).equals(STATE_PENDING)) {
+      if (!waitForCompletion) {
+        return false;
+      }
+      print("waiting for encryption completion for keyId=" + keyId);
+      while (response.get(ENCRYPTION_STATE).equals(STATE_PENDING)) {
+        if (isTimeElapsed()) {
+          return false;
+        }
+        Thread.sleep(500);
+        response = sendEncryptionRequest(keyId);
+      }
+      print("encryption complete for keyId=" + keyId);
+    }
+    return true;
+  }
+
+  private boolean waitForCompletion(Random random) {
+    return random.nextFloat() <= PROBABILITY_OF_WAITING_ENCRYPTION_COMPLETION;
+  }
+
+  private NamedList<Object> sendEncryptionRequest(String keyId) throws SolrServerException, IOException {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(PARAM_KEY_ID, keyId);
+    NamedList<Object> response = solrClient.request(new GenericSolrRequest(SolrRequest.METHOD.GET, "/admin/encrypt", params));
+    print("encrypt keyId=" + keyId + " => response status=" + response.get(STATUS) + " state=" + response.get(ENCRYPTION_STATE));
+    return response;
+  }
+
+  private static void print(String message) {
+    System.out.println(SYSTEM_OUTPUT_MARKER + message);
+  }
+
+  private static void threadPrint(String message) {
+    print(Thread.currentThread().getName() + ": " + message);
+  }
+
+  private static class Dictionary {
+
+    final List<String> terms;
+
+    Dictionary(List<String> terms) {
+      this.terms = terms;
+    }
+
+    String getTerm(Random random) {
+      return terms.get(random.nextInt(terms.size()));
+    }
+
+    static class Builder {
+
+      Dictionary build(int size, Random random) {
+        Set<String> terms = new HashSet<>();
+        for (int i = 0; i < size;) {
+          String term = RandomStrings.randomUnicodeOfCodepointLengthBetween(random, 4, 12);
+          if (terms.add(term)) {
+            i++;
+          }
+        }
+        return new Dictionary(new ArrayList<>(terms));
+      }
+    }
+  }
+
+  private class Indexer implements Runnable {
+
+    final long seed;
+    final AtomicLong docNum = new AtomicLong();
+
+    Indexer() {
+      seed = random().nextLong();
+    }
+
+    @Override
+    public void run() {
+      long numBatches = 0;
+      long totalDocs = 0;
+      long numCommits = 0;
+      try {
+        Random random = new Random(seed);
+        while (!stopTest) {
+          Thread.sleep(random.nextInt(RANDOM_DELAY_BETWEEN_INDEXING_BATCHES_MS));
+          Collection<SolrInputDocument> docs = new ArrayList<>();
+          for (int i = random.nextInt(RANDOM_NUM_DOCS_PER_BATCH) + 1; i > 0; i--) {
+            docs.add(createDoc(random));
+          }
+          totalDocs += docs.size();
+          solrClient.add(docs);
+          if (random.nextFloat() <= PROBABILITY_OF_COMMIT_PER_BATCH) {
+            numCommits++;
+            solrClient.commit();
+          }
+          if (++numBatches % 10 == 0) {
+            threadPrint("sent " + numBatches + " indexing batches, totalDocs=" + totalDocs + ", numCommits=" + numCommits);
+          }
+        }
+      } catch (InterruptedException e) {
+        threadPrint("Indexing interrupted");
+        e.printStackTrace(System.err);
+      } catch (Exception e) {
+        exception = e;
+        threadPrint("Indexing stopped by exception");
+        e.printStackTrace(System.err);
+      } finally {
+        threadPrint("Stop indexing");
+        threadPrint("sent " + numBatches + " indexing batches, totalDocs=" + totalDocs + ", numCommits=" + numCommits);
+        stopTest = true;
+      }
+    }
+
+    SolrInputDocument createDoc(Random random) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", Long.toString(docNum.getAndIncrement()));
+      doc.addField("text", dictionary.getTerm(random));
+      return doc;
+    }
+  }
+
+  private class Querier implements Runnable {
+
+    final long seed;
+
+    Querier() {
+      seed = random().nextLong();
+    }
+
+    @Override
+    public void run() {
+      long totalResults = 0;
+      long numQueries = 0;
+      long numConsecutiveNoResults = 0;
+      try {
+        Random random = new Random(seed);
+        while (!stopTest) {
+          Thread.sleep(random.nextInt(RANDOM_DELAY_BETWEEN_QUERIES_MS));
+          QueryResponse response = null;
+          do {
+            try {
+              response = solrClient.query(new SolrQuery(dictionary.getTerm(random)));
+            } catch (Exception e) {
+              // Some queries might not be parseable due to the random terms. Just retry with another term.
+            }
+          } while (response == null);
+          int numResults = response.getResults().size();
+          totalResults += numResults;
+          numQueries++;
+          if (numResults == 0) {
+            numConsecutiveNoResults++;
+          } else {
+            numConsecutiveNoResults = 0;
+          }
+          if (numQueries % 500 == 0) {
+            threadPrint("sent " + numQueries + " queries, totalResults=" + totalResults + ", numConsecutiveNoResults=" + numConsecutiveNoResults);
+          }
+        }
+      } catch (InterruptedException e) {
+        threadPrint("Querying interrupted");
+        e.printStackTrace(System.err);
+      } catch (Exception e) {
+        exception = e;
+        threadPrint("Querying stopped by exception");
+        e.printStackTrace(System.err);
+      } finally {
+        threadPrint("Stop querying");
+        threadPrint("sent " + numQueries + " queries, totalResults=" + totalResults + ", numConsecutiveNoResults=" + numConsecutiveNoResults);
+        stopTest = true;
+      }
+    }
+  }
+}
diff --git a/encryption/src/test/java/org/apache/solr/encryption/EncryptionMergePolicyFactoryTest.java b/encryption/src/test/java/org/apache/solr/encryption/EncryptionMergePolicyFactoryTest.java
new file mode 100644
index 0000000..4ff3d38
--- /dev/null
+++ b/encryption/src/test/java/org/apache/solr/encryption/EncryptionMergePolicyFactoryTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption;
+
+import org.apache.lucene.analysis.core.WhitespaceAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.ConcurrentMergeScheduler;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.MergePolicy;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSLockFactory;
+import org.apache.lucene.store.MMapDirectory;
+import org.apache.lucene.tests.util.LuceneTestCase;
+import org.apache.solr.core.SolrResourceLoader;
+import org.apache.solr.encryption.crypto.LightAesCtrEncrypter;
+import org.apache.solr.index.MergePolicyFactoryArgs;
+import org.apache.solr.index.TieredMergePolicyFactory;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Tests {@link EncryptionMergePolicyFactory}.
+ */
+public class EncryptionMergePolicyFactoryTest extends LuceneTestCase {
+
+  private final SolrResourceLoader resourceLoader = new SolrResourceLoader(createTempDir());
+
+  /**
+   * Verifies the merge policy factory loading from solrconfig.xml.
+   */
+  @Test
+  public void testMergePolicyCreation() {
+    MergePolicy mergePolicy = createMergePolicy();
+    assertEquals(EncryptionMergePolicy.class, mergePolicy.getClass());
+    MergePolicy delegateMP = ((EncryptionMergePolicy) mergePolicy).getDelegate();
+    assertEquals(TieredMergePolicy.class, delegateMP.getClass());
+    TieredMergePolicy tieredMP = (TieredMergePolicy) delegateMP;
+    assertEquals(5, tieredMP.getMaxMergeAtOnce());
+  }
+
+  private MergePolicy createMergePolicy() {
+    final MergePolicyFactoryArgs args = new MergePolicyFactoryArgs();
+    String prefix = "delegate";
+    args.add("wrapped.prefix", prefix);
+    args.add(prefix + ".class", TieredMergePolicyFactory.class.getName());
+    args.add(prefix + ".maxMergeAtOnce", 5);
+    return new EncryptionMergePolicyFactory(resourceLoader, args, null).getMergePolicy();
+  }
+
+  /**
+   * Verifies that each segment is re-encrypted individually
+   * (not requiring an optimized commit to merge into a single segment).
+   */
+  @Test
+  public void testSegmentReencryption() throws Exception {
+    KeyManager keyManager = new TestingKeyManager.Supplier().getKeyManager();
+    try (Directory dir = new EncryptionDirectory(new MMapDirectory(createTempDir(), FSLockFactory.getDefault()),
+                                                 LightAesCtrEncrypter.FACTORY,
+                                                 keyManager)) {
+      IndexWriterConfig iwc = new IndexWriterConfig(new WhitespaceAnalyzer());
+      iwc.setMergeScheduler(new ConcurrentMergeScheduler());
+      iwc.setMergePolicy(createMergePolicy());
+      try (IndexWriter writer = new IndexWriter(dir, iwc)) {
+
+        // Index 3 segments with encryption key id 1.
+        commit(writer, keyManager, TestingKeyManager.KEY_ID_1);
+        int numSegments = 3;
+        for (int i = 0; i < numSegments; ++i) {
+          writer.addDocument(new Document());
+          commit(writer, keyManager, TestingKeyManager.KEY_ID_1);
+        }
+        Set<String> initialSegmentNames = readSegmentNames(dir);
+        assertEquals(numSegments, initialSegmentNames.size());
+
+        // Run a force merge with the special max num segments trigger.
+        writer.forceMerge(Integer.MAX_VALUE);
+        commit(writer, keyManager, TestingKeyManager.KEY_ID_1);
+        // Verify no segments are merged because they are encrypted with
+        // the latest active key id.
+        assertEquals(initialSegmentNames, readSegmentNames(dir));
+
+        // Set the latest encryption key id 2.
+        commit(writer, keyManager, TestingKeyManager.KEY_ID_1, TestingKeyManager.KEY_ID_2);
+
+        // Run a force merge with any non-special max num segments.
+        writer.forceMerge(10);
+        commit(writer, keyManager, TestingKeyManager.KEY_ID_1, TestingKeyManager.KEY_ID_2);
+        // Verify no segments are merged.
+        assertEquals(initialSegmentNames, readSegmentNames(dir));
+
+        // Run a force merge with the special max num segments trigger.
+        writer.forceMerge(Integer.MAX_VALUE);
+        commit(writer, keyManager, TestingKeyManager.KEY_ID_1, TestingKeyManager.KEY_ID_2);
+        // Verify each segment has been rewritten.
+        Set<String> segmentNames = readSegmentNames(dir);
+        assertEquals(initialSegmentNames.size(), segmentNames.size());
+        assertNotEquals(initialSegmentNames, segmentNames);
+        segmentNames.retainAll(initialSegmentNames);
+        assertTrue(segmentNames.isEmpty());
+      }
+    }
+  }
+
+  private void commit(IndexWriter writer, KeyManager keyManager, String... keyIds) throws IOException {
+    Map<String, String> commitData = new HashMap<>();
+    for (String keyId : keyIds) {
+      EncryptionUtil.setNewActiveKeyIdInCommit(keyId, keyManager.getKeyCookie(keyId), commitData);
+    }
+    writer.setLiveCommitData(commitData.entrySet());
+    writer.commit();
+  }
+
+  private Set<String> readSegmentNames(Directory dir) throws IOException {
+    SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(dir);
+    return segmentInfos.asList().stream().map(sci -> sci.info.name).collect(Collectors.toSet());
+  }
+}
diff --git a/encryption/src/test/java/org/apache/solr/encryption/EncryptionRequestHandlerTest.java b/encryption/src/test/java/org/apache/solr/encryption/EncryptionRequestHandlerTest.java
new file mode 100644
index 0000000..5b7f506
--- /dev/null
+++ b/encryption/src/test/java/org/apache/solr/encryption/EncryptionRequestHandlerTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.GenericSolrRequest;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.RetryUtil;
+import org.apache.solr.encryption.crypto.AesCtrEncrypterFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.solr.encryption.EncryptionRequestHandler.*;
+import static org.apache.solr.encryption.EncryptionUtil.getKeyIdFromCommit;
+
+/**
+ * Tests {@link EncryptionRequestHandler} (re)encryption logic.
+ * For a concurrent heavy load test, see {@link EncryptionHeavyLoadTest}.
+ */
+public class EncryptionRequestHandlerTest extends SolrCloudTestCase {
+
+  private static final String COLLECTION_PREFIX = EncryptionRequestHandlerTest.class.getSimpleName() + "-collection-";
+
+  private static MockEncryptionDirectory mockDir;
+
+  private String collectionName;
+  private CloudSolrClient solrClient;
+  private TestUtil testUtil;
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    System.setProperty(EncryptionDirectoryFactory.PROPERTY_INNER_ENCRYPTION_DIRECTORY_FACTORY, MockFactory.class.getName());
+    TestUtil.setInstallDirProperty();
+    cluster = new MiniSolrCloudCluster.Builder(1, createTempDir())
+      .addConfig("config", TestUtil.getConfigPath("collection1"))
+      .configure();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    System.clearProperty(EncryptionDirectoryFactory.PROPERTY_INNER_ENCRYPTION_DIRECTORY_FACTORY);
+    cluster.shutdown();
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    collectionName = COLLECTION_PREFIX + UUID.randomUUID();
+    solrClient = cluster.getSolrClient();
+    solrClient.setDefaultCollection(collectionName);
+    CollectionAdminRequest.createCollection(collectionName, 1, 1).process(solrClient);
+    cluster.waitForActiveCollection(collectionName, 1, 1);
+    testUtil = new TestUtil(solrClient, collectionName);
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    mockDir.clearMockValues();
+    CollectionAdminRequest.deleteCollection(collectionName).process(solrClient);
+    super.tearDown();
+  }
+
+  @Test
+  public void testEncryptionFromNoKeysToOneKey_NoIndex() throws Exception {
+    // Send an encrypt request with a key id on an empty index.
+    NamedList<Object> response = encrypt(TestingKeyManager.KEY_ID_1);
+    assertEquals(STATUS_SUCCESS, response.get(STATUS));
+    assertEquals(STATE_COMPLETE, response.get(ENCRYPTION_STATE));
+
+    // Index some documents to create a first segment.
+    testUtil.indexDocsAndCommit("weather broadcast");
+
+    // Verify that the segment is encrypted.
+    mockDir.forceClearText = true;
+    testUtil.assertCannotReloadCore();
+    mockDir.forceClearText = false;
+    testUtil.reloadCore();
+    testUtil.assertQueryReturns("weather", 1);
+  }
+
+  @Test
+  public void testEncryptionFromNoKeysToOneKeyToNoKeys_NoIndex() throws Exception {
+    // Send an encrypt request with a key id on an empty index.
+    NamedList<Object> response = encrypt(TestingKeyManager.KEY_ID_1);
+    assertEquals(STATUS_SUCCESS, response.get(STATUS));
+    assertEquals(STATE_COMPLETE, response.get(ENCRYPTION_STATE));
+
+    // Send another encrypt request with no key id, still on the empty index.
+    response = encrypt(NO_KEY_ID);
+    assertEquals(STATUS_SUCCESS, response.get(STATUS));
+    assertEquals(STATE_COMPLETE, response.get(ENCRYPTION_STATE));
+
+    // Index some documents to create a first segment.
+    testUtil.indexDocsAndCommit("weather broadcast");
+
+    // Verify that the segment is cleartext.
+    mockDir.forceClearText = true;
+    testUtil.reloadCore();
+    testUtil.assertQueryReturns("weather", 1);
+  }
+
+  @Test
+  public void testEncryptionFromNoKeysToOneKey_ExistingIndex() throws Exception {
+    createAndEncryptIndex();
+  }
+
+  private void createAndEncryptIndex() throws Exception {
+    // Index some documents to create multiple segments.
+    testUtil.indexDocsAndCommit("weather broadcast");
+    testUtil.indexDocsAndCommit("sunny weather");
+    // Verify that the segments are cleartext.
+    mockDir.forceClearText = true;
+    testUtil.reloadCore();
+    testUtil.assertQueryReturns("weather", 2);
+    mockDir.forceClearText = false;
+
+    // Send an encrypt request with a key id.
+    NamedList<Object> response = encrypt(TestingKeyManager.KEY_ID_1);
+    assertEquals(STATUS_SUCCESS, response.get(STATUS));
+    assertEquals(STATE_PENDING, response.get(ENCRYPTION_STATE));
+
+    waitUntilEncryptionIsComplete(TestingKeyManager.KEY_ID_1);
+
+    // Verify that the segment is encrypted.
+    mockDir.forceClearText = true;
+    testUtil.assertCannotReloadCore();
+    mockDir.forceClearText = false;
+    mockDir.soleKeyIdAllowed = TestingKeyManager.KEY_ID_1;
+    testUtil.reloadCore();
+    testUtil.assertQueryReturns("weather", 2);
+    mockDir.clearMockValues();
+  }
+
+  @Test
+  public void testEncryptionFromOneKeyToAnotherKey_ExistingIndex() throws Exception {
+    createAndEncryptIndex();
+
+    // Index some documents to ensure we have at least two segments.
+    testUtil.indexDocsAndCommit("foggy weather");
+
+    // Send an encrypt request with another key id.
+    NamedList<Object> response = encrypt(TestingKeyManager.KEY_ID_2);
+    assertEquals(STATUS_SUCCESS, response.get(STATUS));
+    assertEquals(STATE_PENDING, response.get(ENCRYPTION_STATE));
+
+    waitUntilEncryptionIsComplete(TestingKeyManager.KEY_ID_2);
+
+    // Verify that the segment is encrypted.
+    mockDir.forceClearText = true;
+    testUtil.assertCannotReloadCore();
+    mockDir.forceClearText = false;
+    mockDir.soleKeyIdAllowed = TestingKeyManager.KEY_ID_2;
+    testUtil.reloadCore();
+    testUtil.assertQueryReturns("weather", 3);
+  }
+
+  @Test
+  public void testEncryptionFromOneKeyToNoKeys_ExistingIndex() throws Exception {
+    createAndEncryptIndex();
+
+    // Index some documents to ensure we have at least two segments.
+    testUtil.indexDocsAndCommit("foggy weather");
+
+    // Send an encrypt request with no key id.
+    NamedList<Object> response = encrypt(NO_KEY_ID);
+    assertEquals(STATUS_SUCCESS, response.get(STATUS));
+    assertEquals(STATE_PENDING, response.get(ENCRYPTION_STATE));
+
+    waitUntilEncryptionIsComplete(NO_KEY_ID);
+
+    // Verify that the segment is cleartext.
+    mockDir.forceClearText = true;
+    testUtil.reloadCore();
+    testUtil.assertQueryReturns("weather", 3);
+    mockDir.clearMockValues();
+
+    // Index some documents to ensure we have at least two segments.
+    testUtil.indexDocsAndCommit("cloudy weather");
+
+    // Send an encrypt request with another key id.
+    response = encrypt(TestingKeyManager.KEY_ID_2);
+    assertEquals(STATUS_SUCCESS, response.get(STATUS));
+    assertEquals(STATE_PENDING, response.get(ENCRYPTION_STATE));
+
+    waitUntilEncryptionIsComplete(TestingKeyManager.KEY_ID_2);
+
+    // Verify that the segment is encrypted.
+    mockDir.forceClearText = true;
+    testUtil.assertCannotReloadCore();
+    mockDir.forceClearText = false;
+    mockDir.soleKeyIdAllowed = TestingKeyManager.KEY_ID_2;
+    testUtil.reloadCore();
+    testUtil.assertQueryReturns("weather", 4);
+  }
+
+  private NamedList<Object> encrypt(String keyId) throws Exception {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(PARAM_KEY_ID, keyId);
+    return solrClient.request(new GenericSolrRequest(SolrRequest.METHOD.GET, "/admin/encrypt", params));
+  }
+
+  private void waitUntilEncryptionIsComplete(String keyId) throws InterruptedException {
+    RetryUtil.retryUntil("Timeout waiting for encryption completion",
+                         50,
+                         100,
+                         TimeUnit.MILLISECONDS,
+                         () -> {
+                           NamedList<Object> response;
+                           try {
+                             response = encrypt(keyId);
+                           } catch (Exception e) {
+                             throw new RuntimeException(e);
+                           }
+                           assertEquals(STATUS_SUCCESS, response.get(STATUS));
+                           return response.get(ENCRYPTION_STATE).equals(STATE_COMPLETE);
+                         });
+  }
+
+  public static class MockFactory implements EncryptionDirectoryFactory.InnerFactory {
+    @Override
+    public EncryptionDirectory create(Directory delegate,
+                                      AesCtrEncrypterFactory encrypterFactory,
+                                      KeyManager keyManager) throws IOException {
+      return mockDir = new MockEncryptionDirectory(delegate, encrypterFactory, keyManager);
+    }
+  }
+
+  private static class MockEncryptionDirectory extends EncryptionDirectory {
+
+    boolean forceClearText;
+    String soleKeyIdAllowed;
+
+    MockEncryptionDirectory(Directory delegate, AesCtrEncrypterFactory encrypterFactory, KeyManager keyManager)
+      throws IOException {
+      super(delegate, encrypterFactory, keyManager);
+    }
+
+    void clearMockValues() {
+      forceClearText = false;
+      soleKeyIdAllowed = null;
+    }
+
+    @Override
+    public IndexInput openInput(String fileName, IOContext context) throws IOException {
+      return forceClearText ? in.openInput(fileName, context) : super.openInput(fileName, context);
+    }
+
+    @Override
+    protected byte[] getKeySecret(String keyRef) throws IOException {
+      if (soleKeyIdAllowed != null) {
+        String keyId = getKeyIdFromCommit(keyRef, getLatestCommitData().data);
+        assertEquals(soleKeyIdAllowed, keyId);
+      }
+      return super.getKeySecret(keyRef);
+    }
+  }
+}
diff --git a/encryption/src/test/java/org/apache/solr/encryption/EncryptionUpdateHandlerTest.java b/encryption/src/test/java/org/apache/solr/encryption/EncryptionUpdateHandlerTest.java
new file mode 100644
index 0000000..cf87cd5
--- /dev/null
+++ b/encryption/src/test/java/org/apache/solr/encryption/EncryptionUpdateHandlerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption;
+
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.apache.solr.encryption.CommitUtil.readLatestCommit;
+
+/**
+ * Tests {@link EncryptionUpdateHandler}.
+ */
+public class EncryptionUpdateHandlerTest extends SolrTestCaseJ4 {
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    TestUtil.setInstallDirProperty();
+    initCore("solrconfig.xml", "schema.xml", TestUtil.getConfigPath().toString());
+  }
+
+  /**
+   * Verifies that when a commit command contains some transferable user data,
+   * it should allow an empty commit with no change, to persist the
+   * commit-transferable user data.
+   */
+  @Test
+  public void testEmptyCommitWithTransferableUserData() throws Exception {
+    // Given an empty core.
+    SolrCore core = h.getCore();
+    SegmentInfos segmentInfos = readLatestCommit(core);
+    assertEquals(0, segmentInfos.size());
+    assertTrue(segmentInfos.getUserData().isEmpty());
+
+    // When we commit with no change but with commit-transferable data.
+    SolrQueryRequest req = new LocalSolrQueryRequest(core, Collections.emptyMap());
+    CommitUpdateCommand commitCmd = new CommitUpdateCommand(req, false);
+    commitCmd.commitData = new HashMap<>();
+    String transferableDataKey1 = EncryptionUpdateHandler.TRANSFERABLE_COMMIT_DATA + "key1";
+    String transferableDataValue1 = "myValue1";
+    commitCmd.commitData.put(transferableDataKey1, transferableDataValue1);
+    req.getCore().getUpdateHandler().commit(commitCmd);
+
+    // Then the empty commit is persisted with the data.
+    segmentInfos = readLatestCommit(core);
+    assertEquals(0, segmentInfos.size());
+    assertEquals(transferableDataValue1, segmentInfos.getUserData().get(transferableDataKey1));
+
+    // When we commit again with different commit-transferable data.
+    commitCmd = new CommitUpdateCommand(req, false);
+    commitCmd.commitData = new HashMap<>();
+    String transferableDataKey2 = EncryptionUpdateHandler.TRANSFERABLE_COMMIT_DATA + "key2";
+    String transferableDataValue2 = "myValue2";
+    commitCmd.commitData.put(transferableDataKey2, transferableDataValue2);
+    req.getCore().getUpdateHandler().commit(commitCmd);
+
+    // Then the empty commit is persisted and the commit data is replaced,
+    // it does not add up.
+    segmentInfos = readLatestCommit(core);
+    assertEquals(0, segmentInfos.size());
+    assertNull(segmentInfos.getUserData().get(transferableDataKey1));
+    assertEquals(transferableDataValue2, segmentInfos.getUserData().get(transferableDataKey2));
+
+    // When we commit again without commit data in the command.
+    commitCmd = new CommitUpdateCommand(req, false);
+    req.getCore().getUpdateHandler().commit(commitCmd);
+
+    // Then the empty commit is persisted and transfers commit-transferable data
+    // from the previous commit.
+    segmentInfos = readLatestCommit(core);
+    assertEquals(0, segmentInfos.size());
+    assertNull(segmentInfos.getUserData().get(transferableDataKey1));
+    assertEquals(transferableDataValue2, segmentInfos.getUserData().get(transferableDataKey2));
+  }
+}
diff --git a/encryption/src/test/java/org/apache/solr/encryption/TestUtil.java b/encryption/src/test/java/org/apache/solr/encryption/TestUtil.java
new file mode 100644
index 0000000..9dba03c
--- /dev/null
+++ b/encryption/src/test/java/org/apache/solr/encryption/TestUtil.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Slice;
+import org.junit.Assert;
+
+import java.nio.file.Path;
+
+/**
+ * Utility methods for encryption tests.
+ */
+public class TestUtil {
+
+  private final CloudSolrClient solrClient;
+  private final String collectionName;
+  private int docId;
+
+  public TestUtil(CloudSolrClient solrClient, String collectionName) {
+    this.solrClient = solrClient;
+    this.collectionName = collectionName;
+  }
+
+  /**
+   * Sets the "solr.install.dir" system property.
+   */
+  public static void setInstallDirProperty() {
+    System.setProperty("solr.install.dir", SolrTestCaseJ4.getFile("..").getPath());
+  }
+
+  /**
+   * Gets the path to the encryption module test config.
+   */
+  public static Path getConfigPath() {
+    return getConfigPath("");
+  }
+
+  /**
+   * Gets the path of a specific sub-dir of the encryption module test config.
+   */
+  public static Path getConfigPath(String configDir) {
+    return SolrTestCaseJ4.getFile("src/test/resources/configs/" + configDir).toPath();
+  }
+
+  /**
+   * Adds one doc per provided text, and commits.
+   */
+  public void indexDocsAndCommit(String... texts) throws Exception {
+    for (String text : texts) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", Integer.toString(docId++));
+      doc.addField("text", text);
+      solrClient.add(doc);
+    }
+    solrClient.commit(collectionName);
+  }
+
+  /**
+   * Verifies that the provided query returns the expected number of results.
+   */
+  public void assertQueryReturns(String query, int expectedNumResults) throws Exception {
+    QueryResponse response = solrClient.query(new SolrQuery(query));
+    Assert.assertEquals(expectedNumResults, response.getResults().size());
+  }
+
+  /**
+   * Reloads the leader replica core of the first shard of the collection.
+   */
+  public void reloadCore() throws Exception {
+    try {
+      DocCollection collection = solrClient.getClusterState().getCollection(collectionName);
+      Slice slice = collection.getSlices().iterator().next();
+      CoreAdminRequest.reloadCore(slice.getLeader().core, solrClient);
+    } catch (SolrException e) {
+      throw new CoreReloadException("The index cannot be reloaded. There is probably an issue with the encryption key ids.", e);
+    }
+  }
+
+  /**
+   * Verifies that {@link #reloadCore()} fails.
+   */
+  public void assertCannotReloadCore() throws Exception {
+    try {
+      reloadCore();
+      Assert.fail("Core reloaded whereas it was not expected to be possible");
+    } catch (CoreReloadException e) {
+      // Expected.
+    }
+  }
+
+  private static class CoreReloadException extends Exception {
+    CoreReloadException(String msg, SolrException cause) {
+      super(msg, cause);
+    }
+  }
+}
diff --git a/encryption/src/test/java/org/apache/solr/encryption/TestingKeyManager.java b/encryption/src/test/java/org/apache/solr/encryption/TestingKeyManager.java
new file mode 100644
index 0000000..5e37940
--- /dev/null
+++ b/encryption/src/test/java/org/apache/solr/encryption/TestingKeyManager.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption;
+
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.solr.common.util.NamedList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Mocked implementation of {@link KeyManager}.
+ */
+public class TestingKeyManager implements KeyManager {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public static final String KEY_ID_1 = "mock1";
+  public static final String KEY_ID_2 = "mock2";
+  public static final String KEY_ID_3 = "mock3";
+  public static final byte[] KEY_COOKIE_1 = "ABCDE".getBytes(StandardCharsets.UTF_8);
+  public static final byte[] KEY_COOKIE_2 = "BCDEF".getBytes(StandardCharsets.UTF_8);
+  public static final byte[] KEY_COOKIE_3 = "CDEFG".getBytes(StandardCharsets.UTF_8);
+  private static final Map<String, byte[]> MOCK_COOKIES = Map.of(KEY_ID_1, KEY_COOKIE_1, KEY_ID_2, KEY_COOKIE_2, KEY_ID_3, KEY_COOKIE_3);
+  public static final byte[] KEY_SECRET_1 = "12345678901234567890123456789012".getBytes(StandardCharsets.UTF_8);
+  public static final byte[] KEY_SECRET_2 = "34567890123456789012345678901234".getBytes(StandardCharsets.UTF_8);
+  public static final byte[] KEY_SECRET_3 = "78901234567890123456789012345678".getBytes(StandardCharsets.UTF_8);
+  private static final Map<String, byte[]> MOCK_KEYS = Map.of(KEY_ID_1, KEY_SECRET_1, KEY_ID_2, KEY_SECRET_2, KEY_ID_3, KEY_SECRET_3);
+
+  /**
+   * File name extensions/suffixes that do NOT need to be encrypted because it lacks user/external data.
+   * Other files should be encrypted.
+   * There is some human judgement here as some files may contain vague clues as to the shape of the data.
+   */
+  private static final Set<String> CLEARTEXT_EXTENSIONS = Set.of(
+    "doc",    // Document number, frequencies, and skip data
+    "pos",    // Positions
+    "pay",    // Payloads and offsets
+    "dvm",    // Doc values metadata
+    "fdm",    // Stored fields metadata
+    "fdx",    // Stored fields index
+    "nvd",    // Norms data
+    "nvm",    // Norms metadata
+    "fnm",    // Field Infos
+    "si",     // Segment Infos
+    "cfe"     // Compound file entries
+    );
+  // Extensions known to contain sensitive user data, and thus that need to be encrypted:
+  // tip    - BlockTree terms index (FST)
+  // tim    - BlockTree terms
+  // tmd    - BlockTree metadata (contains first and last term)
+  // fdt    - Stored fields data
+  // dvd    - Doc values data
+  // ustd   - UniformSplit index (FST)
+  // ustb   - UniformSplit terms (including metadata)
+  // stustd - HintDriven UniformSplit index (FST)
+  // stustb - HintDriven UniformSplit terms (including metadata)
+  // cfs    - Compound file (contains all the above files data)
+
+  // Cleartext temporary files:
+  private static final String TMP_EXTENSION = "tmp";
+  private static final String TMP_DOC_IDS = "-doc_ids"; // FieldsIndexWriter
+  private static final String TMP_FILE_POINTERS = "file_pointers"; // FieldsIndexWriter
+
+  static {
+    try {
+      assert false;
+      log.error(TestingKeyManager.class.getSimpleName() + " must not be used in production");
+    } catch (AssertionError e) {
+      // Ok.
+    }
+  }
+
+  private TestingKeyManager() {}
+
+  @Override
+  public boolean isEncryptable(String fileName) {
+    String extension = IndexFileNames.getExtension(fileName);
+    if (extension == null) {
+      // segments and pending_segments are never passed as parameter of this method.
+      assert !fileName.startsWith(IndexFileNames.SEGMENTS) && !fileName.startsWith(IndexFileNames.PENDING_SEGMENTS);
+    } else if (CLEARTEXT_EXTENSIONS.contains(extension)) {
+      // The file extension tells us it does not need to be encrypted.
+      return false;
+    } else if (extension.equals(TMP_EXTENSION)) {
+      // We know some tmp files do not need to be encrypted.
+      int tmpCounterIndex = fileName.lastIndexOf('_');
+      assert tmpCounterIndex != -1;
+      if (endsWith(fileName, TMP_DOC_IDS, tmpCounterIndex)
+      || endsWith(fileName, TMP_FILE_POINTERS, tmpCounterIndex)) {
+        return false;
+      }
+    }
+    // By default, all other files should be encrypted.
+    return true;
+  }
+
+  private static boolean endsWith(String s, String suffix, int endIndex) {
+    // Inspired from JDK String where endsWith calls startsWith.
+    // Here we look for [suffix] from index [endIndex - suffix.length()].
+    // This is equivalent to
+    // s.substring(0, endIndex).endsWith(suffix)
+    // without creating a substring.
+    return s.startsWith(suffix, endIndex - suffix.length());
+  }
+
+  @Override
+  public byte[] getKeyCookie(String keyId) {
+    //TODO: Replace this mock. This class should be the key cache.
+    byte[] cookie = MOCK_COOKIES.get(keyId);
+    if (cookie == null) {
+      throw new NoSuchElementException("No key defined for " + keyId);
+    }
+    return cookie;
+  }
+
+  @Override
+  public byte[] getKeySecret(String keyId, String keyRef, Function<String, byte[]> cookieSupplier) {
+    //TODO: Replace this mock. This class should be the key cache.
+    byte[] secret = MOCK_KEYS.get(keyId);
+    if (secret == null) {
+      throw new NoSuchElementException("No key defined for " + keyId);
+    }
+    byte[] cookie = cookieSupplier.apply(keyRef);
+    byte[] expectedCookie = MOCK_COOKIES.get(keyId);
+    if (cookie != null && expectedCookie != null && !Arrays.equals(cookie, expectedCookie)
+      || (cookie == null || expectedCookie == null) && cookie != expectedCookie) {
+      throw new IllegalStateException("Wrong cookie provided");
+    }
+    return secret;
+  }
+
+  /**
+   * Supplies the {@link TestingKeyManager} singleton.
+   */
+  public static class Supplier implements KeyManager.Supplier {
+
+    private static final KeyManager SINGLETON = new TestingKeyManager();
+
+    @Override
+    public void init(NamedList<?> args) {
+      // Do nothing.
+    }
+
+    @Override
+    public KeyManager getKeyManager() {
+      return SINGLETON;
+    }
+  }
+}
diff --git a/encryption/src/test/java/org/apache/solr/encryption/crypto/AesCtrEncrypterTest.java b/encryption/src/test/java/org/apache/solr/encryption/crypto/AesCtrEncrypterTest.java
new file mode 100644
index 0000000..7852424
--- /dev/null
+++ b/encryption/src/test/java/org/apache/solr/encryption/crypto/AesCtrEncrypterTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption.crypto;
+
+import java.nio.ByteBuffer;
+
+import org.apache.lucene.tests.util.LuceneTestCase;
+import org.junit.Test;
+
+import static org.apache.solr.encryption.crypto.AesCtrUtil.*;
+
+/**
+ * Tests {@link AesCtrEncrypter} implementations.
+ */
+public class AesCtrEncrypterTest extends LuceneTestCase {
+
+  /**
+   * Verifies that {@link AesCtrEncrypter} implementations encrypt and decrypt data exactly the
+   * same way. They produce the same encrypted data and can decrypt each other.
+   */
+  @Test
+  public void testEncryptionDecryption() {
+    for (int i = 0; i < 100; i++) {
+      ByteBuffer clearData = generateRandomData(10000);
+      byte[] key = generateRandomBytes(AES_BLOCK_SIZE);
+      byte[] iv = generateRandomAesCtrIv(SecureRandomProvider.get());
+      AesCtrEncrypter encrypter1 = encrypterFactory().create(key, iv);
+      AesCtrEncrypter encrypter2 = encrypterFactory().create(key, iv);
+
+      ByteBuffer encryptedDataLight = crypt(clearData, encrypter1);
+      ByteBuffer encryptedDataCipher = crypt(clearData, encrypter2);
+      assertEquals(encryptedDataCipher, encryptedDataLight);
+
+      ByteBuffer decryptedData = crypt(encryptedDataLight, encrypter1);
+      assertEquals(clearData, decryptedData);
+      decryptedData = crypt(encryptedDataLight, encrypter2);
+      assertEquals(clearData, decryptedData);
+    }
+  }
+
+  private AesCtrEncrypterFactory encrypterFactory() {
+    if (LightAesCtrEncrypter.isSupported()) {
+      return random().nextBoolean() ? CipherAesCtrEncrypter.FACTORY : LightAesCtrEncrypter.FACTORY;
+    }
+    return CipherAesCtrEncrypter.FACTORY;
+  }
+
+  private static ByteBuffer generateRandomData(int numBytes) {
+    ByteBuffer buffer = ByteBuffer.allocate(numBytes);
+    for (int i = 0; i < numBytes; i++) {
+      buffer.put((byte) random().nextInt());
+    }
+    buffer.position(0);
+    return buffer;
+  }
+
+  private static byte[] generateRandomBytes(int numBytes) {
+    byte[] b = new byte[numBytes];
+    // Random.nextBytes(byte[]) does not produce good enough randomness here,
+    // it has a bias to produce 0 and -1 bytes.
+    for (int i = 0; i < numBytes; i++) {
+      b[i] = (byte) random().nextInt();
+    }
+    return b;
+  }
+
+  private ByteBuffer crypt(ByteBuffer inputBuffer, AesCtrEncrypter encrypter) {
+    encrypter = randomClone(encrypter);
+    encrypter.init(0);
+    int inputInitialPosition = inputBuffer.position();
+    ByteBuffer outputBuffer = ByteBuffer.allocate(inputBuffer.capacity());
+    while (inputBuffer.remaining() > 0) {
+      int length = Math.min(random().nextInt(51) + 1, inputBuffer.remaining());
+      ByteBuffer inputSlice = inputBuffer.slice();
+      inputSlice.limit(inputSlice.position() + length);
+      encrypter.process(inputSlice, outputBuffer);
+      inputBuffer.position(inputBuffer.position() + length);
+    }
+    inputBuffer.position(inputInitialPosition);
+    outputBuffer.position(0);
+    return outputBuffer;
+  }
+
+  private static AesCtrEncrypter randomClone(AesCtrEncrypter encrypter) {
+    return random().nextBoolean() ? encrypter.clone() : encrypter;
+  }
+}
\ No newline at end of file
diff --git a/encryption/src/test/java/org/apache/solr/encryption/crypto/BaseDataOutputTestCase.java b/encryption/src/test/java/org/apache/solr/encryption/crypto/BaseDataOutputTestCase.java
new file mode 100644
index 0000000..621dfa1
--- /dev/null
+++ b/encryption/src/test/java/org/apache/solr/encryption/crypto/BaseDataOutputTestCase.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption.crypto;
+
+import com.carrotsearch.randomizedtesting.Xoroshiro128PlusRandom;
+import com.carrotsearch.randomizedtesting.generators.RandomBytes;
+import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
+import com.carrotsearch.randomizedtesting.generators.RandomPicks;
+import com.carrotsearch.randomizedtesting.generators.RandomStrings;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.lucene.store.ByteBuffersDataInput;
+import org.apache.lucene.store.ByteBuffersDataOutput;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.DataOutput;
+import org.apache.lucene.store.OutputStreamDataOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOConsumer;
+import org.apache.lucene.tests.util.LuceneTestCase;
+import org.junit.Test;
+
+/**
+ * Copy of {@code org.apache.lucene.store.BaseDataOutputTestCase}.
+ */
+public abstract class BaseDataOutputTestCase<T extends DataOutput> extends LuceneTestCase {
+  protected abstract T newInstance();
+
+  protected abstract byte[] toBytes(T instance);
+
+  @FunctionalInterface
+  private interface ThrowingBiFunction<T, U, R> {
+    R apply(T t, U u) throws Exception;
+  }
+
+  @Test
+  public void testRandomizedWrites() throws IOException {
+    T dst = newInstance();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutput ref = new OutputStreamDataOutput(baos);
+
+    long seed = random().nextLong();
+    int max = 50_000;
+    addRandomData(dst, new Xoroshiro128PlusRandom(seed), max);
+    addRandomData(ref, new Xoroshiro128PlusRandom(seed), max);
+    assertArrayEquals(baos.toByteArray(), toBytes(dst));
+  }
+
+  protected static List<IOConsumer<DataInput>> addRandomData(
+    DataOutput dst, Random rnd, int maxAddCalls) throws IOException {
+    try {
+      List<IOConsumer<DataInput>> reply = new ArrayList<>();
+      for (int i = 0; i < maxAddCalls; i++) {
+        reply.add(RandomPicks.randomFrom(rnd, GENERATORS).apply(dst, rnd));
+      }
+      return reply;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  private static final List<ThrowingBiFunction<DataOutput, Random, IOConsumer<DataInput>>> GENERATORS;
+
+  static {
+    GENERATORS = new ArrayList<>();
+
+    // writeByte/ readByte
+    GENERATORS.add(
+      (dst, rnd) -> {
+        byte value = (byte) rnd.nextInt();
+        dst.writeByte(value);
+        return (src) -> assertEquals("readByte()", value, src.readByte());
+      });
+
+    // writeBytes/ readBytes (array and buffer version).
+    GENERATORS.add(
+      (dst, rnd) -> {
+        byte[] bytes = RandomBytes.randomBytesOfLengthBetween(rnd, 0, 100);
+        ByteBuffersDataOutput rdo =
+          dst instanceof ByteBuffersDataOutput ? (ByteBuffersDataOutput) dst : null;
+
+        if (rnd.nextBoolean() && rdo != null) {
+          rdo.writeBytes(ByteBuffer.wrap(bytes));
+        } else {
+          dst.writeBytes(bytes, bytes.length);
+        }
+
+        boolean useBuffersForRead = rnd.nextBoolean();
+        return (src) -> {
+          byte[] read = new byte[bytes.length];
+          if (useBuffersForRead && src instanceof ByteBuffersDataInput) {
+            ((ByteBuffersDataInput) src).readBytes(ByteBuffer.wrap(read), read.length);
+            assertArrayEquals("readBytes(ByteBuffer)", bytes, read);
+          } else {
+            src.readBytes(read, 0, read.length);
+            assertArrayEquals("readBytes(byte[])", bytes, read);
+          }
+        };
+      });
+
+    // writeBytes/ readBytes (array + offset).
+    GENERATORS.add(
+      (dst, rnd) -> {
+        byte[] bytes = RandomBytes.randomBytesOfLengthBetween(rnd, 0, 100);
+        int off = RandomNumbers.randomIntBetween(rnd, 0, bytes.length);
+        int len = RandomNumbers.randomIntBetween(rnd, 0, bytes.length - off);
+        dst.writeBytes(bytes, off, len);
+
+        return (src) -> {
+          byte[] read = new byte[bytes.length + off];
+          src.readBytes(read, off, len);
+          assertArrayEquals(
+            "readBytes(byte[], off)",
+            ArrayUtil.copyOfSubArray(bytes, off, len + off),
+            ArrayUtil.copyOfSubArray(read, off, len + off));
+        };
+      });
+
+    GENERATORS.add(
+      (dst, rnd) -> {
+        int v = rnd.nextInt();
+        dst.writeInt(v);
+        return (src) -> assertEquals("readInt()", v, src.readInt());
+      });
+
+    GENERATORS.add(
+      (dst, rnd) -> {
+        long v = rnd.nextLong();
+        dst.writeLong(v);
+        return (src) -> assertEquals("readLong()", v, src.readLong());
+      });
+
+    GENERATORS.add(
+      (dst, rnd) -> {
+        short v = (short) rnd.nextInt();
+        dst.writeShort(v);
+        return (src) -> assertEquals("readShort()", v, src.readShort());
+      });
+
+    GENERATORS.add(
+      (dst, rnd) -> {
+        int v = rnd.nextInt();
+        dst.writeVInt(v);
+        return (src) -> assertEquals("readVInt()", v, src.readVInt());
+      });
+
+    GENERATORS.add(
+      (dst, rnd) -> {
+        int v = rnd.nextInt();
+        dst.writeZInt(v);
+        return (src) -> assertEquals("readZInt()", v, src.readZInt());
+      });
+
+    GENERATORS.add(
+      (dst, rnd) -> {
+        long v = rnd.nextLong() & (-1L >>> 1);
+        dst.writeVLong(v);
+        return (src) -> assertEquals("readVLong()", v, src.readVLong());
+      });
+
+    GENERATORS.add(
+      (dst, rnd) -> {
+        long v = rnd.nextLong();
+        dst.writeZLong(v);
+        return (src) -> assertEquals("readZLong()", v, src.readZLong());
+      });
+
+    GENERATORS.add(
+      (dst, rnd) -> {
+        String v;
+        if (rnd.nextInt(50) == 0) {
+          // Occasionally a large blob.
+          v =
+            RandomStrings.randomUnicodeOfLength(
+              rnd, RandomNumbers.randomIntBetween(rnd, 2048, 4096));
+        } else {
+          v =
+            RandomStrings.randomUnicodeOfLength(
+              rnd, RandomNumbers.randomIntBetween(rnd, 0, 10));
+        }
+        dst.writeString(v);
+        return (src) -> assertEquals("readString()", v, src.readString());
+      });
+  }
+}
\ No newline at end of file
diff --git a/encryption/src/test/java/org/apache/solr/encryption/crypto/DecryptingIndexInputTest.java b/encryption/src/test/java/org/apache/solr/encryption/crypto/DecryptingIndexInputTest.java
new file mode 100644
index 0000000..393f8c0
--- /dev/null
+++ b/encryption/src/test/java/org/apache/solr/encryption/crypto/DecryptingIndexInputTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption.crypto;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.Xoroshiro128PlusRandom;
+import org.apache.lucene.store.ByteBuffersDataOutput;
+import org.apache.lucene.store.ByteBuffersIndexInput;
+import org.apache.lucene.store.ByteBuffersIndexOutput;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.DataOutput;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOConsumer;
+import org.apache.lucene.tests.util.LuceneTestCase;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests {@link DecryptingIndexInput}.
+ */
+public class DecryptingIndexInputTest extends RandomizedTest {
+
+  private byte[] key;
+
+  @Before
+  public void initializeEncryption() {
+    // AES key length can either 16, 24 or 32 bytes.
+    key = randomBytesOfLength(randomIntBetween(2, 4) * 8);
+  }
+
+  @Test
+  public void testSanity() throws IOException {
+    ByteBuffersDataOutput dataOutput = new ByteBuffersDataOutput();
+    EncryptingIndexOutput indexOutput = createEncryptingIndexOutput(dataOutput);
+    indexOutput.close();
+    DecryptingIndexInput indexInput1 = createDecryptingIndexInput(dataOutput, 0);
+    assertEquals(0, indexInput1.length());
+    LuceneTestCase.expectThrows(EOFException.class, indexInput1::readByte);
+
+    dataOutput = new ByteBuffersDataOutput();
+    indexOutput = createEncryptingIndexOutput(dataOutput);
+    indexOutput.writeByte((byte) 1);
+    indexOutput.close();
+
+    DecryptingIndexInput indexInput2 = createDecryptingIndexInput(dataOutput, 0);
+    assertEquals(1, indexInput2.length());
+    assertEquals(0, indexInput2.getFilePointer());
+    assertEquals(0, indexInput1.length());
+    indexInput1.close();
+
+    assertEquals(1, indexInput2.readByte());
+    assertEquals(1, indexInput2.getFilePointer());
+    assertEquals(1, indexInput2.randomAccessSlice(0, 1).readByte(0));
+
+    LuceneTestCase.expectThrows(EOFException.class, indexInput2::readByte);
+
+    assertEquals(1, indexInput2.getFilePointer());
+    indexInput2.close();
+  }
+
+  @Test
+  public void testRandomReads() throws Exception {
+    ByteBuffersDataOutput dataOutput = new ByteBuffersDataOutput();
+    int offset = writeRandomBytes(dataOutput);
+    EncryptingIndexOutput indexOutput = createEncryptingIndexOutput(dataOutput);
+
+    long seed = randomLong();
+    int maxAddCalls = 100_000;
+    List<IOConsumer<DataInput>> reply =
+      BaseDataOutputTestCase.addRandomData(indexOutput, new Xoroshiro128PlusRandom(seed), maxAddCalls);
+    indexOutput.close();
+
+    DecryptingIndexInput indexInput = createDecryptingIndexInput(dataOutput, offset);
+    for (IOConsumer<DataInput> c : reply) {
+      c.accept(indexInput);
+    }
+
+    LuceneTestCase.expectThrows(EOFException.class, indexInput::readByte);
+    indexInput.close();
+  }
+
+  @Test
+  public void testRandomReadsOnSlices() throws Exception {
+    for (int reps = randomIntBetween(1, 20); --reps > 0; ) {
+      ByteBuffersDataOutput dataOutput = new ByteBuffersDataOutput();
+      int offset = writeRandomBytes(dataOutput);
+      EncryptingIndexOutput indexOutput = createEncryptingIndexOutput(dataOutput);
+
+      byte[] prefix = new byte[randomIntBetween(0, 1024 * 8)];
+      indexOutput.writeBytes(prefix, 0, prefix.length);
+
+      long seed = randomLong();
+      int max = 10_000;
+      List<IOConsumer<DataInput>> reply =
+        BaseDataOutputTestCase.addRandomData(indexOutput, new Xoroshiro128PlusRandom(seed), max);
+
+      byte[] suffix = new byte[randomIntBetween(0, 1024 * 8)];
+      indexOutput.writeBytes(suffix, 0, suffix.length);
+      long outputLength = indexOutput.getFilePointer();
+      indexOutput.close();
+
+      IndexInput indexInput = createDecryptingIndexInput(dataOutput, offset).slice("Test", prefix.length, outputLength - prefix.length - suffix.length);
+
+      assertEquals(0, indexInput.getFilePointer());
+      assertEquals(outputLength - prefix.length - suffix.length, indexInput.length());
+      for (IOConsumer<DataInput> c : reply) {
+        c.accept(indexInput);
+      }
+
+      LuceneTestCase.expectThrows(EOFException.class, indexInput::readByte);
+      indexInput.close();
+    }
+  }
+
+  @Test
+  public void testSeekEmpty() throws Exception {
+    ByteBuffersDataOutput dataOutput = new ByteBuffersDataOutput();
+    createEncryptingIndexOutput(dataOutput).close();
+    DecryptingIndexInput indexInput = createDecryptingIndexInput(dataOutput, 0);
+
+    indexInput.seek(0);
+    LuceneTestCase.expectThrows(EOFException.class, () -> indexInput.seek(1));
+
+    indexInput.seek(0);
+    LuceneTestCase.expectThrows(EOFException.class, indexInput::readByte);
+    indexInput.close();
+  }
+
+  @Test
+  public void testSeek() throws Exception {
+    for (int reps = randomIntBetween(1, 200); --reps > 0; ) {
+      ByteBuffersDataOutput dataOutput = new ByteBuffersDataOutput();
+      int offset = writeRandomBytes(dataOutput);
+      ByteBuffersDataOutput clearDataOutput = new ByteBuffersDataOutput();
+      EncryptingIndexOutput indexOutput = createEncryptingIndexOutput(dataOutput);
+
+      byte[] prefix = {};
+      if (randomBoolean()) {
+        prefix = new byte[randomIntBetween(1, 1024 * 8)];
+        indexOutput.writeBytes(prefix, 0, prefix.length);
+        clearDataOutput.writeBytes(prefix);
+      }
+
+      long seed = randomLong();
+      int max = 1000;
+      List<IOConsumer<DataInput>> reply =
+        BaseDataOutputTestCase.addRandomData(indexOutput, new Xoroshiro128PlusRandom(seed), max);
+      BaseDataOutputTestCase.addRandomData(clearDataOutput, new Xoroshiro128PlusRandom(seed), max);
+      assertEquals(clearDataOutput.size(), indexOutput.getFilePointer());
+      long outputLength = indexOutput.getFilePointer();
+      indexOutput.close();
+
+      IndexInput indexInput = createDecryptingIndexInput(dataOutput, offset).slice("Test", prefix.length, outputLength - prefix.length);
+
+      indexInput.seek(0);
+      for (IOConsumer<DataInput> c : reply) {
+        c.accept(indexInput);
+      }
+
+      indexInput.seek(0);
+      for (IOConsumer<DataInput> c : reply) {
+        c.accept(indexInput);
+      }
+
+      byte[] clearData = clearDataOutput.toArrayCopy();
+      clearData = ArrayUtil.copyOfSubArray(clearData, prefix.length, clearData.length);
+
+      for (int i = 0; i < 1000; i++) {
+        int offs = randomIntBetween(0, clearData.length - 1);
+        indexInput.seek(offs);
+        assertEquals(offs, indexInput.getFilePointer());
+        assertEquals("reps=" + reps + " i=" + i + ", offs=" + offs, clearData[offs], indexInput.readByte());
+      }
+      indexInput.seek(indexInput.length());
+      assertEquals(indexInput.length(), indexInput.getFilePointer());
+      LuceneTestCase.expectThrows(EOFException.class, indexInput::readByte);
+      indexInput.close();
+    }
+  }
+
+  @Test
+  public void testClone() throws Exception {
+    for (int reps = randomIntBetween(1, 200); --reps > 0; ) {
+      ByteBuffersDataOutput dataOutput = new ByteBuffersDataOutput();
+      int offset = writeRandomBytes(dataOutput);
+      ByteBuffersDataOutput clearDataOutput = new ByteBuffersDataOutput();
+      EncryptingIndexOutput indexOutput = createEncryptingIndexOutput(dataOutput);
+
+      long seed = randomLong();
+      int max = 1000;
+      BaseDataOutputTestCase.addRandomData(indexOutput, new Xoroshiro128PlusRandom(seed), max);
+      BaseDataOutputTestCase.addRandomData(clearDataOutput, new Xoroshiro128PlusRandom(seed), max);
+      assertEquals(clearDataOutput.size(), indexOutput.getFilePointer());
+      indexOutput.close();
+
+      IndexInput indexInput = createDecryptingIndexInput(dataOutput, offset);
+      byte[] clearData = clearDataOutput.toArrayCopy();
+      byte[] readBuffer = new byte[100];
+      for (int i = 0; i < 1000; i++) {
+        int readLength = randomIntBetween(1, readBuffer.length);
+        int offs = randomIntBetween(0, clearData.length - 1 - 2 * readLength);
+        indexInput.seek(offs);
+        assertEquals("reps=" + reps + " i=" + i + ", offs=" + offs, clearData[offs], indexInput.readByte());
+        indexInput.readBytes(readBuffer, 0, readLength);
+        assertTrue(Arrays.equals(clearData, offs + 1, offs + 1 + readLength, readBuffer, 0, readLength));
+
+        IndexInput clone = indexInput.clone();
+        if (randomBoolean()) {
+          clone.readBytes(readBuffer, 0, readLength);
+          assertTrue(Arrays.equals(clearData, offs + 1 + readLength, offs + 1 + 2 * readLength, readBuffer, 0, readLength));
+        }
+        int cloneOffs = Math.max(offs - readLength + randomIntBetween(0, 2 * readLength), 0);
+        clone.seek(cloneOffs);
+        clone.readBytes(readBuffer, 0, readLength);
+        assertTrue(Arrays.equals(clearData, cloneOffs, cloneOffs + readLength, readBuffer, 0, readLength));
+        clone.close();
+      }
+      indexInput.close();
+    }
+  }
+
+  private int writeRandomBytes(DataOutput dataOutput) throws IOException {
+    int numBytes = randomIntBetween(0, 10);
+    for (int i = 0; i < numBytes; i++) {
+      dataOutput.writeByte((byte) 1);
+    }
+    return numBytes;
+  }
+
+  private EncryptingIndexOutput createEncryptingIndexOutput(ByteBuffersDataOutput dataOutput) throws IOException {
+    return new EncryptingIndexOutput(new ByteBuffersIndexOutput(dataOutput, "Test", "Test"),
+      key, encrypterFactory());
+  }
+
+  private DecryptingIndexInput createDecryptingIndexInput(ByteBuffersDataOutput dataOutput, int offset) throws IOException {
+    IndexInput indexInput = new ByteBuffersIndexInput(dataOutput.toDataInput(), "Test");
+    indexInput.seek(offset);
+    return new DecryptingIndexInput(indexInput, key, encrypterFactory());
+  }
+
+  private AesCtrEncrypterFactory encrypterFactory() {
+    return randomBoolean() ? CipherAesCtrEncrypter.FACTORY : LightAesCtrEncrypter.FACTORY;
+  }
+}
\ No newline at end of file
diff --git a/encryption/src/test/java/org/apache/solr/encryption/crypto/EncryptingIndexOutputTest.java b/encryption/src/test/java/org/apache/solr/encryption/crypto/EncryptingIndexOutputTest.java
new file mode 100644
index 0000000..2de88e4
--- /dev/null
+++ b/encryption/src/test/java/org/apache/solr/encryption/crypto/EncryptingIndexOutputTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.encryption.crypto;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.lucene.store.ByteBuffersDataInput;
+import org.apache.lucene.store.ByteBuffersDataOutput;
+import org.apache.lucene.store.ByteBuffersIndexInput;
+import org.apache.lucene.store.ByteBuffersIndexOutput;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.OutputStreamIndexOutput;
+import org.apache.lucene.tests.util.LuceneTestCase;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.carrotsearch.randomizedtesting.RandomizedTest.*;
+import static org.apache.solr.encryption.crypto.AesCtrUtil.*;
+
+/**
+ * Tests {@link EncryptingIndexOutput}.
+ */
+public class EncryptingIndexOutputTest extends BaseDataOutputTestCase<EncryptingIndexOutput> {
+
+  private byte[] key;
+  private boolean shouldSimulateWrongKey;
+
+  @Before
+  public void initializeEncryption() {
+    // AES key length can either 16, 24 or 32 bytes.
+    key = randomBytesOfLength(randomIntBetween(2, 4) * 8);
+    shouldSimulateWrongKey = false;
+  }
+
+  /**
+   * Verifies that the length of the encrypted output is the same as the original data.
+   * Verifies that all the file pointers in the encrypted output are the same as the original data pointers.
+   */
+  @Test
+  public void testEncryptionLength() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    OutputStreamIndexOutput delegateIndexOutput = new OutputStreamIndexOutput("test", "test", baos, 10);
+    byte[] key = new byte[32];
+    Arrays.fill(key, (byte) 1);
+    EncryptingIndexOutput indexOutput = new EncryptingIndexOutput(delegateIndexOutput, key, encrypterFactory()) {
+      @Override
+      protected int getBufferCapacity() {
+        // Reduce the buffer capacity to make sure we often write to the index output.
+        return AesCtrUtil.AES_BLOCK_SIZE;
+      }
+    };
+    indexOutput.writeByte((byte) 3);
+    // Check same file pointer.
+    assertEquals(1, indexOutput.getFilePointer());
+    byte[] bytes = "tomorrow morning".getBytes(StandardCharsets.UTF_16);
+    indexOutput.writeBytes(bytes, 0, bytes.length);
+    // Check same file pointer.
+    assertEquals(1 + bytes.length, indexOutput.getFilePointer());
+    indexOutput.close();
+    // Check the output size is equal to the original size + IV length.
+    assertEquals(1 + bytes.length + IV_LENGTH, baos.size());
+  }
+
+  /**
+   * Verify that with a wrong key we don't get the original data when decrypting.
+   */
+  @Test
+  public void testWrongKey() {
+    shouldSimulateWrongKey = true;
+    // Run the testRandomizedWrites which encrypts data and then calls
+    // toBytes() which decrypts with a wrong key.
+    LuceneTestCase.expectThrows(AssertionError.class, this::testRandomizedWrites);
+  }
+
+  @Override
+  protected EncryptingIndexOutput newInstance() {
+    try {
+      return new TestBufferedEncryptingIndexOutput(new ByteBuffersDataOutput(), key, encrypterFactory());
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  protected byte[] toBytes(EncryptingIndexOutput indexOutput) {
+    try {
+      indexOutput.close();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+    ByteBuffersDataInput dataInput = ((TestBufferedEncryptingIndexOutput) indexOutput).dataOutput.toDataInput();
+    IndexInput indexInput = new ByteBuffersIndexInput(dataInput, "Test");
+    byte[] key = this.key.clone();
+    if (shouldSimulateWrongKey) {
+      key[0]++;
+    }
+    try (DecryptingIndexInput decryptingIndexInput = new DecryptingIndexInput(indexInput, key, encrypterFactory())) {
+      byte[] b = new byte[(int) decryptingIndexInput.length()];
+      decryptingIndexInput.readBytes(b, 0, b.length);
+      return b;
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  private AesCtrEncrypterFactory encrypterFactory() {
+    return randomBoolean() ? CipherAesCtrEncrypter.FACTORY : LightAesCtrEncrypter.FACTORY;
+  }
+
+  /**
+   * Replaces the {@link java.security.SecureRandom} by a repeatable {@link Random} for tests.
+   * This is used to generate a repeatable random IV.
+   */
+  private static class TestBufferedEncryptingIndexOutput extends EncryptingIndexOutput {
+
+    private final ByteBuffersDataOutput dataOutput;
+
+    TestBufferedEncryptingIndexOutput(ByteBuffersDataOutput dataOutput, byte[] key, AesCtrEncrypterFactory encrypterFactory) throws IOException {
+      super(new ByteBuffersIndexOutput(dataOutput, "Test", "Test"), key, encrypterFactory);
+      this.dataOutput = dataOutput;
+    }
+
+    @Override
+    protected byte[] generateRandomIv() {
+      return randomBytesOfLength(IV_LENGTH);
+    }
+  }
+}
\ No newline at end of file
diff --git a/encryption/src/test/resources/configs/collection1/schema.xml b/encryption/src/test/resources/configs/collection1/schema.xml
new file mode 100644
index 0000000..bcfb7a9
--- /dev/null
+++ b/encryption/src/test/resources/configs/collection1/schema.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+    <fieldType name="string" class="solr.StrField"/>
+    <fieldType name="int" class="${solr.tests.IntegerFieldType}" docValues="${solr.tests.numeric.dv}" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+    <fieldType name="long" class="${solr.tests.LongFieldType}" docValues="${solr.tests.numeric.dv}" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+    <fieldType name="text" class="solr.TextField">
+        <analyzer>
+            <tokenizer class="solr.StandardTokenizerFactory"/>
+            <filter class="solr.LowerCaseFilterFactory"/>
+        </analyzer>
+    </fieldType>
+    <field name="_version_" type="long" indexed="true" stored="true"/>
+    <field name="_root_" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
+    <field name="id" type="string" indexed="true" stored="true"/>
+    <field name="text" type="text" indexed="true" stored="false"/>
+    <dynamicField name="*_s"  type="string"  indexed="true"  stored="true" />
+    <uniqueKey>id</uniqueKey>
+</schema>
diff --git a/encryption/src/test/resources/configs/collection1/solrconfig.xml b/encryption/src/test/resources/configs/collection1/solrconfig.xml
new file mode 100644
index 0000000..05ee468
--- /dev/null
+++ b/encryption/src/test/resources/configs/collection1/solrconfig.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <directoryFactory name="DirectoryFactory"
+                    class="org.apache.solr.encryption.EncryptionDirectoryFactory">
+    <str name="keyManagerSupplier">org.apache.solr.encryption.TestingKeyManager$Supplier</str>
+    <str name="encrypterFactory">org.apache.solr.encryption.crypto.LightAesCtrEncrypter$Factory</str>
+  </directoryFactory>
+
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <!-- EncryptionUpdateHandler transfers the encryption key ids from a commit to the next. -->
+  <updateHandler class="org.apache.solr.encryption.EncryptionUpdateHandler">
+    <commitWithin>
+      <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
+    </commitWithin>
+    <updateLog class="${solr.ulog:solr.UpdateLog}"></updateLog>
+  </updateHandler>
+
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <lst name="defaults">
+      <str name="echoParams">explicit</str>
+      <str name="indent">true</str>
+      <str name="df">text</str>
+    </lst>
+  </requestHandler>
+
+  <!-- Encryption handler -->
+  <requestHandler name="/admin/encrypt" class="org.apache.solr.encryption.EncryptionRequestHandler"/>
+
+  <indexConfig>
+    <mergeScheduler class="${solr.mscheduler:org.apache.lucene.index.ConcurrentMergeScheduler}"/>
+
+    <!-- Chain of MergePolicy factories:
+         - EncryptionMergePolicy detects when a force-merge is triggered with a special max
+           number of segments equal to Integer.MAX_VALUE, in this case it merges (rewrites) individually
+           each segment which is not encrypted with the latest active key id.
+         - TieredMergePolicy is the standard merge policy.
+    -->
+    <mergePolicyFactory class="org.apache.solr.encryption.EncryptionMergePolicyFactory">
+      <str name="wrapped.prefix">delegate</str>
+      <str name="delegate.class">org.apache.solr.index.TieredMergePolicyFactory</str>
+    </mergePolicyFactory>
+
+  </indexConfig>
+</config>
diff --git a/encryption/src/test/resources/log4j2.xml b/encryption/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..4590aae
--- /dev/null
+++ b/encryption/src/test/resources/log4j2.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<!-- We're configuring testing to be synchronous due to "logging polution", see SOLR-13268 -->
+<Configuration>
+  <Appenders>
+    <Console name="STDERR" target="SYSTEM_ERR">
+      <PatternLayout>
+        <Pattern>
+          %maxLen{%-4r %-5p (%t) [%X{node_name} %X{collection} %X{shard} %X{replica} %X{core} %X{trace_id}] %c{1.} %m%notEmpty{
+          =>%ex{short}}}{10240}%n
+        </Pattern>
+      </PatternLayout>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <!-- Use <AsyncLogger/<AsyncRoot and <Logger/<Root for asynchronous logging or synchonous logging respectively -->
+
+    <!--
+    <Logger name="org.apache.solr.encryption" level="DEBUG"/>
+    -->
+
+    <Root level="ERROR">
+      <AppenderRef ref="STDERR"/>
+    </Root>
+  </Loggers>
+</Configuration>
diff --git a/gradlew b/gradlew
old mode 100644
new mode 100755
diff --git a/settings.gradle b/settings.gradle
index 924bbaf..f0d9385 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -12,3 +12,4 @@
 include 'crossdc-consumer'
 include 'crossdc-producer'
 include 'crossdc-commons'
+include 'encryption'