Support index version hint for greater consistency when paginating
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/SearchRequest.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/SearchRequest.java
index 2fc9e1f..969cce5 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/SearchRequest.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/SearchRequest.java
@@ -24,6 +24,8 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalLong;
import org.apache.couchdb.nouveau.core.ser.PrimitiveWrapper;
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
@@ -32,6 +34,8 @@
@NotNull
private String query;
+ private OptionalLong indexVersion = OptionalLong.empty();
+
private Locale locale;
private String partition;
@@ -64,6 +68,14 @@
return query;
}
+ public OptionalLong getIndexVersion() {
+ return indexVersion;
+ }
+
+ public void setIndexVersion(final OptionalLong indexVersion) {
+ this.indexVersion = Objects.requireNonNull(indexVersion);
+ }
+
public void setLocale(final Locale locale) {
this.locale = locale;
}
@@ -154,7 +166,7 @@
@Override
public String toString() {
- return "SearchRequest [query=" + query + ", locale=" + locale + ", sort=" + sort + ", limit=" + limit
- + ", after=" + after + ", counts=" + counts + ", ranges=" + ranges + "]";
+ return "SearchRequest [query=" + query + ", indexVersion=" + indexVersion + ", locale=" + locale + ", sort="
+ + sort + ", limit=" + limit + ", after=" + after + ", counts=" + counts + ", ranges=" + ranges + "]";
}
}
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/SearchResults.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/SearchResults.java
index a273e6e..7903769 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/api/SearchResults.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/api/SearchResults.java
@@ -28,6 +28,8 @@
@PositiveOrZero
private long totalHits;
+ private long indexVersion;
+
@NotNull
private Relation totalHitsRelation;
@@ -49,6 +51,15 @@
return totalHits;
}
+ @JsonProperty
+ public long getIndexVersion() {
+ return indexVersion;
+ }
+
+ public void setIndexVersion(long indexVersion) {
+ this.indexVersion = indexVersion;
+ }
+
public Relation getTotalHitsRelation() {
return totalHitsRelation;
}
@@ -86,7 +97,7 @@
@Override
public String toString() {
- return "SearchResults [hits=" + hits + ", totalHits=" + totalHits + ", counts=" + counts + ", ranges=" + ranges
- + "]";
+ return "SearchResults [indexVersion=" + indexVersion + ", hits=" + hits + ", totalHits=" + totalHits
+ + ", counts=" + counts + ", ranges=" + ranges + "]";
}
}
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/lucene9/Lucene9Index.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/lucene9/Lucene9Index.java
index ce7f7d7..5af4d30 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/lucene9/Lucene9Index.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/lucene9/Lucene9Index.java
@@ -30,6 +30,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.OptionalLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.couchdb.nouveau.api.DocumentDeleteRequest;
@@ -78,6 +79,8 @@
import org.apache.lucene.search.MultiCollectorManager;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.SearcherLifetimeManager;
+import org.apache.lucene.search.SearcherLifetimeManager.PruneByAge;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
@@ -98,17 +101,20 @@
private final Analyzer analyzer;
private final IndexWriter writer;
private final SearcherManager searcherManager;
+ private final SearcherLifetimeManager searcherLifetimeManager;
public Lucene9Index(
final Analyzer analyzer,
final IndexWriter writer,
final long updateSeq,
final long purgeSeq,
- final SearcherManager searcherManager) {
+ final SearcherManager searcherManager,
+ final SearcherLifetimeManager searcherLifetimeManager) {
super(updateSeq, purgeSeq);
this.analyzer = Objects.requireNonNull(analyzer);
this.writer = Objects.requireNonNull(writer);
this.searcherManager = Objects.requireNonNull(searcherManager);
+ this.searcherLifetimeManager = Objects.requireNonNull(searcherLifetimeManager);
}
@Override
@@ -158,6 +164,9 @@
public void doClose() throws IOException {
IOUtils.runAll(
() -> {
+ searcherLifetimeManager.close();
+ },
+ () -> {
searcherManager.close();
},
() -> {
@@ -186,17 +195,39 @@
cm = new MultiCollectorManager(hits);
}
- searcherManager.maybeRefreshBlocking();
-
- final IndexSearcher searcher = searcherManager.acquire();
- try {
- final Object[] reduces = searcher.search(query, cm);
- return toSearchResults(request, searcher, reduces);
- } catch (final IllegalStateException e) {
- throw new WebApplicationException(e.getMessage(), e, Status.BAD_REQUEST);
- } finally {
- searcherManager.release(searcher);
+ // Search latest version of index
+ if (request.getIndexVersion().isEmpty()) {
+ searcherManager.maybeRefreshBlocking();
+ var searcher = searcherManager.acquire();
+ var indexVersion = searcherLifetimeManager.record(searcher);
+ try {
+ final Object[] reduces = searcher.search(query, cm);
+ return toSearchResults(request, indexVersion, searcher, reduces);
+ } catch (final IllegalStateException e) {
+ throw new WebApplicationException(e.getMessage(), e, Status.BAD_REQUEST);
+ } finally {
+ searcherLifetimeManager.release(searcher);
+ searcherLifetimeManager.prune(new PruneByAge(600.0));
+ }
}
+
+ // Try to search specific version of index if available
+ var indexVersion = request.getIndexVersion().getAsLong();
+ var searcher = searcherLifetimeManager.acquire(indexVersion);
+ if (searcher != null) {
+ try {
+ final Object[] reduces = searcher.search(query, cm);
+ return toSearchResults(request, indexVersion, searcher, reduces);
+ } catch (final IllegalStateException e) {
+ throw new WebApplicationException(e.getMessage(), e, Status.BAD_REQUEST);
+ } finally {
+ searcherLifetimeManager.release(searcher);
+ searcherLifetimeManager.prune(new PruneByAge(600.0));
+ }
+ }
+ // We failed to find the requested version, fallback to latest.
+ request.setIndexVersion(OptionalLong.empty());
+ return doSearch(request);
}
private CollectorManager<?, ? extends TopDocs> hitCollector(final SearchRequest searchRequest) {
@@ -224,9 +255,10 @@
}
private SearchResults toSearchResults(
- final SearchRequest searchRequest, final IndexSearcher searcher, final Object[] reduces)
+ final SearchRequest searchRequest, final long version, final IndexSearcher searcher, final Object[] reduces)
throws IOException {
final SearchResults result = new SearchResults();
+ result.setIndexVersion(version);
collectHits(searcher, (TopDocs) reduces[0], result);
if (reduces.length == 2) {
collectFacets(searchRequest, searcher, (FacetsCollector) reduces[1], result);
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
index 9c7a100..7926fd0 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java
@@ -47,6 +47,7 @@
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.misc.store.DirectIODirectory;
import org.apache.lucene.search.SearcherFactory;
+import org.apache.lucene.search.SearcherLifetimeManager;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
@@ -149,7 +150,8 @@
final long updateSeq = getSeq(writer, "update_seq");
final long purgeSeq = getSeq(writer, "purge_seq");
final SearcherManager searcherManager = new SearcherManager(writer, searcherFactory);
- return new Lucene9Index(analyzer, writer, updateSeq, purgeSeq, searcherManager);
+ final SearcherLifetimeManager searcherLifetimeManager = new SearcherLifetimeManager();
+ return new Lucene9Index(analyzer, writer, updateSeq, purgeSeq, searcherManager, searcherLifetimeManager);
};
}
diff --git a/nouveau/src/test/java/org/apache/couchdb/nouveau/api/SearchRequestTest.java b/nouveau/src/test/java/org/apache/couchdb/nouveau/api/SearchRequestTest.java
index 01489b0..5b664a5 100644
--- a/nouveau/src/test/java/org/apache/couchdb/nouveau/api/SearchRequestTest.java
+++ b/nouveau/src/test/java/org/apache/couchdb/nouveau/api/SearchRequestTest.java
@@ -16,6 +16,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.BeforeAll;
@@ -28,6 +29,7 @@
@BeforeAll
public static void setupMapper() {
mapper = new ObjectMapper();
+ mapper.registerModule(new Jdk8Module());
}
@Test
diff --git a/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java b/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java
index c71c281..6372f31 100644
--- a/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java
+++ b/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java
@@ -17,6 +17,7 @@
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import java.nio.file.Path;
import java.util.concurrent.Executors;
import org.apache.couchdb.nouveau.core.IndexManager;
@@ -31,7 +32,9 @@
public void testIndexHealthCheck(@TempDir final Path tempDir) throws Exception {
var scheduler = Executors.newSingleThreadScheduledExecutor();
var manager = new IndexManager();
- manager.setObjectMapper(new ObjectMapper());
+ var mapper = new ObjectMapper();
+ mapper.registerModule(new Jdk8Module());
+ manager.setObjectMapper(mapper);
manager.setMetricRegistry(new MetricRegistry());
manager.setRootDir(tempDir);
manager.setScheduler(scheduler);
diff --git a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java
index eaaad17..62a36e5 100644
--- a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java
+++ b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java
@@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import org.apache.couchdb.nouveau.api.DocumentDeleteRequest;
import org.apache.couchdb.nouveau.api.DocumentUpdateRequest;
import org.apache.couchdb.nouveau.api.DoubleField;
@@ -39,6 +40,7 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.misc.store.DirectIODirectory;
+import org.apache.lucene.search.SearcherLifetimeManager;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
@@ -85,6 +87,38 @@
}
@Test
+ public void testSearchingWithVersion(@TempDir Path path) throws IOException {
+ final Index index = setup(path);
+ try {
+ final int count = 100;
+ final Collection<Field> fields = List.of(new StringField("foo", "bar", false));
+ for (int i = 1; i <= count; i++) {
+ final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields);
+ index.update("doc" + i, request);
+ }
+ final SearchRequest request = new SearchRequest();
+ request.setQuery("*:*");
+ SearchResults results = index.search(request);
+ assertThat(results.getTotalHits()).isEqualTo(count);
+
+ // add another doc
+ index.update("doc" + count + 1, new DocumentUpdateRequest(count, count + 1, null, fields));
+
+ // Search with previous version and get same results.
+ request.setIndexVersion(OptionalLong.of(results.getIndexVersion()));
+ results = index.search(request);
+ assertThat(results.getTotalHits()).isEqualTo(count);
+
+ // Search without version and get the extra result.
+ request.setIndexVersion(OptionalLong.empty());
+ results = index.search(request);
+ assertThat(results.getTotalHits()).isEqualTo(count + 1);
+ } finally {
+ cleanup(index);
+ }
+ }
+
+ @Test
public void testSort(@TempDir Path path) throws IOException {
final Index index = setup(path);
try {
@@ -245,7 +279,8 @@
config.setUseCompoundFile(false);
final IndexWriter writer = new IndexWriter(dir, config);
final SearcherManager searcherManager = new SearcherManager(writer, null);
- return new Lucene9Index(analyzer, writer, 0L, 0L, searcherManager);
+ final SearcherLifetimeManager searcherLifetimeManager = new SearcherLifetimeManager();
+ return new Lucene9Index(analyzer, writer, 0L, 0L, searcherManager, searcherLifetimeManager);
};
}
}
diff --git a/src/nouveau/src/nouveau_bookmark.erl b/src/nouveau/src/nouveau_bookmark.erl
index b919534..67bc7af 100644
--- a/src/nouveau/src/nouveau_bookmark.erl
+++ b/src/nouveau/src/nouveau_bookmark.erl
@@ -58,7 +58,10 @@
pack({EJson}) when is_list(EJson) ->
pack(from_ejson(EJson));
pack(UnpackedBookmark) when is_map(UnpackedBookmark) ->
- base64:encode(jiffy:encode(maps:values(UnpackedBookmark))).
+ Values = maps:values(UnpackedBookmark),
+ Encoded = jiffy:encode(Values),
+ Compressed = zlib:compress(Encoded),
+ base64:encode(Compressed).
%% legacy use of ejson within mango
from_ejson({Props}) ->
diff --git a/src/nouveau/src/nouveau_fabric_search.erl b/src/nouveau/src/nouveau_fabric_search.erl
index d11b556..2167486 100644
--- a/src/nouveau/src/nouveau_fabric_search.erl
+++ b/src/nouveau/src/nouveau_fabric_search.erl
@@ -86,10 +86,14 @@
%% already heard from someone else in this range
{ok, State};
nil ->
- SearchResults = merge_search_results(State#state.search_results, Response, State),
+ SearchResults = merge_search_results(
+ State#state.search_results, Shard, Response, State
+ ),
Counters1 = fabric_dict:store(Shard, ok, State#state.counters),
Counters2 = fabric_view:remove_overlapping_shards(Shard, Counters1),
- State1 = State#state{counters = Counters2, search_results = SearchResults},
+ State1 = State#state{
+ counters = Counters2, search_results = SearchResults
+ },
case fabric_dict:any(nil, Counters2) of
true ->
{ok, State1};
@@ -110,29 +114,42 @@
handle_message(Else, _Shard, _State) ->
{error, Else}.
-merge_search_results(A, B, #state{} = State) ->
+merge_search_results(Acc, #shard{} = Shard, Response, #state{} = State) ->
#{
+ <<"index_versions">> => merge_index_version(
+ maps:get(<<"index_versions">>, Acc, #{}),
+ Shard,
+ maps:get(<<"index_version">>, Response)
+ ),
<<"total_hits">> => merge_total_hits(
- maps:get(<<"total_hits">>, A, 0), maps:get(<<"total_hits">>, B, 0)
+ maps:get(<<"total_hits">>, Acc, 0), maps:get(<<"total_hits">>, Response, 0)
),
<<"total_hits_relation">> => merge_total_hits_relation(
- maps:get(<<"total_hits_relation">>, A, null),
- maps:get(<<"total_hits_relation">>, B, null)
+ maps:get(<<"total_hits_relation">>, Acc, null),
+ maps:get(<<"total_hits_relation">>, Response, null)
),
<<"hits">> => merge_hits(
- maps:get(<<"hits">>, A, []),
- maps:get(<<"hits">>, B, []),
+ maps:get(<<"hits">>, Acc, []),
+ maps:get(<<"hits">>, Response, []),
State#state.sort,
State#state.limit
),
<<"counts">> => merge_facets(
- maps:get(<<"counts">>, A, null), maps:get(<<"counts">>, B, null), State#state.limit
+ maps:get(<<"counts">>, Acc, null),
+ maps:get(<<"counts">>, Response, null),
+ State#state.limit
),
<<"ranges">> => merge_facets(
- maps:get(<<"ranges">>, A, null), maps:get(<<"ranges">>, B, null), State#state.limit
+ maps:get(<<"ranges">>, Acc, null),
+ maps:get(<<"ranges">>, Response, null),
+ State#state.limit
)
}.
+merge_index_version(Acc, #shard{} = Shard, IndexVersion) ->
+ Range = ?l2b(io_lib:format("~16.16b-~.16b", Shard#shard.range)),
+ Acc#{Range => IndexVersion}.
+
merge_total_hits(TotalHitsA, TotalHitsB) ->
TotalHitsA + TotalHitsB.
diff --git a/src/nouveau/src/nouveau_httpd.erl b/src/nouveau/src/nouveau_httpd.erl
index 10917b3..9375785 100644
--- a/src/nouveau/src/nouveau_httpd.erl
+++ b/src/nouveau/src/nouveau_httpd.erl
@@ -104,6 +104,9 @@
{ok, SearchResults} ->
RespBody = #{
<<"bookmark">> => nouveau_bookmark:pack(maps:get(bookmark, SearchResults)),
+ <<"hints">> => pack_hints(
+ maps:get(<<"index_versions">>, SearchResults)
+ ),
<<"total_hits">> => maps:get(<<"total_hits">>, SearchResults),
<<"total_hits_relation">> => maps:get(<<"total_hits_relation">>, SearchResults),
<<"hits">> => include_docs(
@@ -279,6 +282,11 @@
true -> throw({query_parser_error, <<"all values in ranges parameter must be objects">>})
end.
+pack_hints(IndexVersions) ->
+ Encoded = jiffy:encode(#{iv => IndexVersions}),
+ Compressed = zlib:compress(Encoded),
+ base64:encode(Compressed).
+
check_if_enabled() ->
case nouveau:enabled() of
true ->