Merge pull request #4717 from apache/nouveau-purge

Add clustered purge support in nouveau
diff --git a/Makefile b/Makefile
index 7123eae..67971c4 100644
--- a/Makefile
+++ b/Makefile
@@ -547,6 +547,7 @@
 # Build nouveau
 nouveau:
 ifeq ($(with_nouveau), 1)
+	@cd nouveau && ./gradlew spotlessApply
 	@cd nouveau && ./gradlew build -x test
 endif
 
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java
index 6eaf941..3039214 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java
@@ -18,7 +18,6 @@
 import io.swagger.v3.jaxrs2.integration.resources.OpenApiResource;
 import java.util.concurrent.ForkJoinPool;
 import org.apache.couchdb.nouveau.core.IndexManager;
-import org.apache.couchdb.nouveau.core.UpdatesOutOfOrderExceptionMapper;
 import org.apache.couchdb.nouveau.health.AnalyzeHealthCheck;
 import org.apache.couchdb.nouveau.health.IndexHealthCheck;
 import org.apache.couchdb.nouveau.lucene9.Lucene9Module;
@@ -41,8 +40,6 @@
 
     @Override
     public void run(NouveauApplicationConfiguration configuration, Environment environment) throws Exception {
-        environment.jersey().register(new UpdatesOutOfOrderExceptionMapper());
-
         // configure index manager
         final IndexManager indexManager = new IndexManager();
         indexManager.setCommitIntervalSeconds(configuration.getCommitIntervalSeconds());
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
index 6f22eff..99f6f5d 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
@@ -24,15 +24,18 @@
     @Positive
     private long seq;
 
+    private boolean purge;
+
     public DocumentDeleteRequest() {
         // Jackson deserialization
     }
 
-    public DocumentDeleteRequest(long seq) {
+    public DocumentDeleteRequest(long seq, final boolean purge) {
         if (seq < 1) {
             throw new IllegalArgumentException("seq must be 1 or greater");
         }
         this.seq = seq;
+        this.purge = purge;
     }
 
     @JsonProperty
@@ -40,8 +43,13 @@
         return seq;
     }
 
+    @JsonProperty
+    public boolean isPurge() {
+        return purge;
+    }
+
     @Override
     public String toString() {
-        return "DocumentDeleteRequest [seq=" + seq + "]";
+        return "DocumentDeleteRequest [seq=" + seq + ", purge=" + purge + "]";
     }
 }
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfo.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfo.java
index e7380a5..9958c77 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfo.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfo.java
@@ -19,42 +19,50 @@
 import jakarta.validation.constraints.PositiveOrZero;
 
 @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
