blob: 4f5fc8d0839c58cb58200ec09cc031624382abc6 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_views_fdb).
-export([
get_view_state/2,
new_interactive_index/3,
new_creation_vs/3,
get_creation_vs/2,
get_build_status/2,
set_build_status/3,
get_update_seq/2,
set_update_seq/3,
list_signatures/1,
clear_index/2,
persist_chunks/3,
update_kv_size/4
]).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-include("couch_views.hrl").
-include_lib("fabric/include/fabric2.hrl").
get_view_state(Db, #mrst{} = Mrst) ->
get_view_state(Db, Mrst#mrst.sig);
get_view_state(Db, Sig) when is_binary(Sig) ->
#{
tx := Tx
} = Db,
VersionF = erlfdb:get(Tx, version_key(Db, Sig)),
ViewSeqF = erlfdb:get(Tx, seq_key(Db, Sig)),
ViewVSF = erlfdb:get(Tx, creation_vs_key(Db, Sig)),
BuildStatusF = erlfdb:get(Tx, build_status_key(Db, Sig)),
Version =
case erlfdb:wait(VersionF) of
not_found -> not_found;
VsnVal -> element(1, erlfdb_tuple:unpack(VsnVal))
end,
ViewSeq =
case erlfdb:wait(ViewSeqF) of
not_found -> <<>>;
SeqVal -> SeqVal
end,
ViewVS =
case erlfdb:wait(ViewVSF) of
not_found -> not_found;
VSVal -> element(1, erlfdb_tuple:unpack(VSVal))
end,
State = #{
version => Version,
view_seq => ViewSeq,
view_vs => ViewVS,
build_status => erlfdb:wait(BuildStatusF)
},
maybe_upgrade_view(Db, Sig, State).
new_interactive_index(Db, #mrst{} = Mrst, VS) ->
new_interactive_index(Db, Mrst#mrst.sig, VS);
new_interactive_index(Db, Sig, VS) ->
set_version(Db, Sig),
new_creation_vs(Db, Sig, VS),
set_build_status(Db, Sig, ?INDEX_BUILDING).
%Interactive View Creation Versionstamp
%(<db>, ?DB_VIEWS, ?VIEW_INFO, ?VIEW_CREATION_VS, Sig) = VS
new_creation_vs(TxDb, #mrst{} = Mrst, VS) ->
new_creation_vs(TxDb, Mrst#mrst.sig, VS);
new_creation_vs(TxDb, Sig, VS) ->
#{
tx := Tx
} = TxDb,
Key = creation_vs_key(TxDb, Sig),
Value = erlfdb_tuple:pack_vs({VS}),
ok = erlfdb:set_versionstamped_value(Tx, Key, Value).
get_creation_vs(TxDb, MrstOrSig) ->
#{
view_vs := ViewVS
} = get_view_state(TxDb, MrstOrSig),
ViewVS.
%Interactive View Build Status
%(<db>, ?DB_VIEWS, ?VIEW_INFO, ?VIEW_BUILD_STATUS, Sig) = INDEX_BUILDING | INDEX_READY
get_build_status(TxDb, MrstOrSig) ->
#{
build_status := BuildStatus
} = get_view_state(TxDb, MrstOrSig),
BuildStatus.
set_build_status(TxDb, #mrst{} = Mrst, State) ->
set_build_status(TxDb, Mrst#mrst.sig, State);
set_build_status(TxDb, Sig, State) ->
#{
tx := Tx
} = TxDb,
Key = build_status_key(TxDb, Sig),
ok = erlfdb:set(Tx, Key, State).
% View Build Sequence Access
% (<db>, ?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ) = Sequence
get_update_seq(TxDb, MrstOrSig) ->
#{
view_seq := ViewSeq
} = get_view_state(TxDb, MrstOrSig),
ViewSeq.
set_update_seq(TxDb, Sig, Seq) ->
#{
tx := Tx
} = TxDb,
ok = erlfdb:set(Tx, seq_key(TxDb, Sig), Seq).
list_signatures(Db) ->
#{
db_prefix := DbPrefix
} = Db,
ViewSeqRange = {?DB_VIEWS, ?VIEW_INFO, ?VIEW_UPDATE_SEQ},
RangePrefix = erlfdb_tuple:pack(ViewSeqRange, DbPrefix),
fabric2_fdb:fold_range(
Db,
RangePrefix,
fun({Key, _Val}, Acc) ->
{Sig} = erlfdb_tuple:unpack(Key, RangePrefix),
[Sig | Acc]
end,
[],
[]
).
clear_index(Db, Signature) ->
#{
tx := Tx,
db_prefix := DbPrefix
} = Db,
% Get view size to remove from global counter
SizeTuple = {?DB_VIEWS, ?VIEW_INFO, ?VIEW_KV_SIZE, Signature},
SizeKey = erlfdb_tuple:pack(SizeTuple, DbPrefix),
ViewSize =
case erlfdb:wait(erlfdb:get(Tx, SizeKey)) of
not_found -> 0;
SizeVal -> ?bin2uint(SizeVal)
end,
% Clear index info keys
Keys = [
{?DB_VIEWS, ?VIEW_INFO, ?VIEW_UPDATE_SEQ, Signature},
{?DB_VIEWS, ?VIEW_INFO, ?VIEW_ROW_COUNT, Signature},
{?DB_VIEWS, ?VIEW_INFO, ?VIEW_KV_SIZE, Signature}
],
lists:foreach(
fun(Key) ->
FDBKey = erlfdb_tuple:pack(Key, DbPrefix),
erlfdb:clear(Tx, FDBKey)
end,
Keys
),
% Clear index data
DataTuple = {?DB_VIEWS, ?VIEW_DATA, Signature},
DataPrefix = erlfdb_tuple:pack(DataTuple, DbPrefix),
erlfdb:clear_range_startswith(Tx, DataPrefix),
% Clear tree data
TreeTuple = {?DB_VIEWS, ?VIEW_TREES, Signature},
TreePrefix = erlfdb_tuple:pack(TreeTuple, DbPrefix),
erlfdb:clear_range_startswith(Tx, TreePrefix),
% Decrement db wide view size counter
DbSizeTuple = {?DB_STATS, <<"sizes">>, <<"views">>},
DbSizeKey = erlfdb_tuple:pack(DbSizeTuple, DbPrefix),
erlfdb:add(Tx, DbSizeKey, -ViewSize).
persist_chunks(Tx, set, [Key, Value]) ->
Chunks = fabric2_fdb:chunkify_binary(Value),
LastId = lists:foldl(
fun(Chunk, Id) ->
ChunkKey = erlfdb_tuple:pack({Id}, Key),
erlfdb:set(Tx, ChunkKey, Chunk),
Id + 1
end,
0,
Chunks
),
% We update nodes in place, so its possible that
% a node shrank. This clears any keys that we haven't
% just overwritten for the provided key.
LastIdKey = erlfdb_tuple:pack({LastId}, Key),
EndRange = <<Key/binary, 16#FF>>,
erlfdb:clear_range(Tx, LastIdKey, EndRange);
persist_chunks(Tx, get, Key) ->
Rows = erlfdb:get_range_startswith(Tx, Key),
Values = [V || {_K, V} <- Rows],
iolist_to_binary(Values);
persist_chunks(Tx, clear, Key) ->
erlfdb:clear_range_startswith(Tx, Key).
update_kv_size(TxDb, Sig, OldSize, NewSize) ->
#{
tx := Tx,
db_prefix := DbPrefix
} = TxDb,
ViewTuple = {?DB_VIEWS, ?VIEW_INFO, ?VIEW_KV_SIZE, Sig},
ViewKey = erlfdb_tuple:pack(ViewTuple, DbPrefix),
erlfdb:set(Tx, ViewKey, ?uint2bin(NewSize)),
DbTuple = {?DB_STATS, <<"sizes">>, <<"views">>},
DbKey = erlfdb_tuple:pack(DbTuple, DbPrefix),
erlfdb:add(Tx, DbKey, NewSize - OldSize).
maybe_upgrade_view(_Db, _Sig, #{version := ?CURRENT_VIEW_IMPL_VERSION} = St) ->
St;
maybe_upgrade_view(Db, Sig, #{version := not_found, view_seq := <<>>} = St) ->
% If we haven't started building the view yet
% then we don't change view_vs and build_status
% as they're still correct.
set_version(Db, Sig),
St#{
version => ?CURRENT_VIEW_IMPL_VERSION,
view_seq => <<>>
};
maybe_upgrade_view(Db, Sig, #{version := not_found} = St) ->
clear_index(Db, Sig),
set_version(Db, Sig),
{ViewVS, BuildStatus} = reset_interactive_index(Db, Sig, St),
#{
version => ?CURRENT_VIEW_IMPL_VERSION,
view_seq => <<>>,
view_vs => ViewVS,
build_status => BuildStatus
}.
set_version(Db, Sig) ->
#{
tx := Tx
} = Db,
Key = version_key(Db, Sig),
Val = erlfdb_tuple:pack({?CURRENT_VIEW_IMPL_VERSION}),
erlfdb:set(Tx, Key, Val).
reset_interactive_index(_Db, _Sig, #{view_vs := not_found}) ->
% Not an interactive index
{not_found, not_found};
reset_interactive_index(Db, Sig, _St) ->
% We have to reset the creation versionstamp
% to the current update seq of the database
% or else we'll not have indexed any documents
% inserted since the creation of the interactive
% index.
#{
tx := Tx
} = Db,
DbSeq = fabric2_db:get_update_seq(Db),
VS = fabric2_fdb:seq_to_vs(DbSeq),
Key = creation_vs_key(Db, Sig),
Val = erlfdb_tuple:pack({VS}),
ok = erlfdb:set(Tx, Key, Val),
set_build_status(Db, Sig, ?INDEX_BUILDING),
{VS, ?INDEX_BUILDING}.
version_key(Db, Sig) ->
#{
db_prefix := DbPrefix
} = Db,
Key = {?DB_VIEWS, ?VIEW_INFO, ?VIEW_IMPL_VERSION, Sig},
erlfdb_tuple:pack(Key, DbPrefix).
seq_key(Db, Sig) ->
#{
db_prefix := DbPrefix
} = Db,
Key = {?DB_VIEWS, ?VIEW_INFO, ?VIEW_UPDATE_SEQ, Sig},
erlfdb_tuple:pack(Key, DbPrefix).
creation_vs_key(Db, Sig) ->
#{
db_prefix := DbPrefix
} = Db,
Key = {?DB_VIEWS, ?VIEW_INFO, ?VIEW_CREATION_VS, Sig},
erlfdb_tuple:pack(Key, DbPrefix).
build_status_key(Db, Sig) ->
#{
db_prefix := DbPrefix
} = Db,
Key = {?DB_VIEWS, ?VIEW_INFO, ?VIEW_BUILD_STATUS, Sig},
erlfdb_tuple:pack(Key, DbPrefix).