[FLINK-32446][connectors/mongodb] MongoWriter should regularly check whether the latest flush time is arriving
This closes #12.
diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
index df76891..5319959 100644
--- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
+++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
@@ -33,6 +33,7 @@
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.Collector;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
@@ -45,6 +46,10 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -68,11 +73,18 @@
private final Collector<WriteModel<BsonDocument>> collector;
private final Counter numRecordsOut;
private final MongoClient mongoClient;
+ private final long batchIntervalMs;
+ private final int batchSize;
private boolean checkpointInProgress = false;
private volatile long lastSendTime = 0L;
private volatile long ackTime = Long.MAX_VALUE;
+ private transient volatile boolean closed = false;
+ private transient ScheduledExecutorService scheduler;
+ private transient ScheduledFuture<?> scheduledFuture;
+ private transient volatile Exception flushException;
+
public MongoWriter(
MongoConnectionOptions connectionOptions,
MongoWriteOptions writeOptions,
@@ -83,6 +95,8 @@
this.writeOptions = checkNotNull(writeOptions);
this.serializationSchema = checkNotNull(serializationSchema);
this.flushOnCheckpoint = flushOnCheckpoint;
+ this.batchIntervalMs = writeOptions.getBatchIntervalMs();
+ this.batchSize = writeOptions.getBatchSize();
checkNotNull(initContext);
this.mailboxExecutor = checkNotNull(initContext.getMailboxExecutor());
@@ -105,10 +119,37 @@
// Initialize the mongo client.
this.mongoClient = MongoClients.create(connectionOptions.getUri());
+
+ boolean flushOnlyOnCheckpoint = batchIntervalMs == -1 && batchSize == -1;
+
+ if (!flushOnlyOnCheckpoint && batchIntervalMs > 0) {
+ this.scheduler =
+ Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("mongo-writer"));
+
+ this.scheduledFuture =
+ this.scheduler.scheduleWithFixedDelay(
+ () -> {
+ synchronized (MongoWriter.this) {
+ if (!closed && isOverMaxBatchIntervalLimit()) {
+ try {
+ doBulkWrite();
+ } catch (Exception e) {
+ flushException = e;
+ }
+ }
+ }
+ },
+ batchIntervalMs,
+ batchIntervalMs,
+ TimeUnit.MILLISECONDS);
+ }
}
@Override
- public void write(IN element, Context context) throws IOException, InterruptedException {
+ public synchronized void write(IN element, Context context)
+ throws IOException, InterruptedException {
+ checkFlushException();
+
// do not allow new bulk writes until all actions are flushed
while (checkpointInProgress) {
mailboxExecutor.yield();
@@ -122,7 +163,9 @@
}
@Override
- public void flush(boolean endOfInput) throws IOException {
+ public synchronized void flush(boolean endOfInput) throws IOException {
+ checkFlushException();
+
checkpointInProgress = true;
while (!bulkRequests.isEmpty() && (flushOnCheckpoint || endOfInput)) {
doBulkWrite();
@@ -131,8 +174,28 @@
}
@Override
- public void close() {
- mongoClient.close();
+ public synchronized void close() throws Exception {
+ if (!closed) {
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ scheduler.shutdown();
+ }
+
+ if (!bulkRequests.isEmpty()) {
+ try {
+ doBulkWrite();
+ } catch (Exception e) {
+ LOG.error("Writing records to MongoDB failed when closing MongoWriter", e);
+ throw new IOException("Writing records to MongoDB failed.", e);
+ } finally {
+ mongoClient.close();
+ closed = true;
+ }
+ } else {
+ mongoClient.close();
+ closed = true;
+ }
+ }
}
@VisibleForTesting
@@ -172,13 +235,17 @@
}
private boolean isOverMaxBatchSizeLimit() {
- int bulkActions = writeOptions.getBatchSize();
- return bulkActions != -1 && bulkRequests.size() >= bulkActions;
+ return batchSize != -1 && bulkRequests.size() >= batchSize;
}
private boolean isOverMaxBatchIntervalLimit() {
- long bulkFlushInterval = writeOptions.getBatchIntervalMs();
long lastSentInterval = System.currentTimeMillis() - lastSendTime;
- return bulkFlushInterval != -1 && lastSentInterval >= bulkFlushInterval;
+ return batchIntervalMs != -1 && lastSentInterval >= batchIntervalMs;
+ }
+
+ private void checkFlushException() {
+ if (flushException != null) {
+ throw new RuntimeException("Writing records to MongoDB failed.", flushException);
+ }
}
}
diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
index c071721..84767e8 100644
--- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
+++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
@@ -53,6 +53,7 @@
import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreNotWritten;
import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWritten;
+import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWrittenWithMaxWaitTime;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
@@ -138,12 +139,12 @@
createWriter(collection, batchSize, batchIntervalMs, flushOnCheckpoint)) {
writer.write(buildMessage(1), null);
writer.write(buildMessage(2), null);
+ writer.doBulkWrite();
writer.write(buildMessage(3), null);
writer.write(buildMessage(4), null);
- writer.doBulkWrite();
- }
- assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4);
+ assertThatIdsAreWrittenWithMaxWaitTime(collectionOf(collection), 10000L, 1, 2, 3, 4);
+ }
}
@Test
diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java
index 7d8c3d0..246f2bc 100644
--- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java
+++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java
@@ -77,4 +77,17 @@
assertThat(actualIds).containsExactlyInAnyOrder(ids);
}
+
+ public static void assertThatIdsAreWrittenWithMaxWaitTime(
+ MongoCollection<Document> coll, long maxWaitTimeMs, Integer... ids)
+ throws InterruptedException {
+ long startTimeMillis = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTimeMillis < maxWaitTimeMs) {
+ if (coll.countDocuments(Filters.in("_id", ids)) == ids.length) {
+ break;
+ }
+ Thread.sleep(1000L);
+ }
+ assertThatIdsAreWritten(coll, ids);
+ }
}