blob: bd3ca6678a8634dcc3774d6984eac25979c0a4af [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.mongodb.sink.writer;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
import org.apache.flink.connector.mongodb.sink.MongoSink;
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
import org.apache.flink.connector.mongodb.testutils.MongoTestUtil;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.MiniClusterExtension;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.DeleteOneModel;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import org.bson.BsonDocument;
import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.io.IOException;
import java.util.Optional;
import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreNotWritten;
import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWritten;
import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWrittenWithMaxWaitTime;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
/** Tests for {@link MongoWriter}. */
@Testcontainers
public class MongoWriterITCase {
private static final Logger LOG = LoggerFactory.getLogger(MongoWriterITCase.class);
private static final String TEST_DATABASE = "test_writer";
@RegisterExtension
static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(2)
.build());
@Container
private static final MongoDBContainer MONGO_CONTAINER =
MongoTestUtil.createMongoDBContainer(LOG);
private static MongoClient mongoClient;
private static TestSinkInitContext sinkInitContext;
@BeforeAll
static void beforeAll() {
mongoClient = MongoClients.create(MONGO_CONTAINER.getConnectionString());
}
@AfterAll
static void afterAll() {
if (mongoClient != null) {
mongoClient.close();
}
}
@BeforeEach
void setUp() {
sinkInitContext = new TestSinkInitContext();
}
@Test
void testWriteOnBulkFlush() throws Exception {
final String collection = "test-bulk-flush-without-checkpoint";
final boolean flushOnCheckpoint = false;
final int batchSize = 5;
final int batchIntervalMs = -1;
try (final MongoWriter<Document> writer =
createWriter(collection, batchSize, batchIntervalMs, flushOnCheckpoint)) {
writer.write(buildMessage(1), null);
writer.write(buildMessage(2), null);
writer.write(buildMessage(3), null);
writer.write(buildMessage(4), null);
// Ignore flush on checkpoint
writer.flush(false);
assertThatIdsAreNotWritten(collectionOf(collection), 1, 2, 3, 4);
// Trigger flush
writer.write(buildMessage(5), null);
assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5);
writer.write(buildMessage(6), null);
assertThatIdsAreNotWritten(collectionOf(collection), 6);
// Force flush
writer.doBulkWrite();
assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5, 6);
}
}
@Test
void testWriteOnBatchIntervalFlush() throws Exception {
final String collection = "test-bulk-flush-with-interval";
final boolean flushOnCheckpoint = false;
final int batchSize = -1;
final int batchIntervalMs = 1000;
try (final MongoWriter<Document> writer =
createWriter(collection, batchSize, batchIntervalMs, flushOnCheckpoint)) {
writer.write(buildMessage(1), null);
writer.write(buildMessage(2), null);
writer.doBulkWrite();
writer.write(buildMessage(3), null);
writer.write(buildMessage(4), null);
assertThatIdsAreWrittenWithMaxWaitTime(collectionOf(collection), 10000L, 1, 2, 3, 4);
}
}
@Test
void testWriteOnCheckpoint() throws Exception {
final String collection = "test-bulk-flush-with-checkpoint";
final boolean flushOnCheckpoint = true;
final int batchSize = -1;
final int batchIntervalMs = -1;
// Enable flush on checkpoint
try (final MongoWriter<Document> writer =
createWriter(collection, batchSize, batchIntervalMs, flushOnCheckpoint)) {
writer.write(buildMessage(1), null);
writer.write(buildMessage(2), null);
writer.write(buildMessage(3), null);
assertThatIdsAreNotWritten(collectionOf(collection), 1, 2, 3);
// Trigger flush
writer.flush(false);
assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3);
}
}
@Test
void testIncrementRecordsSendMetric() throws Exception {
final String collection = "test-inc-records-send";
final boolean flushOnCheckpoint = false;
final int batchSize = 2;
final int batchIntervalMs = -1;
try (final MongoWriter<Document> writer =
createWriter(collection, batchSize, batchIntervalMs, flushOnCheckpoint)) {
final Counter recordsSend = sinkInitContext.getNumRecordsOutCounter();
writer.write(buildMessage(1), null);
// Update existing index
writer.write(buildMessage(2, "u"), null);
// Delete index
writer.write(buildMessage(3, "d"), null);
writer.doBulkWrite();
assertThat(recordsSend.getCount()).isEqualTo(3L);
}
}
@Test
void testCurrentSendTime() throws Exception {
final String collection = "test-current-send-time";
final boolean flushOnCheckpoint = false;
final int batchSize = 1;
final int batchIntervalMs = -1;
final int retryTimes = 5;
try (final MongoWriter<Document> writer =
createWriter(collection, batchSize, batchIntervalMs, flushOnCheckpoint)) {
final Optional<Gauge<Long>> currentSendTime = sinkInitContext.getCurrentSendTimeGauge();
assertThat(currentSendTime.isPresent()).isTrue();
assertThat(currentSendTime.get().getValue()).isEqualTo(Long.MAX_VALUE);
// Since currentTimeMillis does not guarantee a monotonous sequence,
// we added a retry mechanism to make the tests more stable.
for (int i = 0; i < retryTimes; i++) {
writer.write(buildMessage(i), null);
writer.doBulkWrite();
// currentSendTime should be larger than 0.
if (currentSendTime.get().getValue() > 0L) {
return;
}
}
fail("Test currentSendTime should be larger than 0 failed over max retry times.");
}
}
@Test
void testSinkContext() throws Exception {
final String collection = "test-sink-context";
final boolean flushOnCheckpoint = false;
final int batchSize = 2;
final int batchIntervalMs = -1;
MongoWriteOptions expectOptions =
MongoWriteOptions.builder()
.setBatchSize(batchSize)
.setBatchIntervalMs(batchIntervalMs)
.setDeliveryGuarantee(DeliveryGuarantee.NONE)
.build();
MongoSerializationSchema<Document> testSerializationSchema =
(element, context) -> {
assertThat(context.getInitContext().getSubtaskId()).isEqualTo(0);
assertThat(context.getWriteOptions()).isEqualTo(expectOptions);
assertThat(context.processTime())
.isEqualTo(
sinkInitContext
.getProcessingTimeService()
.getCurrentProcessingTime());
return new InsertOneModel<>(element.toBsonDocument());
};
try (MongoWriter<Document> writer =
createWriter(
collection,
batchSize,
batchIntervalMs,
flushOnCheckpoint,
testSerializationSchema)) {
writer.write(buildMessage(1), null);
writer.write(buildMessage(2), null);
writer.doBulkWrite();
}
}
private static MongoCollection<Document> collectionOf(String collection) {
return mongoClient.getDatabase(TEST_DATABASE).getCollection(collection);
}
private static MongoWriter<Document> createWriter(
String collection, int batchSize, long batchIntervalMs, boolean flushOnCheckpoint)
throws IOException {
return createWriter(
collection,
batchSize,
batchIntervalMs,
flushOnCheckpoint,
new UpsertSerializationSchema());
}
private static MongoWriter<Document> createWriter(
String collection,
int batchSize,
long batchIntervalMs,
boolean flushOnCheckpoint,
MongoSerializationSchema<Document> serializationSchema)
throws IOException {
MongoSink<Document> mongoSink =
MongoSink.<Document>builder()
.setUri(MONGO_CONTAINER.getConnectionString())
.setDatabase(TEST_DATABASE)
.setCollection(collection)
.setBatchSize(batchSize)
.setBatchIntervalMs(batchIntervalMs)
.setDeliveryGuarantee(
flushOnCheckpoint
? DeliveryGuarantee.AT_LEAST_ONCE
: DeliveryGuarantee.NONE)
.setSerializationSchema(serializationSchema)
.build();
return (MongoWriter<Document>) mongoSink.createWriter(sinkInitContext);
}
private static Document buildMessage(int id) {
return buildMessage(id, "i");
}
private static Document buildMessage(int id, String op) {
return new Document("_id", id).append("op", op);
}
private static class UpsertSerializationSchema implements MongoSerializationSchema<Document> {
@Override
public WriteModel<BsonDocument> serialize(Document element, MongoSinkContext sinkContext) {
String operation = element.getString("op");
switch (operation) {
case "i":
return new InsertOneModel<>(element.toBsonDocument());
case "u":
{
BsonDocument document = element.toBsonDocument();
BsonDocument filter = new BsonDocument("_id", document.getInt32("_id"));
// _id is immutable so we remove it here to prevent exception.
document.remove("_id");
BsonDocument update = new BsonDocument("$set", document);
return new UpdateOneModel<>(
filter, update, new UpdateOptions().upsert(true));
}
case "d":
{
BsonDocument document = element.toBsonDocument();
BsonDocument filter = new BsonDocument("_id", document.getInt32("_id"));
return new DeleteOneModel<>(filter);
}
default:
throw new UnsupportedOperationException("op is not supported " + operation);
}
}
}
}