| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.connector.opensearch.sink; |
| |
| import org.apache.flink.api.common.operators.MailboxExecutor; |
| import org.apache.flink.api.connector.sink2.SinkWriter; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.connector.opensearch.OpensearchUtil; |
| import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector; |
| import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultFailureHandler; |
| import org.apache.flink.connector.opensearch.test.DockerImageVersions; |
| import org.apache.flink.metrics.Counter; |
| import org.apache.flink.metrics.Gauge; |
| import org.apache.flink.metrics.groups.OperatorIOMetricGroup; |
| import org.apache.flink.metrics.groups.SinkWriterMetricGroup; |
| import org.apache.flink.metrics.testutils.MetricListener; |
| import org.apache.flink.runtime.metrics.MetricNames; |
| import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; |
| import org.apache.flink.util.FlinkRuntimeException; |
| import org.apache.flink.util.TestLoggerExtension; |
| import org.apache.flink.util.function.ThrowingRunnable; |
| |
| import org.apache.http.HttpHost; |
| import org.junit.jupiter.api.AfterEach; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.api.extension.ExtendWith; |
| import org.opensearch.action.delete.DeleteRequest; |
| import org.opensearch.action.index.IndexRequest; |
| import org.opensearch.action.update.UpdateRequest; |
| import org.opensearch.client.RestHighLevelClient; |
| import org.opensearch.testcontainers.OpensearchContainer; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.testcontainers.junit.jupiter.Container; |
| import org.testcontainers.junit.jupiter.Testcontainers; |
| |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Optional; |
| |
| import static org.apache.flink.connector.opensearch.sink.OpensearchTestClient.buildMessage; |
| import static org.assertj.core.api.Assertions.assertThat; |
| |
| /** Tests for {@link OpensearchWriter}. */ |
| @Testcontainers |
| @ExtendWith(TestLoggerExtension.class) |
| class OpensearchWriterITCase { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(OpensearchWriterITCase.class); |
| |
| @Container |
| private static final OpensearchContainer OS_CONTAINER = |
| OpensearchUtil.createOpensearchContainer(DockerImageVersions.OPENSEARCH_1, LOG); |
| |
| private RestHighLevelClient client; |
| private OpensearchTestClient context; |
| private MetricListener metricListener; |
| |
| @BeforeEach |
| void setUp() { |
| metricListener = new MetricListener(); |
| client = OpensearchUtil.createClient(OS_CONTAINER); |
| context = new OpensearchTestClient(client); |
| } |
| |
| @AfterEach |
| void tearDown() throws IOException { |
| if (client != null) { |
| client.close(); |
| } |
| } |
| |
| @Test |
| void testWriteOnBulkFlush() throws Exception { |
| final String index = "test-bulk-flush-without-checkpoint"; |
| final int flushAfterNActions = 5; |
| final BulkProcessorConfig bulkProcessorConfig = |
| new BulkProcessorConfig(flushAfterNActions, -1, -1, FlushBackoffType.NONE, 0, 0); |
| |
| try (final OpensearchWriter<Tuple2<Integer, String>> writer = |
| createWriter(index, false, bulkProcessorConfig)) { |
| writer.write(Tuple2.of(1, buildMessage(1)), null); |
| writer.write(Tuple2.of(2, buildMessage(2)), null); |
| writer.write(Tuple2.of(3, buildMessage(3)), null); |
| writer.write(Tuple2.of(4, buildMessage(4)), null); |
| |
| // Ignore flush on checkpoint |
| writer.flush(false); |
| |
| context.assertThatIdsAreNotWritten(index, 1, 2, 3, 4); |
| |
| // Trigger flush |
| writer.write(Tuple2.of(5, "test-5"), null); |
| context.assertThatIdsAreWritten(index, 1, 2, 3, 4, 5); |
| |
| writer.write(Tuple2.of(6, "test-6"), null); |
| context.assertThatIdsAreNotWritten(index, 6); |
| |
| // Force flush |
| writer.blockingFlushAllActions(); |
| context.assertThatIdsAreWritten(index, 1, 2, 3, 4, 5, 6); |
| } |
| } |
| |
| @Test |
| void testWriteOnBulkIntervalFlush() throws Exception { |
| final String index = "test-bulk-flush-with-interval"; |
| |
| // Configure bulk processor to flush every 1s; |
| final BulkProcessorConfig bulkProcessorConfig = |
| new BulkProcessorConfig(-1, -1, 1000, FlushBackoffType.NONE, 0, 0); |
| |
| try (final OpensearchWriter<Tuple2<Integer, String>> writer = |
| createWriter(index, false, bulkProcessorConfig)) { |
| writer.write(Tuple2.of(1, buildMessage(1)), null); |
| writer.write(Tuple2.of(2, buildMessage(2)), null); |
| writer.write(Tuple2.of(3, buildMessage(3)), null); |
| writer.write(Tuple2.of(4, buildMessage(4)), null); |
| writer.blockingFlushAllActions(); |
| } |
| |
| context.assertThatIdsAreWritten(index, 1, 2, 3, 4); |
| } |
| |
| @Test |
| void testWriteOnCheckpoint() throws Exception { |
| final String index = "test-bulk-flush-with-checkpoint"; |
| final BulkProcessorConfig bulkProcessorConfig = |
| new BulkProcessorConfig(-1, -1, -1, FlushBackoffType.NONE, 0, 0); |
| |
| // Enable flush on checkpoint |
| try (final OpensearchWriter<Tuple2<Integer, String>> writer = |
| createWriter(index, true, bulkProcessorConfig)) { |
| writer.write(Tuple2.of(1, buildMessage(1)), null); |
| writer.write(Tuple2.of(2, buildMessage(2)), null); |
| writer.write(Tuple2.of(3, buildMessage(3)), null); |
| |
| context.assertThatIdsAreNotWritten(index, 1, 2, 3); |
| |
| // Trigger flush |
| writer.flush(false); |
| |
| context.assertThatIdsAreWritten(index, 1, 2, 3); |
| } |
| } |
| |
| @Test |
| void testIncrementByteOutMetric() throws Exception { |
| final String index = "test-inc-byte-out"; |
| final OperatorIOMetricGroup operatorIOMetricGroup = |
| UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup(); |
| final int flushAfterNActions = 2; |
| final BulkProcessorConfig bulkProcessorConfig = |
| new BulkProcessorConfig(flushAfterNActions, -1, -1, FlushBackoffType.NONE, 0, 0); |
| |
| try (final OpensearchWriter<Tuple2<Integer, String>> writer = |
| createWriter( |
| index, |
| false, |
| bulkProcessorConfig, |
| TestingSinkWriterMetricGroup.getSinkWriterMetricGroup( |
| operatorIOMetricGroup, metricListener.getMetricGroup()))) { |
| final Counter numBytesOut = operatorIOMetricGroup.getNumBytesOutCounter(); |
| assertThat(numBytesOut.getCount()).isZero(); |
| writer.write(Tuple2.of(1, buildMessage(1)), null); |
| writer.write(Tuple2.of(2, buildMessage(2)), null); |
| |
| writer.blockingFlushAllActions(); |
| long first = numBytesOut.getCount(); |
| |
| assertThat(first).isGreaterThan(0); |
| |
| writer.write(Tuple2.of(1, buildMessage(1)), null); |
| writer.write(Tuple2.of(2, buildMessage(2)), null); |
| |
| writer.blockingFlushAllActions(); |
| assertThat(numBytesOut.getCount()).isGreaterThan(first); |
| } |
| } |
| |
| @Test |
| void testIncrementRecordsSendMetric() throws Exception { |
| final String index = "test-inc-records-send"; |
| final int flushAfterNActions = 2; |
| final BulkProcessorConfig bulkProcessorConfig = |
| new BulkProcessorConfig(flushAfterNActions, -1, -1, FlushBackoffType.NONE, 0, 0); |
| |
| try (final OpensearchWriter<Tuple2<Integer, String>> writer = |
| createWriter(index, false, bulkProcessorConfig)) { |
| final Optional<Counter> recordsSend = |
| metricListener.getCounter(MetricNames.NUM_RECORDS_SEND); |
| writer.write(Tuple2.of(1, buildMessage(1)), null); |
| // Update existing index |
| writer.write(Tuple2.of(1, "u" + buildMessage(2)), null); |
| // Delete index |
| writer.write(Tuple2.of(1, "d" + buildMessage(3)), null); |
| |
| writer.blockingFlushAllActions(); |
| |
| assertThat(recordsSend).isPresent(); |
| assertThat(recordsSend.get().getCount()).isEqualTo(3L); |
| } |
| } |
| |
| @Test |
| void testCurrentSendTime() throws Exception { |
| final String index = "test-current-send-time"; |
| final int flushAfterNActions = 2; |
| final BulkProcessorConfig bulkProcessorConfig = |
| new BulkProcessorConfig(flushAfterNActions, -1, -1, FlushBackoffType.NONE, 0, 0); |
| |
| try (final OpensearchWriter<Tuple2<Integer, String>> writer = |
| createWriter(index, false, bulkProcessorConfig)) { |
| final Optional<Gauge<Long>> currentSendTime = |
| metricListener.getGauge("currentSendTime"); |
| writer.write(Tuple2.of(1, buildMessage(1)), null); |
| writer.write(Tuple2.of(2, buildMessage(2)), null); |
| |
| writer.blockingFlushAllActions(); |
| |
| assertThat(currentSendTime).isPresent(); |
| assertThat(currentSendTime.get().getValue()).isGreaterThan(0L); |
| } |
| } |
| |
| private static class TestHandler implements FailureHandler { |
| private boolean failed = false; |
| |
| private synchronized void setFailed() { |
| failed = true; |
| } |
| |
| public boolean isFailed() { |
| return failed; |
| } |
| |
| @Override |
| public void onFailure(Throwable failure) { |
| setFailed(); |
| } |
| } |
| |
| @Test |
| void testWriteErrorOnUpdate() throws Exception { |
| final String index = "test-bulk-flush-with-error"; |
| final int flushAfterNActions = 1; |
| final BulkProcessorConfig bulkProcessorConfig = |
| new BulkProcessorConfig(flushAfterNActions, -1, -1, FlushBackoffType.NONE, 0, 0); |
| |
| final TestHandler testHandler = new TestHandler(); |
| try (final OpensearchWriter<Tuple2<Integer, String>> writer = |
| createWriter(index, true, bulkProcessorConfig, testHandler)) { |
| // Trigger an error by updating non-existing document |
| writer.write(Tuple2.of(1, "u" + buildMessage(1)), null); |
| context.assertThatIdsAreNotWritten(index, 1); |
| assertThat(testHandler.isFailed()).isEqualTo(true); |
| } |
| } |
| |
| private OpensearchWriter<Tuple2<Integer, String>> createWriter( |
| String index, boolean flushOnCheckpoint, BulkProcessorConfig bulkProcessorConfig) { |
| return createWriter( |
| index, |
| flushOnCheckpoint, |
| bulkProcessorConfig, |
| TestingSinkWriterMetricGroup.getSinkWriterMetricGroup( |
| metricListener.getMetricGroup()), |
| new DefaultFailureHandler()); |
| } |
| |
| private OpensearchWriter<Tuple2<Integer, String>> createWriter( |
| String index, |
| boolean flushOnCheckpoint, |
| BulkProcessorConfig bulkProcessorConfig, |
| FailureHandler failureHandler) { |
| return createWriter( |
| index, |
| flushOnCheckpoint, |
| bulkProcessorConfig, |
| TestingSinkWriterMetricGroup.getSinkWriterMetricGroup( |
| metricListener.getMetricGroup()), |
| failureHandler); |
| } |
| |
| private OpensearchWriter<Tuple2<Integer, String>> createWriter( |
| String index, |
| boolean flushOnCheckpoint, |
| BulkProcessorConfig bulkProcessorConfig, |
| SinkWriterMetricGroup metricGroup) { |
| return createWriter( |
| index, |
| flushOnCheckpoint, |
| bulkProcessorConfig, |
| metricGroup, |
| new DefaultFailureHandler()); |
| } |
| |
| private OpensearchWriter<Tuple2<Integer, String>> createWriter( |
| String index, |
| boolean flushOnCheckpoint, |
| BulkProcessorConfig bulkProcessorConfig, |
| SinkWriterMetricGroup metricGroup, |
| FailureHandler failureHandler) { |
| return new OpensearchWriter<Tuple2<Integer, String>>( |
| Collections.singletonList(HttpHost.create(OS_CONTAINER.getHttpHostAddress())), |
| new UpdatingEmitter(index, context.getDataFieldName()), |
| flushOnCheckpoint, |
| bulkProcessorConfig, |
| new NetworkClientConfig( |
| OS_CONTAINER.getUsername(), |
| OS_CONTAINER.getPassword(), |
| null, |
| null, |
| null, |
| null, |
| true), |
| metricGroup, |
| new TestMailbox(), |
| new DefaultRestClientFactory(), |
| new DefaultBulkResponseInspector(failureHandler)); |
| } |
| |
| private static class UpdatingEmitter implements OpensearchEmitter<Tuple2<Integer, String>> { |
| private static final long serialVersionUID = 1L; |
| |
| private final String dataFieldName; |
| private final String index; |
| |
| UpdatingEmitter(String index, String dataFieldName) { |
| this.index = index; |
| this.dataFieldName = dataFieldName; |
| } |
| |
| @Override |
| public void emit( |
| Tuple2<Integer, String> element, |
| SinkWriter.Context context, |
| RequestIndexer indexer) { |
| |
| Map<String, Object> document = new HashMap<>(); |
| document.put(dataFieldName, element.f1); |
| |
| final char action = element.f1.charAt(0); |
| final String id = element.f0.toString(); |
| switch (action) { |
| case 'd': |
| { |
| indexer.add(new DeleteRequest(index).id(id)); |
| break; |
| } |
| case 'u': |
| { |
| indexer.add(new UpdateRequest().index(index).id(id).doc(document)); |
| break; |
| } |
| default: |
| { |
| indexer.add(new IndexRequest(index).id(id).source(document)); |
| } |
| } |
| } |
| } |
| |
| private static class TestMailbox implements MailboxExecutor { |
| |
| @Override |
| public void execute( |
| ThrowingRunnable<? extends Exception> command, |
| String descriptionFormat, |
| Object... descriptionArgs) { |
| try { |
| command.run(); |
| } catch (Exception e) { |
| throw new RuntimeException("Unexpected error", e); |
| } |
| } |
| |
| @Override |
| public void yield() throws InterruptedException, FlinkRuntimeException { |
| Thread.sleep(100); |
| } |
| |
| @Override |
| public boolean tryYield() throws FlinkRuntimeException { |
| return false; |
| } |
| } |
| } |