blob: 43dfe86768eb19547faf92c003fdc07149c0e80d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.flink.io.kafka;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
import org.apache.flink.statefun.flink.common.json.Selectors;
import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
import org.apache.flink.statefun.flink.io.generated.TargetFunctionType;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressAutoResetPosition;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
import org.apache.flink.statefun.sdk.kafka.KafkaTopicPartition;
final class KafkaIngressSpecJsonParser {
private KafkaIngressSpecJsonParser() {}
private static final JsonPointer DESCRIPTOR_SET_POINTER =
JsonPointer.compile("/ingress/spec/descriptorSet");
private static final JsonPointer TOPICS_POINTER = JsonPointer.compile("/ingress/spec/topics");
private static final JsonPointer MESSAGE_TYPE_POINTER =
JsonPointer.compile("/ingress/spec/messageType");
private static final JsonPointer PROPERTIES_POINTER =
JsonPointer.compile("/ingress/spec/properties");
private static final JsonPointer ADDRESS_POINTER = JsonPointer.compile("/ingress/spec/address");
private static final JsonPointer GROUP_ID_POINTER =
JsonPointer.compile("/ingress/spec/consumerGroupId");
private static final JsonPointer AUTO_RESET_POS_POINTER =
JsonPointer.compile("/ingress/spec/autoOffsetResetPosition");
private static final JsonPointer STARTUP_POS_POINTER =
JsonPointer.compile("/ingress/spec/startupPosition");
private static final JsonPointer STARTUP_POS_TYPE_POINTER =
JsonPointer.compile("/ingress/spec/startupPosition/type");
private static final JsonPointer STARTUP_SPECIFIC_OFFSETS_POINTER =
JsonPointer.compile("/ingress/spec/startupPosition/offsets");
private static final JsonPointer STARTUP_DATE_POINTER =
JsonPointer.compile("/ingress/spec/startupPosition/date");
private static final JsonPointer ROUTABLE_TOPIC_NAME_POINTER = JsonPointer.compile("/topic");
private static final JsonPointer ROUTABLE_TOPIC_VALUE_TYPE_POINTER =
JsonPointer.compile("/valueType");
private static final JsonPointer ROUTABLE_TOPIC_TARGETS_POINTER = JsonPointer.compile("/targets");
private static final String STARTUP_DATE_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS Z";
private static final DateTimeFormatter STARTUP_DATE_FORMATTER =
DateTimeFormatter.ofPattern(STARTUP_DATE_PATTERN);
static List<String> topics(JsonNode json) {
return Selectors.textListAt(json, TOPICS_POINTER);
}
static Map<String, RoutingConfig> routableTopics(JsonNode json) {
Map<String, RoutingConfig> routableTopics = new HashMap<>();
for (JsonNode routableTopicNode : Selectors.listAt(json, TOPICS_POINTER)) {
final String topic = Selectors.textAt(routableTopicNode, ROUTABLE_TOPIC_NAME_POINTER);
final String typeUrl = Selectors.textAt(routableTopicNode, ROUTABLE_TOPIC_VALUE_TYPE_POINTER);
final List<TargetFunctionType> targets = parseRoutableTargetFunctionTypes(routableTopicNode);
routableTopics.put(
topic,
RoutingConfig.newBuilder()
.setTypeUrl(typeUrl)
.addAllTargetFunctionTypes(targets)
.build());
}
return routableTopics;
}
static Properties kafkaClientProperties(JsonNode json) {
Map<String, String> kvs = Selectors.propertiesAt(json, PROPERTIES_POINTER);
Properties properties = new Properties();
kvs.forEach(properties::setProperty);
return properties;
}
static String kafkaAddress(JsonNode json) {
return Selectors.textAt(json, ADDRESS_POINTER);
}
@SuppressWarnings("unchecked")
static <T> KafkaIngressDeserializer<T> deserializer(JsonNode json) {
String descriptorSetPath = Selectors.textAt(json, DESCRIPTOR_SET_POINTER);
String messageType = Selectors.textAt(json, MESSAGE_TYPE_POINTER);
// this cast is safe since we validate that the produced message type (T) is assignable to a
// Message.
// see asJsonIngressSpec()
return (KafkaIngressDeserializer<T>)
new ProtobufKafkaIngressDeserializer(descriptorSetPath, messageType);
}
static Optional<String> optionalConsumerGroupId(JsonNode json) {
return Selectors.optionalTextAt(json, GROUP_ID_POINTER);
}
static Optional<KafkaIngressAutoResetPosition> optionalAutoOffsetResetPosition(JsonNode json) {
Optional<String> conf = Selectors.optionalTextAt(json, AUTO_RESET_POS_POINTER);
if (!conf.isPresent()) {
return Optional.empty();
}
String autoOffsetResetConfig = conf.get().toUpperCase(Locale.ENGLISH);
try {
return Optional.of(KafkaIngressAutoResetPosition.valueOf(autoOffsetResetConfig));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Invalid autoOffsetResetPosition: "
+ autoOffsetResetConfig
+ "; valid values are "
+ Arrays.toString(KafkaIngressAutoResetPosition.values()),
e);
}
}
static Optional<KafkaIngressStartupPosition> optionalStartupPosition(JsonNode json) {
if (json.at(STARTUP_POS_POINTER).isMissingNode()) {
return Optional.empty();
}
String startupType =
Selectors.textAt(json, STARTUP_POS_TYPE_POINTER).toLowerCase(Locale.ENGLISH);
switch (startupType) {
case "group-offsets":
return Optional.of(KafkaIngressStartupPosition.fromGroupOffsets());
case "earliest":
return Optional.of(KafkaIngressStartupPosition.fromEarliest());
case "latest":
return Optional.of(KafkaIngressStartupPosition.fromLatest());
case "specific-offsets":
return Optional.of(
KafkaIngressStartupPosition.fromSpecificOffsets(specificOffsetsStartupMap(json)));
case "date":
return Optional.of(KafkaIngressStartupPosition.fromDate(startupDate(json)));
default:
throw new IllegalArgumentException(
"Invalid startup position type: "
+ startupType
+ "; valid values are [group-offsets, earliest, latest, specific-offsets, date]");
}
}
private static Map<KafkaTopicPartition, Long> specificOffsetsStartupMap(JsonNode json) {
Map<String, Long> kvs = Selectors.longPropertiesAt(json, STARTUP_SPECIFIC_OFFSETS_POINTER);
Map<KafkaTopicPartition, Long> offsets = new HashMap<>(kvs.size());
kvs.forEach(
(partition, offset) ->
offsets.put(KafkaTopicPartition.fromString(partition), validateOffsetLong(offset)));
return offsets;
}
private static ZonedDateTime startupDate(JsonNode json) {
String dateStr = Selectors.textAt(json, STARTUP_DATE_POINTER);
try {
return ZonedDateTime.parse(dateStr, STARTUP_DATE_FORMATTER);
} catch (DateTimeParseException e) {
throw new IllegalArgumentException(
"Unable to parse date string for startup position: "
+ dateStr
+ "; the date should conform to the pattern "
+ STARTUP_DATE_PATTERN,
e);
}
}
private static Long validateOffsetLong(Long offset) {
if (offset < 0) {
throw new IllegalArgumentException(
"Invalid offset value: "
+ offset
+ "; must be a numeric integer with value between 0 and "
+ Long.MAX_VALUE);
}
return offset;
}
private static List<TargetFunctionType> parseRoutableTargetFunctionTypes(
JsonNode routableTopicNode) {
final List<TargetFunctionType> targets = new ArrayList<>();
for (String namespaceAndName :
Selectors.textListAt(routableTopicNode, ROUTABLE_TOPIC_TARGETS_POINTER)) {
NamespaceNamePair namespaceNamePair = NamespaceNamePair.from(namespaceAndName);
targets.add(
TargetFunctionType.newBuilder()
.setNamespace(namespaceNamePair.namespace())
.setType(namespaceNamePair.name())
.build());
}
return targets;
}
}