[pulsar-io-hdfs2] Add config to create subdirectory from current time (#7771)
### Motivation
Adding a subdirectory associated with current time willmake it easier to process HDFS files in batch.
For example, user can create multiple running sink instances with `yyyy-MM-dd-hh` pattern. Then stop all instances at next hour. Eventually, files of the subdirectory will contain all messages consumed during this hour.
### Modifications
- Add a `subdirectoryPattern` field to `HdfsSinkConfig`
- Update some simple tests for `HdfsSinkConfig`
- Update the doc of HDFS2 sink
### Documentation
- Does this pull request introduce a new feature? (yes)
- If yes, how is the feature documented? (docs)
diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
index dbc5881..1d2096d 100644
--- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
@@ -19,10 +19,13 @@
package org.apache.pulsar.io.hdfs2.sink;
import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -39,6 +42,7 @@
* A Simple abstract class for HDFS sink.
* Users need to implement extractKeyValue function to use this sink.
*/
+@Slf4j
public abstract class HdfsAbstractSink<K, V> extends AbstractHdfsConnector implements Sink<V> {
protected HdfsSinkConfig hdfsSinkConfig;
@@ -46,6 +50,7 @@
protected HdfsSyncThread<V> syncThread;
private Path path;
private FSDataOutputStream hdfsStream;
+ private DateTimeFormatter subdirectoryFormatter;
public abstract KeyValue<K, V> extractKeyValue(Record<V> record);
protected abstract void createWriter() throws IOException;
@@ -56,6 +61,9 @@
hdfsSinkConfig.validate();
connectorConfig = hdfsSinkConfig;
unackedRecords = new LinkedBlockingQueue<Record<V>> (hdfsSinkConfig.getMaxPendingRecords());
+ if (hdfsSinkConfig.getSubdirectoryPattern() != null) {
+ subdirectoryFormatter = DateTimeFormatter.ofPattern(hdfsSinkConfig.getSubdirectoryPattern());
+ }
connectToHdfs();
createWriter();
launchSyncThread();
@@ -99,8 +107,13 @@
ext = getCompressionCodec().getDefaultExtension();
}
- path = new Path(FilenameUtils.concat(hdfsSinkConfig.getDirectory(),
+ String directory = hdfsSinkConfig.getDirectory();
+ if (subdirectoryFormatter != null) {
+ directory = FilenameUtils.concat(directory, LocalDateTime.now().format(subdirectoryFormatter));
+ }
+ path = new Path(FilenameUtils.concat(directory,
hdfsSinkConfig.getFilenamePrefix() + "-" + System.currentTimeMillis() + ext));
+ log.info("Create path: {}", path);
}
return path;
}
diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
index fa52b6a..2af24fc 100644
--- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
@@ -24,6 +24,8 @@
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.Map;
import lombok.Data;
@@ -73,6 +75,14 @@
*/
private int maxPendingRecords = Integer.MAX_VALUE;
+ /**
+ * A subdirectory associated with the created time of the sink.
+ * The pattern is the formatted pattern of {@link AbstractHdfsConfig#getDirectory()}'s subdirectory.
+ *
+ * @see java.time.format.DateTimeFormatter for pattern's syntax
+ */
+ private String subdirectoryPattern;
+
public static HdfsSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), HdfsSinkConfig.class);
@@ -87,16 +97,24 @@
public void validate() {
super.validate();
if ((StringUtils.isEmpty(fileExtension) && getCompression() == null)
- || StringUtils.isEmpty(filenamePrefix)) {
- throw new IllegalArgumentException("Required property not set.");
+ || StringUtils.isEmpty(filenamePrefix)) {
+ throw new IllegalArgumentException("Required property not set.");
}
if (syncInterval < 0) {
- throw new IllegalArgumentException("Sync Interval cannot be negative");
+ throw new IllegalArgumentException("Sync Interval cannot be negative");
}
if (maxPendingRecords < 1) {
- throw new IllegalArgumentException("Max Pending Records must be a positive integer");
+ throw new IllegalArgumentException("Max Pending Records must be a positive integer");
+ }
+
+ if (subdirectoryPattern != null) {
+ try {
+ LocalDateTime.of(2020, 1, 1, 12, 0).format(DateTimeFormatter.ofPattern(subdirectoryPattern));
+ } catch (Exception e) {
+ throw new IllegalArgumentException(subdirectoryPattern + " is not a valid pattern: " + e.getMessage());
+ }
}
}
}
diff --git a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
index d4e1f03..aa76064 100644
--- a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
+++ b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
@@ -44,6 +44,7 @@
assertEquals("/foo/bar", config.getDirectory());
assertEquals("prefix", config.getFilenamePrefix());
assertEquals(Compression.SNAPPY, config.getCompression());
+ assertEquals("yyyy-MM-dd", config.getSubdirectoryPattern());
}
@Test
@@ -53,6 +54,7 @@
map.put("directory", "/foo/bar");
map.put("filenamePrefix", "prefix");
map.put("compression", "SNAPPY");
+ map.put("subdirectoryPattern", "yy-MM-dd");
HdfsSinkConfig config = HdfsSinkConfig.load(map);
assertNotNull(config);
@@ -60,6 +62,7 @@
assertEquals("/foo/bar", config.getDirectory());
assertEquals("prefix", config.getFilenamePrefix());
assertEquals(Compression.SNAPPY, config.getCompression());
+ assertEquals("yy-MM-dd", config.getSubdirectoryPattern());
}
@Test
diff --git a/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml b/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
index 5a19ee0..47ab4f9 100644
--- a/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
+++ b/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
@@ -21,5 +21,6 @@
"hdfsConfigResources": "core-site.xml",
"directory": "/foo/bar",
"filenamePrefix": "prefix",
-"compression": "SNAPPY"
+"compression": "SNAPPY",
+"subdirectoryPattern": "yyyy-MM-dd"
}
\ No newline at end of file