Merge pull request #3933 from cloudant/debug-for-sharded-index-server

Debug for sharded index server
diff --git a/src/couch/src/couch_debug.erl b/src/couch/src/couch_debug.erl
index a2f4cdc..cfa9947 100644
--- a/src/couch/src/couch_debug.erl
+++ b/src/couch/src/couch_debug.erl
@@ -31,11 +31,31 @@
     map_tree/2,
     fold_tree/3,
     linked_processes_info/2,
-    print_linked_processes/1
+    print_linked_processes/1,
+    busy/2,
+    busy/3,
+    restart/1,
+    restart_busy/2,
+    restart_busy/3,
+    restart_busy/4
 ]).
 
+-type throw(_Reason) :: no_return().
+
+-type process_name() :: atom().
+-type function_name() :: atom().
+-type busy_properties() ::
+    heap_size
+    | memory
+    | message_queue_len
+    | reductions
+    | total_heap_size.
+
+-spec help() -> [function_name()].
+
 help() ->
     [
+        busy,
         opened_files,
         opened_files_by_regexp,
         opened_files_contains,
@@ -45,11 +65,34 @@
         map,
         fold,
         linked_processes_info,
-        print_linked_processes
+        print_linked_processes,
+        restart,
+        restart_busy
     ].
 
--spec help(Function :: atom()) -> ok.
+-spec help(Function :: function_name()) -> ok.
 %% erlfmt-ignore
