Updated PoC of cost accounting
diff --git a/src/couch/src/couch_btree.erl b/src/couch/src/couch_btree.erl
index 2c75287..2d64ea4 100644
--- a/src/couch/src/couch_btree.erl
+++ b/src/couch/src/couch_btree.erl
@@ -472,6 +472,7 @@
get_node(#btree{fd = Fd}, NodePos) ->
{ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos),
+ couch_cost:inc_get_node(NodeType),
{NodeType, NodeList}.
write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) ->
@@ -1077,7 +1078,6 @@
stream_node(Bt, Reds, Node, InRange, Dir, Fun, Acc) ->
Pointer = element(1, Node),
{NodeType, NodeList} = get_node(Bt, Pointer),
- couch_cost:inc_get_node(NodeType),
case NodeType of
kp_node ->
stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), InRange, Dir, Fun, Acc);
@@ -1164,6 +1164,7 @@
false ->
{stop, {PrevKVs, Reds}, Acc};
true ->
+ couch_cost:inc_changes_processed(),
AssembledKV = assemble(Bt, K, V),
case Fun(visit, AssembledKV, {PrevKVs, Reds}, Acc) of
{ok, Acc2} ->
diff --git a/src/couch/src/couch_cost.erl b/src/couch/src/couch_cost.erl
index 2592950..ebb25cf 100644
--- a/src/couch/src/couch_cost.erl
+++ b/src/couch/src/couch_cost.erl
@@ -3,22 +3,33 @@
-export([
inc_doc/0, inc_doc/1, inc_doc/2,
inc_ioq/0, inc_ioq/1, inc_ioq/2,
- inc_get_node/1
+ inc_get_node/1,
+ inc_db_open/0,
+ inc_js_filter/0, inc_js_filter/1, inc_js_filter/2,
+ inc_js_filtered_docs/1, inc_js_filtered_docs/2,
+ inc_changes_processed/0, inc_changes_processed/1, inc_changes_processed/2
%%io_bytes_read/1, io_bytes_read/2,
%%io_bytes_written/1, io_bytes_written/2,
%%inc_js_evals/0, inc_js_evals/1, inc_js_evals/2
]).
-export([
- get_cost/0
+ get_cost/0,
+ get_costs/0,
+ accumulate_costs/1
]).
+
-record(cost, {
+ db_open = 0,
docs_read = 0,
+ changes_processed = 0,
ioq_calls = 0,
io_bytes_read = 0,
io_bytes_written = 0,
js_evals = 0,
+ js_filter = 0,
+ js_filtered_docs = 0,
get_kv_node = 0,
get_kp_node = 0
}).
@@ -33,9 +44,23 @@
Cost
end.
+get_costs() ->
+ case get(cost_accounting_context_aggregation) of
+ undefined ->
+ Costs = [],
+ put(cost_accounting_context_aggregation, Costs),
+ Costs;
+ Costs when is_list(Costs) ->
+ Costs
+ end.
+
update_cost(#cost{}=Cost) ->
put(cost_accounting_context, Cost).
+accumulate_costs(#cost{}=Cost) ->
+ Costs = get_costs(),
+ put(cost_accounting_context_aggregation, [Cost | Costs]).
+
inc_doc() -> inc_doc(1).
inc_doc(N) -> inc_doc(N, get_cost()).
inc_doc(N, #cost{docs_read=DR0}=Cost) -> update_cost(Cost#cost{docs_read=DR0+N}).
@@ -50,6 +75,25 @@
update_cost(inc(Type, get_cost())).
+inc_js_filter() -> inc_js_filter(1).
+inc_js_filter(N) -> inc_js_filter(N, get_cost()).
+inc_js_filter(N, #cost{}=Cost) -> update_cost(inc(js_filter, Cost, N)).
+
+
+inc_js_filtered_docs(N) -> inc_js_filtered_docs(N, get_cost()).
+inc_js_filtered_docs(N, #cost{}=Cost) -> update_cost(inc(js_filtered_docs, Cost, N)).
+
+
+inc_changes_processed() -> inc_changes_processed(1).
+inc_changes_processed(N) -> inc_changes_processed(N, get_cost()).
+inc_changes_processed(N, #cost{}=Cost) -> update_cost(inc(changes_processed, Cost, N)).
+
+
+inc_db_open() -> inc_db_open(1).
+inc_db_open(N) -> inc_db_open(N, get_cost()).
+inc_db_open(N, #cost{}=Cost) -> update_cost(inc(db_open, Cost, N)).
+
+
inc(Key, Cost) ->
inc(Key, Cost, 1).
@@ -57,4 +101,12 @@
inc(kp_node, #cost{get_kp_node=GKP}=Cost, N) ->
Cost#cost{get_kp_node = GKP + N};
inc(kv_node, #cost{get_kv_node=GKV}=Cost, N) ->
- Cost#cost{get_kp_node = GKV + N}.
+ Cost#cost{get_kv_node = GKV + N};
+inc(changes_processed, #cost{changes_processed=CP}=Cost, N) ->
+ Cost#cost{changes_processed = CP + N};
+inc(db_open, #cost{db_open=DBO}=Cost, N) ->
+ Cost#cost{db_open = DBO + N};
+inc(js_filter, #cost{js_filter=JSF}=Cost, N) ->
+ Cost#cost{js_filter = JSF + N};
+inc(js_filtered_docs, #cost{js_filtered_docs=JSFD}=Cost, N) ->
+ Cost#cost{js_filtered_docs = JSFD + N}.
diff --git a/src/couch/src/couch_query_servers.erl b/src/couch/src/couch_query_servers.erl
index cb61940..641e832 100644
--- a/src/couch/src/couch_query_servers.erl
+++ b/src/couch/src/couch_query_servers.erl
@@ -527,6 +527,8 @@
{ok, Passes}.
filter_docs(Req, Db, DDoc, FName, Docs) ->
+ couch_cost:inc_js_filter(),
+ couch_cost:inc_js_filtered_docs(length(Docs)),
JsonReq =
case Req of
{json_req, JsonObj} ->
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index 23fdd58..4b4c213 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -96,6 +96,7 @@
gen_server:start_link({local, couch_server(N)}, couch_server, [N], []).
open(DbName, Options) ->
+ couch_cost:inc_db_open(),
try
validate_open_or_create(DbName, Options),
open_int(DbName, Options)
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index 34e0954..3715fa7 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -138,7 +138,8 @@
Ref = rexi:cast(Node, self(), MFA, [sync]),
try
receive
- {Ref, {ok, Db}} ->
+ {Ref, {ok, Db}, {cost, Cost}} ->
+ couch_cost:accumulate_costs(Cost),
{ok, Db};
{Ref, {'rexi_EXIT', {{unauthorized, _} = Error, _}}} ->
throw(Error);
@@ -146,7 +147,10 @@
throw(Error);
{Ref, Reason} ->
couch_log:debug("Failed to open shard ~p because: ~p", [Name, Reason]),
- get_shard(Rest, Opts, Timeout, Factor)
+ get_shard(Rest, Opts, Timeout, Factor);
+ Other ->
+ io:format("GOT UNEXPECTED MESSAGE FORMAT: ~p~n", [Other]),
+ erlang:error(Other)
after Timeout ->
couch_log:debug("Failed to open shard ~p after: ~p", [Name, Timeout]),
get_shard(Rest, Opts, Factor * Timeout, Factor)
diff --git a/src/rexi/src/rexi_utils.erl b/src/rexi/src/rexi_utils.erl
index ae05bf7..b0bb79a 100644
--- a/src/rexi/src/rexi_utils.erl
+++ b/src/rexi/src/rexi_utils.erl
@@ -84,7 +84,8 @@
{rexi, '$rexi_ping'} ->
{ok, Acc0};
{Ref, Msg, {cost,Cost}} ->
- io:format("GOT COST: ~p -- ~p~n", [Cost, Msg]),
+ io:format("[~p]GOT COST: ~p -- ~p~n", [self(), Cost, Msg]),
+ couch_cost:accumulate_costs(Cost),
case lists:keyfind(Ref, Keypos, RefList) of
false ->
% this was some non-matching message which we will ignore
@@ -94,7 +95,8 @@
end;
{Ref, From, Msg, {cost,Cost}} ->
%%io:format("GOT COST: ~p~n", [Cost]),
- io:format("GOT COST: ~p -- ~p~n", [Cost, Msg]),
+ io:format("[~p]GOT COST: ~p -- ~p~n", [self(), Cost, Msg]),
+ couch_cost:accumulate_costs(Cost),
case lists:keyfind(Ref, Keypos, RefList) of
false ->
{ok, Acc0};
@@ -102,7 +104,7 @@
Fun(Msg, {Worker, From}, Acc0)
end;
{Ref, Msg} ->
- io:format("GOT NON COST MSG: ~p~n", [Msg]),
+ %%io:format("GOT NON COST MSG: ~p~n", [Msg]),
case lists:keyfind(Ref, Keypos, RefList) of
false ->
% this was some non-matching message which we will ignore
@@ -111,7 +113,7 @@
Fun(Msg, Worker, Acc0)
end;
{Ref, From, Msg} ->
- io:format("GOT NON COST MSG: ~p~n", [Msg]),
+ %%io:format("GOT NON COST MSG: ~p~n", [Msg]),
case lists:keyfind(Ref, Keypos, RefList) of
false ->
{ok, Acc0};