blob: fdd53f5168ffc0e43ca117106f639a21ddd64b90 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.kafkaconnector.catalog;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.camel.kafkaconnector.CamelSinkConnectorConfig;
import org.apache.camel.kafkaconnector.CamelSourceConnectorConfig;
import org.apache.camel.kafkaconnector.model.CamelKafkaConnectorModel;
import org.apache.camel.kafkaconnector.model.CamelKafkaConnectorOptionModel;
import org.apache.camel.tooling.model.JsonMapper;
import org.apache.camel.util.json.JsonObject;
import org.apache.kafka.common.config.ConfigDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CamelKafkaConnectorCatalog {
private static final Logger LOG = LoggerFactory.getLogger(CamelKafkaConnectorCatalog.class);
private static final String CONNECTORS_DIR = "connectors";
private static final String DESCRIPTORS_DIR = "descriptors";
private static final String CONNECTORS_PROPERTIES = "connectors.properties";
private List<String> connectorsName = new ArrayList<>();
private Map<String, CamelKafkaConnectorModel> connectorsModel = new HashMap<>();
public CamelKafkaConnectorCatalog() {
initCatalog();
generateModel();
}
private void generateModel() {
for (String connector : connectorsName) {
connectorsModel.put(connector, getConnectorModel(connector));
}
}
private void initCatalog() {
try (InputStream input = CamelKafkaConnectorCatalog.class.getResourceAsStream("/" + DESCRIPTORS_DIR + "/" + CONNECTORS_PROPERTIES)) {
BufferedReader reader = new BufferedReader(new InputStreamReader(input));
while (reader.ready()) {
String connector = reader.readLine();
if (connector.equalsIgnoreCase("camel-coap-tcp-source")) {
connectorsName.add("camel-coap+tcp-source");
} else if (connector.equalsIgnoreCase("camel-coaps-tcp-source")) {
connectorsName.add("camel-coaps+tcp-source");
} else if (connector.equalsIgnoreCase("camel-coaps-tcp-sink")) {
connectorsName.add("camel-coaps+tcp-sink");
} else if (connector.equalsIgnoreCase("camel-coap-tcp-sink")) {
connectorsName.add("camel-coap+tcp-sink");
} else {
connectorsName.add(connector);
}
}
} catch (FileNotFoundException e) {
LOG.error("Cannot find file: {}", e.getMessage(), e);
} catch (IOException e) {
LOG.error("IO Exception: {}", e.getMessage(), e);
}
}
private String loadConnectorAsJsonFromEmbeddedCatalog(String connectorName) {
String result = null;
try (InputStream connectorModelInputSream = CamelKafkaConnectorCatalog.class.getResourceAsStream("/" + CONNECTORS_DIR + "/" + connectorName + ".json")) {
result = new BufferedReader(new InputStreamReader(connectorModelInputSream, StandardCharsets.UTF_8))
.lines()
.map(String::trim) // to change line
.collect(Collectors.joining());
} catch (IOException e) {
LOG.error("IO Exception: {}", e.getMessage(), e);
}
return result;
}
private CamelKafkaConnectorModel getConnectorModel(String connectorName) {
String json = loadConnectorAsJsonFromEmbeddedCatalog(connectorName);
return createModel(json);
}
private CamelKafkaConnectorModel createModel(String json) {
CamelKafkaConnectorModel model = new CamelKafkaConnectorModel();
JsonObject obj = JsonMapper.deserialize(json);
JsonObject wrapper = (JsonObject)obj.get("connector");
model.setConnectorClass((String)wrapper.get("class"));
model.setArtifactId((String)wrapper.get("artifactId"));
model.setGroupId((String)wrapper.get("groupId"));
model.setType((String)wrapper.get("type"));
model.setVersion((String)wrapper.get("version"));
model.setDescription((String)wrapper.get("description"));
model.setOptions(getConnectorOptionModel(obj));
if (obj.get("aggregationStrategies") != null) {
model.setAggregationStrategies((List<String>) obj.get("aggregationStrategies"));
}
if (obj.get("converters") != null) {
model.setConverters((List<String>) obj.get("converters"));
}
if (obj.get("transforms") != null) {
model.setTransforms((List<String>) obj.get("transforms"));
}
return model;
}
private List<CamelKafkaConnectorOptionModel> getConnectorOptionModel(JsonObject obj) {
List<CamelKafkaConnectorOptionModel> model = new ArrayList<>();
JsonObject wrapper = (JsonObject)obj.get("properties");
Set<String> options = wrapper.keySet();
for (String string : options) {
JsonObject object = (JsonObject)wrapper.get(string);
CamelKafkaConnectorOptionModel singleModel = new CamelKafkaConnectorOptionModel();
singleModel.setDefaultValue((String)object.get("defaultValue"));
singleModel.setPriority((String)object.get("priority"));
singleModel.setDescription((String)object.get("description"));
singleModel.setName((String)object.get("name"));
singleModel.setRequired((String)object.get("required"));
singleModel.setPossibleEnumValues((List<String>)object.get("enum"));
model.add(singleModel);
}
return model;
}
public List<String> getConnectorsName() {
return connectorsName;
}
public Map<String, CamelKafkaConnectorModel> getConnectorsModel() {
return connectorsModel;
}
public CamelKafkaConnectorOptionModel getOptionModel(String connectorName, String optionName) {
List<CamelKafkaConnectorOptionModel> options = getConnectorsModel().get(connectorName).getOptions();
for (CamelKafkaConnectorOptionModel camelKafkaConnectorOptionModel : options) {
if (camelKafkaConnectorOptionModel.getName().equals(optionName)) {
return camelKafkaConnectorOptionModel;
}
}
return null;
}
/**
* Register a new Connector definition in the catalog.
* If it already exists, the previous one is overwritten.
*
* @param connectorName - the connector name
* @param connectorDefinitionAsJson - the definition of the connector provided as a String with Json format
*/
public void addConnector(String connectorName, String connectorDefinitionAsJson) {
connectorsName.add(connectorName);
connectorsModel.put(connectorName, createModel(connectorDefinitionAsJson));
}
/**
* @param connectorName The connector name to remove from the Catalog
*/
public void removeConnector(String connectorName) {
connectorsName.remove(connectorName);
connectorsModel.remove(connectorName);
}
public ConfigDef getBasicConfigurationForSink() {
return CamelSinkConnectorConfig.conf();
}
public ConfigDef getBasicConfigurationForSource() {
return CamelSourceConnectorConfig.conf();
}
}