[FLINK-32446][connectors/mongodb]  MongoWriter should regularly check whether the latest flush time is arriving

This closes #12.
diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
index df76891..5319959 100644
--- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
+++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
@@ -33,6 +33,7 @@
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import com.mongodb.MongoException;
 import com.mongodb.client.MongoClient;
@@ -45,6 +46,10 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -68,11 +73,18 @@
     private final Collector<WriteModel<BsonDocument>> collector;
     private final Counter numRecordsOut;
     private final MongoClient mongoClient;
+    private final long batchIntervalMs;
+    private final int batchSize;
 
     private boolean checkpointInProgress = false;
     private volatile long lastSendTime = 0L;
     private volatile long ackTime = Long.MAX_VALUE;
 
+    private transient volatile boolean closed = false;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile Exception flushException;
+
     public MongoWriter(
             MongoConnectionOptions connectionOptions,
             MongoWriteOptions writeOptions,
@@ -83,6 +95,8 @@
         this.writeOptions = checkNotNull(writeOptions);
         this.serializationSchema = checkNotNull(serializationSchema);
         this.flushOnCheckpoint = flushOnCheckpoint;
+        this.batchIntervalMs = writeOptions.getBatchIntervalMs();
+        this.batchSize = writeOptions.getBatchSize();
 
         checkNotNull(initContext);
         this.mailboxExecutor = checkNotNull(initContext.getMailboxExecutor());
@@ -105,10 +119,37 @@
 
         // Initialize the mongo client.
         this.mongoClient = MongoClients.create(connectionOptions.getUri());
+
+        boolean flushOnlyOnCheckpoint = batchIntervalMs == -1 && batchSize == -1;
+
+        if (!flushOnlyOnCheckpoint && batchIntervalMs > 0) {
+            this.scheduler =
+                    Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("mongo-writer"));
+
+            this.scheduledFuture =
+                    this.scheduler.scheduleWithFixedDelay(
+                            () -> {
+                                synchronized (MongoWriter.this) {
+                                    if (!closed && isOverMaxBatchIntervalLimit()) {
+                                        try {
+                                            doBulkWrite();
+                                        } catch (Exception e) {
+                                            flushException = e;
+                                        }
+                                    }
+                                }
+                            },
+                            batchIntervalMs,
+                            batchIntervalMs,
+                            TimeUnit.MILLISECONDS);
+        }
     }
 
     @Override
-    public void write(IN element, Context context) throws IOException, InterruptedException {
+    public synchronized void write(IN element, Context context)
+            throws IOException, InterruptedException {
+        checkFlushException();
+
         // do not allow new bulk writes until all actions are flushed
         while (checkpointInProgress) {
             mailboxExecutor.yield();
@@ -122,7 +163,9 @@
     }
 
     @Override
-    public void flush(boolean endOfInput) throws IOException {
+    public synchronized void flush(boolean endOfInput) throws IOException {
+        checkFlushException();
+
         checkpointInProgress = true;
         while (!bulkRequests.isEmpty() && (flushOnCheckpoint || endOfInput)) {
             doBulkWrite();
@@ -131,8 +174,28 @@
     }
 
     @Override
-    public void close() {
-        mongoClient.close();
+    public synchronized void close() throws Exception {
+        if (!closed) {
+            if (scheduledFuture != null) {
+                scheduledFuture.cancel(false);
+                scheduler.shutdown();
+            }
+
+            if (!bulkRequests.isEmpty()) {
+                try {
+                    doBulkWrite();
+                } catch (Exception e) {
+                    LOG.error("Writing records to MongoDB failed when closing MongoWriter", e);
+                    throw new IOException("Writing records to MongoDB failed.", e);
+                } finally {
+                    mongoClient.close();
+                    closed = true;
+                }
+            } else {
+                mongoClient.close();
+                closed = true;
+            }
+        }
     }
 
     @VisibleForTesting
@@ -172,13 +235,17 @@
     }
 
     private boolean isOverMaxBatchSizeLimit() {
-        int bulkActions = writeOptions.getBatchSize();
-        return bulkActions != -1 && bulkRequests.size() >= bulkActions;
+        return batchSize != -1 && bulkRequests.size() >= batchSize;
     }
 
     private boolean isOverMaxBatchIntervalLimit() {
-        long bulkFlushInterval = writeOptions.getBatchIntervalMs();
         long lastSentInterval = System.currentTimeMillis() - lastSendTime;
-        return bulkFlushInterval != -1 && lastSentInterval >= bulkFlushInterval;
+        return batchIntervalMs != -1 && lastSentInterval >= batchIntervalMs;
+    }
+
+    private void checkFlushException() {
+        if (flushException != null) {
+            throw new RuntimeException("Writing records to MongoDB failed.", flushException);
+        }
     }
 }
diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
index c071721..84767e8 100644
--- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
+++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
@@ -53,6 +53,7 @@
 
 import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreNotWritten;
 import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWritten;
+import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWrittenWithMaxWaitTime;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
 
@@ -138,12 +139,12 @@
                 createWriter(collection, batchSize, batchIntervalMs, flushOnCheckpoint)) {
             writer.write(buildMessage(1), null);
             writer.write(buildMessage(2), null);
+            writer.doBulkWrite();
             writer.write(buildMessage(3), null);
             writer.write(buildMessage(4), null);
-            writer.doBulkWrite();
-        }
 
-        assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4);
+            assertThatIdsAreWrittenWithMaxWaitTime(collectionOf(collection), 10000L, 1, 2, 3, 4);
+        }
     }
 
     @Test
diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java
index 7d8c3d0..246f2bc 100644
--- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java
+++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java
@@ -77,4 +77,17 @@
 
         assertThat(actualIds).containsExactlyInAnyOrder(ids);
     }
+
+    public static void assertThatIdsAreWrittenWithMaxWaitTime(
+            MongoCollection<Document> coll, long maxWaitTimeMs, Integer... ids)
+            throws InterruptedException {
+        long startTimeMillis = System.currentTimeMillis();
+        while (System.currentTimeMillis() - startTimeMillis < maxWaitTimeMs) {
+            if (coll.countDocuments(Filters.in("_id", ids)) == ids.length) {
+                break;
+            }
+            Thread.sleep(1000L);
+        }
+        assertThatIdsAreWritten(coll, ids);
+    }
 }