[pulsar-io-hdfs2] Add config to create subdirectory from current time (#7771)

### Motivation

Adding a subdirectory associated with current time willmake it easier to process HDFS files in batch.

For example, user can create multiple running sink instances with `yyyy-MM-dd-hh` pattern. Then stop all instances at next hour. Eventually, files of the subdirectory will contain all messages consumed during this hour.

### Modifications

- Add a `subdirectoryPattern` field to `HdfsSinkConfig`
- Update some simple tests for `HdfsSinkConfig`
- Update the doc of HDFS2 sink

### Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs)
diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
index dbc5881..1d2096d 100644
--- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
@@ -19,10 +19,13 @@
 package org.apache.pulsar.io.hdfs2.sink;
 
 import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -39,6 +42,7 @@
  * A Simple abstract class for HDFS sink.
  * Users need to implement extractKeyValue function to use this sink.
  */
+@Slf4j
 public abstract class HdfsAbstractSink<K, V> extends AbstractHdfsConnector implements Sink<V> {
 
     protected HdfsSinkConfig hdfsSinkConfig;
@@ -46,6 +50,7 @@
     protected HdfsSyncThread<V> syncThread;
     private Path path;
     private FSDataOutputStream hdfsStream;
+    private DateTimeFormatter subdirectoryFormatter;
 
     public abstract KeyValue<K, V> extractKeyValue(Record<V> record);
     protected abstract void createWriter() throws IOException;
@@ -56,6 +61,9 @@
        hdfsSinkConfig.validate();
        connectorConfig = hdfsSinkConfig;
        unackedRecords = new LinkedBlockingQueue<Record<V>> (hdfsSinkConfig.getMaxPendingRecords());
+       if (hdfsSinkConfig.getSubdirectoryPattern() != null) {
+           subdirectoryFormatter = DateTimeFormatter.ofPattern(hdfsSinkConfig.getSubdirectoryPattern());
+       }
        connectToHdfs();
        createWriter();
        launchSyncThread();
@@ -99,8 +107,13 @@
                 ext = getCompressionCodec().getDefaultExtension();
             }
 
-            path = new Path(FilenameUtils.concat(hdfsSinkConfig.getDirectory(),
+            String directory = hdfsSinkConfig.getDirectory();
+            if (subdirectoryFormatter != null) {
+                directory = FilenameUtils.concat(directory, LocalDateTime.now().format(subdirectoryFormatter));
+            }
+            path = new Path(FilenameUtils.concat(directory,
                     hdfsSinkConfig.getFilenamePrefix() + "-" + System.currentTimeMillis() + ext));
+            log.info("Create path: {}", path);
         }
         return path;
     }
diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
index fa52b6a..2af24fc 100644
--- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
@@ -24,6 +24,8 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.Map;
 
 import lombok.Data;
@@ -73,6 +75,14 @@
      */
     private int maxPendingRecords = Integer.MAX_VALUE;
 
+    /**
+     * A subdirectory associated with the created time of the sink.
+     * The pattern is the formatted pattern of {@link AbstractHdfsConfig#getDirectory()}'s subdirectory.
+     *
+     * @see java.time.format.DateTimeFormatter for pattern's syntax
+     */
+    private String subdirectoryPattern;
+
     public static HdfsSinkConfig load(String yamlFile) throws IOException {
        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
        return mapper.readValue(new File(yamlFile), HdfsSinkConfig.class);
@@ -87,16 +97,24 @@
     public void validate() {
         super.validate();
         if ((StringUtils.isEmpty(fileExtension) && getCompression() == null)
-            || StringUtils.isEmpty(filenamePrefix)) {
-           throw new IllegalArgumentException("Required property not set.");
+                || StringUtils.isEmpty(filenamePrefix)) {
+            throw new IllegalArgumentException("Required property not set.");
         }
 
         if (syncInterval < 0) {
-          throw new IllegalArgumentException("Sync Interval cannot be negative");
+            throw new IllegalArgumentException("Sync Interval cannot be negative");
         }
 
         if (maxPendingRecords < 1) {
-          throw new IllegalArgumentException("Max Pending Records must be a positive integer");
+            throw new IllegalArgumentException("Max Pending Records must be a positive integer");
+        }
+
+        if (subdirectoryPattern != null) {
+            try {
+                LocalDateTime.of(2020, 1, 1, 12, 0).format(DateTimeFormatter.ofPattern(subdirectoryPattern));
+            } catch (Exception e) {
+                throw new IllegalArgumentException(subdirectoryPattern + " is not a valid pattern: " + e.getMessage());
+            }
         }
     }
 }
diff --git a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
index d4e1f03..aa76064 100644
--- a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
+++ b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
@@ -44,6 +44,7 @@
 		assertEquals("/foo/bar", config.getDirectory());
 		assertEquals("prefix", config.getFilenamePrefix());
 		assertEquals(Compression.SNAPPY, config.getCompression());
+		assertEquals("yyyy-MM-dd", config.getSubdirectoryPattern());
 	}
 	
 	@Test
@@ -53,6 +54,7 @@
 		map.put("directory", "/foo/bar");
 		map.put("filenamePrefix", "prefix");
 		map.put("compression", "SNAPPY");
+		map.put("subdirectoryPattern", "yy-MM-dd");
 		
 		HdfsSinkConfig config = HdfsSinkConfig.load(map);
 		assertNotNull(config);
@@ -60,6 +62,7 @@
 		assertEquals("/foo/bar", config.getDirectory());
 		assertEquals("prefix", config.getFilenamePrefix());
 		assertEquals(Compression.SNAPPY, config.getCompression());
+		assertEquals("yy-MM-dd", config.getSubdirectoryPattern());
 	}
 	
 	@Test
diff --git a/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml b/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
index 5a19ee0..47ab4f9 100644
--- a/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
+++ b/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
@@ -21,5 +21,6 @@
 "hdfsConfigResources": "core-site.xml",
 "directory": "/foo/bar",
 "filenamePrefix": "prefix",
-"compression": "SNAPPY"
+"compression": "SNAPPY",
+"subdirectoryPattern": "yyyy-MM-dd"
 }
\ No newline at end of file