blob: ebd7a2d413f841ec5560cbe3cab12718f9ea16ac [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_view).
-behaviour(gen_server).
-export([start_link/0,fold/4,less_json/2,less_json_ids/2,expand_dups/2,
detuple_kvs/2,init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,
code_change/3,get_reduce_view/4,get_temp_reduce_view/5,get_temp_map_view/4,
get_map_view/4,get_row_count/1,reduce_to_count/1,fold_reduce/4,
extract_map_view/1,get_group_server/2,get_group_info/2,cleanup_index_files/1]).
-include("couch_db.hrl").
-record(server,{
root_dir = []}).
start_link() ->
gen_server:start_link({local, couch_view}, couch_view, [], []).
get_temp_updater(DbName, Language, DesignOptions, MapSrc, RedSrc) ->
{ok, Group} =
couch_view_group:open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc),
case gen_server:call(couch_view, {get_group_server, DbName, Group}, infinity) of
{ok, Pid} ->
Pid;
Error ->
throw(Error)
end.
get_group_server(DbName, GroupId) ->
case couch_view_group:open_db_group(DbName, GroupId) of
{ok, Group} ->
case gen_server:call(couch_view, {get_group_server, DbName, Group}, infinity) of
{ok, Pid} ->
Pid;
Error ->
throw(Error)
end;
Error ->
throw(Error)
end.
get_group(Db, GroupId, Stale) ->
MinUpdateSeq = case Stale of
ok -> 0;
_Else -> couch_db:get_update_seq(Db)
end,
couch_view_group:request_group(
get_group_server(couch_db:name(Db), GroupId),
MinUpdateSeq).
get_temp_group(Db, Language, DesignOptions, MapSrc, RedSrc) ->
couch_view_group:request_group(
get_temp_updater(couch_db:name(Db), Language, DesignOptions, MapSrc, RedSrc),
couch_db:get_update_seq(Db)).
get_group_info(Db, GroupId) ->
couch_view_group:request_group_info(
get_group_server(couch_db:name(Db), GroupId)).
cleanup_index_files(Db) ->
% load all ddocs
{ok, DesignDocs} = couch_db:get_design_docs(Db),
% make unique list of group sigs
Sigs = lists:map(fun(#doc{id = GroupId}) ->
{ok, Info} = get_group_info(Db, GroupId),
?b2l(couch_util:get_value(signature, Info))
end, [DD||DD <- DesignDocs, DD#doc.deleted == false]),
FileList = list_index_files(Db),
% regex that matches all ddocs
RegExp = "("++ string:join(Sigs, "|") ++")",
% filter out the ones in use
DeleteFiles = [FilePath
|| FilePath <- FileList,
re:run(FilePath, RegExp, [{capture, none}]) =:= nomatch],
% delete unused files
?LOG_DEBUG("deleting unused view index files: ~p",[DeleteFiles]),
RootDir = couch_config:get("couchdb", "view_index_dir"),
[couch_file:delete(RootDir,File,false)||File <- DeleteFiles],
ok.
list_index_files(Db) ->
% call server to fetch the index files
RootDir = couch_config:get("couchdb", "view_index_dir"),
filelib:wildcard(RootDir ++ "/." ++ ?b2l(couch_db:name(Db)) ++ "_design"++"/*").
get_row_count(#view{btree=Bt}) ->
{ok, {Count, _Reds}} = couch_btree:full_reduce(Bt),
{ok, Count}.
get_temp_reduce_view(Db, Language, DesignOptions, MapSrc, RedSrc) ->
{ok, #group{views=[View]}=Group} =
get_temp_group(Db, Language, DesignOptions, MapSrc, RedSrc),
{ok, {temp_reduce, View}, Group}.
get_reduce_view(Db, GroupId, Name, Update) ->
case get_group(Db, GroupId, Update) of
{ok, #group{views=Views,def_lang=Lang}=Group} ->
case get_reduce_view0(Name, Lang, Views) of
{ok, View} ->
{ok, View, Group};
Else ->
Else
end;
Error ->
Error
end.
get_reduce_view0(_Name, _Lang, []) ->
{not_found, missing_named_view};
get_reduce_view0(Name, Lang, [#view{reduce_funs=RedFuns}=View|Rest]) ->
case get_key_pos(Name, RedFuns, 0) of
0 -> get_reduce_view0(Name, Lang, Rest);
N -> {ok, {reduce, N, Lang, View}}
end.
extract_map_view({reduce, _N, _Lang, View}) ->
View.
detuple_kvs([], Acc) ->
lists:reverse(Acc);
detuple_kvs([KV | Rest], Acc) ->
{{Key,Id},Value} = KV,
NKV = [[Key, Id], Value],
detuple_kvs(Rest, [NKV | Acc]).
expand_dups([], Acc) ->
lists:reverse(Acc);
expand_dups([{Key, {dups, Vals}} | Rest], Acc) ->
Expanded = [{Key, Val} || Val <- Vals],
expand_dups(Rest, Expanded ++ Acc);
expand_dups([KV | Rest], Acc) ->
expand_dups(Rest, [KV | Acc]).
fold_reduce({temp_reduce, #view{btree=Bt}}, Fun, Acc, Options) ->
WrapperFun = fun({GroupedKey, _}, PartialReds, Acc0) ->
{_, [Red]} = couch_btree:final_reduce(Bt, PartialReds),
Fun(GroupedKey, Red, Acc0)
end,
couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options);
fold_reduce({reduce, NthRed, Lang, #view{btree=Bt, reduce_funs=RedFuns}}, Fun, Acc, Options) ->
PreResultPadding = lists:duplicate(NthRed - 1, []),
PostResultPadding = lists:duplicate(length(RedFuns) - NthRed, []),
{_Name, FunSrc} = lists:nth(NthRed,RedFuns),
ReduceFun =
fun(reduce, KVs) ->
{ok, Reduced} = couch_query_servers:reduce(Lang, [FunSrc], detuple_kvs(expand_dups(KVs, []),[])),
{0, PreResultPadding ++ Reduced ++ PostResultPadding};
(rereduce, Reds) ->
UserReds = [[lists:nth(NthRed, UserRedsList)] || {_, UserRedsList} <- Reds],
{ok, Reduced} = couch_query_servers:rereduce(Lang, [FunSrc], UserReds),
{0, PreResultPadding ++ Reduced ++ PostResultPadding}
end,
WrapperFun = fun({GroupedKey, _}, PartialReds, Acc0) ->
{_, Reds} = couch_btree:final_reduce(ReduceFun, PartialReds),
Fun(GroupedKey, lists:nth(NthRed, Reds), Acc0)
end,
couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options).
get_key_pos(_Key, [], _N) ->
0;
get_key_pos(Key, [{Key1,_Value}|_], N) when Key == Key1 ->
N + 1;
get_key_pos(Key, [_|Rest], N) ->
get_key_pos(Key, Rest, N+1).
get_temp_map_view(Db, Language, DesignOptions, Src) ->
{ok, #group{views=[View]}=Group} = get_temp_group(Db, Language, DesignOptions, Src, []),
{ok, View, Group}.
get_map_view(Db, GroupId, Name, Stale) ->
case get_group(Db, GroupId, Stale) of
{ok, #group{views=Views}=Group} ->
case get_map_view0(Name, Views) of
{ok, View} ->
{ok, View, Group};
Else ->
Else
end;
Error ->
Error
end.
get_map_view0(_Name, []) ->
{not_found, missing_named_view};
get_map_view0(Name, [#view{map_names=MapNames}=View|Rest]) ->
case lists:member(Name, MapNames) of
true -> {ok, View};
false -> get_map_view0(Name, Rest)
end.
reduce_to_count(Reductions) ->
{Count, _} =
couch_btree:final_reduce(
fun(reduce, KVs) ->
Count = lists:sum(
[case V of {dups, Vals} -> length(Vals); _ -> 1 end
|| {_,V} <- KVs]),
{Count, []};
(rereduce, Reds) ->
{lists:sum([Count0 || {Count0, _} <- Reds]), []}
end, Reductions),
Count.
fold_fun(_Fun, [], _, Acc) ->
{ok, Acc};
fold_fun(Fun, [KV|Rest], {KVReds, Reds}, Acc) ->
case Fun(KV, {KVReds, Reds}, Acc) of
{ok, Acc2} ->
fold_fun(Fun, Rest, {[KV|KVReds], Reds}, Acc2);
{stop, Acc2} ->
{stop, Acc2}
end.
fold(#view{btree=Btree}, Fun, Acc, Options) ->
WrapperFun =
fun(KV, Reds, Acc2) ->
fold_fun(Fun, expand_dups([KV],[]), Reds, Acc2)
end,
{ok, _LastReduce, _AccResult} = couch_btree:fold(Btree, WrapperFun, Acc, Options).
init([]) ->
% read configuration settings and register for configuration changes
RootDir = couch_config:get("couchdb", "view_index_dir"),
Self = self(),
ok = couch_config:register(
fun("couchdb", "view_index_dir")->
exit(Self, config_change)
end),
couch_db_update_notifier:start_link(
fun({deleted, DbName}) ->
gen_server:cast(couch_view, {reset_indexes, DbName});
({created, DbName}) ->
gen_server:cast(couch_view, {reset_indexes, DbName});
(_Else) ->
ok
end),
ets:new(couch_groups_by_db, [bag, private, named_table]),
ets:new(group_servers_by_sig, [set, protected, named_table]),
ets:new(couch_groups_by_updater, [set, private, named_table]),
process_flag(trap_exit, true),
ok = couch_file:init_delete_dir(RootDir),
{ok, #server{root_dir=RootDir}}.
terminate(_Reason, _Srv) ->
[couch_util:shutdown_sync(Pid) || {Pid, _} <-
ets:tab2list(couch_groups_by_updater)],
ok.
handle_call({get_group_server, DbName,
#group{name=GroupId,sig=Sig}=Group}, _From, #server{root_dir=Root}=Server) ->
case ets:lookup(group_servers_by_sig, {DbName, Sig}) of
[] ->
?LOG_DEBUG("Spawning new group server for view group ~s in database ~s.",
[GroupId, DbName]),
case (catch couch_view_group:start_link({Root, DbName, Group})) of
{ok, NewPid} ->
add_to_ets(NewPid, DbName, Sig),
{reply, {ok, NewPid}, Server};
{error, invalid_view_seq} ->
do_reset_indexes(DbName, Root),
case (catch couch_view_group:start_link({Root, DbName, Group})) of
{ok, NewPid} ->
add_to_ets(NewPid, DbName, Sig),
{reply, {ok, NewPid}, Server};
Error ->
{reply, Error, Server}
end;
Error ->
{reply, Error, Server}
end;
[{_, ExistingPid}] ->
{reply, {ok, ExistingPid}, Server}
end.
handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) ->
do_reset_indexes(DbName, Root),
{noreply, Server}.
do_reset_indexes(DbName, Root) ->
% shutdown all the updaters and clear the files, the db got changed
Names = ets:lookup(couch_groups_by_db, DbName),
lists:foreach(
fun({_DbName, Sig}) ->
?LOG_DEBUG("Killing update process for view group ~s. in database ~s.", [Sig, DbName]),
[{_, Pid}] = ets:lookup(group_servers_by_sig, {DbName, Sig}),
couch_util:shutdown_sync(Pid),
delete_from_ets(Pid, DbName, Sig)
end, Names),
delete_index_dir(Root, DbName),
RootDelDir = couch_config:get("couchdb", "view_index_dir"),
couch_file:delete(RootDelDir, Root ++ "/." ++ ?b2l(DbName) ++ "_temp").
handle_info({'EXIT', FromPid, Reason}, Server) ->
case ets:lookup(couch_groups_by_updater, FromPid) of
[] ->
if Reason /= normal ->
% non-updater linked process died, we propagate the error
?LOG_ERROR("Exit on non-updater process: ~p", [Reason]),
exit(Reason);
true -> ok
end;
[{_, {DbName, GroupId}}] ->
delete_from_ets(FromPid, DbName, GroupId)
end,
{noreply, Server}.
add_to_ets(Pid, DbName, Sig) ->
true = ets:insert(couch_groups_by_updater, {Pid, {DbName, Sig}}),
true = ets:insert(group_servers_by_sig, {{DbName, Sig}, Pid}),
true = ets:insert(couch_groups_by_db, {DbName, Sig}).
delete_from_ets(Pid, DbName, Sig) ->
true = ets:delete(couch_groups_by_updater, Pid),
true = ets:delete(group_servers_by_sig, {DbName, Sig}),
true = ets:delete_object(couch_groups_by_db, {DbName, Sig}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
delete_index_dir(RootDir, DbName) ->
nuke_dir(RootDir, RootDir ++ "/." ++ ?b2l(DbName) ++ "_design").
nuke_dir(RootDelDir, Dir) ->
case file:list_dir(Dir) of
{error, enoent} -> ok; % doesn't exist
{ok, Files} ->
lists:foreach(
fun(File)->
Full = Dir ++ "/" ++ File,
case couch_file:delete(RootDelDir, Full, false) of
ok -> ok;
{error, eperm} ->
ok = nuke_dir(RootDelDir, Full)
end
end,
Files),
ok = file:del_dir(Dir)
end.
% keys come back in the language of btree - tuples.
less_json_ids({JsonA, IdA}, {JsonB, IdB}) ->
case less_json0(JsonA, JsonB) of
0 ->
IdA < IdB;
Result ->
Result < 0
end.
less_json(A,B) ->
less_json0(A,B) < 0.
less_json0(A,A) -> 0;
less_json0(A,B) when is_atom(A), is_atom(B) -> atom_sort(A) - atom_sort(B);
less_json0(A,_) when is_atom(A) -> -1;
less_json0(_,B) when is_atom(B) -> 1;
less_json0(A,B) when is_number(A), is_number(B) -> A - B;
less_json0(A,_) when is_number(A) -> -1;
less_json0(_,B) when is_number(B) -> 1;
less_json0(A,B) when is_binary(A), is_binary(B) -> couch_util:collate(A,B);
less_json0(A,_) when is_binary(A) -> -1;
less_json0(_,B) when is_binary(B) -> 1;
less_json0(A,B) when is_list(A), is_list(B) -> less_list(A,B);
less_json0(A,_) when is_list(A) -> -1;
less_json0(_,B) when is_list(B) -> 1;
less_json0({A},{B}) when is_list(A), is_list(B) -> less_props(A,B);
less_json0({A},_) when is_list(A) -> -1;
less_json0(_,{B}) when is_list(B) -> 1.
atom_sort(null) -> 1;
atom_sort(false) -> 2;
atom_sort(true) -> 3.
less_props([], [_|_]) ->
-1;
less_props(_, []) ->
1;
less_props([{AKey, AValue}|RestA], [{BKey, BValue}|RestB]) ->
case couch_util:collate(AKey, BKey) of
0 ->
case less_json0(AValue, BValue) of
0 ->
less_props(RestA, RestB);
Result ->
Result
end;
Result ->
Result
end.
less_list([], [_|_]) ->
-1;
less_list(_, []) ->
1;
less_list([A|RestA], [B|RestB]) ->
case less_json0(A,B) of
0 ->
less_list(RestA, RestB);
Result ->
Result
end.