blob: 114e3110a1b7ff238093b5fa874dde544b8b319a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.kafkaconnector.catalog;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.camel.kafkaconnector.CamelSinkConnectorConfig;
import org.apache.camel.kafkaconnector.CamelSourceConnectorConfig;
import org.apache.camel.kafkaconnector.model.CamelKafkaConnectorModel;
import org.apache.camel.kafkaconnector.model.CamelKafkaConnectorOptionModel;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.ConfigKey;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
class CamelKafkaConnectorCatalogTest {
static CamelKafkaConnectorCatalog catalog;
@BeforeAll
public static void createCamelCatalog() {
catalog = new CamelKafkaConnectorCatalog();
}
@Test
void testConnectors() throws Exception {
List<String> list = catalog.getConnectorsName();
assertTrue(list.contains("camel-aws2-s3-sink"));
assertTrue(list.contains("camel-aws2-s3-source"));
}
@Test
void testAws2S3Options() throws Exception {
Map<String, CamelKafkaConnectorModel> p = catalog.getConnectorsModel();
CamelKafkaConnectorModel model = p.get("camel-aws2-s3-sink");
assertEquals("org.apache.camel.kafkaconnector", model.getGroupId());
assertEquals("sink", model.getType());
assertEquals("org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector", model.getConnectorClass());
assertEquals("camel.sink.path.bucketNameOrArn", model.getOptions().get(0).getName());
assertEquals("camel.sink.endpoint.amazonS3Client", model.getOptions().get(1).getName());
assertEquals("camel.sink.endpoint.amazonS3Presigner", model.getOptions().get(2).getName());
assertEquals(1, model.getConverters().size());
assertEquals(1, model.getTransforms().size());
assertEquals(1, model.getAggregationStrategies().size());
}
@Test
void testAws2SnsOptions() throws Exception {
Map<String, CamelKafkaConnectorModel> p = catalog.getConnectorsModel();
CamelKafkaConnectorModel model = p.get("camel-aws2-sns-sink");
assertEquals("org.apache.camel.kafkaconnector", model.getGroupId());
assertEquals("sink", model.getType());
assertEquals("org.apache.camel.kafkaconnector.aws2sns.CamelAws2snsSinkConnector", model.getConnectorClass());
assertEquals("camel.sink.path.topicNameOrArn", model.getOptions().get(0).getName());
assertEquals("camel.sink.endpoint.amazonSNSClient", model.getOptions().get(1).getName());
assertEquals("camel.sink.endpoint.autoCreateTopic", model.getOptions().get(2).getName());
assertEquals("false", model.getOptions().get(2).getDefaultValue());
assertNull(model.getOptions().get(1).getDefaultValue());
assertNull(model.getConverters());
assertNull(model.getTransforms());
assertNull(model.getAggregationStrategies());
}
@Test
void testCouchbaseOptions() throws Exception {
Map<String, CamelKafkaConnectorModel> p = catalog.getConnectorsModel();
CamelKafkaConnectorModel model = p.get("camel-couchbase-source");
assertEquals("org.apache.camel.kafkaconnector", model.getGroupId());
assertEquals("source", model.getType());
assertEquals("org.apache.camel.kafkaconnector.couchbase.CamelCouchbaseSourceConnector", model.getConnectorClass());
assertEquals("camel.source.path.protocol", model.getOptions().get(0).getName());
assertNull(model.getOptions().get(0).getDefaultValue());
assertNull(model.getConverters());
assertNull(model.getTransforms());
assertNull(model.getAggregationStrategies());
}
@Test
void testAddConnector() throws Exception {
String connectorName = "my-test-connector";
catalog.addConnector(connectorName, "{\n"
+ " \"connector\": {\n"
+ " \"class\": \"org.apache.camel.kafkaconnector.my-test-connector.TestDemoConnector\",\n"
+ " \"artifactId\": \"camel-my-test-connector-kafka-connector\",\n"
+ " \"groupId\": \"org.apache.camel.kafkaconnector\",\n"
+ " \"id\": \"my-test-connector\",\n"
+ " \"type\": \"sink\",\n"
+ " \"version\": \"0.6.0-SNAPSHOT\"\n"
+ " },\n"
+ " \"properties\": {\n"
+ " \"camel.component.my-test-connector.demo\": {\n"
+ " \"name\": \"camel.component.my-test-connector.demo\",\n"
+ " \"description\": \"A demo description of the component\",\n"
+ " \"defaultValue\": \"\\\"firstValue\\\"\",\n"
+ " \"priority\": \"MEDIUM\",\n"
+ " \"enum\": [\"firstValue\",\"secondValue\"]\n"
+ " }\n"
+ " }\n"
+ "}\n");
assertTrue(catalog.getConnectorsName().contains(connectorName), "The new Connector wasn't added in the ConnectorNames list.");
assertNotNull(catalog.getConnectorsModel().get(connectorName), "The new Connector wasn't added in the ConnectorModel map.");
checkAddedConnectorContainsCorrectPropertyValues(connectorName);
}
private void checkAddedConnectorContainsCorrectPropertyValues(String connectorName) {
CamelKafkaConnectorOptionModel camelKafkaConnectorOptionModel = catalog.getConnectorsModel().get(connectorName).getOptions().get(0);
assertEquals("\"firstValue\"", camelKafkaConnectorOptionModel.getDefaultValue());
assertEquals("camel.component.my-test-connector.demo", camelKafkaConnectorOptionModel.getName());
assertEquals("MEDIUM", camelKafkaConnectorOptionModel.getPriority());
assertEquals("A demo description of the component", camelKafkaConnectorOptionModel.getDescription());
assertEquals(Arrays.asList("firstValue", "secondValue"), camelKafkaConnectorOptionModel.getPossibleEnumValues());
}
@Test
void testRemoveConnector() throws Exception {
String connectorName = "my-test-to-remove-connector";
catalog.addConnector(connectorName, "{\n"
+ " \"connector\": {\n"
+ " \"class\": \"org.apache.camel.kafkaconnector.my-test-connector.TestDemoConnector\",\n"
+ " \"artifactId\": \"camel-my-test-connector-kafka-connector\",\n"
+ " \"groupId\": \"org.apache.camel.kafkaconnector\",\n"
+ " \"id\": \"my-test-to-remove-connector\",\n"
+ " \"type\": \"sink\",\n"
+ " \"version\": \"0.6.0-SNAPSHOT\"\n"
+ " },\n"
+ " \"properties\": {}\n"
+ "}\n");
catalog.removeConnector(connectorName);
assertFalse(catalog.getConnectorsName().contains(connectorName), "The connector is still present in ConnectorNames list.");
assertNull(catalog.getConnectorsModel().get(connectorName), "The connector model is still present in the ConnectorsModel map.");
}
@Test
void testAws2SnsGetSingleOption() throws Exception {
Map<String, CamelKafkaConnectorModel> p = catalog.getConnectorsModel();
CamelKafkaConnectorOptionModel existingOption = catalog.getOptionModel("camel-aws2-sns-sink", "camel.sink.path.topicNameOrArn");
assertNotNull(existingOption);
assertEquals("true", existingOption.getRequired());
assertEquals("Topic name or ARN", existingOption.getDescription());
CamelKafkaConnectorOptionModel nonExistingOption = catalog.getOptionModel("camel-aws2-sns-sink", "camel.sink.path.topiNameOrAr");
assertNull(nonExistingOption);
}
@Test
void testConnectorContainsDescription() throws Exception {
Map<String, CamelKafkaConnectorModel> p = catalog.getConnectorsModel();
CamelKafkaConnectorModel model = p.get("camel-aws2-s3-sink");
assertEquals("Store and retrieve objects from AWS S3 Storage Service using AWS SDK version 2.x.", model.getDescription());
}
@Test
void testBasicConfigurationForSink() throws Exception {
ConfigDef sinkConfigDef = catalog.getBasicConfigurationForSink();
ConfigKey marshalConfigKey = sinkConfigDef.configKeys().get(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF);
assertEquals(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF, marshalConfigKey.name);
assertEquals(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_DOC, marshalConfigKey.documentation);
assertEquals(Type.STRING, marshalConfigKey.type);
}
@Test
void testBasicConfigurationForSource() throws Exception {
ConfigDef sourceConfigDef = catalog.getBasicConfigurationForSource();
ConfigKey marshalConfigKey = sourceConfigDef.configKeys().get(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF);
assertEquals(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF, marshalConfigKey.name);
assertEquals(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_DOC, marshalConfigKey.documentation);
assertEquals(Type.STRING, marshalConfigKey.type);
}
}