[hotfix][test][connectors/mongodb] Update MongoWriterITCase to be compatible with updated SinkV2 interfaces
This closes #22.
diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
index 84767e8..bd3ca66 100644
--- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
+++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
@@ -17,8 +17,9 @@
package org.apache.flink.connector.mongodb.sink.writer;
+import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
-import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.sink.MongoSink;
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
@@ -49,6 +50,7 @@
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
+import java.io.IOException;
import java.util.Optional;
import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreNotWritten;
@@ -235,7 +237,7 @@
MongoWriteOptions.builder()
.setBatchSize(batchSize)
.setBatchIntervalMs(batchIntervalMs)
- .setMaxRetries(0)
+ .setDeliveryGuarantee(DeliveryGuarantee.NONE)
.build();
MongoSerializationSchema<Document> testSerializationSchema =
@@ -269,7 +271,8 @@
}
private static MongoWriter<Document> createWriter(
- String collection, int batchSize, long batchIntervalMs, boolean flushOnCheckpoint) {
+ String collection, int batchSize, long batchIntervalMs, boolean flushOnCheckpoint)
+ throws IOException {
return createWriter(
collection,
batchSize,
@@ -283,28 +286,24 @@
int batchSize,
long batchIntervalMs,
boolean flushOnCheckpoint,
- MongoSerializationSchema<Document> serializationSchema) {
+ MongoSerializationSchema<Document> serializationSchema)
+ throws IOException {
- MongoConnectionOptions connectionOptions =
- MongoConnectionOptions.builder()
+ MongoSink<Document> mongoSink =
+ MongoSink.<Document>builder()
.setUri(MONGO_CONTAINER.getConnectionString())
.setDatabase(TEST_DATABASE)
.setCollection(collection)
- .build();
-
- MongoWriteOptions writeOptions =
- MongoWriteOptions.builder()
.setBatchSize(batchSize)
.setBatchIntervalMs(batchIntervalMs)
- .setMaxRetries(0)
+ .setDeliveryGuarantee(
+ flushOnCheckpoint
+ ? DeliveryGuarantee.AT_LEAST_ONCE
+ : DeliveryGuarantee.NONE)
+ .setSerializationSchema(serializationSchema)
.build();
- return new MongoWriter<>(
- connectionOptions,
- writeOptions,
- flushOnCheckpoint,
- sinkInitContext,
- serializationSchema);
+ return (MongoWriter<Document>) mongoSink.createWriter(sinkInitContext);
}
private static Document buildMessage(int id) {