add thread-num args for backup/restore cmd (#83)

improve: hugegraph/hugegraph#1390

Change-Id: Ia41b3dfe35c7d465759c1a1bf12fce946ff8c721
diff --git a/src/main/java/com/baidu/hugegraph/base/RetryManager.java b/src/main/java/com/baidu/hugegraph/base/RetryManager.java
index 364123b..cc66fb2 100644
--- a/src/main/java/com/baidu/hugegraph/base/RetryManager.java
+++ b/src/main/java/com/baidu/hugegraph/base/RetryManager.java
@@ -32,10 +32,9 @@
 
 public class RetryManager extends ToolManager {
 
-    private static int threadsNum = Math.min(10,
-            Math.max(4, Runtime.getRuntime().availableProcessors() / 2));
-    private final ExecutorService pool =
-            Executors.newFixedThreadPool(threadsNum);
+    private int CPUS = Runtime.getRuntime().availableProcessors();
+    private int threadsNum = Math.min(10, Math.max(4, CPUS / 2));
+    private ExecutorService pool;
     private final Queue<Future<?>> futures = new ConcurrentLinkedQueue<>();
     private int retry = 0;
 
@@ -43,6 +42,11 @@
         super(info, type);
     }
 
+    public void initExecutors() {
+        Printer.print("Init %s executors", this.threadsNum);
+        this.pool = Executors.newFixedThreadPool(this.threadsNum);
+    }
+
     public <R> R retry(Supplier<R> supplier, String description) {
         int retries = 0;
         R r = null;
@@ -79,6 +83,9 @@
     }
 
     public void shutdown(String taskType) {
+        if (this.pool == null) {
+            return;
+        }
         this.pool.shutdown();
         try {
             this.pool.awaitTermination(24, TimeUnit.HOURS);
@@ -96,7 +103,13 @@
         this.retry = retry;
     }
 
-    public static int threadsNum() {
-        return threadsNum;
+    public int threadsNum() {
+        return this.threadsNum;
+    }
+
+    public void threadsNum(int threadsNum) {
+        if (threadsNum > 0) {
+            this.threadsNum = threadsNum;
+        }
     }
 }
diff --git a/src/main/java/com/baidu/hugegraph/cmd/SubCommands.java b/src/main/java/com/baidu/hugegraph/cmd/SubCommands.java
index f6b67e5..92bd2e0 100644
--- a/src/main/java/com/baidu/hugegraph/cmd/SubCommands.java
+++ b/src/main/java/com/baidu/hugegraph/cmd/SubCommands.java
@@ -586,6 +586,12 @@
                    description = "Directory of log")
         public String logDir = "./logs";
 
+        @Parameter(names = {"--thread-num", "-T"}, arity = 1,
+                   validateWith = {PositiveValidator.class},
+                   description = "Threads number to use, default is " +
+                                 "Math.min(10, Math.max(4, CPUs / 2))")
+        public int threadsNum;
+
         @ParametersDelegate
         private Retry retry = new Retry();
 
@@ -601,6 +607,10 @@
             return this.logDir;
         }
 
+        public int threadsNum() {
+            return this.threadsNum;
+        }
+
         public int retry() {
             return this.retry.retry;
         }
diff --git a/src/main/java/com/baidu/hugegraph/manager/BackupRestoreBaseManager.java b/src/main/java/com/baidu/hugegraph/manager/BackupRestoreBaseManager.java
index 2d5d0d4..8fa2e26 100644
--- a/src/main/java/com/baidu/hugegraph/manager/BackupRestoreBaseManager.java
+++ b/src/main/java/com/baidu/hugegraph/manager/BackupRestoreBaseManager.java
@@ -75,11 +75,13 @@
     }
 
     public void init(SubCommands.BackupRestore cmd) {
+        this.threadsNum(cmd.threadsNum());
         assert cmd.retry() > 0;
         this.retry(cmd.retry());
         LocalDirectory.ensureDirectoryExist(cmd.logDir());
         this.logDir(cmd.logDir());
         this.directory(cmd.directory(), cmd.hdfsConf());
+        this.initExecutors();
     }
 
     public void logDir(String logDir) {