+help(busy) ->
+    io:format("
+    busy(ProcessList, Threshold)
+    busy(ProcessList, Threshold, Property)
+    --------------
+
+    Iterate over given list of named processes and returns the ones with
+    a Property value greater than provided Threshold.
+
+    If Property is not specified we use message box size
+
+    Properties which can be used are listed below
+
+    - heap_size
+    - memory
+    - message_queue_len (default)
+    - reductions
+    - total_heap_size
+
+    ---
+    ", []);
 help(opened_files) ->
     io:format("
     opened_files()
@@ -93,6 +136,42 @@
 
     ---
     ", []);
+help(restart) ->
+    io:format("
+    restart(ServerName)
+    --------------
+
+    Restart a process with given ServerName and wait for
+    replacement process to start.
+    ---
+    ", []);
+help(restart_busy) ->
+    io:format("
+    restart_busy(ProcessList, Thereshold)
+    restart_busy(ProcessList, Thereshold, DelayInMsec)
+    --------------
+
+    Iterate over given list of named processes and returns the ones with
+    a Property value greater than provided Threshold.
+
+    Then it restart the identified processes.
+
+    If Property is not specified we use message box size
+
+    Properties which can be used are listed below
+
+    - heap_size
+    - memory
+    - message_queue_len (default)
+    - reductions
+    - total_heap_size
+
+    The restarts happen sequentially with a given DelayInMsec between them.
+    If DelayInMsec is not provided the default value is one second.
+    The function doesn't proceed to next process until
+    the replacement process starts.
+    ---
+    ", []);
 help(link_tree) ->
     io:format("
     link_tree(Pid)
@@ -202,6 +281,30 @@
     io:format("    ---~n", []),
     ok.
 
+-spec busy(ProcessList :: [process_name()], Threshold :: pos_integer()) ->
+    [Name :: process_name()].
+
+busy(ProcessList, Threshold) when Threshold > 0 ->
+    busy(ProcessList, Threshold, message_queue_len).
+
+-spec busy(
+    ProcessList :: [process_name()], Threshold :: pos_integer(), Property :: busy_properties()
+) ->
+    [Name :: process_name()].
+
+busy(ProcessList, Threshold, Property) when Threshold > 0 ->
+    lists:filter(
+        fun(Name) ->
+            case (catch process_info(whereis(Name), Property)) of
+                {Property, Value} when is_integer(Value) andalso Value > Threshold ->
+                    true;
+                _ ->
+                    false
+            end
+        end,
+        ProcessList
+    ).
+
 -spec opened_files() ->
     [{port(), CouchFilePid :: pid(), Fd :: pid() | tuple(), FilePath :: string()}].
 
@@ -421,6 +524,15 @@
 
 print_couch_index_server_processes() ->
     Info = [reductions, message_queue_len, memory],
+    Trees = lists:map(
+        fun(Name) ->
+            link_tree(whereis(Name), Info, fun(P, Props) ->
+                IdStr = process_name(P),
+                {IdStr, [{id, id(IdStr, P, Props)} | Props]}
+            end)
+        end,
+        couch_index_server:names()
+    ),
     TableSpec = [
         {50, left, name},
         {12, centre, reductions},
@@ -428,12 +540,7 @@
         {14, centre, memory},
         {id}
     ],
-
-    Tree = link_tree(whereis(couch_index_server), Info, fun(P, Props) ->
-        IdStr = process_name(P),
-        {IdStr, [{id, id(IdStr, P, Props)} | Props]}
-    end),
-    print_tree(Tree, TableSpec).
+    print_trees(Trees, TableSpec).
 
 shorten_path(Path) ->
     ViewDir = list_to_binary(config:get("couchdb", "view_index_dir")),
@@ -446,6 +553,57 @@
     <<_:Len/binary, Rest/binary>> = File,
     binary_to_list(Rest).
 
+-spec restart(Name :: process_name()) ->
+    Pid :: pid() | timeout.
+
+restart(Name) ->
+    Res = test_util:with_process_restart(Name, fun() ->
+        exit(whereis(Name), kill)
+    end),
+    case Res of
+        {Pid, true} ->
+            Pid;
+        timeout ->
+            timeout
+    end.
+
+-spec restart_busy(ProcessList :: [process_name()], Threshold :: pos_integer()) ->
+    throw({timeout, Name :: process_name()}).
+
+restart_busy(ProcessList, Threshold) ->
+    restart_busy(ProcessList, Threshold, 1000).
+
+-spec restart_busy(
+    ProcessList :: [process_name()], Thershold :: pos_integer(), DelayInMsec :: pos_integer()
+) ->
+    throw({timeout, Name :: process_name()}) | ok.
+
+restart_busy(ProcessList, Threshold, DelayInMsec) ->
+    restart_busy(ProcessList, Threshold, DelayInMsec, message_queue_len).
+
+-spec restart_busy(
+    ProcessList :: [process_name()],
+    Thershold :: pos_integer(),
+    DelayInMsec :: pos_integer(),
+    Property :: busy_properties()
+) ->
+    throw({timeout, Name :: process_name()}) | ok.
+
+restart_busy(ProcessList, Threshold, DelayInMsec, Property) when
+    Threshold > 0 andalso DelayInMsec > 0
+->
+    lists:foreach(
+        fun(Name) ->
+            case restart(Name) of
+                timeout ->
+                    throw({timeout, Name});
+                _ ->
+                    timer:sleep(DelayInMsec)
+            end
+        end,
+        busy(ProcessList, Threshold, Property)
+    ).
+
 %% Pretty print functions
 
 %% Limmitations:
@@ -464,9 +622,36 @@
     end),
     ok.
 
+print_trees(Trees, TableSpec) ->
+    io:format("~s~n", [format(TableSpec)]),
+    io:format("~s~n", [separator(TableSpec)]),
+    lists:foreach(
+        fun(Tree) ->
+            map_tree(Tree, fun(_, {Id, Props}, Pos) ->
+                io:format("~s~n", [table_row(Id, Pos * 2, Props, TableSpec)])
+            end),
+            io:format("~s~n", [space(TableSpec)])
+        end,
+        Trees
+    ),
+    ok.
+
 format(Spec) ->
     Fields = [format_value(Format) || Format <- Spec],
-    string:join(Fields, "|").
+    [$| | string:join(Fields, "|")].
+
+fill(Spec, [Char]) ->
+    fill(Spec, Char);
+fill(Spec, Char) when is_integer(Char) ->
+    Fields = [format_value(Format) || Format <- Spec],
+    Sizes = [length(F) || F <- Fields],
+    [$| | [string:join([string:chars(Char, F) || F <- Sizes], "|")]].
+
+space(Spec) ->
+    fill(Spec, " ").
+
+separator(Spec) ->
+    fill(Spec, "-").
 
 format_value({Value}) -> term2str(Value);
 format_value({Width, Align, Value}) -> string:Align(term2str(Value), Width).
@@ -486,7 +671,7 @@
 table_row(Key, Indent, Props, [{KeyWidth, Align, _} | Spec]) ->
     Values = [bind_value(Format, Props) || Format <- Spec],
     KeyStr = string:Align(term2str(Key), KeyWidth - Indent),
-    [string:copies(" ", Indent), KeyStr, "|" | format(Values)].
+    [$|, string:copies(" ", Indent), KeyStr | format(Values)].
 
 -ifdef(TEST).
 -include_lib("couch/include/couch_eunit.hrl").
diff --git a/src/couch_index/src/couch_index_debug.erl b/src/couch_index/src/couch_index_debug.erl
new file mode 100644
index 0000000..3de7fad
--- /dev/null
+++ b/src/couch_index/src/couch_index_debug.erl
@@ -0,0 +1,171 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_debug).
+
+-export([
+    help/0,
+    help/1
+]).
+
+-export([
+    names/0,
+    print_linked_processes/0,
+    busy/1,
+    busy/2,
+    restart_busy/1,
+    restart_busy/2,
+    restart_busy/3
+]).
+
+-type throw(_Reason) :: no_return().
+
+-type process_name() :: atom().
+-type function_name() :: atom().
+
+help() ->
+    [
+        %% list of provided commands
+        names,
+        print_linked_processes,
+        busy,
+        restart_busy
+    ].
+
+-spec help(Function :: function_name()) -> ok.
+%% erlfmt-ignore
+help(names) ->
+    io:format("
+    names()
+    --------------
+
+    Returns list of named processes which constitutes
+    a sharded couch_index_server
+    ---
+    ", []);
+help(print_linked_processes) ->
+    io:format("
+    print_linked_processes()
+    --------------
+
+    Print cluster of linked processes. The output would look like similar to:
+
+    |name                                              | reductions | message_queue_len |    memory    |id
+    |--------------------------------------------------|------------|-------------------|--------------|--
+    |index_server_1[<0.320.0>]                         |    1115    |         0         |    17000     |
+    |  couch_secondary_services[<0.312.0>]             |   93258    |         0         |    68600     |
+    |  couch_event_listener:do_init/3[<0.323.0>]       |    195     |         0         |     2856     |
+    |    index_server_1[<0.320.0>]                     |    1115    |         0         |    17000     |
+    |                                                  |            |                   |              |
+    |index_server_2[<0.324.0>]                         |    278     |         0         |     6088     |
+    |  couch_secondary_services[<0.312.0>]             |   93260    |         0         |    68600     |
+    |  couch_event_listener:do_init/3[<0.326.0>]       |    161     |         0         |     2856     |
+    |    index_server_2[<0.324.0>]                     |    278     |         0         |     6088     |
+    ---
+    ", []);
+help(busy) ->
+    io:format("
+    busy(Thereshold)
+    busy(Thereshold, Property)
+    --------------
+
+    Finds list of couch_index_server processes and returns the ones with
+    a Property value greater than provided Threshold.
+
+    If Property is not specified we use message box size
+
+    Properties which can be used are listed below
+
+    - heap_size
+    - memory
+    - message_queue_len (default)
+    - reductions
+    - total_heap_size
+    ---
+    ", []);
+help(restart_busy) ->
+    io:format("
+    restart_busy(Thereshold)
+    restart_busy(Thereshold, DelayInMsec)
+    restart_busy(Thereshold, DelayInMsec, Property)
+    --------------
+
+    Finds list of couch_index_server processes and returns the ones with
+    a Property value greater than provided Threshold.
+
+    Then it restart the identified processes.
+
+    If Property is not specified we use message box size
+
+    Properties which can be used are listed below
+
+    - heap_size
+    - memory
+    - message_queue_len (default)
+    - reductions
+    - total_heap_size
+
+    The restarts happen sequentially with a given DelayInMsec between them.
+    If DelayInMsec is not provided the default value is one second.
+    The function doesn't proceed to next server until the replacement server
+    process starts.
+    ---
+    ", []);
+help(Unknown) ->
+    io:format("Unknown function: `~p`. Please try one of the following:~n", [Unknown]),
+    [io:format("    - ~s~n", [Function]) || Function <- help()],
+    io:format("    ---~n", []),
+    ok.
+
+-spec names() -> [process_name()].
+
+names() ->
+    couch_index_server:names().
+
+-spec print_linked_processes() -> ok.
+
+print_linked_processes() ->
+    couch_debug:print_linked_processes(couch_index_server).
+
+-spec busy(Thershold :: pos_integer()) ->
+    [Name :: process_name()].
+
+busy(Threshold) when Threshold > 0 ->
+    couch_debug:busy(names(), Threshold).
+
+-spec busy(Thershold :: pos_integer(), Property :: couch_debug:busy_properties()) ->
+    [Name :: process_name()].
+
+busy(Threshold, Property) when Threshold > 0 ->
+    couch_debug:busy(names(), Threshold, Property).
+
+-spec restart_busy(Threshold :: pos_integer()) ->
+    throw({timeout, Name :: process_name()}).
+
+restart_busy(Threshold) ->
+    couch_debug:restart_busy(names(), Threshold, 1000).
+
+-spec restart_busy(Thershold :: pos_integer(), DelayInMsec :: pos_integer()) ->
+    throw({timeout, Name :: process_name()}).
+
+restart_busy(Threshold, DelayInMsec) ->
+    couch_debug:restart_busy(names(), Threshold, DelayInMsec).
+
+-spec restart_busy(
+    Thershold :: pos_integer(),
+    DelayInMsec :: pos_integer(),
+    Property :: couch_debug:busy_properties()
+) ->
+    throw({timeout, Name :: process_name()}).
+
+restart_busy(Threshold, DelayInMsec, Property) ->
+    couch_debug:restart_busy(names(), Threshold, DelayInMsec, Property).
diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl
index a72ec3b..2e368bf 100644
--- a/src/couch_index/src/couch_index_server.erl
+++ b/src/couch_index/src/couch_index_server.erl
@@ -23,7 +23,7 @@
 
 % Sharding functions
 -export([num_servers/0, server_name/1, by_sig/1, by_pid/1, by_db/1]).
--export([aggregate_queue_len/0]).
+-export([aggregate_queue_len/0, names/0]).
 
 % Exported for callbacks
 -export([
@@ -382,6 +382,10 @@
 name(BaseName, N) when is_integer(N), N > 0 ->
     list_to_atom(BaseName ++ "_" ++ integer_to_list(N)).
 
+names() ->
+    N = num_servers(),
+    [server_name(I) || I <- lists:seq(1, N)].
+
 aggregate_queue_len() ->
     N = num_servers(),
     Names = [server_name(I) || I <- lists:seq(1, N)],