| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_mrview_index). |
| |
| -export([get/2]). |
| -export([init/2, open/2, close/1, reset/1, delete/1, shutdown/1]). |
| -export([start_update/4, purge/4, process_doc/3, finish_update/1, commit/1]). |
| -export([compact/3, swap_compacted/2, remove_compacted/1]). |
| -export([index_file_exists/1]). |
| -export([update_local_purge_doc/2, verify_index_exists/2]). |
| -export([ensure_local_purge_docs/2]). |
| |
| -include_lib("couch/include/couch_db.hrl"). |
| -include_lib("couch_mrview/include/couch_mrview.hrl"). |
| |
| get(db_name, #mrst{db_name = DbName}) -> |
| DbName; |
| get(idx_name, #mrst{idx_name = IdxName}) -> |
| IdxName; |
| get(signature, #mrst{sig = Signature}) -> |
| Signature; |
| get(update_seq, #mrst{update_seq = UpdateSeq}) -> |
| UpdateSeq; |
| get(purge_seq, #mrst{purge_seq = PurgeSeq}) -> |
| PurgeSeq; |
| get(update_options, #mrst{design_opts = Opts}) -> |
| IncDesign = couch_util:get_value(<<"include_design">>, Opts, false), |
| LocalSeq = couch_util:get_value(<<"local_seq">>, Opts, false), |
| Partitioned = couch_util:get_value(<<"partitioned">>, Opts, false), |
| if |
| IncDesign -> [include_design]; |
| true -> [] |
| end ++ |
| if |
| LocalSeq -> [local_seq]; |
| true -> [] |
| end ++ |
| if |
| Partitioned -> [partitioned]; |
| true -> [] |
| end; |
| get(fd, #mrst{fd = Fd}) -> |
| Fd; |
| get(language, #mrst{language = Language}) -> |
| Language; |
| get(views, #mrst{views = Views}) -> |
| Views; |
| get(info, State) -> |
| #mrst{ |
| fd = Fd, |
| sig = Sig, |
| id_btree = IdBtree, |
| language = Lang, |
| update_seq = UpdateSeq, |
| purge_seq = PurgeSeq, |
| views = Views, |
| view_info = ViewInfo |
| } = State, |
| {ok, FileSize} = couch_file:bytes(Fd), |
| {ok, ExternalSize} = couch_mrview_util:calculate_external_size(Views), |
| {ok, ActiveViewSize} = couch_mrview_util:calculate_active_size(Views), |
| ActiveSize = couch_btree:size(IdBtree) + ActiveViewSize, |
| |
| UpdateOptions0 = get(update_options, State), |
| UpdateOptions = [atom_to_binary(O, latin1) || O <- UpdateOptions0], |
| CollVsTups = couch_mrview_util:get_collator_versions(ViewInfo), |
| CollVsBins = [couch_util:version_to_binary(V) || V <- CollVsTups], |
| {ok, [ |
| {signature, list_to_binary(couch_index_util:hexsig(Sig))}, |
| {language, Lang}, |
| {sizes, |
| {[ |
| {file, FileSize}, |
| {active, ActiveSize}, |
| {external, ExternalSize} |
| ]}}, |
| {update_seq, UpdateSeq}, |
| {purge_seq, PurgeSeq}, |
| {update_options, UpdateOptions}, |
| {collator_versions, CollVsBins} |
| ]}; |
| get(Other, _) -> |
| throw({unknown_index_property, Other}). |
| |
| init(Db, DDoc) -> |
| {ok, State} = couch_mrview_util:ddoc_to_mrst(couch_db:name(Db), DDoc), |
| {ok, set_partitioned(Db, State)}. |
| |
| open(Db, State0) -> |
| #mrst{ |
| db_name = DbName, |
| sig = Sig |
| } = State = set_partitioned(Db, State0), |
| IndexFName = couch_mrview_util:index_file(DbName, Sig), |
| |
| % If we are upgrading from <= 2.x, we upgrade the view |
| % index file on the fly, avoiding an index reset. |
| % We are making commit with a new state |
| % right after the upgrade to ensure |
| % that we have a proper sig in the header |
| % when open the view next time |
| % |
| % OldSig is `ok` if no upgrade happened. |
| % |
| % To remove support for 2.x auto-upgrades in the |
| % future, just remove the next line and the code |
| % between "upgrade code for <= 2.x" and |
| % "end of upgrade code for <= 2.x" and the corresponding |
| % code in couch_mrview_util |
| |
| OldSig = couch_mrview_util:maybe_update_index_file(State), |
| |
| case couch_mrview_util:open_file(IndexFName) of |
| {ok, Fd} -> |
| case couch_file:read_header(Fd) of |
| % upgrade code for <= 2.x |
| {ok, {OldSig, Header}} -> |
| % Matching view signatures. |
| NewSt = init_and_upgrade_state(Db, Fd, State, Header), |
| ensure_local_purge_doc(Db, NewSt), |
| {ok, NewSt}; |
| % end of upgrade code for <= 2.x |
| {ok, {Sig, Header}} -> |
| % Matching view signatures. |
| NewSt = init_and_upgrade_state(Db, Fd, State, Header), |
| ensure_local_purge_doc(Db, NewSt), |
| check_collator_versions(DbName, NewSt), |
| {ok, NewSt}; |
| {ok, {WrongSig, _}} -> |
| couch_log:error( |
| "~s has the wrong signature: expected: ~p but got ~p", |
| [IndexFName, Sig, WrongSig] |
| ), |
| NewSt = couch_mrview_util:reset_index(Db, Fd, State), |
| ensure_local_purge_doc(Db, NewSt), |
| {ok, NewSt}; |
| {ok, Else} -> |
| couch_log:error( |
| "~s has a bad header: got ~p", |
| [IndexFName, Else] |
| ), |
| NewSt = couch_mrview_util:reset_index(Db, Fd, State), |
| ensure_local_purge_doc(Db, NewSt), |
| {ok, NewSt}; |
| no_valid_header -> |
| NewSt = couch_mrview_util:reset_index(Db, Fd, State), |
| ensure_local_purge_doc(Db, NewSt), |
| {ok, NewSt} |
| end; |
| {error, Reason} = Error -> |
| couch_log:error( |
| "Failed to open view file '~s': ~s", |
| [IndexFName, file:format_error(Reason)] |
| ), |
| Error |
| end. |
| |
| close(State) -> |
| erlang:demonitor(State#mrst.fd_monitor, [flush]), |
| couch_file:close(State#mrst.fd). |
| |
| % This called after ddoc_updated event occurrs, and |
| % before we shutdown couch_index process. |
| % We unlink couch_index from corresponding couch_file and demonitor it. |
| % This allows all outstanding queries that are currently streaming |
| % data from couch_file finish successfully. |
| % couch_file will be closed automatically after all |
| % outstanding queries are done. |
| shutdown(State) -> |
| erlang:demonitor(State#mrst.fd_monitor, [flush]), |
| unlink(State#mrst.fd). |
| |
| delete(#mrst{db_name = DbName, sig = Sig} = State) -> |
| couch_file:close(State#mrst.fd), |
| catch couch_mrview_util:delete_files(DbName, Sig). |
| |
| reset(State) -> |
| couch_util:with_db(State#mrst.db_name, fun(Db) -> |
| NewState = couch_mrview_util:reset_index(Db, State#mrst.fd, State), |
| {ok, NewState} |
| end). |
| |
| start_update(PartialDest, State, NumChanges, NumChangesDone) -> |
| couch_mrview_updater:start_update( |
| PartialDest, |
| State, |
| NumChanges, |
| NumChangesDone |
| ). |
| |
| purge(Db, PurgeSeq, PurgedIdRevs, State) -> |
| couch_mrview_updater:purge(Db, PurgeSeq, PurgedIdRevs, State). |
| |
| process_doc(Doc, Seq, State) -> |
| couch_mrview_updater:process_doc(Doc, Seq, State). |
| |
| finish_update(State) -> |
| couch_mrview_updater:finish_update(State). |
| |
| commit(State) -> |
| Header = {State#mrst.sig, couch_mrview_util:make_header(State)}, |
| couch_file:write_header(State#mrst.fd, Header). |
| |
| compact(Db, State, Opts) -> |
| couch_mrview_compactor:compact(Db, State, Opts). |
| |
| swap_compacted(OldState, NewState) -> |
| couch_mrview_compactor:swap_compacted(OldState, NewState). |
| |
| remove_compacted(State) -> |
| couch_mrview_compactor:remove_compacted(State). |
| |
| index_file_exists(State) -> |
| #mrst{ |
| db_name = DbName, |
| sig = Sig |
| } = State, |
| IndexFName = couch_mrview_util:index_file(DbName, Sig), |
| filelib:is_file(IndexFName). |
| |
| verify_index_exists(DbName, Props) -> |
| try |
| Type = couch_util:get_value(<<"type">>, Props), |
| if |
| Type =/= <<"mrview">> -> |
| false; |
| true -> |
| DDocId = couch_util:get_value(<<"ddoc_id">>, Props), |
| couch_util:with_db(DbName, fun(Db) -> |
| case couch_db:get_design_doc(Db, DDocId) of |
| {ok, #doc{} = DDoc} -> |
| {ok, IdxState} = couch_mrview_util:ddoc_to_mrst( |
| DbName, DDoc |
| ), |
| IdxSig = IdxState#mrst.sig, |
| SigInLocal = couch_util:get_value( |
| <<"signature">>, Props |
| ), |
| couch_index_util:hexsig(IdxSig) == SigInLocal; |
| {not_found, _} -> |
| false |
| end |
| end) |
| end |
| catch |
| _:_ -> |
| false |
| end. |
| |
| set_partitioned(Db, State) -> |
| #mrst{ |
| design_opts = DesignOpts |
| } = State, |
| DbPartitioned = couch_db:is_partitioned(Db), |
| ViewPartitioned = couch_util:get_value( |
| <<"partitioned">>, DesignOpts, DbPartitioned |
| ), |
| IsPartitioned = DbPartitioned andalso ViewPartitioned, |
| State#mrst{partitioned = IsPartitioned}. |
| |
| ensure_local_purge_docs(DbName, DDocs) -> |
| couch_util:with_db(DbName, fun(Db) -> |
| lists:foreach( |
| fun(DDoc) -> |
| try couch_mrview_util:ddoc_to_mrst(DbName, DDoc) of |
| {ok, MRSt} -> |
| ensure_local_purge_doc(Db, MRSt) |
| catch |
| _:_ -> |
| ok |
| end |
| end, |
| DDocs |
| ) |
| end). |
| |
| ensure_local_purge_doc(Db, #mrst{} = State) -> |
| Sig = couch_index_util:hexsig(get(signature, State)), |
| DocId = couch_mrview_util:get_local_purge_doc_id(Sig), |
| case couch_db:open_doc(Db, DocId, []) of |
| {not_found, _} -> |
| create_local_purge_doc(Db, State); |
| {ok, _} -> |
| ok |
| end. |
| |
| create_local_purge_doc(Db, State) -> |
| PurgeSeq = couch_db:get_purge_seq(Db), |
| update_local_purge_doc(Db, State, PurgeSeq). |
| |
| update_local_purge_doc(Db, State) -> |
| update_local_purge_doc(Db, State, get(purge_seq, State)). |
| |
| update_local_purge_doc(Db, State, PSeq) -> |
| Sig = couch_index_util:hexsig(State#mrst.sig), |
| DocId = couch_mrview_util:get_local_purge_doc_id(Sig), |
| {Mega, Secs, _} = os:timestamp(), |
| NowSecs = Mega * 1000000 + Secs, |
| BaseDoc = couch_doc:from_json_obj( |
| {[ |
| {<<"_id">>, DocId}, |
| {<<"type">>, <<"mrview">>}, |
| {<<"purge_seq">>, PSeq}, |
| {<<"updated_on">>, NowSecs}, |
| {<<"ddoc_id">>, get(idx_name, State)}, |
| {<<"signature">>, Sig} |
| ]} |
| ), |
| Doc = |
| case couch_db:open_doc(Db, DocId, []) of |
| {ok, #doc{revs = Revs}} -> |
| BaseDoc#doc{revs = Revs}; |
| {not_found, _} -> |
| BaseDoc |
| end, |
| couch_db:update_doc(Db, Doc, []). |
| |
| init_and_upgrade_state(Db, Fd, State, Header) -> |
| {Commit, #mrst{} = Mrst} = couch_mrview_util:init_state(Db, Fd, State, Header), |
| case Commit of |
| true -> |
| case couch_mrview_util:commit_on_header_upgrade() of |
| true -> |
| LogMsg = "~p : Index ~s ~s was upgraded", |
| DbName = couch_db:name(Db), |
| IdxName = State#mrst.idx_name, |
| couch_log:warning(LogMsg, [?MODULE, DbName, IdxName]), |
| ok = commit(Mrst), |
| Mrst; |
| false -> |
| Mrst |
| end; |
| false -> |
| Mrst |
| end. |
| |
| % Check if there are multiple collator versions used to build this view |
| check_collator_versions(DbName, #mrst{} = Mrst) -> |
| case couch_mrview_util:compact_on_collator_upgrade() of |
| true -> |
| #mrst{view_info = ViewInfo, idx_name = IdxName} = Mrst, |
| Vers = couch_mrview_util:get_collator_versions(ViewInfo), |
| case length(Vers) >= 2 of |
| true -> |
| Event = {index_collator_upgrade, IdxName}, |
| couch_event:notify(DbName, Event); |
| false -> |
| ok |
| end; |
| false -> |
| ok |
| end. |