[Bug] Fix log printing error (#315)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index 604eb5c..5e2a697 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -85,6 +85,7 @@
private final CloseableHttpClient httpClient;
private final ExecutorService executorService;
private boolean loadBatchFirstRecord;
+ private volatile String currentLabel;
public DorisStreamLoad(
String hostPort,
@@ -246,9 +247,9 @@
throw new StreamLoadException("stream load error: " + response.getStatusLine().toString());
}
- public RespContent stopLoad(String label) throws IOException {
+ public RespContent stopLoad() throws IOException {
recordStream.endInput();
- LOG.info("table {} stream load stopped for {} on host {}", table, label, hostPort);
+ LOG.info("table {} stream load stopped for {} on host {}", table, currentLabel, hostPort);
Preconditions.checkState(pendingLoadFuture != null);
try {
return handlePreCommitResponse(pendingLoadFuture.get());
@@ -268,6 +269,7 @@
HttpPutBuilder putBuilder = new HttpPutBuilder();
recordStream.startInput(isResume);
LOG.info("table {} stream load started for {} on host {}", table, label, hostPort);
+ this.currentLabel = label;
try {
InputStreamEntity entity = new InputStreamEntity(recordStream);
putBuilder
@@ -284,7 +286,7 @@
pendingLoadFuture =
executorService.submit(
() -> {
- LOG.info("table {} start execute load", table);
+ LOG.info("table {} start execute load for label {}", table, label);
return httpClient.execute(putBuilder.build());
});
} catch (Exception e) {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index c6f8124..54facc7 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -233,9 +233,7 @@
continue;
}
DorisStreamLoad dorisStreamLoad = streamLoader.getValue();
- LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier);
- String currentLabel = labelGenerator.generateTableLabel(curCheckpointId);
- RespContent respContent = dorisStreamLoad.stopLoad(currentLabel);
+ RespContent respContent = dorisStreamLoad.stopLoad();
// refresh metrics
if (sinkMetricsMap.containsKey(tableIdentifier)) {
DorisWriteMetrics dorisWriteMetrics = sinkMetricsMap.get(tableIdentifier);
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
index fab2fcd..5ead3e8 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
@@ -120,7 +120,7 @@
httpClient);
dorisStreamLoad.startLoad("1", false);
dorisStreamLoad.writeRecord(writeBuffer);
- dorisStreamLoad.stopLoad("label");
+ dorisStreamLoad.stopLoad();
byte[] buff = new byte[4];
int n = dorisStreamLoad.getRecordStream().read(buff);
dorisStreamLoad.getRecordStream().read(new byte[4]);
@@ -147,7 +147,7 @@
dorisStreamLoad.startLoad("1", false);
dorisStreamLoad.writeRecord(writeBuffer);
dorisStreamLoad.writeRecord(writeBuffer);
- dorisStreamLoad.stopLoad("label");
+ dorisStreamLoad.stopLoad();
byte[] buff = new byte[9];
int n = dorisStreamLoad.getRecordStream().read(buff);
int ret = dorisStreamLoad.getRecordStream().read(new byte[9]);
@@ -179,7 +179,7 @@
dorisStreamLoad.startLoad("1", false);
dorisStreamLoad.writeRecord("{\"id\": 1}".getBytes(StandardCharsets.UTF_8));
dorisStreamLoad.writeRecord("{\"id\": 2}".getBytes(StandardCharsets.UTF_8));
- dorisStreamLoad.stopLoad("label");
+ dorisStreamLoad.stopLoad();
byte[] buff = new byte[expectBuffer.length];
int n = dorisStreamLoad.getRecordStream().read(buff);