Merge remote-tracking branch 'cloudant/3184-retry-recompact-failure'
diff --git a/src/couch_mrview_compactor.erl b/src/couch_mrview_compactor.erl
index 9dba094..e343ac8 100644
--- a/src/couch_mrview_compactor.erl
+++ b/src/couch_mrview_compactor.erl
@@ -26,6 +26,7 @@
    total_changes
 }).
 
+-define(DEFAULT_RECOMPACT_RETRY_COUNT, 3).
 
 compact(_Db, State, Opts) ->
     case lists:member(recompact, Opts) of
@@ -148,16 +149,41 @@
 
 
 recompact(State) ->
+    recompact(State, recompact_retry_count()).
+
+recompact(_State, 0) ->
+    erlang:error(exceeded_recompact_retry_count);
+
+recompact(State, RetryCount) ->
+    Self = self(),
     link(State#mrst.fd),
     {Pid, Ref} = erlang:spawn_monitor(fun() ->
-        couch_index_updater:update(couch_mrview_index, State)
+        couch_index_updater:update(Self, couch_mrview_index, State)
     end),
+    recompact_loop(Pid, Ref, State, RetryCount).
+
+recompact_loop(Pid, Ref, State, RetryCount) ->
     receive
+        {'$gen_cast', {new_state, State2}} ->
+            % We've made progress so reset RetryCount
+            recompact_loop(Pid, Ref, State2, recompact_retry_count());
         {'DOWN', Ref, _, _, {updated, Pid, State2}} ->
             unlink(State#mrst.fd),
-            {ok, State2}
+            {ok, State2};
+        {'DOWN', Ref, _, _, Reason} ->
+            unlink(State#mrst.fd),
+            couch_log:warning("Error during recompaction: ~r", [Reason]),
+            recompact(State, RetryCount - 1)
     end.
 
+recompact_retry_count() ->
+    config:get_integer(
+        "view_compaction",
+        "recompact_retry_count",
+        ?DEFAULT_RECOMPACT_RETRY_COUNT
+    ).
+
+
 compact_log(LogBtree, BufferSize, Acc0) ->
     FoldFun = fun(KV, Acc) ->
         #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize} = Acc,
@@ -265,3 +291,39 @@
     erlang:demonitor(OldState#mrst.fd_monitor, [flush]),
     
     {ok, NewState#mrst{fd_monitor=Ref}}.
+
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+recompact_test_() ->
+    [
+        recompact_success_after_progress(),
+        recompact_exceeded_retry_count()
+    ].
+
+recompact_success_after_progress() ->
+    ?_test(begin
+        ok = meck:expect(couch_index_updater, update, fun
+            (Pid, _, #mrst{update_seq=0} = State) ->
+                Pid ! {'$gen_cast', {new_state, State#mrst{update_seq=1}}};
+            (_, _, State) ->
+                exit({updated, self(), State})
+        end),
+        State = #mrst{fd=self(), update_seq=0},
+        ?assertEqual({ok, State#mrst{update_seq=1}}, recompact(State)),
+        meck:unload(couch_index_updater)
+    end).
+
+recompact_exceeded_retry_count() ->
+    ?_test(begin
+        ok = meck:expect(couch_index_updater, update,
+            fun(_, _, _) ->
+                exit(error)
+        end),
+        State = #mrst{fd=self()},
+        ?assertError(exceeded_recompact_retry_count, recompact(State)),
+        meck:unload(couch_index_updater)
+    end).
+
+-endif.