blob: 4be184269896f01f3050b2ceff371f7b6aa3456d [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.plc4x.kafka;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.plc4x.kafka.util.VersionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URL;
import java.util.*;
import java.util.stream.Collectors;
public class Plc4xSourceConnector extends SourceConnector {
private static final Logger log = LoggerFactory.getLogger(Plc4xSourceConnector.class);
private static final String TOPIC_CONFIG = "topic";
private static final String TOPIC_DOC = "Kafka topic to publish to";
private static final String QUERIES_CONFIG = "queries";
private static final String QUERIES_DOC = "Field queries to be sent to the PLC";
private static final String JSON_CONFIG = "json.url";
private static final String JSON_DEFAULT = "";
private static final String JSON_DOC = "JSON configuration";
private static final String RATE_CONFIG = "rate";
private static final Integer RATE_DEFAULT = 1000;
private static final String RATE_DOC = "Polling rate";
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
.define(QUERIES_CONFIG, ConfigDef.Type.LIST, new LinkedList<>(), ConfigDef.Importance.HIGH, QUERIES_DOC)
.define(JSON_CONFIG, ConfigDef.Type.STRING, JSON_DEFAULT, ConfigDef.Importance.HIGH, JSON_DOC)
.define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
private String topic;
private List<String> queries;
private String json;
private Integer rate;
@Override
public Class<? extends Task> taskClass() {
return Plc4xSourceTask.class;
}
@Override
@SuppressWarnings("unchecked")
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> configs = new LinkedList<>();
if (json.isEmpty()) {
Map<String, List<String>> groupedByHost = new HashMap<>();
queries.stream().map(query -> query.split("#", 2)).collect(Collectors.groupingBy(parts -> parts[0])).forEach((host, q) ->
groupedByHost.put(host, q.stream().map(parts -> parts[1]).collect(Collectors.toList())));
if (groupedByHost.size() > maxTasks) {
// Not enough tasks
// TODO: throw exception?
return Collections.emptyList();
}
groupedByHost.forEach((host, qs) -> {
Map<String, String> taskConfig = new HashMap<>();
taskConfig.put(Plc4xSourceTask.TOPIC_CONFIG, topic);
taskConfig.put(Plc4xSourceTask.URL_CONFIG, host);
taskConfig.put(Plc4xSourceTask.QUERIES_CONFIG, String.join(",", qs));
taskConfig.put(Plc4xSourceTask.RATE_CONFIG, rate.toString());
configs.add(taskConfig);
});
} else {
try {
// TODO
String config = new Scanner(new URL(json).openStream(), "UTF-8").useDelimiter("\\A").next();
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> values = mapper.readValue(config, new TypeReference<Map<String, Object>>() {});
List<Map<String, Object>> plcs = (List<Map<String, Object>>) values.get("PLCs");
log.info("TASKS REQUIRED: " + plcs.size());
if (plcs.size() > maxTasks) {
// not enough tasks
log.warn("NOT ENOUGH TASKS!");
return Collections.emptyList();
}
for (Map<String, Object> plc : plcs) {
Map<String, String> taskConfig = new HashMap<>();
String ip = plc.get("IP").toString();
String topic = ip;
String url = "s7://" + ip + "/1/" + plc.get("Slot");
List<String> queries = new LinkedList<>();
for (Map<String, Object> operand : (List<Map<String, Object>>)plc.get("operands")) {
String query = "%" + operand.get("Operand") + ":" + operand.get("Datatype");
queries.add(query);
}
taskConfig.put(Plc4xSourceTask.TOPIC_CONFIG, topic);
taskConfig.put(Plc4xSourceTask.URL_CONFIG, url);
taskConfig.put(RATE_CONFIG, rate.toString());
taskConfig.put(Plc4xSourceTask.QUERIES_CONFIG, String.join(",", queries));
configs.add(taskConfig);
}
} catch (IOException e) {
log.error("ERROR CONFIGURING TASK", e);
}
}
return configs;
}
@Override
public void start(Map<String, String> props) {
AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
topic = config.getString(TOPIC_CONFIG);
queries = config.getList(QUERIES_CONFIG);
rate = config.getInt(RATE_CONFIG);
json = config.getString(JSON_CONFIG);
}
@Override
public void stop() {}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public String version() {
return VersionUtil.getVersion();
}
}