[HUDI-1775] Add option for compaction parallelism (#2785)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index b66be2b..b120714 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -72,6 +72,15 @@
           + " column value is null/empty string");
 
   // ------------------------------------------------------------------------
+  //  Index Options
+  // ------------------------------------------------------------------------
+  public static final ConfigOption<Boolean> INDEX_BOOTSTRAP_ENABLED = ConfigOptions
+      .key("index.bootstrap.enabled")
+      .booleanType()
+      .defaultValue(false)
+      .withDescription("Whether to bootstrap the index state from existing hoodie table, default false");
+
+  // ------------------------------------------------------------------------
   //  Read Options
   // ------------------------------------------------------------------------
   public static final ConfigOption<Integer> READ_TASKS = ConfigOptions
@@ -255,8 +264,14 @@
   public static final ConfigOption<Double> WRITE_BATCH_SIZE = ConfigOptions
       .key("write.batch.size.MB")
       .doubleType()
-      .defaultValue(2D) // 2MB
-      .withDescription("Batch buffer size in MB to flush data into the underneath filesystem");
+      .defaultValue(64D) // 64MB
+      .withDescription("Batch buffer size in MB to flush data into the underneath filesystem, default 64MB");
+
+  public static final ConfigOption<Integer> WRITE_LOG_BLOCK_SIZE = ConfigOptions
+      .key("write.log_block.size.MB")
+      .intType()
+      .defaultValue(128)
+      .withDescription("Max log block size in MB for log file, default 128MB");
 
   // ------------------------------------------------------------------------
   //  Compaction Options
@@ -268,6 +283,12 @@
       .defaultValue(true) // default true for MOR write
       .withDescription("Async Compaction, enabled by default for MOR");
 
+  public static final ConfigOption<Integer> COMPACTION_TASKS = ConfigOptions
+      .key("compaction.tasks")
+      .intType()
+      .defaultValue(10) // default WRITE_TASKS * COMPACTION_DELTA_COMMITS * 0.5 (assumes two commits generate one bucket)
+      .withDescription("Parallelism of tasks that do actual compaction, default is 10");
+
   public static final String NUM_COMMITS = "num_commits";
   public static final String TIME_ELAPSED = "time_elapsed";
   public static final String NUM_AND_TIME = "num_and_time";
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 9c23259..7e017cc 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -31,7 +31,6 @@
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.commit.BucketInfo;
@@ -103,6 +102,8 @@
 
   private final boolean isChangingRecords;
 
+  private final boolean bootstrapIndex;
+
   /**
    * State to book-keep which partition is loaded into the index state {@code indexState}.
    */
@@ -112,6 +113,7 @@
     this.conf = conf;
     this.isChangingRecords = WriteOperationType.isChangingRecords(
         WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
+    this.bootstrapIndex = conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
   }
 
   @Override
@@ -143,9 +145,11 @@
             TypeInformation.of(HoodieKey.class),
             TypeInformation.of(HoodieRecordLocation.class));
     indexState = context.getKeyedStateStore().getMapState(indexStateDesc);
-    MapStateDescriptor<String, Integer> partitionLoadStateDesc =
-        new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT);
-    partitionLoadState = context.getKeyedStateStore().getMapState(partitionLoadStateDesc);
+    if (bootstrapIndex) {
+      MapStateDescriptor<String, Integer> partitionLoadStateDesc =
+          new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT);
+      partitionLoadState = context.getKeyedStateStore().getMapState(partitionLoadStateDesc);
+    }
   }
 
   @SuppressWarnings("unchecked")
@@ -159,7 +163,9 @@
     final BucketInfo bucketInfo;
     final HoodieRecordLocation location;
 
-    if (!partitionLoadState.contains(hoodieKey.getPartitionPath())) {
+    // The dataset may be huge, thus the processing would block for long,
+    // disabled by default.
+    if (bootstrapIndex && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
       // If the partition records are never loaded, load the records first.
       loadRecords(hoodieKey.getPartitionPath());
     }
@@ -205,6 +211,7 @@
    * @throws Exception when error occurs for state update
    */
   private void loadRecords(String partitionPath) throws Exception {
+    LOG.info("Start loading records under partition {} into the index state", partitionPath);
     HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
     List<HoodieBaseFile> latestBaseFiles =
         HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, hoodieTable);
@@ -212,8 +219,16 @@
     final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
     final int taskID = getRuntimeContext().getIndexOfThisSubtask();
     for (HoodieBaseFile baseFile : latestBaseFiles) {
-      List<HoodieKey> hoodieKeys =
-          ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+      final List<HoodieKey> hoodieKeys;
+      try {
+        hoodieKeys =
+            ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
+      } catch (Exception e) {
+        // in case there was some empty parquet file when the pipeline
+        // crushes exceptionally.
+        LOG.error("Error when loading record keys from file: {}", baseFile);
+        continue;
+      }
       hoodieKeys.forEach(hoodieKey -> {
         try {
           // Reference: org.apache.flink.streaming.api.datastream.KeyedStream,
@@ -224,12 +239,13 @@
             this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
           }
         } catch (Exception e) {
-          throw new HoodieIOException("Error when load record keys from file: " + baseFile);
+          LOG.error("Error when putting record keys into the state from file: {}", baseFile);
         }
       });
     }
     // Mark the partition path as loaded.
     partitionLoadState.put(partitionPath, 0);
+    LOG.info("Finish loading records under partition {} into the index state", partitionPath);
   }
 
   @VisibleForTesting
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index a568a3f..b52e0ca 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -91,6 +91,7 @@
             .transform("compact_task",
                 TypeInformation.of(CompactionCommitEvent.class),
                 new KeyedProcessOperator<>(new CompactFunction(conf)))
+            .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
             .addSink(new CompactionCommitSink(conf))
             .name("compact_commit")
             .setParallelism(1); // compaction commit should be singleton
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 8fed30b..3cc5d56 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -34,6 +34,7 @@
 import org.apache.hudi.common.util.TablePathUtils;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieMemoryConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
@@ -210,6 +211,9 @@
                         conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L
                         ).build())
             .forTable(conf.getString(FlinkOptions.TABLE_NAME))
+            .withStorageConfig(HoodieStorageConfig.newBuilder()
+                .logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024)
+                .build())
             .withAutoCommit(false)
             .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
 
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 2384f7e..f373ab8 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -449,6 +449,10 @@
 
   @Test
   public void testIndexStateBootstrap() throws Exception {
+    // reset the config option
+    conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
+    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
+
     // open the function and ingest data
     funcWrapper.openFunction();
     for (RowData rowData : TestData.DATA_SET_INSERT) {
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index ffb9859..56cbb55 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -169,6 +169,7 @@
     options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
     options.put(FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_READ_OPTIMIZED);
     options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
+    options.put(FlinkOptions.COMPACTION_TASKS.key(), "1");
     String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
     streamTableEnv.executeSql(hoodieTableDDL);
     String insertInto = "insert into t1 select * from source";
@@ -180,7 +181,7 @@
   }
 
   @Test
-  void testStreamWriteWithCleaning() throws InterruptedException {
+  void testStreamWriteWithCleaning() {
     // create filesystem table named source
 
     // the source generates 4 commits but the cleaning task