blob: bc02984a87651be45f35bf5d1baf133854b9b669 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.kafkaconnector.timer.source;
import java.util.concurrent.ExecutionException;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* A simple test case that checks whether the timer produces the expected number of
* messages
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CamelSourceTimerITCase extends AbstractKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(CamelSourceTimerITCase.class);
private int received;
private final int expect = 10;
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-timer-kafka-connector"};
}
@BeforeEach
public void setUp() {
received = 0;
}
private boolean checkRecord(ConsumerRecord<String, String> record) {
received++;
if (received == expect) {
return false;
}
return true;
}
private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
connectorPropertyFactory.log();
getKafkaConnectService().initializeConnector(connectorPropertyFactory);
LOG.debug("Creating the consumer ...");
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
LOG.debug("Created the consumer ...");
assertEquals(received, expect);
}
@Test
@Timeout(90)
public void testLaunchConnector() throws ExecutionException, InterruptedException {
CamelTimerPropertyFactory connectorPropertyFactory = CamelTimerPropertyFactory
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withTimerName("launchTest")
.withRepeatCount(expect);
runTest(connectorPropertyFactory);
}
@Test
@Timeout(90)
public void testLaunchConnectorUsingUrl() throws ExecutionException, InterruptedException {
CamelTimerPropertyFactory connectorPropertyFactory = CamelTimerPropertyFactory
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withUrl("launchTestUsingUrl")
.append("repeatCount", expect)
.buildUrl();
runTest(connectorPropertyFactory);
}
}