blob: 8c26ddd3ce654b19f213144bd135ee6438c2e63e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.kafkaconnector.couchbase.sink;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.couchbase.client.core.diagnostics.EndpointPingReport;
import com.couchbase.client.core.diagnostics.PingResult;
import com.couchbase.client.core.diagnostics.PingState;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.manager.bucket.BucketSettings;
import com.couchbase.client.java.query.QueryResult;
import org.apache.camel.kafkaconnector.CamelSinkTask;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
import org.apache.camel.test.infra.common.TestUtils;
import org.apache.camel.test.infra.couchbase.services.CouchbaseService;
import org.apache.camel.test.infra.couchbase.services.CouchbaseServiceFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.fail;
/*
This test is slow and flaky. It tends to fail on systems with limited resources and slow I/O. Therefore, it is
disabled by default. Also, suffers from bugs in the couchbase test container:
- https://github.com/testcontainers/testcontainers-java/issues/2993
Therefore, this test is marked as flaky and only runs if specifically enabled.
*/
@EnabledIfSystemProperty(named = "enable.flaky.tests", matches = "true")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport {
@RegisterExtension
public static CouchbaseService service = CouchbaseServiceFactory.createService();
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkCouchbaseITCase.class);
private String bucketName;
private String topic;
private Cluster cluster;
private final int expect = 10;
private static class CustomProducer extends StringMessageProducer {
public CustomProducer(String bootstrapServer, String topicName, int count) {
super(bootstrapServer, topicName, count);
}
@Override
public String testMessageContent(int current) {
JsonObject jsonObject = JsonObject.create().put("data", String.format("test-%d", current));
return jsonObject.toString();
}
@Override
public Map<String, String> messageHeaders(String text, int current) {
Map<String, String> parameters = new HashMap<>();
parameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CCB_ID", String.valueOf(current));
return parameters;
}
}
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-couchbase-kafka-connector"};
}
@BeforeEach
public void setUp() {
bucketName = "testBucket" + TestUtils.randomWithRange(0, 100);
cluster = Cluster.connect(service.getConnectionString(), service.getUsername(), service.getPassword());
cluster.ping().endpoints().entrySet().forEach(this::checkEndpoints);
LOG.debug("Creating a new bucket named {}", bucketName);
cluster.buckets().createBucket(BucketSettings.create(bucketName));
PingResult pingResult = cluster.bucket(bucketName).ping();
pingResult.endpoints().entrySet().forEach(this::checkEndpoints);
LOG.debug("Bucket created");
topic = getTopicForTest(this);
try {
String startDelay = System.getProperty("couchbase.test.start.delay", "1000");
int delay = Integer.parseInt(startDelay);
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupted();
}
}
@AfterEach
public void tearDown() {
LOG.debug("Dropping the test bucket named {}", bucketName);
cluster.buckets().dropBucket(bucketName);
LOG.debug("Bucket dropped");
cluster.disconnect();
}
@Override
protected void consumeMessages(CountDownLatch latch) {
try {
TestUtils.waitFor(this::waitForMinimumRecordCount);
} finally {
latch.countDown();
}
}
@Override
protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
if (latch.await(110, TimeUnit.SECONDS)) {
verifyRecords();
} else {
fail("Failed to receive the records within the specified time");
}
}
private void checkEndpoints(Map.Entry<ServiceType, List<EndpointPingReport>> entries) {
entries.getValue().forEach(this::checkStatus);
}
private void checkStatus(EndpointPingReport endpointPingReport) {
if (endpointPingReport.state() == PingState.OK) {
LOG.debug("Endpoint {} is ok", endpointPingReport.id());
} else {
LOG.warn("Endpoint {} is not OK", endpointPingReport.id());
}
}
private boolean waitForMinimumRecordCount() {
try {
String query = String.format("select count(*) as count from `%s`", bucketName);
QueryResult queryResult = cluster.query(query);
List<JsonObject> results = queryResult.rowsAsObject();
if (results.isEmpty()) {
return false;
}
int size = results.get(0).getInt("count");
if (size < expect) {
LOG.info("There are only {} records at the moment", size);
return false;
}
return size == expect;
} catch (Exception e) {
LOG.warn("Exception while waiting for the records to arrive: {}", e.getMessage(), e);
}
return false;
}
private void verifyRecords() {
String query = String.format("select * from `%s` USE KEYS \"1\"", bucketName);
QueryResult queryResult = cluster.query(query);
List<JsonObject> results = queryResult.rowsAsObject();
assertFalse(results.isEmpty(), "There should be at least 1 record on the result");
LOG.debug("Received record: {}", results.get(0));
}
@Disabled("Not formatting the URL correctly - issue #629")
@Test
@Timeout(90)
public void testBasicSendReceive() throws Exception {
ConnectorPropertyFactory factory = CamelCouchbasePropertyFactory.basic()
.withTopics(topic)
.withBucket(bucketName)
.withProtocol("http")
.withHostname(service.getHostname())
.withPort(service.getPort())
.withUsername(service.getUsername())
.withPassword(service.getPassword());
runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect));
}
@RepeatedTest(10)
@Timeout(90)
public void testBasicSendReceiveUsingUrl() throws Exception {
ConnectorPropertyFactory factory = CamelCouchbasePropertyFactory.basic()
.withTopics(topic)
.withUrl("http", service.getHostname(), service.getPort())
.append("bucket", bucketName)
.append("username", service.getUsername())
.append("password", service.getPassword())
.append("connectTimeout", 5000)
.append("queryTimeout", 5000)
.append("producerRetryAttempts", 10)
.append("producerRetryPause", 7500)
.buildUrl();
runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect));
}
}