Merge pull request #4717 from apache/nouveau-purge
Add clustered purge support in nouveau
diff --git a/Makefile b/Makefile
index 7123eae..67971c4 100644
--- a/Makefile
+++ b/Makefile
@@ -547,6 +547,7 @@
# Build nouveau
nouveau:
ifeq ($(with_nouveau), 1)
+ @cd nouveau && ./gradlew spotlessApply
@cd nouveau && ./gradlew build -x test
endif
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java
index 6eaf941..3039214 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java
@@ -18,7 +18,6 @@
import io.swagger.v3.jaxrs2.integration.resources.OpenApiResource;
import java.util.concurrent.ForkJoinPool;
import org.apache.couchdb.nouveau.core.IndexManager;
-import org.apache.couchdb.nouveau.core.UpdatesOutOfOrderExceptionMapper;
import org.apache.couchdb.nouveau.health.AnalyzeHealthCheck;
import org.apache.couchdb.nouveau.health.IndexHealthCheck;
import org.apache.couchdb.nouveau.lucene9.Lucene9Module;
@@ -41,8 +40,6 @@
@Override
public void run(NouveauApplicationConfiguration configuration, Environment environment) throws Exception {
- environment.jersey().register(new UpdatesOutOfOrderExceptionMapper());
-
// configure index manager
final IndexManager indexManager = new IndexManager();
indexManager.setCommitIntervalSeconds(configuration.getCommitIntervalSeconds());
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
index 6f22eff..99f6f5d 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java
@@ -24,15 +24,18 @@
@Positive
private long seq;
+ private boolean purge;
+
public DocumentDeleteRequest() {
// Jackson deserialization
}
- public DocumentDeleteRequest(long seq) {
+ public DocumentDeleteRequest(long seq, final boolean purge) {
if (seq < 1) {
throw new IllegalArgumentException("seq must be 1 or greater");
}
this.seq = seq;
+ this.purge = purge;
}
@JsonProperty
@@ -40,8 +43,13 @@
return seq;
}
+ @JsonProperty
+ public boolean isPurge() {
+ return purge;
+ }
+
@Override
public String toString() {
- return "DocumentDeleteRequest [seq=" + seq + "]";
+ return "DocumentDeleteRequest [seq=" + seq + ", purge=" + purge + "]";
}
}
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfo.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfo.java
index e7380a5..9958c77 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfo.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfo.java
@@ -19,42 +19,50 @@
import jakarta.validation.constraints.PositiveOrZero;
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
-public class IndexInfo {
+public final class IndexInfo {
@PositiveOrZero
- private long updateSeq;
+ private final long updateSeq;
@PositiveOrZero
- private int numDocs;
+ private final long purgeSeq;
@PositiveOrZero
- private long diskSize;
+ private final int numDocs;
- public IndexInfo() {}
+ @PositiveOrZero
+ private final long diskSize;
- public IndexInfo(final long updateSeq, final int numDocs, final long diskSize) {
+ public IndexInfo(
+ @JsonProperty("update_seq") final long updateSeq,
+ @JsonProperty("purge_seq") final long purgeSeq,
+ @JsonProperty("num_docs") final int numDocs,
+ @JsonProperty("disk_size") final long diskSize) {
this.updateSeq = updateSeq;
+ this.purgeSeq = purgeSeq;
this.numDocs = numDocs;
this.diskSize = diskSize;
}
- @JsonProperty
public int getNumDocs() {
return numDocs;
}
- @JsonProperty
public long getDiskSize() {
return diskSize;
}
- @JsonProperty
public long getUpdateSeq() {
return updateSeq;
}
+ public long getPurgeSeq() {
+ return purgeSeq;
+ }
+
@Override
public String toString() {
- return "IndexInfo [updateSeq=" + updateSeq + ", numDocs=" + numDocs + ", diskSize=" + diskSize + "]";
+ return "IndexInfo [updateSeq=" + updateSeq + ", purgeSeq=" + purgeSeq + ", numDocs=" + numDocs + ", diskSize="
+ + diskSize + "]";
}
}
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java
new file mode 100644
index 0000000..67e8060
--- /dev/null
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java
@@ -0,0 +1,45 @@
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.couchdb.nouveau.api;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import jakarta.validation.constraints.Positive;
+import java.util.OptionalLong;
+
+public final class IndexInfoRequest {
+
+ private OptionalLong updateSeq;
+
+ private OptionalLong purgeSeq;
+
+ public IndexInfoRequest(
+ @JsonProperty("update_seq") @Positive OptionalLong updateSeq,
+ @JsonProperty("purge_seq") @Positive OptionalLong purgeSeq) {
+ this.updateSeq = updateSeq;
+ this.purgeSeq = purgeSeq;
+ }
+
+ public OptionalLong getUpdateSeq() {
+ return updateSeq;
+ }
+
+ public OptionalLong getPurgeSeq() {
+ return purgeSeq;
+ }
+
+ @Override
+ public String toString() {
+ return "IndexInfoRequest [updateSeq=" + updateSeq + ", purgeSeq=" + purgeSeq + "]";
+ }
+}
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
index 5b31938..ca0cd69 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
@@ -13,6 +13,8 @@
package org.apache.couchdb.nouveau.core;
+import jakarta.ws.rs.WebApplicationException;
+import jakarta.ws.rs.core.Response.Status;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Semaphore;
@@ -36,13 +38,15 @@
public abstract class Index implements Closeable {
private long updateSeq;
+ private long purgeSeq;
private boolean deleteOnClose = false;
private long lastCommit = now();
private volatile boolean closed;
private final Semaphore permits = new Semaphore(Integer.MAX_VALUE);
- protected Index(final long updateSeq) {
+ protected Index(final long updateSeq, final long purgeSeq) {
this.updateSeq = updateSeq;
+ this.purgeSeq = purgeSeq;
}
public final boolean tryAcquire() {
@@ -74,7 +78,7 @@
public final IndexInfo info() throws IOException {
final int numDocs = doNumDocs();
final long diskSize = doDiskSize();
- return new IndexInfo(updateSeq, numDocs, diskSize);
+ return new IndexInfo(updateSeq, purgeSeq, numDocs, diskSize);
}
protected abstract int doNumDocs() throws IOException;
@@ -90,9 +94,15 @@
protected abstract void doUpdate(final String docId, final DocumentUpdateRequest request) throws IOException;
public final synchronized void delete(final String docId, final DocumentDeleteRequest request) throws IOException {
- assertUpdateSeqIsLower(request.getSeq());
- doDelete(docId, request);
- incrementUpdateSeq(request.getSeq());
+ if (request.isPurge()) {
+ assertPurgeSeqIsLower(request.getSeq());
+ doDelete(docId, request);
+ incrementPurgeSeq(request.getSeq());
+ } else {
+ assertUpdateSeqIsLower(request.getSeq());
+ doDelete(docId, request);
+ incrementUpdateSeq(request.getSeq());
+ }
}
protected abstract void doDelete(final String docId, final DocumentDeleteRequest request) throws IOException;
@@ -105,10 +115,12 @@
public final boolean commit() throws IOException {
final long updateSeq;
+ final long purgeSeq;
synchronized (this) {
updateSeq = this.updateSeq;
+ purgeSeq = this.purgeSeq;
}
- final boolean result = doCommit(updateSeq);
+ final boolean result = doCommit(updateSeq, purgeSeq);
if (result) {
final long now = now();
synchronized (this) {
@@ -118,7 +130,27 @@
return result;
}
- protected abstract boolean doCommit(final long updateSeq) throws IOException;
+ protected abstract boolean doCommit(final long updateSeq, final long purgeSeq) throws IOException;
+
+ public final synchronized void setUpdateSeq(final long updateSeq) throws IOException {
+ if (updateSeq < this.updateSeq) {
+ throw new WebApplicationException(
+ "update_seq must be equal or greater than current update_seq", Status.BAD_REQUEST);
+ }
+ if (updateSeq > this.updateSeq) {
+ incrementUpdateSeq(updateSeq);
+ }
+ }
+
+ public final synchronized void setPurgeSeq(final long purgeSeq) throws IOException {
+ if (purgeSeq < this.purgeSeq) {
+ throw new WebApplicationException(
+ "purge_seq must be equal or greater than current purge_seq", Status.BAD_REQUEST);
+ }
+ if (purgeSeq > this.purgeSeq) {
+ incrementPurgeSeq(purgeSeq);
+ }
+ }
@Override
public final void close() throws IOException {
@@ -159,6 +191,19 @@
this.updateSeq = updateSeq;
}
+ protected final void assertPurgeSeqIsLower(final long purgeSeq) throws UpdatesOutOfOrderException {
+ assert Thread.holdsLock(this);
+ if (!(purgeSeq > this.purgeSeq)) {
+ throw new UpdatesOutOfOrderException(this.purgeSeq, purgeSeq);
+ }
+ }
+
+ protected final void incrementPurgeSeq(final long purgeSeq) throws IOException {
+ assert Thread.holdsLock(this);
+ assertPurgeSeqIsLower(purgeSeq);
+ this.purgeSeq = purgeSeq;
+ }
+
public boolean needsCommit(final long duration, final TimeUnit unit) {
final long commitNeededSince = now() - unit.toNanos(duration);
synchronized (this) {
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java
index 14d77c3..acda8be 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderException.java
@@ -13,12 +13,16 @@
package org.apache.couchdb.nouveau.core;
-import java.io.IOException;
+import jakarta.ws.rs.WebApplicationException;
+import jakarta.ws.rs.core.Response.Status;
-public final class UpdatesOutOfOrderException extends IOException {
+public final class UpdatesOutOfOrderException extends WebApplicationException {
public UpdatesOutOfOrderException(final long currentSeq, final long attemptedSeq) {
- super(String.format(
- "Updates applied in the wrong order (current seq: %d, attempted seq: %d)", currentSeq, attemptedSeq));
+ super(
+ String.format(
+ "Updates applied in the wrong order (current seq: %d, attempted seq: %d)",
+ currentSeq, attemptedSeq),
+ Status.BAD_REQUEST);
}
}
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderExceptionMapper.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderExceptionMapper.java
deleted file mode 100644
index ac35680..0000000
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/UpdatesOutOfOrderExceptionMapper.java
+++ /dev/null
@@ -1,31 +0,0 @@
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package org.apache.couchdb.nouveau.core;
-
-import io.dropwizard.jersey.errors.ErrorMessage;
-import jakarta.ws.rs.core.MediaType;
-import jakarta.ws.rs.core.Response;
-import jakarta.ws.rs.core.Response.Status;
-import jakarta.ws.rs.ext.ExceptionMapper;
-
-public class UpdatesOutOfOrderExceptionMapper implements ExceptionMapper<UpdatesOutOfOrderException> {
-
- @Override
- public Response toResponse(final UpdatesOutOfOrderException exception) {
- return Response.status(Status.BAD_REQUEST)
- .type(MediaType.APPLICATION_JSON_TYPE)
- .entity(new ErrorMessage(Status.BAD_REQUEST.getStatusCode(), exception.getMessage()))
- .build();
- }
-}
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/lucene9/Lucene9Index.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/lucene9/Lucene9Index.java
index 69e6b12..ce7f7d7 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/lucene9/Lucene9Index.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/lucene9/Lucene9Index.java
@@ -25,7 +25,6 @@
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -104,8 +103,9 @@
final Analyzer analyzer,
final IndexWriter writer,
final long updateSeq,
+ final long purgeSeq,
final SearcherManager searcherManager) {
- super(updateSeq);
+ super(updateSeq, purgeSeq);
this.analyzer = Objects.requireNonNull(analyzer);
this.writer = Objects.requireNonNull(writer);
this.searcherManager = Objects.requireNonNull(searcherManager);
@@ -144,12 +144,12 @@
}
@Override
- public boolean doCommit(final long updateSeq) throws IOException {
+ public boolean doCommit(final long updateSeq, final long purgeSeq) throws IOException {
if (!writer.hasUncommittedChanges()) {
return false;
}
- writer.setLiveCommitData(
- Collections.singletonMap("update_seq", Long.toString(updateSeq)).entrySet());
+ writer.setLiveCommitData(Map.of("update_seq", Long.toString(updateSeq), "purge_seq", Long.toString(purgeSeq))
+ .entrySet());
writer.commit();
return true;
}
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
index 0776df5..ce99830 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
@@ -35,6 +35,7 @@
import org.apache.couchdb.nouveau.api.DocumentUpdateRequest;
import org.apache.couchdb.nouveau.api.IndexDefinition;
import org.apache.couchdb.nouveau.api.IndexInfo;
+import org.apache.couchdb.nouveau.api.IndexInfoRequest;
import org.apache.couchdb.nouveau.api.SearchRequest;
import org.apache.couchdb.nouveau.api.SearchResults;
import org.apache.couchdb.nouveau.core.IndexLoader;
@@ -91,13 +92,27 @@
}
@GET
- public IndexInfo indexInfo(@PathParam("name") String name) throws Exception {
+ public IndexInfo getIndexInfo(@PathParam("name") String name) throws Exception {
return indexManager.with(name, indexLoader(), (index) -> {
return index.info();
});
}
@POST
+ public void setIndexInfo(@PathParam("name") String name, @NotNull @Valid IndexInfoRequest request)
+ throws Exception {
+ indexManager.with(name, indexLoader(), (index) -> {
+ if (request.getUpdateSeq().isPresent()) {
+ index.setUpdateSeq(request.getUpdateSeq().getAsLong());
+ }
+ if (request.getPurgeSeq().isPresent()) {
+ index.setPurgeSeq(request.getPurgeSeq().getAsLong());
+ }
+ return null;
+ });
+ }
+
+ @POST
@Path("/search")
public SearchResults searchIndex(@PathParam("name") String name, @NotNull @Valid SearchRequest request)
throws Exception {
@@ -126,19 +141,20 @@
final IndexWriterConfig config = new IndexWriterConfig(analyzer);
config.setUseCompoundFile(false);
final IndexWriter writer = new IndexWriter(dir, config);
- final long updateSeq = getUpdateSeq(writer);
+ final long updateSeq = getSeq(writer, "update_seq");
+ final long purgeSeq = getSeq(writer, "purge_seq");
final SearcherManager searcherManager = new SearcherManager(writer, searcherFactory);
- return new Lucene9Index(analyzer, writer, updateSeq, searcherManager);
+ return new Lucene9Index(analyzer, writer, updateSeq, purgeSeq, searcherManager);
};
}
- private static long getUpdateSeq(final IndexWriter writer) throws IOException {
+ private static long getSeq(final IndexWriter writer, final String key) throws IOException {
final Iterable<Map.Entry<String, String>> commitData = writer.getLiveCommitData();
if (commitData == null) {
return 0L;
}
for (Map.Entry<String, String> entry : commitData) {
- if (entry.getKey().equals("update_seq")) {
+ if (entry.getKey().equals(key)) {
return Long.parseLong(entry.getValue());
}
}
diff --git a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java
index 79151a2..5d98980 100644
--- a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java
+++ b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java
@@ -199,11 +199,34 @@
IndexInfo info = index.info();
assertThat(info.getNumDocs()).isEqualTo(1);
- index.delete("foo", new DocumentDeleteRequest(3));
+ index.delete("foo", new DocumentDeleteRequest(3, false));
index.commit();
info = index.info();
assertThat(info.getNumDocs()).isEqualTo(0);
+ assertThat(info.getUpdateSeq()).isEqualTo(3);
+ } finally {
+ cleanup(index);
+ }
+ }
+
+ @Test
+ public void testPurge(@TempDir Path path) throws IOException {
+ Index index = setup(path);
+ try {
+ final Collection<Field> fields = List.of(new DoubleField("bar", 12.0, false));
+ index.update("foo", new DocumentUpdateRequest(2, null, fields));
+ index.commit();
+
+ IndexInfo info = index.info();
+ assertThat(info.getNumDocs()).isEqualTo(1);
+
+ index.delete("foo", new DocumentDeleteRequest(3, true));
+ index.commit();
+
+ info = index.info();
+ assertThat(info.getNumDocs()).isEqualTo(0);
+ assertThat(info.getPurgeSeq()).isEqualTo(3);
} finally {
cleanup(index);
}
@@ -217,7 +240,7 @@
config.setUseCompoundFile(false);
final IndexWriter writer = new IndexWriter(dir, config);
final SearcherManager searcherManager = new SearcherManager(writer, null);
- return new Lucene9Index(analyzer, writer, 0L, searcherManager);
+ return new Lucene9Index(analyzer, writer, 0L, 0L, searcherManager);
};
}
}
diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl
index 7744eb1..39cc2b3 100644
--- a/src/nouveau/src/nouveau_api.erl
+++ b/src/nouveau/src/nouveau_api.erl
@@ -24,8 +24,11 @@
delete_path/1,
delete_path/2,
delete_doc/3,
+ purge_doc/3,
update_doc/5,
- search/2
+ search/2,
+ set_purge_seq/2,
+ set_update_seq/2
]).
-define(JSON_CONTENT_TYPE, {"Content-Type", "application/json"}).
@@ -97,7 +100,17 @@
delete_doc(#index{} = Index, DocId, UpdateSeq) when
is_binary(DocId), is_integer(UpdateSeq)
->
- ReqBody = {[{<<"seq">>, UpdateSeq}]},
+ delete_doc(Index, DocId, UpdateSeq, false).
+
+purge_doc(#index{} = Index, DocId, PurgeSeq) when
+ is_binary(DocId), is_integer(PurgeSeq)
+->
+ delete_doc(Index, DocId, PurgeSeq, true).
+
+delete_doc(#index{} = Index, DocId, Seq, IsPurge) when
+ is_binary(DocId), is_integer(Seq), is_boolean(IsPurge)
+->
+ ReqBody = #{seq => Seq, purge => IsPurge},
Resp = send_if_enabled(
doc_url(Index, DocId), [?JSON_CONTENT_TYPE], delete, jiffy:encode(ReqBody)
),
@@ -146,6 +159,27 @@
send_error(Reason)
end.
+set_update_seq(#index{} = Index, UpdateSeq) ->
+ set_seq(Index, update_seq, UpdateSeq).
+set_purge_seq(#index{} = Index, PurgeSeq) ->
+ set_seq(Index, purge_seq, PurgeSeq).
+
+set_seq(#index{} = Index, Key, Value) when is_atom(Key), is_integer(Value) ->
+ ReqBody = #{
+ Key => Value
+ },
+ Resp = send_if_enabled(
+ index_url(Index), [?JSON_CONTENT_TYPE], post, jiffy:encode(ReqBody)
+ ),
+ case Resp of
+ {ok, "204", _, _} ->
+ ok;
+ {ok, StatusCode, _, RespBody} ->
+ {error, jaxrs_error(StatusCode, RespBody)};
+ {error, Reason} ->
+ send_error(Reason)
+ end.
+
%% private functions
index_path(Path) ->
diff --git a/src/nouveau/src/nouveau_epi.erl b/src/nouveau/src/nouveau_epi.erl
index f42e179..9f34ae5 100644
--- a/src/nouveau/src/nouveau_epi.erl
+++ b/src/nouveau/src/nouveau_epi.erl
@@ -31,7 +31,10 @@
nouveau.
providers() ->
- [{chttpd_handlers, nouveau_httpd_handlers}].
+ [
+ {couch_db, nouveau_plugin_couch_db},
+ {chttpd_handlers, nouveau_httpd_handlers}
+ ].
services() ->
[].
diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl
index a78b1ff..33134bc 100644
--- a/src/nouveau/src/nouveau_index_updater.erl
+++ b/src/nouveau/src/nouveau_index_updater.erl
@@ -28,9 +28,10 @@
outdated(#index{} = Index) ->
case open_or_create_index(Index) of
- {ok, IndexSeq} ->
- DbSeq = get_db_seq(Index),
- DbSeq > IndexSeq;
+ {ok, #{} = Info} ->
+ #{<<"update_seq">> := IndexUpdateSeq, <<"purge_seq">> := IndexPurgeSeq} = Info,
+ {DbUpdateSeq, DbPurgeSeq} = get_db_info(Index),
+ DbUpdateSeq > IndexUpdateSeq orelse DbPurgeSeq > IndexPurgeSeq;
{error, Reason} ->
{error, Reason}
end.
@@ -38,11 +39,14 @@
update(#index{} = Index) ->
{ok, Db} = couch_db:open_int(Index#index.dbname, []),
try
- case open_or_create_index(Index) of
+ case open_or_create_index(Db, Index) of
{error, Reason} ->
exit({error, Reason});
- {ok, CurSeq} ->
- TotalChanges = couch_db:count_changes_since(Db, CurSeq),
+ {ok, #{} = Info} ->
+ #{<<"update_seq">> := IndexUpdateSeq, <<"purge_seq">> := IndexPurgeSeq} = Info,
+ ChangesSince = couch_db:count_changes_since(Db, IndexUpdateSeq),
+ PurgesSince = couch_db:get_purge_seq(Db) - IndexPurgeSeq,
+ TotalChanges = ChangesSince + PurgesSince,
couch_task_status:add_task([
{type, search_indexer},
{database, Index#index.dbname},
@@ -56,11 +60,15 @@
%% update status every half second
couch_task_status:set_update_frequency(500),
+ {ok, ExcludeIdRevs} = purge_index(Db, Index, IndexPurgeSeq),
+
+ NewCurSeq = couch_db:get_update_seq(Db),
Proc = get_os_process(Index#index.def_lang),
try
true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]),
- Acc0 = {Db, Index, Proc, 0, TotalChanges},
- {ok, _} = couch_db:fold_changes(Db, CurSeq, fun load_docs/2, Acc0, [])
+ Acc0 = {Db, Index, Proc, 0, TotalChanges, ExcludeIdRevs},
+ {ok, _} = couch_db:fold_changes(Db, IndexUpdateSeq, fun load_docs/2, Acc0, []),
+ ok = nouveau_api:set_update_seq(Index, NewCurSeq)
after
ret_os_process(Proc)
end
@@ -71,14 +79,20 @@
load_docs(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) ->
{ok, Acc};
-load_docs(FDI, {Db, Index, Proc, ChangesDone, TotalChanges}) ->
+load_docs(FDI, {Db, Index, Proc, ChangesDone, TotalChanges, ExcludeIdRevs}) ->
couch_task_status:update([
{changes_done, ChangesDone}, {progress, (ChangesDone * 100) div TotalChanges}
]),
-
DI = couch_doc:to_doc_info(FDI),
- #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI,
+ #doc_info{id = Id, revs = [#rev_info{rev = Rev} | _]} = DI,
+ case lists:member({Id, Rev}, ExcludeIdRevs) of
+ true -> ok;
+ false -> update_or_delete_index(Db, Index, DI, Proc)
+ end,
+ {ok, {Db, Index, Proc, ChangesDone + 1, TotalChanges, ExcludeIdRevs}}.
+update_or_delete_index(Db, #index{} = Index, #doc_info{} = DI, Proc) ->
+ #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI,
case Del of
true ->
ok = nouveau_api:delete_doc(Index, Id, Seq);
@@ -104,17 +118,16 @@
exit({error, Reason})
end
end
- end,
- {ok, {Db, Index, Proc, ChangesDone + 1, TotalChanges}}.
+ end.
open_or_create_index(#index{} = Index) ->
- case get_index_seq(Index) of
- {ok, UpdateSeq} ->
- {ok, UpdateSeq};
+ case nouveau_api:index_info(Index) of
+ {ok, #{} = Info} ->
+ {ok, Info};
{error, {not_found, _}} ->
case nouveau_api:create_index(Index, index_definition(Index)) of
ok ->
- {ok, 0};
+ nouveau_api:index_info(Index);
{error, Reason} ->
{error, Reason}
end;
@@ -122,20 +135,23 @@
{error, Reason}
end.
-get_db_seq(#index{} = Index) ->
- {ok, Db} = couch_db:open_int(Index#index.dbname, []),
- try
- couch_db:get_update_seq(Db)
- after
- couch_db:close(Db)
+open_or_create_index(Db, #index{} = Index) ->
+ case open_or_create_index(Index) of
+ {ok, #{} = Info} ->
+ nouveau_util:maybe_create_local_purge_doc(Db, Index),
+ {ok, Info};
+ Else ->
+ Else
end.
-get_index_seq(#index{} = Index) ->
- case nouveau_api:index_info(Index) of
- {ok, #{<<"update_seq">> := Seq}} ->
- {ok, Seq};
- {error, Reason} ->
- {error, Reason}
+get_db_info(#index{} = Index) ->
+ {ok, Db} = couch_db:open_int(Index#index.dbname, []),
+ try
+ UpdateSeq = couch_db:get_update_seq(Db),
+ PurgeSeq = couch_db:get_purge_seq(Db),
+ {UpdateSeq, PurgeSeq}
+ after
+ couch_db:close(Db)
end.
index_definition(#index{} = Index) ->
@@ -143,3 +159,55 @@
<<"default_analyzer">> => Index#index.default_analyzer,
<<"field_analyzers">> => Index#index.field_analyzers
}.
+
+purge_index(Db, Index, IndexPurgeSeq) ->
+ Proc = get_os_process(Index#index.def_lang),
+ try
+ true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]),
+ FoldFun = fun({PurgeSeq, _UUID, Id, _Revs}, {Acc, _}) ->
+ Acc0 =
+ case couch_db:get_full_doc_info(Db, Id) of
+ not_found ->
+ ok = nouveau_api:purge_doc(Index, Id, PurgeSeq),
+ Acc;
+ FDI ->
+ DI = couch_doc:to_doc_info(FDI),
+ #doc_info{id = Id, revs = [#rev_info{rev = Rev} | _]} = DI,
+ case lists:member({Id, Rev}, Acc) of
+ true ->
+ Acc;
+ false ->
+ update_or_delete_index(Db, Index, DI, Proc),
+ [{Id, Rev} | Acc]
+ end
+ end,
+ update_task(1),
+ {ok, {Acc0, PurgeSeq}}
+ end,
+
+ {ok, {ExcludeList, NewPurgeSeq}} = couch_db:fold_purge_infos(
+ Db, IndexPurgeSeq, FoldFun, {[], 0}, []
+ ),
+ nouveau_api:set_purge_seq(Index, NewPurgeSeq),
+ update_local_doc(Db, Index, NewPurgeSeq),
+ {ok, ExcludeList}
+ after
+ ret_os_process(Proc)
+ end.
+
+update_task(NumChanges) ->
+ [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
+ Changes2 = Changes + NumChanges,
+ Progress =
+ case Total of
+ 0 ->
+ 0;
+ _ ->
+ (Changes2 * 100) div Total
+ end,
+ couch_task_status:update([{progress, Progress}, {changes_done, Changes2}]).
+
+update_local_doc(Db, #index{} = Index, PurgeSeq) ->
+ DocId = nouveau_util:get_local_purge_doc_id(Index#index.sig),
+ DocContent = nouveau_util:get_local_purge_doc_body(DocId, PurgeSeq, Index),
+ couch_db:update_doc(Db, DocContent, []).
diff --git a/src/nouveau/src/nouveau_plugin_couch_db.erl b/src/nouveau/src/nouveau_plugin_couch_db.erl
new file mode 100644
index 0000000..dcd3ae1
--- /dev/null
+++ b/src/nouveau/src/nouveau_plugin_couch_db.erl
@@ -0,0 +1,24 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(nouveau_plugin_couch_db).
+
+-export([
+ is_valid_purge_client/2,
+ on_compact/2
+]).
+
+is_valid_purge_client(DbName, Props) ->
+ nouveau_util:verify_index_exists(DbName, Props).
+
+on_compact(DbName, DDocs) ->
+ nouveau_util:ensure_local_purge_docs(DbName, DDocs).
diff --git a/src/nouveau/src/nouveau_util.erl b/src/nouveau/src/nouveau_util.erl
index b7da7e8..86fc3a4 100644
--- a/src/nouveau/src/nouveau_util.erl
+++ b/src/nouveau/src/nouveau_util.erl
@@ -22,6 +22,11 @@
index_name/1,
design_doc_to_indexes/2,
design_doc_to_index/3,
+ verify_index_exists/2,
+ ensure_local_purge_docs/2,
+ maybe_create_local_purge_doc/2,
+ get_local_purge_doc_id/1,
+ get_local_purge_doc_body/3,
nouveau_url/0
]).
@@ -93,5 +98,101 @@
{error, InvalidDDocError}
end.
+verify_index_exists(DbName, Props) ->
+ try
+ Type = couch_util:get_value(<<"type">>, Props),
+ if
+ Type =/= <<"nouveau">> ->
+ false;
+ true ->
+ DDocId = couch_util:get_value(<<"ddoc_id">>, Props),
+ IndexName = couch_util:get_value(<<"indexname">>, Props),
+ Sig = couch_util:get_value(<<"signature">>, Props),
+ couch_util:with_db(DbName, fun(Db) ->
+ case couch_db:get_design_doc(Db, DDocId) of
+ {ok, #doc{} = DDoc} ->
+ {ok, IdxState} = design_doc_to_index(
+ DbName, DDoc, IndexName
+ ),
+ IdxState#index.sig == Sig;
+ {not_found, _} ->
+ false
+ end
+ end)
+ end
+ catch
+ _:_ ->
+ false
+ end.
+
+ensure_local_purge_docs(DbName, DDocs) ->
+ couch_util:with_db(DbName, fun(Db) ->
+ lists:foreach(
+ fun(DDoc) ->
+ #doc{body = {Props}} = DDoc,
+ case couch_util:get_value(<<"indexes">>, Props) of
+ undefined ->
+ false;
+ _ ->
+ try design_doc_to_indexes(DbName, DDoc) of
+ SIndexes -> ensure_local_purge_doc(Db, SIndexes)
+ catch
+ _:_ ->
+ ok
+ end
+ end
+ end,
+ DDocs
+ )
+ end).
+
+ensure_local_purge_doc(Db, SIndexes) ->
+ if
+ SIndexes =/= [] ->
+ lists:map(
+ fun(SIndex) ->
+ maybe_create_local_purge_doc(Db, SIndex)
+ end,
+ SIndexes
+ );
+ true ->
+ ok
+ end.
+
+maybe_create_local_purge_doc(Db, Index) ->
+ DocId = get_local_purge_doc_id(Index#index.sig),
+ case couch_db:open_doc(Db, DocId) of
+ {ok, _Doc} ->
+ ok;
+ {not_found, _} ->
+ DbPurgeSeq = couch_db:get_purge_seq(Db),
+ DocContent = get_local_purge_doc_body(
+ DocId, DbPurgeSeq, Index
+ ),
+ couch_db:update_doc(Db, DocContent, [])
+ end.
+
+get_local_purge_doc_id(Sig) ->
+ iolist_to_binary([?LOCAL_DOC_PREFIX, "purge-", "nouveau-", Sig]).
+
+get_local_purge_doc_body(LocalDocId, PurgeSeq, Index) ->
+ #index{
+ name = IdxName,
+ ddoc_id = DDocId,
+ sig = Sig
+ } = Index,
+ NowSecs = os:system_time(second),
+ JsonList =
+ {[
+ {<<"_id">>, LocalDocId},
+ {<<"purge_seq">>, PurgeSeq},
+ {<<"updated_on">>, NowSecs},
+ {<<"indexname">>, IdxName},
+ {<<"ddoc_id">>, DDocId},
+ {<<"signature">>, Sig},
+ {<<"type">>, <<"nouveau">>}
+ ]},
+ couch_doc:from_json_obj(JsonList).
+
nouveau_url() ->
config:get("nouveau", "url", "http://127.0.0.1:8080").
diff --git a/test/elixir/test/config/nouveau.elixir b/test/elixir/test/config/nouveau.elixir
index 733aa9d..53f28a5 100644
--- a/test/elixir/test/config/nouveau.elixir
+++ b/test/elixir/test/config/nouveau.elixir
@@ -21,6 +21,9 @@
"mango sort by string",
"search GET (partitioned)",
"search POST (partitioned)",
- "mango (partitioned)"
+ "mango (partitioned)",
+ "delete",
+ "purge",
+ "purge with conflicts"
]
}
diff --git a/test/elixir/test/nouveau_test.exs b/test/elixir/test/nouveau_test.exs
index 28cc273..8e3a1bd 100644
--- a/test/elixir/test/nouveau_test.exs
+++ b/test/elixir/test/nouveau_test.exs
@@ -18,6 +18,48 @@
]}
)
assert resp.status_code in [201]
+ resp
+ end
+
+ def create_conflicted_search_docs(db_name) do
+ resp = Couch.post("/#{db_name}/_bulk_docs",
+ headers: ["Content-Type": "application/json"],
+ body: %{:docs => [
+ # doc4: conflict between 1-a and 1-b, 1-b wins, 1-b will be purged
+ %{"_id" => "doc4", "foo" => "foo", "bar" => 42, "baz" => "hello there",
+ "_revisions" => %{:start => 1, :ids => ["a"]}
+ },
+ %{"_id" => "doc4", "foo" => "fooX", "bar" => 43, "baz" => "hello thereX",
+ "_revisions" => %{:start => 1, :ids => ["b"]}
+ },
+
+ # doc3: conflict between 1-a deleted and 2-c, 1-a is deleted,
+ %{"_id" => "doc3", "foo" => "bar", "bar" => 12.0, "baz" => "hello",
+ "_revisions" => %{:start => 1, :ids => ["a"]},
+ "_deleted" => true
+ },
+ %{"_id" => "doc3", "foo" => "barX", "bar" => 13.0, "baz" => "helloX",
+ "_revisions" => %{:start => 2, :ids => ["c", "b"]}
+ },
+
+ # doc1: conflict between 1-a and 2-c, 2-c is deleted
+ %{"_id" => "doc1", "foo" => "baz", "bar" => 0, "baz" => "there",
+ "_revisions" => %{:start => 1, :ids => ["a"]}
+ },
+ %{"_id" => "doc1", "foo" => "bazX", "bar" => 1, "baz" => "thereX",
+ "_revisions" => %{:start => 2, :ids => ["c", "b"]},
+ "_deleted" => true
+ },
+
+ # doc2: 2-b is deleted
+ %{"_id" => "doc2", "foo" => "foobar", "bar" => 100, "baz" => "hi",
+ "_revisions" => %{:start => 2, :ids => ["b", "a"]},
+ "_deleted" => true
+ }
+ ], :new_edits => false}
+ )
+ assert resp.status_code in [201]
+ resp
end
def create_partitioned_search_docs(db_name) do
@@ -87,6 +129,11 @@
bookmark
end
+ def get_total_hits(resp) do
+ %{:body => %{"total_hits" => total_hits}} = resp
+ total_hits
+ end
+
def assert_status_code(resp, code) do
assert resp.status_code == code,
"status code: #{resp.status_code}, resp body: #{:jiffy.encode(resp.body)}"
@@ -420,4 +467,133 @@
assert ids == ["bar:doc3"]
end
+ @tag :with_db
+ test "delete", context do
+ db_name = context[:db_name]
+ create_resp = create_search_docs(db_name)
+ create_ddoc(db_name)
+
+ search_url = "/#{db_name}/_design/foo/_nouveau/bar"
+
+ # confirm all hits
+ resp = Couch.get(search_url, query: %{q: "*:*", include_docs: true})
+ assert_status_code(resp, 200)
+ assert get_total_hits(resp) == 4
+
+ # delete a doc
+ doc = hd(create_resp.body)
+ resp = Couch.delete("/#{db_name}/#{doc["id"]}?rev=#{doc["rev"]}")
+ assert_status_code(resp, 200)
+
+ # confirm it is gone
+ resp = Couch.get(search_url, query: %{q: "*:*", include_docs: true})
+ assert_status_code(resp, 200)
+ assert get_total_hits(resp) == 3
+
+ resp = Couch.get("/#{db_name}/_design/foo/_nouveau_info/bar")
+ assert_status_code(resp, 200)
+ assert resp.body["search_index"]["update_seq"] == 6
+ assert resp.body["search_index"]["purge_seq"] == 0
+ end
+
+ @tag :with_db
+ test "purge", context do
+ db_name = context[:db_name]
+ create_resp = create_search_docs(db_name)
+ create_ddoc(db_name)
+
+ search_url = "/#{db_name}/_design/foo/_nouveau/bar"
+
+ # confirm all hits
+ resp = Couch.get(search_url, query: %{q: "*:*", include_docs: true})
+ assert_status_code(resp, 200)
+ assert get_total_hits(resp) == 4
+
+ # purge a doc
+ doc = hd(create_resp.body)
+ resp =
+ Couch.post("/#{db_name}/_purge",
+ body: %{doc["id"] => [doc["rev"]]}
+ )
+ assert_status_code(resp, 201)
+
+ # confirm it is gone
+ resp = Couch.get(search_url, query: %{q: "*:*", include_docs: true})
+ assert_status_code(resp, 200)
+ assert get_total_hits(resp) == 3
+
+ resp = Couch.get("/#{db_name}/_design/foo/_nouveau_info/bar")
+ assert_status_code(resp, 200)
+ assert resp.body["search_index"]["update_seq"] == 6
+ assert resp.body["search_index"]["purge_seq"] == 1
+ end
+
+ @tag :with_db
+ test "purge with conflicts", context do
+ db_name = context[:db_name]
+ create_resp = create_conflicted_search_docs(db_name)
+ create_ddoc(db_name)
+
+ search_url = "/#{db_name}/_design/foo/_nouveau/bar"
+
+ # confirm all hits
+ resp = Couch.get(search_url, query: %{q: "*:*", include_docs: true})
+ assert_status_code(resp, 200)
+
+ assert get_total_hits(resp) == 3
+ [hit1, hit2, hit3] = Enum.sort(resp.body["hits"])
+
+ assert hit1["doc"]["_id"] == "doc1"
+ assert hit1["doc"]["_rev"] == "1-a"
+ assert hit1["fields"] == %{"bar" => 0.0, "foo" => "baz"}
+
+ assert hit2["doc"]["_id"] == "doc3"
+ assert hit2["doc"]["_rev"] == "2-c"
+ assert hit2["fields"] == %{"bar" => 13.0, "foo" => "barX"}
+
+ assert hit3["doc"]["_id"] == "doc4"
+ assert hit3["doc"]["_rev"] == "1-b"
+ assert hit3["fields"] == %{"bar" => 43.0, "foo" => "fooX"}
+
+ # purge docs
+ purge_body = %{
+ "doc1" => ["2-c", "3-nonexistentrev"],
+ "doc2" => ["2-b"],
+ "doc3" => ["2-c"],
+ "doc4" => ["1-b"],
+ }
+ resp = Couch.post("/#{db_name}/_purge", body: purge_body)
+ assert_status_code(resp, 201)
+
+ resp = Couch.get(search_url, query: %{q: "*:*", include_docs: true})
+ assert_status_code(resp, 200)
+ hits = Enum.sort(resp.body["hits"])
+
+ assert get_total_hits(resp) == 2
+ [hit1, hit2] = Enum.sort(resp.body["hits"])
+
+ # doc1: 2-c deleted was purged, 1-a is still the winner
+ assert hit1["doc"]["_id"] == "doc1"
+ assert hit1["doc"]["_rev"] == "1-a"
+ assert hit1["fields"] == %{"bar" => 0.0, "foo" => "baz"}
+
+ # doc2: doc was deleted and now it's completely purged
+
+ # doc3: live revision is deleted, we're left with the deleted rev only
+
+ # doc4: 2-c was purged, 1-a is the new winner
+ assert hit2["doc"]["_id"] == "doc4"
+ assert hit2["doc"]["_rev"] == "1-a"
+ assert hit2["fields"] == %{"bar" => 42.0, "foo" => "foo"}
+
+ resp = Couch.get("/#{db_name}")
+ db_purge_seq = resp.body["purge_seq"]
+ # Double-check db purge sequence (sum of purge seqeunces on shards) is 4
+ assert String.starts_with?(db_purge_seq, "4-")
+
+ resp = Couch.get("/#{db_name}/_design/foo/_nouveau_info/bar")
+ assert_status_code(resp, 200)
+ assert resp.body["search_index"]["update_seq"] == 8
+ assert resp.body["search_index"]["purge_seq"] == 4
+ end
end