| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_stats_resource_tracker). |
| |
| -behaviour(gen_server). |
| |
| -export([ |
| start_link/0, |
| init/1, |
| handle_call/3, |
| handle_cast/2, |
| handle_info/2, |
| code_change/3, |
| terminate/2 |
| ]). |
| |
| -export([ |
| pause_eviction/0, |
| enable_eviction/0, |
| eviction_status/0 |
| ]). |
| |
| -export([ |
| inc/1, inc/2, |
| maybe_inc/2, |
| get_pid_ref/0, |
| accumulate_delta/1 |
| ]). |
| |
| -export([ |
| close_pid_ref/0, close_pid_ref/1, |
| create_context/3, |
| create_coordinator_context/1, create_coordinator_context/2, |
| destroy_context/0, destroy_context/1, |
| is_enabled/0, |
| get_resource/0, |
| get_resource/1, |
| set_context_dbname/1, |
| set_context_handler_fun/1, |
| set_context_username/1, |
| track/1, tracker/1, |
| stop_tracker/0, stop_tracker/1, stop_tracker/2, |
| should_track/1 |
| ]). |
| |
| -export([ |
| active/0, |
| active_coordinators/0, |
| active_workers/0, |
| find_unmonitored/0 |
| ]). |
| |
| -export([ |
| count_by/1, |
| group_by/2, |
| group_by/3, |
| group_by/4, |
| sorted/1, |
| sorted_by/1, |
| sorted_by/2, |
| sorted_by/3, |
| |
| find_by_pid/1, |
| |
| unsafe_foldl/3, |
| |
| term_to_flat_json/1 |
| ]). |
| |
| -export([ |
| make_delta/0 |
| ]). |
| |
| %% Singular increment operations |
| -export([ |
| db_opened/0, |
| doc_read/0, |
| row_read/0, |
| btree_fold/0, |
| ioq_called/0, |
| js_evaled/0, |
| js_filtered/0, |
| js_filtered_error/0, |
| js_filtered_doc/0, |
| mango_match_evaled/0, |
| get_kv_node/0, |
| get_kp_node/0 |
| ]). |
| |
| %% Plural increment operations |
| -export([ |
| js_filtered_docs/1, |
| io_bytes_read/1, |
| io_bytes_written/1 |
| ]). |
| |
| -export([ |
| field/2, |
| curry_field/1 |
| ]). |
| |
| -include_lib("couch/include/couch_db.hrl"). |
| |
| %% Use these for record upgrades over the wire and in ETS tables |
| %% TODO: alternatively, just delete these. Currently using a map |
| %% for shipping deltas over the wire, avoiding much of the |
| %% problem here. We'll likely still need to handle upgrades to |
| %% map format over time, so let's decide a course of action here. |
| -define(RCTX_V1, rctx_v1). |
| -define(RCTX, ?RCTX_V1). |
| |
| -define(MANGO_EVAL_MATCH, mango_eval_match). |
| -define(DB_OPEN_DOC, docs_read). |
| -define(DB_OPEN, db_open). |
| -define(COUCH_SERVER_OPEN, db_open). |
| -define(COUCH_BT_FOLDS, btree_folds). |
| -define(COUCH_BT_GET_KP_NODE, get_kp_node). |
| -define(COUCH_BT_GET_KV_NODE, get_kv_node). |
| -define(COUCH_JS_FILTER, js_filter). |
| -define(COUCH_JS_FILTER_ERROR, js_filter_error). |
| -define(COUCH_JS_FILTERED_DOCS, js_filtered_docs). |
| -define(ROWS_READ, rows_read). |
| |
| %% TODO: overlap between this and couch btree fold invocations |
| %% TODO: need some way to distinguish fols on views vs find vs all_docs |
| -define(FRPC_CHANGES_ROW, changes_processed). |
| -define(FRPC_CHANGES_RETURNED, changes_returned). |
| %%-define(FRPC_CHANGES_ROW, ?ROWS_READ). |
| |
| %% Module pdict markers |
| -define(DELTA_TA, csrt_delta_ta). |
| -define(DELTA_TZ, csrt_delta_tz). %% T Zed instead of T0 |
| -define(PID_REF, csrt_pid_ref). %% track local ID |
| -define(TRACKER_PID, csrt_tracker). %% tracker pid |
| |
| |
| -record(st, { |
| eviction = enabled, %% or paused |
| ev_queue = [], %% eviction queue for when eviction is paused |
| eviction_delay = 10 * 1000, %% How many ms dead processes are visible |
| scan_interval = 2048, %% How regularly to perfom scans |
| tracking = #{} %% track active processes for eventual eviction |
| }). |
| |
| |
| %% TODO: switch to: |
| %% -record(?RCTX, { |
| -record(rctx, { |
| %% Metadata |
| started_at = tnow(), |
| updated_at = tnow(), |
| exited_at, %% TODO: do we need a final exit time and additional update times afterwards? |
| pid_ref, |
| mon_ref, |
| mfa, |
| nonce, |
| from, |
| type = unknown, %% unknown/background/system/rpc/coordinator/fabric_rpc/etc_rpc/etc |
| state = alive, |
| dbname, |
| username, |
| path, |
| |
| %% Stats counters |
| db_open = 0, |
| docs_read = 0, |
| rows_read = 0, |
| btree_folds = 0, |
| changes_processed = 0, |
| changes_returned = 0, |
| ioq_calls = 0, |
| io_bytes_read = 0, |
| io_bytes_written = 0, |
| js_evals = 0, |
| js_filter = 0, |
| js_filter_error = 0, |
| js_filtered_docs = 0, |
| mango_eval_match = 0, |
| %% TODO: switch record definitions to be macro based, eg: |
| %% ?COUCH_BT_GET_KP_NODE = 0, |
| get_kv_node = 0, |
| get_kp_node = 0 |
| }). |
| |
| %% monotonic time now in millisecionds |
| tnow() -> |
| erlang:monotonic_time(millisecond). |
| |
| is_enabled() -> |
| config:get_boolean(?MODULE_STRING, "enabled", true). |
| |
| should_scan() -> |
| config:get_boolean(?MODULE_STRING, "scans_enabled", false). |
| |
| db_opened() -> inc(db_opened). |
| doc_read() -> inc(docs_read). |
| row_read() -> inc(rows_read). |
| btree_fold() -> inc(?COUCH_BT_FOLDS). |
| %% TODO: do we need ioq_called and this access pattern? |
| ioq_called() -> is_enabled() andalso inc(ioq_calls). |
| js_evaled() -> inc(js_evals). |
| js_filtered() -> inc(js_filter). |
| js_filtered_error() -> inc(js_filter_error). |
| js_filtered_doc() -> inc(js_filtered_docs). |
| mango_match_evaled() -> inc(mango_eval_match). |
| get_kv_node() -> inc(get_kv_node). |
| get_kp_node() -> inc(get_kp_node). |
| |
| js_filtered_docs(N) -> inc(js_filtered_docs, N). |
| io_bytes_read(N) -> inc(io_bytes_read, N). |
| io_bytes_written(N) -> inc(io_bytes_written, N). |
| |
| inc(?DB_OPEN) -> |
| inc(?DB_OPEN, 1); |
| inc(docs_read) -> |
| inc(docs_read, 1); |
| inc(?ROWS_READ) -> |
| inc(?ROWS_READ, 1); |
| inc(?FRPC_CHANGES_RETURNED) -> |
| inc(?FRPC_CHANGES_RETURNED, 1); |
| inc(?COUCH_BT_FOLDS) -> |
| inc(?COUCH_BT_FOLDS, 1); |
| inc(ioq_calls) -> |
| inc(ioq_calls, 1); |
| inc(io_bytes_read) -> |
| inc(io_bytes_read, 1); |
| inc(io_bytes_written) -> |
| inc(io_bytes_written, 1); |
| inc(js_evals) -> |
| inc(js_evals, 1); |
| inc(?COUCH_JS_FILTER) -> |
| inc(?COUCH_JS_FILTER, 1); |
| inc(?COUCH_JS_FILTER_ERROR) -> |
| inc(?COUCH_JS_FILTER_ERROR, 1); |
| inc(?COUCH_JS_FILTERED_DOCS) -> |
| inc(?COUCH_JS_FILTERED_DOCS, 1); |
| inc(?MANGO_EVAL_MATCH) -> |
| inc(?MANGO_EVAL_MATCH, 1); |
| inc(?COUCH_BT_GET_KP_NODE) -> |
| inc(?COUCH_BT_GET_KP_NODE, 1); |
| inc(?COUCH_BT_GET_KV_NODE) -> |
| inc(?COUCH_BT_GET_KV_NODE, 1); |
| inc(_) -> |
| 0. |
| |
| |
| inc(?DB_OPEN, N) -> |
| update_counter(#rctx.?DB_OPEN, N); |
| inc(?ROWS_READ, N) -> |
| update_counter(#rctx.?ROWS_READ, N); |
| inc(?FRPC_CHANGES_RETURNED, N) -> |
| update_counter(#rctx.?FRPC_CHANGES_RETURNED, N); |
| inc(ioq_calls, N) -> |
| update_counter(#rctx.ioq_calls, N); |
| inc(io_bytes_read, N) -> |
| update_counter(#rctx.io_bytes_read, N); |
| inc(io_bytes_written, N) -> |
| update_counter(#rctx.io_bytes_written, N); |
| inc(js_evals, N) -> |
| update_counter(#rctx.js_evals, N); |
| inc(?COUCH_JS_FILTER, N) -> |
| update_counter(#rctx.?COUCH_JS_FILTER, N); |
| inc(?COUCH_JS_FILTER_ERROR, N) -> |
| update_counter(#rctx.?COUCH_JS_FILTER_ERROR, N); |
| inc(?COUCH_JS_FILTERED_DOCS, N) -> |
| update_counter(#rctx.?COUCH_JS_FILTERED_DOCS, N); |
| inc(?MANGO_EVAL_MATCH, N) -> |
| update_counter(#rctx.?MANGO_EVAL_MATCH, N); |
| inc(?DB_OPEN_DOC, N) -> |
| update_counter(#rctx.?DB_OPEN_DOC, N); |
| inc(?FRPC_CHANGES_ROW, N) -> |
| update_counter(#rctx.?ROWS_READ, N); %% TODO: rework double use of rows_read |
| inc(?COUCH_BT_GET_KP_NODE, N) -> |
| update_counter(#rctx.?COUCH_BT_GET_KP_NODE, N); |
| inc(?COUCH_BT_GET_KV_NODE, N) -> |
| update_counter(#rctx.?COUCH_BT_GET_KV_NODE, N); |
| inc(_, _) -> |
| %% inc needs to allow unknown types to pass for accumulate_update to handle |
| %% updates from nodes with newer data formats |
| 0. |
| |
| maybe_inc([mango, evaluate_selector], Val) -> |
| inc(?MANGO_EVAL_MATCH, Val); |
| maybe_inc([couchdb, database_reads], Val) -> |
| inc(?DB_OPEN_DOC, Val); |
| maybe_inc([fabric_rpc, changes, processed], Val) -> |
| inc(?FRPC_CHANGES_ROW, Val); |
| maybe_inc([fabric_rpc, changes, returned], Val) -> |
| inc(?FRPC_CHANGES_RETURNED, Val); |
| maybe_inc([fabric_rpc, view, rows_read], Val) -> |
| inc(?ROWS_READ, Val); |
| maybe_inc([couchdb, couch_server, open], Val) -> |
| inc(?DB_OPEN, Val); |
| maybe_inc([couchdb, btree, folds], Val) -> |
| inc(?COUCH_BT_FOLDS, Val); |
| maybe_inc([couchdb, btree, kp_node], Val) -> |
| inc(?COUCH_BT_GET_KP_NODE, Val); |
| maybe_inc([couchdb, btree, kv_node], Val) -> |
| inc(?COUCH_BT_GET_KV_NODE, Val); |
| maybe_inc([couchdb, query_server, js_filter_error], Val) -> |
| inc(?COUCH_JS_FILTER_ERROR, Val); |
| maybe_inc([couchdb, query_server, js_filter], Val) -> |
| inc(?COUCH_JS_FILTER, Val); |
| maybe_inc([couchdb, query_server, js_filtered_docs], Val) -> |
| inc(?COUCH_JS_FILTERED_DOCS, Val); |
| maybe_inc(_Metric, _Val) -> |
| %%io:format("SKIPPING MAYBE_INC METRIC[~p]: ~p~n", [Val, Metric]), |
| 0. |
| |
| |
| %% TODO: update stats_descriptions.cfg for relevant apps |
| should_track([fabric_rpc, all_docs, spawned]) -> |
| is_enabled(); |
| should_track([fabric_rpc, changes, spawned]) -> |
| is_enabled(); |
| should_track([fabric_rpc, changes, processed]) -> |
| is_enabled(); |
| should_track([fabric_rpc, changes, returned]) -> |
| is_enabled(); |
| should_track([fabric_rpc, map_view, spawned]) -> |
| is_enabled(); |
| should_track([fabric_rpc, reduce_view, spawned]) -> |
| is_enabled(); |
| should_track([fabric_rpc, get_all_security, spawned]) -> |
| is_enabled(); |
| should_track([fabric_rpc, open_doc, spawned]) -> |
| is_enabled(); |
| should_track([fabric_rpc, update_docs, spawned]) -> |
| is_enabled(); |
| should_track([fabric_rpc, open_shard, spawned]) -> |
| is_enabled(); |
| should_track([mango_cursor, view, all_docs]) -> |
| is_enabled(); |
| should_track([mango_cursor, view, idx]) -> |
| is_enabled(); |
| should_track(_Metric) -> |
| %%io:format("SKIPPING METRIC: ~p~n", [Metric]), |
| false. |
| |
| accumulate_delta(Delta) when is_map(Delta) -> |
| %% TODO: switch to creating a batch of updates to invoke a single |
| %% update_counter rather than sequentially invoking it for each field |
| is_enabled() andalso maps:foreach(fun inc/2, Delta); |
| accumulate_delta(undefined) -> |
| ok; |
| accumulate_delta(Other) -> |
| io:format("CSRT:ACC_DELTA UNKNOWN DELTA: ~p~n", [Other]). |
| |
| |
| update_counter(Field, Count) -> |
| is_enabled() andalso update_counter(get_pid_ref(), Field, Count). |
| |
| |
| update_counter(undefined, _Field, _Count) -> |
| ok; |
| update_counter({_Pid,_Ref}=PidRef, Field, Count) -> |
| %% TODO: mem3 crashes without catch, why do we lose the stats table? |
| is_enabled() andalso catch ets:update_counter(?MODULE, PidRef, {Field, Count}, #rctx{pid_ref=PidRef}). |
| |
| |
| active() -> active_int(all). |
| active_coordinators() -> active_int(coordinators). |
| active_workers() -> active_int(workers). |
| |
| |
| active_int(coordinators) -> |
| select_by_type(coordinators); |
| active_int(workers) -> |
| select_by_type(workers); |
| active_int(all) -> |
| lists:map(fun to_flat_json/1, ets:tab2list(?MODULE)). |
| |
| |
| select_by_type(coordinators) -> |
| ets:select(couch_stats_resource_tracker, |
| [{#rctx{type = {coordinator,'_','_'}, _ = '_'}, [], ['$_']}]); |
| select_by_type(workers) -> |
| ets:select(couch_stats_resource_tracker, |
| [{#rctx{type = {worker,'_','_'}, _ = '_'}, [], ['$_']}]); |
| select_by_type(all) -> |
| lists:map(fun to_flat_json/1, ets:tab2list(?MODULE)). |
| |
| |
| field(#rctx{pid_ref=Val}, pid_ref) -> Val; |
| %% NOTE: Pros and cons to doing these convert functions here |
| %% Ideally, this would be done later so as to prefer the core data structures |
| %% as long as possible, but we currently need the output of this function to |
| %% be jiffy:encode'able. The tricky bit is dynamically encoding the group_by |
| %% structure provided by the caller of *_by aggregator functions below. |
| %% For now, we just always return jiffy:encode'able data types. |
| field(#rctx{mfa=Val}, mfa) -> convert_mfa(Val); |
| field(#rctx{nonce=Val}, nonce) -> Val; |
| field(#rctx{from=Val}, from) -> Val; |
| field(#rctx{type=Val}, type) -> convert_type(Val); |
| field(#rctx{state=Val}, state) -> Val; |
| field(#rctx{dbname=Val}, dbname) -> Val; |
| field(#rctx{username=Val}, username) -> Val; |
| field(#rctx{path=Val}, path) -> Val; |
| field(#rctx{db_open=Val}, db_open) -> Val; |
| field(#rctx{docs_read=Val}, docs_read) -> Val; |
| field(#rctx{rows_read=Val}, rows_read) -> Val; |
| field(#rctx{btree_folds=Val}, btree_folds) -> Val; |
| field(#rctx{changes_processed=Val}, changes_processed) -> Val; |
| field(#rctx{changes_returned=Val}, changes_returned) -> Val; |
| field(#rctx{ioq_calls=Val}, ioq_calls) -> Val; |
| field(#rctx{io_bytes_read=Val}, io_bytes_read) -> Val; |
| field(#rctx{io_bytes_written=Val}, io_bytes_written) -> Val; |
| field(#rctx{js_evals=Val}, js_evals) -> Val; |
| field(#rctx{js_filter=Val}, js_filter) -> Val; |
| field(#rctx{js_filter_error=Val}, js_filter_error) -> Val; |
| field(#rctx{js_filtered_docs=Val}, js_filtered_docs) -> Val; |
| field(#rctx{mango_eval_match=Val}, mango_eval_match) -> Val; |
| field(#rctx{get_kv_node=Val}, get_kv_node) -> Val; |
| field(#rctx{get_kp_node=Val}, get_kp_node) -> Val. |
| |
| |
| curry_field(Field) -> |
| fun(Ele) -> field(Ele, Field) end. |
| |
| |
| count_by(KeyFun) -> |
| group_by(KeyFun, fun(_) -> 1 end). |
| |
| |
| group_by(KeyFun, ValFun) -> |
| group_by(KeyFun, ValFun, fun erlang:'+'/2). |
| |
| |
| group_by(KeyFun, ValFun, AggFun) -> |
| Fold = case conf_get("fold_fun") of |
| "unsafe" -> |
| fun unsafe_foldl/3; |
| _ -> |
| fun ets:foldl/3 |
| end, |
| group_by(KeyFun, ValFun, AggFun, Fold). |
| |
| |
| %% eg: group_by(mfa, docs_read). |
| %% eg: group_by(fun(#rctx{mfa=MFA,docs_read=DR}) -> {MFA, DR} end, ioq_calls). |
| %% eg: ^^ or: group_by([mfa, docs_read], ioq_calls). |
| %% eg: group_by([username, dbname, mfa], docs_read). |
| %% eg: group_by([username, dbname, mfa], ioq_calls). |
| %% eg: group_by([username, dbname, mfa], js_filters). |
| group_by(KeyL, ValFun, AggFun, Fold) when is_list(KeyL) -> |
| KeyFun = fun(Ele) -> list_to_tuple([field(Ele, Key) || Key <- KeyL]) end, |
| group_by(KeyFun, ValFun, AggFun, Fold); |
| group_by(Key, ValFun, AggFun, Fold) when is_atom(Key) -> |
| group_by(curry_field(Key), ValFun, AggFun, Fold); |
| group_by(KeyFun, Val, AggFun, Fold) when is_atom(Val) -> |
| group_by(KeyFun, curry_field(Val), AggFun, Fold); |
| group_by(KeyFun, ValFun, AggFun, Fold) -> |
| FoldFun = fun(Ele, Acc) -> |
| Key = KeyFun(Ele), |
| Val = ValFun(Ele), |
| CurrVal = maps:get(Key, Acc, 0), |
| NewVal = AggFun(CurrVal, Val), |
| %% TODO: should we skip here? how to make this optional? |
| case NewVal > 0 of |
| true -> |
| maps:put(Key, NewVal, Acc); |
| false -> |
| Acc |
| end |
| end, |
| case conf_get("fold_fun") of |
| "unsafe" -> |
| %% When fold_fun is unsafe we must pause deletion of keys as |
| %% otherwise `ets:next/2` will fail when the supplied key has |
| %% already been deleted |
| pause_eviction(), |
| try |
| Fold(FoldFun, #{}, ?MODULE) |
| after |
| enable_eviction() |
| end; |
| _ -> |
| Fold(FoldFun, #{}, ?MODULE) |
| end. |
| |
| |
| %% Sorts largest first |
| sorted(Map) when is_map(Map) -> |
| lists:sort(fun({_K1, A}, {_K2, B}) -> B < A end, maps:to_list(Map)). |
| |
| shortened(L) -> |
| lists:sublist(L, 10). |
| |
| |
| %% eg: sorted_by([username, dbname, mfa], ioq_calls) |
| %% eg: sorted_by([dbname, mfa], doc_reads) |
| sorted_by(KeyFun) -> shortened(sorted(count_by(KeyFun))). |
| sorted_by(KeyFun, ValFun) -> shortened(sorted(group_by(KeyFun, ValFun))). |
| sorted_by(KeyFun, ValFun, AggFun) -> shortened(sorted(group_by(KeyFun, ValFun, AggFun))). |
| |
| |
| term_to_flat_json({shutdown, Reason0}) when is_atom(Reason0) -> |
| Reason = atom_to_binary(Reason0), |
| <<"shutdown: ", Reason/binary>>; |
| term_to_flat_json({type, Atom}) when is_atom(Atom) -> |
| atom_to_binary(Atom); |
| term_to_flat_json({type, {coordinator, Verb0, Path0}}=_Type) -> |
| Verb = atom_to_binary(Verb0), |
| Path = list_to_binary(Path0), |
| <<"coordinator:", Verb/binary, ":", Path/binary>>; |
| term_to_flat_json({type, {worker, M0, F0}}=_Type) -> |
| M = atom_to_binary(M0), |
| F = atom_to_binary(F0), |
| <<"worker:", M/binary, ":", F/binary>>; |
| term_to_flat_json(Tuple) when is_tuple(Tuple) -> |
| erlang:tuple_to_list(Tuple); |
| term_to_flat_json(Pid) when is_pid(Pid) -> |
| ?l2b(pid_to_list(Pid)); |
| term_to_flat_json(Ref) when is_reference(Ref) -> |
| ?l2b(ref_to_list(Ref)); |
| term_to_flat_json(Atom) when is_atom(Atom) -> |
| atom_to_binary(Atom); |
| term_to_flat_json(undefined) -> |
| null; |
| term_to_flat_json(null) -> |
| null; |
| term_to_flat_json(T) -> |
| T. |
| |
| to_flat_json(#rctx{}=Rctx) -> |
| #rctx{ |
| updated_at = TP, |
| started_at = TInit, |
| pid_ref = {Pid0, Ref0}, |
| mfa = MFA0, |
| nonce = Nonce0, |
| from = From0, |
| dbname = DbName, |
| username = UserName, |
| db_open = DbOpens, |
| docs_read = DocsRead, |
| rows_read = RowsRead, |
| js_filter = JSFilters, |
| js_filter_error = JSFilterErrors, |
| js_filtered_docs = JSFilteredDocss, |
| state = State0, |
| type = Type, |
| get_kp_node = KpNodes, |
| get_kv_node = KvNodes, |
| btree_folds = ChangesProcessed, |
| changes_returned = ChangesReturned, |
| ioq_calls = IoqCalls |
| } = Rctx, |
| Pid = term_to_flat_json(Pid0), |
| Ref = term_to_flat_json(Ref0), |
| PidRef = <<Pid/binary, ":", Ref/binary>>, |
| MFA = case MFA0 of |
| {M0, F0, A0} -> |
| M = atom_to_binary(M0), |
| F = atom_to_binary(F0), |
| A = integer_to_binary(A0), |
| <<M/binary, ":", F/binary, "/", A/binary>>; |
| MFA0 when is_list(MFA0) -> |
| MFA0; |
| undefined -> |
| null; |
| OtherMFA -> |
| throw({error, {unexpected, OtherMFA}}) |
| end, |
| From = case From0 of |
| {Parent0, ParentRef0} -> |
| Parent = term_to_flat_json(Parent0), |
| ParentRef = term_to_flat_json(ParentRef0), |
| <<Parent/binary, ":", ParentRef/binary>>; |
| undefined -> |
| null |
| end, |
| State = case State0 of |
| alive -> |
| alive; |
| {down, Reason0} -> |
| Reason = term_to_flat_json(Reason0), |
| <<"down: ", Reason/binary>> |
| end, |
| Nonce = case Nonce0 of |
| undefined -> |
| null; |
| Nonce0 -> |
| list_to_binary(Nonce0) |
| end, |
| #{ |
| updated_at => TP, |
| started_at => TInit, |
| pid_ref => PidRef, |
| mfa => MFA, |
| nonce => Nonce, |
| from => From, |
| dbname => DbName, |
| username => UserName, |
| db_open => DbOpens, |
| docs_read => DocsRead, |
| js_filter => JSFilters, |
| js_filter_error => JSFilterErrors, |
| js_filtered_docs => JSFilteredDocss, |
| rows_read => RowsRead, |
| state => State, |
| type => term_to_flat_json({type, Type}), |
| kp_nodes => KpNodes, |
| kv_nodes => KvNodes, |
| btree_folds => ChangesProcessed, |
| changes_returned => ChangesReturned, |
| ioq_calls => IoqCalls |
| }. |
| |
| |
| convert_mfa(MFA) when is_list(MFA) -> |
| list_to_binary(MFA); |
| convert_mfa({M0, F0, A0}) -> |
| M = atom_to_binary(M0), |
| F = atom_to_binary(F0), |
| A = integer_to_binary(A0), |
| <<M/binary, ":", F/binary, "/", A/binary>>; |
| convert_mfa(undefined) -> |
| null. |
| |
| convert_type(Atom) when is_atom(Atom) -> |
| atom_to_binary(Atom); |
| convert_type({coordinator, Verb0, Atom0}) when is_atom(Atom0) -> |
| Verb = atom_to_binary(Verb0), |
| Atom = atom_to_binary(Atom0), |
| <<"coordinator:", Verb/binary, ":", Atom/binary>>; |
| convert_type({coordinator, Verb0, Path0}) -> |
| Verb = atom_to_binary(Verb0), |
| Path = list_to_binary(Path0), |
| <<"coordinator:", Verb/binary, ":", Path/binary>>; |
| convert_type({worker, M0, F0}) -> |
| M = atom_to_binary(M0), |
| F = atom_to_binary(F0), |
| <<"worker:", M/binary, ":", F/binary>>. |
| |
| get_pid_ref() -> |
| get(?PID_REF). |
| |
| |
| create_pid_ref() -> |
| case get(?PID_REF) of |
| undefined -> |
| ok; |
| PidRef0 -> |
| %% TODO: what to do when it already exists? |
| throw({epidexist, PidRef0}), |
| close_pid_ref(PidRef0) |
| end, |
| PidRef = {self(), make_ref()}, |
| set_pid_ref(PidRef), |
| PidRef. |
| |
| |
| close_pid_ref() -> |
| close_pid_ref(get_pid_ref()). |
| |
| |
| close_pid_ref(undefined) -> |
| undefined; |
| close_pid_ref(_PidRef) -> |
| erase(?PID_REF). |
| |
| destroy_context() -> |
| destroy_context(get_pid_ref()). |
| |
| |
| destroy_context(undefined) -> |
| ok; |
| destroy_context({_, _} = PidRef) -> |
| stop_tracker(PidRef), |
| close_pid_ref(PidRef), |
| gen_server:cast(?MODULE, {destroy, PidRef}), |
| ok. |
| |
| |
| create_resource(#rctx{} = Rctx) -> |
| %% true = ets:insert(?MODULE, Rctx). |
| catch ets:insert(?MODULE, Rctx). |
| |
| |
| %% add type to disnguish coordinator vs rpc_worker |
| create_context(From, {M,F,_A} = MFA, Nonce) -> |
| case is_enabled() of |
| false -> |
| undefined; |
| true -> |
| PidRef = create_pid_ref(), |
| %% TODO: extract user_ctx and db/shard from |
| Rctx = #rctx{ |
| pid_ref = PidRef, |
| from = From, |
| mfa = MFA, |
| type = {worker, M, F}, |
| nonce = Nonce |
| }, |
| track(Rctx), |
| erlang:put(?DELTA_TZ, Rctx), |
| create_resource(Rctx), |
| PidRef |
| end. |
| |
| create_coordinator_context(#httpd{path_parts=Parts} = Req) -> |
| is_enabled() andalso create_coordinator_context(Req, io_lib:format("~p", [Parts])). |
| |
| create_coordinator_context(#httpd{} = Req, Path) -> |
| case is_enabled() of |
| false -> |
| ok; |
| true -> |
| #httpd{ |
| method = Verb, |
| nonce = Nonce |
| } = Req, |
| PidRef = create_pid_ref(), |
| Rctx = #rctx{ |
| pid_ref = PidRef, |
| %%type = {coordinator, Verb, Path}, |
| type = {coordinator, Verb, init}, |
| nonce = Nonce, |
| path = list_to_binary([$/ | Path]) |
| }, |
| track(Rctx), |
| erlang:put(?DELTA_TZ, Rctx), |
| create_resource(Rctx), |
| PidRef |
| end. |
| |
| set_context_dbname(DbName) -> |
| set_context_dbname(DbName, get_pid_ref()). |
| |
| set_context_dbname(_, undefined) -> |
| ok; |
| set_context_dbname(DbName, PidRef) -> |
| case is_enabled() of |
| false -> |
| ok; |
| true -> |
| catch case ets:update_element(?MODULE, PidRef, [{#rctx.dbname, DbName}]) of |
| false -> |
| Stk = try throw(42) catch _:_:Stk0 -> Stk0 end, |
| io:format("UPDATING DBNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [DbName, get_resource(), Stk, process_info(self(), current_stacktrace)]), |
| timer:sleep(1000), |
| erlang:halt(kaboomz); |
| true -> |
| true |
| end |
| end. |
| |
| set_context_handler_fun(Fun) when is_function(Fun) -> |
| set_context_handler_fun(Fun, get_pid_ref()). |
| |
| set_context_handler_fun(_, undefined) -> |
| ok; |
| set_context_handler_fun(Fun, PidRef) when is_function(Fun) -> |
| case is_enabled() of |
| false -> |
| ok; |
| true -> |
| FunName = erlang:fun_to_list(Fun), |
| #rctx{type={coordinator, Verb, _}} = get_resource(), |
| Update = [{#rctx.type, {coordinator, Verb, FunName}}], |
| catch case ets:update_element(?MODULE, PidRef, Update) of |
| false -> |
| Stk = try throw(42) catch _:_:Stk0 -> Stk0 end, |
| io:format("UPDATING HANDLER FUN[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [FunName, get_resource(), Stk, process_info(self(), current_stacktrace)]), |
| timer:sleep(1000), |
| erlang:halt(kaboomz); |
| true -> |
| true |
| end |
| end. |
| |
| set_context_username(null) -> |
| ok; |
| set_context_username(UserName) -> |
| set_context_username(UserName, get_pid_ref()). |
| |
| set_context_username(_, undefined) -> |
| ok; |
| set_context_username(UserName, PidRef) -> |
| case is_enabled() of |
| false -> |
| ok; |
| true -> |
| catch case ets:update_element(?MODULE, PidRef, [{#rctx.username, UserName}]) of |
| false -> |
| Stk = try throw(42) catch _:_:Stk0 -> Stk0 end, |
| io:format("UPDATING USERNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [UserName, get_resource(), Stk, process_info(self(), current_stacktrace)]), |
| timer:sleep(1000), |
| erlang:halt(kaboomz); |
| true -> |
| true |
| end |
| end. |
| |
| track(#rctx{}=Rctx) -> |
| case conf_get("spawn_monitor", "true") of |
| "true" -> |
| track(Rctx, spawn_monitor); |
| _ -> |
| track(Rctx, gen_server) |
| end. |
| |
| track(#rctx{pid_ref=PidRef}, spawn_monitor) -> |
| case get_tracker() of |
| undefined -> |
| Pid = spawn(?MODULE, tracker, [PidRef]), |
| put_tracker(Pid); |
| Pid when is_pid(Pid) -> |
| Pid |
| end; |
| track(#rctx{pid_ref=PidRef}, gen_server) -> |
| %% TODO: should this block or not? If no, what cleans up zombies? |
| %% gen_server:call(?MODULE, {track, PR}). |
| gen_server:cast(?MODULE, {track, PidRef}). |
| |
| get_tracker() -> |
| get(?TRACKER_PID). |
| |
| put_tracker(Pid) when is_pid(Pid) -> |
| put(?TRACKER_PID, Pid). |
| |
| tracker({Pid, _Ref}=PidRef) -> |
| MonRef = erlang:monitor(process, Pid), |
| should_scan() andalso catch ets:update_element(?MODULE, PidRef, [{#rctx.mon_ref, MonRef}]), |
| receive |
| stop -> |
| %% TODO: do we need cleanup here? |
| %%cleanup_tracker(PidRef, <<"shutdown:stopped">>), |
| demonitor(MonRef), |
| ok; |
| {'DOWN', MonRef, _Type, _0DPid, Reason0} -> |
| Reason = case Reason0 of |
| {shutdown, Shutdown0} -> |
| Shutdown = atom_to_binary(Shutdown0), |
| <<"shutdown: ", Shutdown/binary>>; |
| Reason0 -> |
| Reason0 |
| end, |
| cleanup_tracker(PidRef, Reason) |
| end. |
| |
| cleanup_tracker(PidRef, Reason) -> |
| case is_logging_enabled() andalso get_resource(PidRef) of |
| #rctx{} = Rctx -> |
| should_log(Rctx) andalso log_process_lifetime_report(PidRef, Rctx); |
| _ -> |
| ok |
| end, |
| %% update stats |
| should_scan() andalso catch ets:update_element(?MODULE, PidRef, |
| [{#rctx.state, {down, Reason}}, {#rctx.updated_at, tnow()}]), |
| catch evict(PidRef). |
| |
| |
| stop_tracker() -> |
| stop_tracker(get_tracker(), get_pid_ref()). |
| |
| stop_tracker({_Pid, _Ref}=PidRef) -> |
| stop_tracker(get_tracker(), PidRef); |
| stop_tracker(Pid) when is_pid(Pid) -> |
| stop_tracker(Pid, get_pid_ref()). |
| |
| stop_tracker(undefined, _) -> |
| ok; |
| stop_tracker(Pid, PidRef) -> |
| should_scan() andalso catch ets:update_element(?MODULE, PidRef, [{#rctx.mon_ref, undefined}]), |
| Pid ! stop. |
| |
| |
| make_delta() -> |
| TA = case get(?DELTA_TA) of |
| undefined -> |
| %% Need to handle this better, can't just make a new T0 at T' as |
| %% the timestamps will be identical causing a divide by zero error. |
| %% |
| %% Realistically need to ensure that all invocations of database |
| %% operations sets T0 appropriately. Perhaps it's possible to do |
| %% this is the couch_db:open chain, and then similarly, in |
| %% couch_server, and uhhhh... couch_file, and... |
| %% |
| %% I think we need some type of approach for establishing a T0 that |
| %% doesn't result in outrageous deltas. For now zero out the |
| %% microseconds field, or subtract a second on the off chance that |
| %% microseconds is zero. I'm not uptodate on the latest Erlang time |
| %% libraries and don't remember how to easily get an |
| %% `os:timestamp()` out of now() - 100ms or some such. |
| %% |
| %% I think it's unavoidable that we'll have some codepaths that do |
| %% not properly instantiate the T0 at spawn resulting in needing to |
| %% do some time of "time warp" or ignoring the timing collection |
| %% entirely. Perhaps if we hoisted out the stats collection into |
| %% the primary flow of the database and funnel that through all the |
| %% function clauses we could then utilize Dialyzer to statically |
| %% analyze and assert all code paths that invoke database |
| %% operations have properly instantinated a T0 at the appropriate |
| %% start time such that we don't have to "fudge" deltas with a |
| %% missing start point, but we're a long ways from that happening |
| %% so I feel it necessary to address the NULL start time. |
| |
| %% Track how often we fail to initiate T0 correctly |
| %% Perhaps somewhat naughty we're incrementing stats from within |
| %% couch_stats itself? Might need to handle this differently |
| %% TODO: determine appropriate course of action here |
| %% io:format("~n**********MISSING STARTING DELTA************~n~n", []), |
| couch_stats:increment_counter( |
| [couchdb, csrt, delta_missing_t0]), |
| %%[couch_stats_resource_tracker, delta_missing_t0]), |
| |
| case erlang:get(?DELTA_TZ) of |
| undefined -> |
| TA0 = make_delta_base(), |
| %% TODO: handline missing deltas, otherwise divide by zero |
| set_delta_a(TA0), |
| TA0; |
| TA0 -> |
| TA0 |
| end; |
| #rctx{} = TA0 -> |
| TA0 |
| end, |
| TB = get_resource(), |
| Delta = make_delta(TA, TB), |
| set_delta_a(TB), |
| Delta. |
| |
| |
| make_delta(#rctx{}=TA, #rctx{}=TB) -> |
| Delta = #{ |
| docs_read => TB#rctx.docs_read - TA#rctx.docs_read, |
| js_filter => TB#rctx.js_filter - TA#rctx.js_filter, |
| js_filter_error => TB#rctx.js_filter_error - TA#rctx.js_filter_error, |
| js_filtered_docs => TB#rctx.js_filtered_docs - TA#rctx.js_filtered_docs, |
| rows_read => TB#rctx.rows_read - TA#rctx.rows_read, |
| changes_returned => TB#rctx.changes_returned - TA#rctx.changes_returned, |
| btree_folds => TB#rctx.btree_folds - TA#rctx.btree_folds, |
| get_kp_node => TB#rctx.get_kp_node - TA#rctx.get_kp_node, |
| get_kv_node => TB#rctx.get_kv_node - TA#rctx.get_kv_node, |
| db_open => TB#rctx.db_open - TA#rctx.db_open, |
| ioq_calls => TB#rctx.ioq_calls - TA#rctx.ioq_calls, |
| dt => TB#rctx.updated_at - TA#rctx.updated_at |
| }, |
| %% TODO: reevaluate this decision |
| %% Only return non zero (and also positive) delta fields |
| maps:filter(fun(_K,V) -> V > 0 end, Delta); |
| make_delta(_, #rctx{}) -> |
| #{error => missing_beg_rctx}; |
| make_delta(#rctx{}, _) -> |
| #{error => missing_fin_rctx}. |
| |
| make_delta_base() -> |
| make_delta_base(get_pid_ref()). |
| |
| %% TODO: what to do when PidRef=undefined? |
| make_delta_base(PidRef) -> |
| %% TODO: extract user_ctx and db/shard from request |
| Now = tnow(), |
| #rctx{ |
| pid_ref = PidRef, |
| %% TODO: confirm this subtraction works |
| started_at = Now - 100, %% give us 100ms rewind time for missing T0 |
| updated_at = Now |
| }. |
| |
| set_delta_a(TA) -> |
| erlang:put(?DELTA_TA, TA). |
| |
| set_pid_ref(PidRef) -> |
| erlang:put(?PID_REF, PidRef), |
| PidRef. |
| |
| get_resource() -> |
| get_resource(get_pid_ref()). |
| |
| get_resource(undefined) -> |
| undefined; |
| get_resource(PidRef) -> |
| catch get_resource_int(PidRef). |
| |
| |
| get_resource_int(undefined) -> |
| undefined; |
| get_resource_int(PidRef) -> |
| case ets:lookup(?MODULE, PidRef) of |
| [#rctx{}=TP] -> |
| TP; |
| [] -> |
| undefined |
| end. |
| |
| |
| find_unmonitored() -> |
| %% TODO: only need PidRef here, replace with a select that does that... |
| [PR || #rctx{pid_ref=PR} <- ets:match_object(?MODULE, #rctx{mon_ref=undefined, _ = '_'})]. |
| |
| |
| find_by_pid(Pid) -> |
| [R || #rctx{} = R <- ets:match_object(?MODULE, #rctx{pid_ref={Pid, '_'}, _ = '_'})]. |
| |
| |
| pause_eviction() -> |
| gen_server:call(?MODULE, pause_eviction). |
| |
| |
| enable_eviction() -> |
| gen_server:call(?MODULE, enable_eviction). |
| |
| |
| eviction_status() -> |
| gen_server:call(?MODULE, enable_eviction). |
| |
| |
| start_link() -> |
| gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). |
| |
| init([]) -> |
| ets:new(?MODULE, [ |
| named_table, |
| public, |
| {decentralized_counters, true}, %% TODO: test impact of this |
| {write_concurrency, true}, |
| {read_concurrency, true}, |
| {keypos, #rctx.pid_ref} |
| ]), |
| St = #st{}, |
| case is_enabled() andalso should_scan() of |
| false -> |
| ok; |
| true -> |
| _TimerRef = erlang:send_after(St#st.scan_interval, self(), scan) |
| end, |
| {ok, St}. |
| |
| handle_call(eviction_status, _from, #st{eviction=Eviction, ev_queue=EVQ} = St) -> |
| {reply, {ok, Eviction, length(EVQ)}, St}; |
| handle_call(pause_eviction, _from, #st{eviction=Eviction} = St) -> |
| {reply, {ok, Eviction}, St#st{eviction=paused}}; |
| handle_call(enable_eviction, _from, #st{eviction=Eviction, ev_queue=EVQ} = St) -> |
| evict(EVQ), |
| {reply, {ok, Eviction}, St#st{eviction=enabled, ev_queue=[]}}; |
| handle_call(fetch, _from, #st{} = St) -> |
| {reply, {ok, St}, St}; |
| handle_call({track, _}=Msg, _From, St) -> |
| {noreply, St1} = handle_cast(Msg, St), |
| {reply, ok, St1}; |
| handle_call(Msg, _From, St) -> |
| {stop, {unknown_call, Msg}, St}. |
| |
| handle_cast({destroy, {_,_}=PidRef}, #st{tracking=AT0} = St0) -> |
| AT = destroy_context_int(PidRef, AT0, <<"shutdown:completed">>), |
| {noreply, St0#st{tracking=AT}}; |
| handle_cast({track, {_,_}=PidRef}, #st{tracking=AT0} = St0) -> |
| AT = maybe_track(PidRef, AT0), |
| {noreply, St0#st{tracking=AT}}; |
| handle_cast(Msg, St) -> |
| {stop, {unknown_cast, Msg}, St}. |
| |
| handle_info(scan, #st{tracking=AT0} = St0) -> |
| Unmonitored = find_unmonitored(), |
| AT = maybe_track(Unmonitored, AT0), |
| _TimerRef = erlang:send_after(St0#st.scan_interval, self(), scan), |
| {noreply, St0#st{tracking=AT}}; |
| handle_info({'DOWN', MonRef, _Type, _DPid, Reason0}, #st{tracking=AT0} = St) -> |
| Reason = case Reason0 of |
| {shutdown, Shutdown0} -> |
| Shutdown = atom_to_binary(Shutdown0), |
| <<"shutdown: ", Shutdown/binary>>; |
| Reason0 -> |
| Reason0 |
| end, |
| %% TODO: moving to destroy_context_int lost assertion against RPid =:= DPid |
| %% NOTE: we can't flush with demonitor, so we might get a DOWN message for |
| %% an already destroyed context. |
| %% if |
| %% RPid =:= DPid -> ok; |
| %% true -> erlang:halt(...) |
| %% end, |
| AT = destroy_context_int(MonRef, AT0, Reason), |
| {noreply, St#st{tracking=AT}}; |
| handle_info({evict, {_Pid, _Ref}=PidRef}, #st{eviction=paused, ev_queue=EVQ}=St) -> |
| {noreply, St#st{ev_queue=[PidRef|EVQ]}}; |
| handle_info({evict, {_Pid, _Ref}=PidRef}, #st{}=St) -> |
| evict(PidRef), |
| {noreply, St}; |
| handle_info(Msg, St) -> |
| {stop, {unknown_info, Msg}, St}. |
| |
| terminate(_Reason, _St) -> |
| ok. |
| |
| code_change(_OldVsn, St, _Extra) -> |
| {ok, St}. |
| |
| |
| destroy_context_int(MonRef, AT0, Reason) when is_reference(MonRef) -> |
| drop_monitor(MonRef), |
| case maps:get(MonRef, AT0, undefined) of |
| undefined -> |
| %% Due to not flushing in demonitor we can now get an extra DOWN msg |
| AT0; |
| {_Pid, _Ref} = PidRef -> |
| AT = maps:remove(MonRef, maps:remove(PidRef, AT0)), |
| %% TODO: Assert Pid matches Object from MonRef DOWN msg |
| destroy_context_int(PidRef, AT, Reason) |
| end; |
| destroy_context_int({_Pid, _Ref}=PidRef, AT0, Reason) -> |
| AT = case maps:get(PidRef, AT0, undefined) of |
| undefined -> |
| AT0; |
| MonRef -> |
| drop_monitor(MonRef), |
| maps:remove(MonRef, maps:remove(PidRef, AT0)) |
| end, |
| should_scan() andalso ets:update_element(?MODULE, PidRef, |
| [{#rctx.state, {down, Reason}}, {#rctx.updated_at, tnow()}]), |
| log_process_lifetime_report(PidRef), |
| %% Delay eviction to allow human visibility on short lived pids |
| %%erlang:send_after(St0#st.eviction_delay, self(), {evict, PidRef}), |
| erlang:send_after(0, self(), {evict, PidRef}), |
| AT. |
| |
| |
| drop_monitor(MonRef) -> |
| %% can't use [flush] here or we melt erts_internal:flush_monitor_messages/3 |
| demonitor(MonRef). |
| |
| |
| evict([]) -> |
| true; |
| evict([PidRef|Rest]) -> |
| evict(PidRef), |
| evict(Rest); |
| evict(PidRef) -> |
| ets:delete(?MODULE, PidRef). |
| |
| |
| maybe_track([], AT) -> |
| AT; |
| maybe_track(PidRef, AT) when is_tuple(PidRef) -> |
| maybe_track([PidRef], AT); |
| maybe_track([{Pid,_Ref} = PidRef | PidRefs], AT) -> |
| AT1 = case maps:is_key(PidRef, AT) of |
| true -> %% noop, we're already tracking this PidRef |
| AT; |
| false -> %% setup new monitor and double bookkeep refs |
| Mon = erlang:monitor(process, Pid), |
| %% TODO: decide whether we want the true match to crash this process on failure |
| %%true = ets:update_element(?MODULE, PidRef, [{#rctx.mon_ref, Mon}]), |
| should_scan() andalso ets:update_element(?MODULE, PidRef, [{#rctx.mon_ref, Mon}]), |
| maps:put(Mon, PidRef, maps:put(PidRef, Mon, AT)) |
| end, |
| maybe_track(PidRefs, AT1). |
| |
| |
| log_process_lifetime_report(PidRef) -> |
| %% TODO: catch error out of here, report crashes on depth>1 json |
| %%io:format("CSRT RCTX: ~p~n", [to_flat_json(Rctx)]), |
| %% TODO: clean this up |
| case is_enabled() andalso is_logging_enabled() of |
| true -> |
| Rctx = get_resource_int(PidRef), |
| should_log(Rctx) andalso log_process_lifetime_report(PidRef, Rctx); |
| false -> |
| ok |
| end. |
| |
| log_process_lifetime_report(_PidRef, Rctx) -> |
| couch_log:report("csrt-pid-usage-lifetime", to_flat_json(Rctx)). |
| |
| is_logging_enabled() -> |
| logging_enabled() =/= false. |
| |
| logging_enabled() -> |
| case conf_get("log_pid_usage_report", "coordinator") of |
| "coordinator" -> |
| coordinator; |
| "true" -> |
| true; |
| _ -> |
| false |
| end. |
| |
| |
| should_log(undefined) -> |
| false; |
| should_log(#rctx{}=Rctx) -> |
| should_log(Rctx, logging_enabled()). |
| |
| |
| should_log(undefined, _) -> |
| false; |
| should_log(#rctx{}, true) -> |
| true; |
| should_log(#rctx{}, false) -> |
| false; |
| should_log(#rctx{type = {coordinator, _, _}}, coordinator) -> |
| true; |
| should_log(#rctx{type = {worker, fabric_rpc, FName}}, _) -> |
| case conf_get("log_fabric_rpc") of |
| "true" -> |
| true; |
| undefined -> |
| false; |
| Name -> |
| Name =:= atom_to_list(FName) |
| end; |
| should_log(#rctx{}, _) -> |
| false. |
| |
| |
| conf_get(Key) -> |
| conf_get(Key, undefined). |
| |
| |
| conf_get(Key, Default) -> |
| config:get(?MODULE_STRING, Key, Default). |
| |
| |
| %% Reimplementation of: https://github.com/erlang/otp/blob/b2ee4fc9a0b81a139dad2033e9b2bfc178146886/lib/stdlib/src/ets.erl#L633-L658 |
| %% with wrapping of ets:safe_fixtable/2 removed |
| unsafe_foldl(F, Accu, T) -> |
| First = ets:first(T), |
| do_foldl(F, Accu, First, T). |
| |
| do_foldl(F, Accu0, Key, T) -> |
| case Key of |
| '$end_of_table' -> |
| Accu0; |
| _ -> |
| do_foldl(F, |
| lists:foldl(F, Accu0, ets:lookup(T, Key)), |
| ets:next(T, Key), T) |
| end. |