-public class IndexInfo {
+public final class IndexInfo {
 
     @PositiveOrZero
-    private long updateSeq;
+    private final long updateSeq;
 
     @PositiveOrZero
-    private int numDocs;
+    private final long purgeSeq;
 
     @PositiveOrZero
-    private long diskSize;
+    private final int numDocs;
 
-    public IndexInfo() {}
+    @PositiveOrZero
+    private final long diskSize;
 
-    public IndexInfo(final long updateSeq, final int numDocs, final long diskSize) {
+    public IndexInfo(
+            @JsonProperty("update_seq") final long updateSeq,
+            @JsonProperty("purge_seq") final long purgeSeq,
+            @JsonProperty("num_docs") final int numDocs,
+            @JsonProperty("disk_size") final long diskSize) {
         this.updateSeq = updateSeq;
+        this.purgeSeq = purgeSeq;
         this.numDocs = numDocs;
         this.diskSize = diskSize;
     }
 
-    @JsonProperty
     public int getNumDocs() {
         return numDocs;
     }
 
-    @JsonProperty
     public long getDiskSize() {
         return diskSize;
     }
 
-    @JsonProperty
     public long getUpdateSeq() {
         return updateSeq;
     }
 
+    public long getPurgeSeq() {
+        return purgeSeq;
+    }
+
     @Override
     public String toString() {
-        return "IndexInfo [updateSeq=" + updateSeq + ", numDocs=" + numDocs + ", diskSize=" + diskSize + "]";
+        return "IndexInfo [updateSeq=" + updateSeq + ", purgeSeq=" + purgeSeq + ", numDocs=" + numDocs + ", diskSize="
+                + diskSize + "]";
     }
 }
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java
new file mode 100644
index 0000000..67e8060
--- /dev/null
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java
@@ -0,0 +1,45 @@
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.couchdb.nouveau.api;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import jakarta.validation.constraints.Positive;
+import java.util.OptionalLong;
+
+public final class IndexInfoRequest {
+
+    private OptionalLong updateSeq;
+
+    private OptionalLong purgeSeq;
+
+    public IndexInfoRequest(
+            @JsonProperty("update_seq") @Positive OptionalLong updateSeq,
+            @JsonProperty("purge_seq") @Positive OptionalLong purgeSeq) {
+        this.updateSeq = updateSeq;
+        this.purgeSeq = purgeSeq;
+    }
+
+    public OptionalLong getUpdateSeq() {
+        return updateSeq;
+    }
+
+    public OptionalLong getPurgeSeq() {
+        return purgeSeq;
+    }
+
+    @Override
+    public String toString() {
+        return "IndexInfoRequest [updateSeq=" + updateSeq + ", purgeSeq=" + purgeSeq + "]";
+    }
+}
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
index 5b31938..ca0cd69 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
@@ -13,6 +13,8 @@
 
 package org.apache.couchdb.nouveau.core;
 
+import jakarta.ws.rs.WebApplicationException;
+import jakarta.ws.rs.core.Response.Status;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.Semaphore;
@@ -36,13 +38,15 @@
 public abstract class Index implements Closeable {
 
     private long updateSeq;
+    private long purgeSeq;
     private boolean deleteOnClose = false;
     private long lastCommit = now();
     private volatile boolean closed;
     private final Semaphore permits = new Semaphore(Integer.MAX_VALUE);
 
-    protected Index(final long updateSeq) {
+    protected Index(final long updateSeq, final long purgeSeq) {
         this.updateSeq = updateSeq;
+        this.purgeSeq = purgeSeq;
     }
 
     public final boolean tryAcquire() {
@@ -74,7 +78,7 @@
     public final IndexInfo info() throws IOException {
         final int numDocs = doNumDocs();
         final long diskSize = doDiskSize();
-        return new IndexInfo(updateSeq, numDocs, diskSize);
+        return new IndexInfo(updateSeq, purgeSeq, numDocs, diskSize);
     }
 
     protected abstract int doNumDocs() throws IOException;
@@ -90,9 +94,15 @@
     protected abstract void doUpdate(final String docId, final DocumentUpdateRequest request) throws IOException;
 
     public final synchronized void delete(final String docId, final DocumentDeleteRequest request) throws IOException {
-        assertUpdateSeqIsLower(request.getSeq());
-        doDelete(docId, request);
-        incrementUpdateSeq(request.getSeq());
+        if (request.isPurge()) {
+            assertPurgeSeqIsLower(request.getSeq());
+            doDelete(docId, request);
+            incrementPurgeSeq(request.getSeq());
+        } else {
+            assertUpdateSeqIsLower(request.getSeq());
+            doDelete(docId, request);
+            incrementUpdateSeq(request.getSeq());
+        }
     }
 
     protected abstract void doDelete(final String docId, final DocumentDeleteRequest request) throws IOException;
@@ -105,10 +115,12 @@
 
     public final boolean commit() throws IOException {
         final long updateSeq;
+        final long purgeSeq;
         synchronized (this) {
             updateSeq = this.updateSeq;
+            purgeSeq = this.purgeSeq;
         }
-        final boolean result = doCommit(updateSeq);
+        final boolean result = doCommit(updateSeq, purgeSeq);
         if (result) {
             final long now = now();
             synchronized (this) {
@@ -118,7 +130,27 @@
         return result;
     }
 
-    protected abstract boolean doCommit(final long updateSeq) throws IOException;
+    protected abstract boolean doCommit(final long updateSeq, final long purgeSeq) throws IOException;
+
+    public final synchronized void setUpdateSeq(final long updateSeq) throws IOException {
+        if (updateSeq < this.updateSeq) {
+            throw new WebApplicationException(
+                    "update_seq must be equal or greater than current update_seq", Status.BAD_REQUEST);
+        }
+        if (updateSeq > this.updateSeq) {
+            incrementUpdateSeq(updateSeq);
+        }
+    }
+
+    public final synchronized void setPurgeSeq(final long purgeSeq) throws IOException {
+        if (purgeSeq < this.purgeSeq) {
+            throw new WebApplicationException(
+                    "purge_seq must be equal or greater than current purge_seq", Status.BAD_REQUEST);
+        }
+        if (purgeSeq > this.purgeSeq) {
+            incrementPurgeSeq(purgeSeq);
+        }
+    }
 
     @Override
     public final void close() throws IOException {
@@ -159,6 +191,19 @@
         this.updateSeq = updateSeq;
     }
 
+    protected final void assertPurgeSeqIsLower(final long purgeSeq) throws UpdatesOutOfOrderException {
+        assert Thread.holdsLock(this);
+        if (!(purgeSeq > this.purgeSeq)) {
+            throw new UpdatesOutOfOrderException(this.purgeSeq, purgeSeq);
+        }
+    }
+
+    protected final void incrementPurgeSeq(final long purgeSeq) throws IOException {
+        assert Thread.holdsLock(this);
+        assertPurgeSeqIsLower(purgeSeq);
+        this.purgeSeq = purgeSeq;
+    }
+
     public boolean needsCommit(final long duration, final TimeUnit unit) {
         final long commitNeededSince = now() - unit.toNanos(duration);
         synchronized (this) {
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java
index 14d77c3..acda8be 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java
@@ -13,12 +13,16 @@
 
 package org.apache.couchdb.nouveau.core;
 
-import java.io.IOException;
+import jakarta.ws.rs.WebApplicationException;
+import jakarta.ws.rs.core.Response.Status;
 
-public final class UpdatesOutOfOrderException extends IOException {
+public final class UpdatesOutOfOrderException extends WebApplicationException {
 
     public UpdatesOutOfOrderException(final long currentSeq, final long attemptedSeq) {
-        super(String.format(
-                "Updates applied in the wrong order (current seq: %d, attempted seq: %d)", currentSeq, attemptedSeq));
+        super(
+                String.format(
+                        "Updates applied in the wrong order (current seq: %d, attempted seq: %d)",
+                        currentSeq, attemptedSeq),
+                Status.BAD_REQUEST);
     }
 }
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderExceptionMapper.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderExceptionMapper.java
deleted file mode 100644
index ac35680..0000000
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderExceptionMapper.java
+++ /dev/null
@@ -1,31 +0,0 @@
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package org.apache.couchdb.nouveau.core;
-
-import io.dropwizard.jersey.errors.ErrorMessage;
-import jakarta.ws.rs.core.MediaType;
-import jakarta.ws.rs.core.Response;
-import jakarta.ws.rs.core.Response.Status;
-import jakarta.ws.rs.ext.ExceptionMapper;
-
-public class UpdatesOutOfOrderExceptionMapper implements ExceptionMapper<UpdatesOutOfOrderException> {
-
-    @Override
-    public Response toResponse(final UpdatesOutOfOrderException exception) {
-        return Response.status(Status.BAD_REQUEST)
-                .type(MediaType.APPLICATION_JSON_TYPE)
-                .entity(new ErrorMessage(Status.BAD_REQUEST.getStatusCode(), exception.getMessage()))
-                .build();
-    }
-}
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/lucene9/Lucene9Index.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/lucene9/Lucene9Index.java
index 69e6b12..ce7f7d7 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/lucene9/Lucene9Index.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/lucene9/Lucene9Index.java
@@ -25,7 +25,6 @@
 import java.nio.file.NoSuchFileException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -104,8 +103,9 @@
             final Analyzer analyzer,
             final IndexWriter writer,
             final long updateSeq,
+            final long purgeSeq,
             final SearcherManager searcherManager) {
-        super(updateSeq);
+        super(updateSeq, purgeSeq);
         this.analyzer = Objects.requireNonNull(analyzer);
         this.writer = Objects.requireNonNull(writer);
         this.searcherManager = Objects.requireNonNull(searcherManager);
@@ -144,12 +144,12 @@
     }
 
     @Override
-    public boolean doCommit(final long updateSeq) throws IOException {
+    public boolean doCommit(final long updateSeq, final long purgeSeq) throws IOException {
         if (!writer.hasUncommittedChanges()) {
             return false;
         }
-        writer.setLiveCommitData(
-                Collections.singletonMap("update_seq", Long.toString(updateSeq)).entrySet());
+        writer.setLiveCommitData(Map.of("update_seq", Long.toString(updateSeq), "purge_seq", Long.toString(purgeSeq))
+                .entrySet());
         writer.commit();
         return true;
     }
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
index 0776df5..ce99830 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
@@ -35,6 +35,7 @@
 import org.apache.couchdb.nouveau.api.DocumentUpdateRequest;
 import org.apache.couchdb.nouveau.api.IndexDefinition;
 import org.apache.couchdb.nouveau.api.IndexInfo;
+import org.apache.couchdb.nouveau.api.IndexInfoRequest;
 import org.apache.couchdb.nouveau.api.SearchRequest;
 import org.apache.couchdb.nouveau.api.SearchResults;
 import org.apache.couchdb.nouveau.core.IndexLoader;
@@ -91,13 +92,27 @@
     }
 
     @GET
-    public IndexInfo indexInfo(@PathParam("name") String name) throws Exception {
+    public IndexInfo getIndexInfo(@PathParam("name") String name) throws Exception {
         return indexManager.with(name, indexLoader(), (index) -> {
             return index.info();
         });
     }
 
     @POST
+    public void setIndexInfo(@PathParam("name") String name, @NotNull @Valid IndexInfoRequest request)
+            throws Exception {
+        indexManager.with(name, indexLoader(), (index) -> {
+            if (request.getUpdateSeq().isPresent()) {
+                index.setUpdateSeq(request.getUpdateSeq().getAsLong());
+            }
+            if (request.getPurgeSeq().isPresent()) {
+                index.setPurgeSeq(request.getPurgeSeq().getAsLong());
+            }
+            return null;
+        });
+    }
+
+    @POST
     @Path("/search")
     public SearchResults searchIndex(@PathParam("name") String name, @NotNull @Valid SearchRequest request)
             throws Exception {
@@ -126,19 +141,20 @@
             final IndexWriterConfig config = new IndexWriterConfig(analyzer);
             config.setUseCompoundFile(false);
             final IndexWriter writer = new IndexWriter(dir, config);
-            final long updateSeq = getUpdateSeq(writer);
+            final long updateSeq = getSeq(writer, "update_seq");
+            final long purgeSeq = getSeq(writer, "purge_seq");
             final SearcherManager searcherManager = new SearcherManager(writer, searcherFactory);
-            return new Lucene9Index(analyzer, writer, updateSeq, searcherManager);
+            return new Lucene9Index(analyzer, writer, updateSeq, purgeSeq, searcherManager);
         };
     }
 
-    private static long getUpdateSeq(final IndexWriter writer) throws IOException {
+    private static long getSeq(final IndexWriter writer, final String key) throws IOException {
         final Iterable<Map.Entry<String, String>> commitData = writer.getLiveCommitData();
         if (commitData == null) {
             return 0L;
         }
         for (Map.Entry<String, String> entry : commitData) {
-            if (entry.getKey().equals("update_seq")) {
+            if (entry.getKey().equals(key)) {
                 return Long.parseLong(entry.getValue());
             }
         }
diff --git a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java
index 79151a2..5d98980 100644
--- a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java
+++ b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java
@@ -199,11 +199,34 @@
             IndexInfo info = index.info();
             assertThat(info.getNumDocs()).isEqualTo(1);
 
-            index.delete("foo", new DocumentDeleteRequest(3));
+            index.delete("foo", new DocumentDeleteRequest(3, false));
             index.commit();
 
             info = index.info();
             assertThat(info.getNumDocs()).isEqualTo(0);
+            assertThat(info.getUpdateSeq()).isEqualTo(3);
+        } finally {
+            cleanup(index);
+        }
+    }
+
+    @Test
+    public void testPurge(@TempDir Path path) throws IOException {
+        Index index = setup(path);
+        try {
+            final Collection<Field> fields = List.of(new DoubleField("bar", 12.0, false));
+            index.update("foo", new DocumentUpdateRequest(2, null, fields));
+            index.commit();
+
+            IndexInfo info = index.info();
+            assertThat(info.getNumDocs()).isEqualTo(1);
+
+            index.delete("foo", new DocumentDeleteRequest(3, true));
+            index.commit();
+
+            info = index.info();
+            assertThat(info.getNumDocs()).isEqualTo(0);
+            assertThat(info.getPurgeSeq()).isEqualTo(3);
         } finally {
             cleanup(index);
         }
@@ -217,7 +240,7 @@
             config.setUseCompoundFile(false);
             final IndexWriter writer = new IndexWriter(dir, config);
             final SearcherManager searcherManager = new SearcherManager(writer, null);
-            return new Lucene9Index(analyzer, writer, 0L, searcherManager);
+            return new Lucene9Index(analyzer, writer, 0L, 0L, searcherManager);
         };
     }
 }
diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl
index 7744eb1..39cc2b3 100644
--- a/src/nouveau/src/nouveau_api.erl
+++ b/src/nouveau/src/nouveau_api.erl
@@ -24,8 +24,11 @@
     delete_path/1,
     delete_path/2,
     delete_doc/3,
+    purge_doc/3,
     update_doc/5,
-    search/2
+    search/2,
+    set_purge_seq/2,
+    set_update_seq/2
 ]).
 
 -define(JSON_CONTENT_TYPE, {"Content-Type", "application/json"}).
@@ -97,7 +100,17 @@
 delete_doc(#index{} = Index, DocId, UpdateSeq) when
     is_binary(DocId), is_integer(UpdateSeq)
 ->
-    ReqBody = {[{<<"seq">>, UpdateSeq}]},
+    delete_doc(Index, DocId, UpdateSeq, false).
+
+purge_doc(#index{} = Index, DocId, PurgeSeq) when
+    is_binary(DocId), is_integer(PurgeSeq)
+->
+    delete_doc(Index, DocId, PurgeSeq, true).
+
+delete_doc(#index{} = Index, DocId, Seq, IsPurge) when
+    is_binary(DocId), is_integer(Seq), is_boolean(IsPurge)
+->
+    ReqBody = #{seq => Seq, purge => IsPurge},
     Resp = send_if_enabled(
         doc_url(Index, DocId), [?JSON_CONTENT_TYPE], delete, jiffy:encode(ReqBody)
     ),
@@ -146,6 +159,27 @@
             send_error(Reason)
     end.
 
+set_update_seq(#index{} = Index, UpdateSeq) ->
+    set_seq(Index, update_seq, UpdateSeq).
+set_purge_seq(#index{} = Index, PurgeSeq) ->
+    set_seq(Index, purge_seq, PurgeSeq).
+
+set_seq(#index{} = Index, Key, Value) when is_atom(Key), is_integer(Value) ->
+    ReqBody = #{
+        Key => Value
+    },
+    Resp = send_if_enabled(
+        index_url(Index), [?JSON_CONTENT_TYPE], post, jiffy:encode(ReqBody)
+    ),
+    case Resp of
+        {ok, "204", _, _} ->
+            ok;
+        {ok, StatusCode, _, RespBody} ->
+            {error, jaxrs_error(StatusCode, RespBody)};
+        {error, Reason} ->
+            send_error(Reason)
+    end.
+
 %% private functions
 
 index_path(Path) ->
diff --git a/src/nouveau/src/nouveau_epi.erl b/src/nouveau/src/nouveau_epi.erl
index f42e179..9f34ae5 100644
--- a/src/nouveau/src/nouveau_epi.erl
+++ b/src/nouveau/src/nouveau_epi.erl
@@ -31,7 +31,10 @@
     nouveau.
 
 providers() ->
-    [{chttpd_handlers, nouveau_httpd_handlers}].
+    [
+        {couch_db, nouveau_plugin_couch_db},
+        {chttpd_handlers, nouveau_httpd_handlers}
+    ].
 
 services() ->
     [].
diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl
index a78b1ff..33134bc 100644
--- a/src/nouveau/src/nouveau_index_updater.erl
+++ b/src/nouveau/src/nouveau_index_updater.erl
@@ -28,9 +28,10 @@
 
 outdated(#index{} = Index) ->
     case open_or_create_index(Index) of
-        {ok, IndexSeq} ->
-            DbSeq = get_db_seq(Index),
-            DbSeq > IndexSeq;
+        {ok, #{} = Info} ->
+            #{<<"update_seq">> := IndexUpdateSeq, <<"purge_seq">> := IndexPurgeSeq} = Info,
+            {DbUpdateSeq, DbPurgeSeq} = get_db_info(Index),
+            DbUpdateSeq > IndexUpdateSeq orelse DbPurgeSeq > IndexPurgeSeq;
         {error, Reason} ->
             {error, Reason}
     end.
@@ -38,11 +39,14 @@
 update(#index{} = Index) ->
     {ok, Db} = couch_db:open_int(Index#index.dbname, []),
     try
-        case open_or_create_index(Index) of
+        case open_or_create_index(Db, Index) of
             {error, Reason} ->
                 exit({error, Reason});
-            {ok, CurSeq} ->
-                TotalChanges = couch_db:count_changes_since(Db, CurSeq),
+            {ok, #{} = Info} ->
+                #{<<"update_seq">> := IndexUpdateSeq, <<"purge_seq">> := IndexPurgeSeq} = Info,
+                ChangesSince = couch_db:count_changes_since(Db, IndexUpdateSeq),
+                PurgesSince = couch_db:get_purge_seq(Db) - IndexPurgeSeq,
+                TotalChanges = ChangesSince + PurgesSince,
                 couch_task_status:add_task([
                     {type, search_indexer},
                     {database, Index#index.dbname},
@@ -56,11 +60,15 @@
                 %% update status every half second
                 couch_task_status:set_update_frequency(500),
 
+                {ok, ExcludeIdRevs} = purge_index(Db, Index, IndexPurgeSeq),
+
+                NewCurSeq = couch_db:get_update_seq(Db),
                 Proc = get_os_process(Index#index.def_lang),
                 try
                     true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]),
-                    Acc0 = {Db, Index, Proc, 0, TotalChanges},
-                    {ok, _} = couch_db:fold_changes(Db, CurSeq, fun load_docs/2, Acc0, [])
+                    Acc0 = {Db, Index, Proc, 0, TotalChanges, ExcludeIdRevs},
+                    {ok, _} = couch_db:fold_changes(Db, IndexUpdateSeq, fun load_docs/2, Acc0, []),
+                    ok = nouveau_api:set_update_seq(Index, NewCurSeq)
                 after
                     ret_os_process(Proc)
                 end
@@ -71,14 +79,20 @@
 
 load_docs(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) ->
     {ok, Acc};
-load_docs(FDI, {Db, Index, Proc, ChangesDone, TotalChanges}) ->
+load_docs(FDI, {Db, Index, Proc, ChangesDone, TotalChanges, ExcludeIdRevs}) ->
     couch_task_status:update([
         {changes_done, ChangesDone}, {progress, (ChangesDone * 100) div TotalChanges}
     ]),
-
     DI = couch_doc:to_doc_info(FDI),
-    #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI,
+    #doc_info{id = Id, revs = [#rev_info{rev = Rev} | _]} = DI,
+    case lists:member({Id, Rev}, ExcludeIdRevs) of
+        true -> ok;
+        false -> update_or_delete_index(Db, Index, DI, Proc)
+    end,
+    {ok, {Db, Index, Proc, ChangesDone + 1, TotalChanges, ExcludeIdRevs}}.
 
+update_or_delete_index(Db, #index{} = Index, #doc_info{} = DI, Proc) ->
+    #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI,
     case Del of
         true ->
             ok = nouveau_api:delete_doc(Index, Id, Seq);
@@ -104,17 +118,16 @@
                             exit({error, Reason})
                     end
             end
-    end,
-    {ok, {Db, Index, Proc, ChangesDone + 1, TotalChanges}}.
+    end.
 
 open_or_create_index(#index{} = Index) ->
-    case get_index_seq(Index) of
-        {ok, UpdateSeq} ->
-            {ok, UpdateSeq};
+    case nouveau_api:index_info(Index) of
+        {ok, #{} = Info} ->
+            {ok, Info};
         {error, {not_found, _}} ->
             case nouveau_api:create_index(Index, index_definition(Index)) of
                 ok ->
-                    {ok, 0};
+                    nouveau_api:index_info(Index);
                 {error, Reason} ->
                     {error, Reason}
             end;
@@ -122,20 +135,23 @@
             {error, Reason}
     end.
 
-get_db_seq(#index{} = Index) ->
-    {ok, Db} = couch_db:open_int(Index#index.dbname, []),
-    try
-        couch_db:get_update_seq(Db)
-    after
-        couch_db:close(Db)
+open_or_create_index(Db, #index{} = Index) ->
+    case open_or_create_index(Index) of
+        {ok, #{} = Info} ->
+            nouveau_util:maybe_create_local_purge_doc(Db, Index),
+            {ok, Info};
+        Else ->
+            Else
     end.
 
-get_index_seq(#index{} = Index) ->
-    case nouveau_api:index_info(Index) of
-        {ok, #{<<"update_seq">> := Seq}} ->
-            {ok, Seq};
-        {error, Reason} ->
-            {error, Reason}
+get_db_info(#index{} = Index) ->
+    {ok, Db} = couch_db:open_int(Index#index.dbname, []),
+    try
+        UpdateSeq = couch_db:get_update_seq(Db),
+        PurgeSeq = couch_db:get_purge_seq(Db),
+        {UpdateSeq, PurgeSeq}
+    after
+        couch_db:close(Db)
     end.
 
 index_definition(#index{} = Index) ->
@@ -143,3 +159,55 @@
         <<"default_analyzer">> => Index#index.default_analyzer,
         <<"field_analyzers">> => Index#index.field_analyzers
     }.
+
+purge_index(Db, Index, IndexPurgeSeq) ->
+    Proc = get_os_process(Index#index.def_lang),
+    try
+        true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]),
+        FoldFun = fun({PurgeSeq, _UUID, Id, _Revs}, {Acc, _}) ->
+            Acc0 =
+                case couch_db:get_full_doc_info(Db, Id) of
+                    not_found ->
+                        ok = nouveau_api:purge_doc(Index, Id, PurgeSeq),
+                        Acc;
+                    FDI ->
+                        DI = couch_doc:to_doc_info(FDI),
+                        #doc_info{id = Id, revs = [#rev_info{rev = Rev} | _]} = DI,
+                        case lists:member({Id, Rev}, Acc) of
+                            true ->
+                                Acc;
+                            false ->
+                                update_or_delete_index(Db, Index, DI, Proc),
+                                [{Id, Rev} | Acc]
+                        end
+                end,
+            update_task(1),
+            {ok, {Acc0, PurgeSeq}}
+        end,
+
+        {ok, {ExcludeList, NewPurgeSeq}} = couch_db:fold_purge_infos(
+            Db, IndexPurgeSeq, FoldFun, {[], 0}, []
+        ),
+        nouveau_api:set_purge_seq(Index, NewPurgeSeq),
+        update_local_doc(Db, Index, NewPurgeSeq),
+        {ok, ExcludeList}
+    after
+        ret_os_process(Proc)
+    end.
+
+update_task(NumChanges) ->
+    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
+    Changes2 = Changes + NumChanges,
+    Progress =
+        case Total of
+            0 ->
+                0;
+            _ ->
+                (Changes2 * 100) div Total
+        end,
+    couch_task_status:update([{progress, Progress}, {changes_done, Changes2}]).
+
+update_local_doc(Db, #index{} = Index, PurgeSeq) ->
+    DocId = nouveau_util:get_local_purge_doc_id(Index#index.sig),
+    DocContent = nouveau_util:get_local_purge_doc_body(DocId, PurgeSeq, Index),
+    couch_db:update_doc(Db, DocContent, []).
diff --git a/src/nouveau/src/nouveau_plugin_couch_db.erl b/src/nouveau/src/nouveau_plugin_couch_db.erl
new file mode 100644
index 0000000..dcd3ae1
--- /dev/null
+++ b/src/nouveau/src/nouveau_plugin_couch_db.erl
@@ -0,0 +1,24 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(nouveau_plugin_couch_db).
+
+-export([
+    is_valid_purge_client/2,
+    on_compact/2
+]).
+
+is_valid_purge_client(DbName, Props) ->
+    nouveau_util:verify_index_exists(DbName, Props).
+
+on_compact(DbName, DDocs) ->
+    nouveau_util:ensure_local_purge_docs(DbName, DDocs).
diff --git a/src/nouveau/src/nouveau_util.erl b/src/nouveau/src/nouveau_util.erl
index b7da7e8..86fc3a4 100644
--- a/src/nouveau/src/nouveau_util.erl
+++ b/src/nouveau/src/nouveau_util.erl
@@ -22,6 +22,11 @@
     index_name/1,
     design_doc_to_indexes/2,
     design_doc_to_index/3,
+    verify_index_exists/2,
+    ensure_local_purge_docs/2,
+    maybe_create_local_purge_doc/2,
+    get_local_purge_doc_id/1,
+    get_local_purge_doc_body/3,
     nouveau_url/0
 ]).
 
@@ -93,5 +98,101 @@
             {error, InvalidDDocError}
     end.
 
+verify_index_exists(DbName, Props) ->
+    try
+        Type = couch_util:get_value(<<"type">>, Props),
+        if
+            Type =/= <<"nouveau">> ->
+                false;
+            true ->
+                DDocId = couch_util:get_value(<<"ddoc_id">>, Props),
+                IndexName = couch_util:get_value(<<"indexname">>, Props),
+                Sig = couch_util:get_value(<<"signature">>, Props),
+                couch_util:with_db(DbName, fun(Db) ->
+                    case couch_db:get_design_doc(Db, DDocId) of
+                        {ok, #doc{} = DDoc} ->
+                            {ok, IdxState} = design_doc_to_index(
+                                DbName, DDoc, IndexName
+                            ),
+                            IdxState#index.sig == Sig;
+                        {not_found, _} ->
+                            false
+                    end
+                end)
+        end
+    catch
+        _:_ ->
+            false
+    end.
+
+ensure_local_purge_docs(DbName, DDocs) ->
+    couch_util:with_db(DbName, fun(Db) ->
+        lists:foreach(
+            fun(DDoc) ->
+                #doc{body = {Props}} = DDoc,
+                case couch_util:get_value(<<"indexes">>, Props) of
+                    undefined ->
+                        false;
+                    _ ->
+                        try design_doc_to_indexes(DbName, DDoc) of
+                            SIndexes -> ensure_local_purge_doc(Db, SIndexes)
+                        catch
+                            _:_ ->
+                                ok
+                        end
+                end
+            end,
+            DDocs
+        )
+    end).
+
+ensure_local_purge_doc(Db, SIndexes) ->
+    if
+        SIndexes =/= [] ->
+            lists:map(
+                fun(SIndex) ->
+                    maybe_create_local_purge_doc(Db, SIndex)
+                end,
+                SIndexes
+            );
+        true ->
+            ok
+    end.
+
+maybe_create_local_purge_doc(Db, Index) ->
+    DocId = get_local_purge_doc_id(Index#index.sig),
+    case couch_db:open_doc(Db, DocId) of
+        {ok, _Doc} ->
+            ok;
+        {not_found, _} ->
+            DbPurgeSeq = couch_db:get_purge_seq(Db),
+            DocContent = get_local_purge_doc_body(
+                DocId, DbPurgeSeq, Index
+            ),
+            couch_db:update_doc(Db, DocContent, [])
+    end.
+
+get_local_purge_doc_id(Sig) ->
+    iolist_to_binary([?LOCAL_DOC_PREFIX, "purge-", "nouveau-", Sig]).
+
+get_local_purge_doc_body(LocalDocId, PurgeSeq, Index) ->
+    #index{
+        name = IdxName,
+        ddoc_id = DDocId,
+        sig = Sig
+    } = Index,
+    NowSecs = os:system_time(second),
+    JsonList =
+        {[
+            {<<"_id">>, LocalDocId},
+            {<<"purge_seq">>, PurgeSeq},
+            {<<"updated_on">>, NowSecs},
+            {<<"indexname">>, IdxName},
+            {<<"ddoc_id">>, DDocId},
+            {<<"signature">>, Sig},
+            {<<"type">>, <<"nouveau">>}
+        ]},
+    couch_doc:from_json_obj(JsonList).
+
 nouveau_url() ->
     config:get("nouveau", "url", "http://127.0.0.1:8080").
diff --git a/test/elixir/test/config/nouveau.elixir b/test/elixir/test/config/nouveau.elixir
index 733aa9d..53f28a5 100644
--- a/test/elixir/test/config/nouveau.elixir
+++ b/test/elixir/test/config/nouveau.elixir
@@ -21,6 +21,9 @@
     "mango sort by string",
     "search GET (partitioned)",
     "search POST (partitioned)",
-    "mango (partitioned)"
+    "mango (partitioned)",
+    "delete",
+    "purge",
+    "purge with conflicts"
   ]
 }
diff --git a/test/elixir/test/nouveau_test.exs b/test/elixir/test/nouveau_test.exs
index 28cc273..8e3a1bd 100644
--- a/test/elixir/test/nouveau_test.exs
+++ b/test/elixir/test/nouveau_test.exs
@@ -18,6 +18,48 @@
       ]}
     )
     assert resp.status_code in [201]
+    resp
+  end
+
+  def create_conflicted_search_docs(db_name) do
+    resp = Couch.post("/#{db_name}/_bulk_docs",
+      headers: ["Content-Type": "application/json"],
+      body: %{:docs => [
+                # doc4: conflict between 1-a and 1-b, 1-b wins, 1-b will be purged
+                %{"_id" => "doc4", "foo" => "foo", "bar" => 42, "baz" => "hello there",
+                  "_revisions" => %{:start => 1, :ids => ["a"]}
+                },
+                %{"_id" => "doc4", "foo" => "fooX", "bar" => 43, "baz" => "hello thereX",
+                  "_revisions" => %{:start => 1, :ids => ["b"]}
+                },
+
+                # doc3: conflict between 1-a deleted and 2-c, 1-a is deleted,
+                %{"_id" => "doc3", "foo" => "bar", "bar" => 12.0, "baz" => "hello",
+                  "_revisions" => %{:start => 1, :ids => ["a"]},
+                  "_deleted" => true
+                },
+                %{"_id" => "doc3", "foo" => "barX", "bar" => 13.0, "baz" => "helloX",
+                  "_revisions" => %{:start => 2, :ids => ["c", "b"]}
+                },
+
+                # doc1: conflict between 1-a and 2-c, 2-c is deleted
+                %{"_id" => "doc1", "foo" => "baz", "bar" => 0, "baz" => "there",
+                  "_revisions" => %{:start => 1, :ids => ["a"]}
+                },
+                %{"_id" => "doc1", "foo" => "bazX", "bar" => 1, "baz" => "thereX",
+                  "_revisions" => %{:start => 2, :ids => ["c", "b"]},
+                  "_deleted" => true
+                },
+
+                # doc2: 2-b is deleted
+                %{"_id" => "doc2", "foo" => "foobar", "bar" => 100, "baz" => "hi",
+                  "_revisions" => %{:start => 2, :ids => ["b", "a"]},
+                  "_deleted" => true
+                }
+      ], :new_edits => false}
+    )
+    assert resp.status_code in [201]
+    resp
   end
 
   def create_partitioned_search_docs(db_name) do
@@ -87,6 +129,11 @@
     bookmark
   end
 
+  def get_total_hits(resp) do
+    %{:body => %{"total_hits" => total_hits}} = resp
+    total_hits
+  end
+
   def assert_status_code(resp, code) do
     assert resp.status_code == code,
       "status code: #{resp.status_code}, resp body: #{:jiffy.encode(resp.body)}"
@@ -420,4 +467,133 @@
     assert ids == ["bar:doc3"]
   end
 
+  @tag :with_db
+  test "delete", context do
+    db_name = context[:db_name]
+    create_resp = create_search_docs(db_name)
+    create_ddoc(db_name)
+
+    search_url = "/#{db_name}/_design/foo/_nouveau/bar"
+
+    # confirm all hits
+    resp = Couch.get(search_url, query: %{q: "*:*", include_docs: true})
+    assert_status_code(resp, 200)
+    assert get_total_hits(resp) == 4
+
+    # delete a doc
+    doc = hd(create_resp.body)
+    resp = Couch.delete("/#{db_name}/#{doc["id"]}?rev=#{doc["rev"]}")
+    assert_status_code(resp, 200)
+
+    # confirm it is gone
+    resp = Couch.get(search_url, query: %{q: "*:*", include_docs: true})
+    assert_status_code(resp, 200)
+    assert get_total_hits(resp) == 3
+
+    resp = Couch.get("/#{db_name}/_design/foo/_nouveau_info/bar")
+    assert_status_code(resp, 200)
+    assert resp.body["search_index"]["update_seq"] == 6
+    assert resp.body["search_index"]["purge_seq"] == 0
+  end
+
+  @tag :with_db
+  test "purge", context do
+    db_name = context[:db_name]
+    create_resp = create_search_docs(db_name)
+    create_ddoc(db_name)
+
+    search_url = "/#{db_name}/_design/foo/_nouveau/bar"
+
+    # confirm all hits
+    resp = Couch.get(search_url, query: %{q: "*:*", include_docs: true})
+    assert_status_code(resp, 200)
+    assert get_total_hits(resp) == 4
+
+    # purge a doc
+    doc = hd(create_resp.body)
+    resp =
+      Couch.post("/#{db_name}/_purge",
+        body: %{doc["id"] => [doc["rev"]]}
+      )
+    assert_status_code(resp, 201)
+
+    # confirm it is gone
+    resp = Couch.get(search_url, query: %{q: "*:*", include_docs: true})
+    assert_status_code(resp, 200)
+    assert get_total_hits(resp) == 3
+
+    resp = Couch.get("/#{db_name}/_design/foo/_nouveau_info/bar")
+    assert_status_code(resp, 200)
+    assert resp.body["search_index"]["update_seq"] == 6
+    assert resp.body["search_index"]["purge_seq"] == 1
+  end
+
+  @tag :with_db
+  test "purge with conflicts", context do
+    db_name = context[:db_name]
+    create_resp = create_conflicted_search_docs(db_name)
+    create_ddoc(db_name)
+
+    search_url = "/#{db_name}/_design/foo/_nouveau/bar"
+
+    # confirm all hits
+    resp = Couch.get(search_url, query: %{q: "*:*", include_docs: true})
+    assert_status_code(resp, 200)
+
+    assert get_total_hits(resp) == 3
+    [hit1, hit2, hit3] = Enum.sort(resp.body["hits"])
+
+    assert hit1["doc"]["_id"] == "doc1"
+    assert hit1["doc"]["_rev"] == "1-a"
+    assert hit1["fields"] == %{"bar" => 0.0, "foo" => "baz"}
+
+    assert hit2["doc"]["_id"] == "doc3"
+    assert hit2["doc"]["_rev"] == "2-c"
+    assert hit2["fields"] == %{"bar" => 13.0, "foo" => "barX"}
+
+    assert hit3["doc"]["_id"] == "doc4"
+    assert hit3["doc"]["_rev"] == "1-b"
+    assert hit3["fields"] == %{"bar" => 43.0, "foo" => "fooX"}
+
+    # purge docs
+    purge_body = %{
+        "doc1" => ["2-c", "3-nonexistentrev"],
+        "doc2" => ["2-b"],
+        "doc3" => ["2-c"],
+        "doc4" => ["1-b"],
+    }
+    resp = Couch.post("/#{db_name}/_purge", body: purge_body)
+    assert_status_code(resp, 201)
+
+    resp = Couch.get(search_url, query: %{q: "*:*", include_docs: true})
+    assert_status_code(resp, 200)
+    hits = Enum.sort(resp.body["hits"])
+
+    assert get_total_hits(resp) == 2
+    [hit1, hit2] = Enum.sort(resp.body["hits"])
+
+    # doc1: 2-c deleted was purged, 1-a is still the winner
+    assert hit1["doc"]["_id"] == "doc1"
+    assert hit1["doc"]["_rev"] == "1-a"
+    assert hit1["fields"] ==  %{"bar" => 0.0, "foo" => "baz"}
+
+    # doc2: doc was deleted and now it's completely purged
+
+    # doc3: live revision is deleted, we're left with the deleted rev only
+
+    # doc4: 2-c was purged, 1-a is the new winner
+    assert hit2["doc"]["_id"] == "doc4"
+    assert hit2["doc"]["_rev"] == "1-a"
+    assert hit2["fields"] == %{"bar" => 42.0, "foo" => "foo"}
+
+    resp = Couch.get("/#{db_name}")
+    db_purge_seq = resp.body["purge_seq"]
+    # Double-check db purge sequence (sum of purge seqeunces on shards) is 4
+    assert String.starts_with?(db_purge_seq, "4-")
+
+    resp = Couch.get("/#{db_name}/_design/foo/_nouveau_info/bar")
+    assert_status_code(resp, 200)
+    assert resp.body["search_index"]["update_seq"] == 8
+    assert resp.body["search_index"]["purge_seq"] == 4
+  end
 end