Merge remote-tracking branch 'cloudant/3184-retry-recompact-failure'
diff --git a/src/couch_mrview_compactor.erl b/src/couch_mrview_compactor.erl
index 9dba094..e343ac8 100644
--- a/src/couch_mrview_compactor.erl
+++ b/src/couch_mrview_compactor.erl
@@ -26,6 +26,7 @@
total_changes
}).
+-define(DEFAULT_RECOMPACT_RETRY_COUNT, 3).
compact(_Db, State, Opts) ->
case lists:member(recompact, Opts) of
@@ -148,16 +149,41 @@
recompact(State) ->
+ recompact(State, recompact_retry_count()).
+
+recompact(_State, 0) ->
+ erlang:error(exceeded_recompact_retry_count);
+
+recompact(State, RetryCount) ->
+ Self = self(),
link(State#mrst.fd),
{Pid, Ref} = erlang:spawn_monitor(fun() ->
- couch_index_updater:update(couch_mrview_index, State)
+ couch_index_updater:update(Self, couch_mrview_index, State)
end),
+ recompact_loop(Pid, Ref, State, RetryCount).
+
+recompact_loop(Pid, Ref, State, RetryCount) ->
receive
+ {'$gen_cast', {new_state, State2}} ->
+ % We've made progress so reset RetryCount
+ recompact_loop(Pid, Ref, State2, recompact_retry_count());
{'DOWN', Ref, _, _, {updated, Pid, State2}} ->
unlink(State#mrst.fd),
- {ok, State2}
+ {ok, State2};
+ {'DOWN', Ref, _, _, Reason} ->
+ unlink(State#mrst.fd),
+ couch_log:warning("Error during recompaction: ~r", [Reason]),
+ recompact(State, RetryCount - 1)
end.
+recompact_retry_count() ->
+ config:get_integer(
+ "view_compaction",
+ "recompact_retry_count",
+ ?DEFAULT_RECOMPACT_RETRY_COUNT
+ ).
+
+
compact_log(LogBtree, BufferSize, Acc0) ->
FoldFun = fun(KV, Acc) ->
#acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize} = Acc,
@@ -265,3 +291,39 @@
erlang:demonitor(OldState#mrst.fd_monitor, [flush]),
{ok, NewState#mrst{fd_monitor=Ref}}.
+
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+recompact_test_() ->
+ [
+ recompact_success_after_progress(),
+ recompact_exceeded_retry_count()
+ ].
+
+recompact_success_after_progress() ->
+ ?_test(begin
+ ok = meck:expect(couch_index_updater, update, fun
+ (Pid, _, #mrst{update_seq=0} = State) ->
+ Pid ! {'$gen_cast', {new_state, State#mrst{update_seq=1}}};
+ (_, _, State) ->
+ exit({updated, self(), State})
+ end),
+ State = #mrst{fd=self(), update_seq=0},
+ ?assertEqual({ok, State#mrst{update_seq=1}}, recompact(State)),
+ meck:unload(couch_index_updater)
+ end).
+
+recompact_exceeded_retry_count() ->
+ ?_test(begin
+ ok = meck:expect(couch_index_updater, update,
+ fun(_, _, _) ->
+ exit(error)
+ end),
+ State = #mrst{fd=self()},
+ ?assertError(exceeded_recompact_retry_count, recompact(State)),
+ meck:unload(couch_index_updater)
+ end).
+
+-endif.