blob: f746a7f8b04e5927a0ab32ea740d13520c611c0b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.flink.common.json;
import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.util.TimeUtils;
public final class Selectors {
public static Optional<ObjectNode> optionalObjectAt(JsonNode node, JsonPointer pointer) {
node = node.at(pointer);
if (node.isMissingNode()) {
return Optional.empty();
}
if (!node.isObject()) {
throw new WrongTypeException(pointer, "not an object");
}
return Optional.of((ObjectNode) node);
}
public static String textAt(JsonNode node, JsonPointer pointer) {
node = dereference(node, pointer);
if (!node.isTextual()) {
throw new WrongTypeException(pointer, "not a string");
}
return node.asText();
}
public static Optional<String> optionalTextAt(JsonNode node, JsonPointer pointer) {
node = node.at(pointer);
if (node.isMissingNode()) {
return Optional.empty();
}
if (!node.isTextual()) {
throw new WrongTypeException(pointer, "not a string");
}
return Optional.of(node.asText());
}
public static Duration durationAt(JsonNode node, JsonPointer pointer) {
node = dereference(node, pointer);
if (!node.isTextual()) {
throw new WrongTypeException(pointer, "not a duration");
}
try {
return TimeUtils.parseDuration(node.asText());
} catch (IllegalArgumentException ignore) {
throw new WrongTypeException(pointer, "not a duration");
}
}
public static Optional<Duration> optionalDurationAt(JsonNode node, JsonPointer pointer) {
node = node.at(pointer);
if (node.isMissingNode()) {
return Optional.empty();
}
if (!node.isTextual()) {
throw new WrongTypeException(pointer, "not a duration");
}
try {
Duration duration = TimeUtils.parseDuration(node.asText());
return Optional.of(duration);
} catch (IllegalArgumentException ignore) {
throw new WrongTypeException(pointer, "not a duration");
}
}
public static int integerAt(JsonNode node, JsonPointer pointer) {
node = dereference(node, pointer);
if (!node.isInt()) {
throw new WrongTypeException(pointer, "not an integer");
}
return node.asInt();
}
public static long longAt(JsonNode node, JsonPointer pointer) {
node = dereference(node, pointer);
if (!node.isLong() && !node.isInt()) {
throw new WrongTypeException(pointer, "not a long");
}
return node.asLong();
}
public static OptionalLong optionalLongAt(JsonNode node, JsonPointer pointer) {
node = node.at(pointer);
if (node.isMissingNode()) {
return OptionalLong.empty();
}
if (!node.isLong() && !node.isInt()) {
throw new WrongTypeException(pointer, "not a long");
}
return OptionalLong.of(node.asLong());
}
public static OptionalInt optionalIntegerAt(JsonNode node, JsonPointer pointer) {
node = node.at(pointer);
if (node.isMissingNode()) {
return OptionalInt.empty();
}
if (!node.isInt()) {
throw new WrongTypeException(pointer, "not an integer");
}
return OptionalInt.of(node.asInt());
}
public static Iterable<? extends JsonNode> listAt(JsonNode node, JsonPointer pointer) {
node = node.at(pointer);
if (node.isMissingNode()) {
return Collections.emptyList();
}
if (!node.isArray()) {
throw new WrongTypeException(pointer, "not a list");
}
return node;
}
public static List<String> textListAt(JsonNode node, JsonPointer pointer) {
node = node.at(pointer);
if (node.isMissingNode()) {
return Collections.emptyList();
}
if (!node.isArray()) {
throw new WrongTypeException(pointer, "not a list");
}
return StreamSupport.stream(node.spliterator(), false)
.filter(JsonNode::isTextual)
.map(JsonNode::asText)
.collect(Collectors.toList());
}
public static Map<String, String> propertiesAt(JsonNode node, JsonPointer pointer) {
node = node.at(pointer);
if (node.isMissingNode()) {
return Collections.emptyMap();
}
if (!node.isArray()) {
throw new WrongTypeException(pointer, "not a key-value list");
}
Map<String, String> properties = new LinkedHashMap<>();
for (JsonNode listElement : node) {
Iterator<Map.Entry<String, JsonNode>> fields = listElement.fields();
if (!fields.hasNext()) {
throw new WrongTypeException(pointer, "not a key-value list");
}
Map.Entry<String, JsonNode> field = fields.next();
properties.put(field.getKey(), field.getValue().asText());
}
return properties;
}
public static Map<String, Long> longPropertiesAt(JsonNode node, JsonPointer pointer) {
node = node.at(pointer);
if (node.isMissingNode()) {
return Collections.emptyMap();
}
if (!node.isArray()) {
throw new WrongTypeException(pointer, "not a key-value list");
}
Map<String, Long> longProperties = new LinkedHashMap<>();
for (JsonNode listElement : node) {
Iterator<Map.Entry<String, JsonNode>> fields = listElement.fields();
if (!fields.hasNext()) {
throw new WrongTypeException(pointer, "not a key-value list");
}
Map.Entry<String, JsonNode> field = fields.next();
if (!field.getValue().isLong() && !field.getValue().isInt()) {
throw new WrongTypeException(
pointer,
"value for key-value pair at "
+ field.getKey()
+ " is not a long: "
+ field.getValue());
}
longProperties.put(field.getKey(), field.getValue().asLong());
}
return longProperties;
}
private static JsonNode dereference(JsonNode node, JsonPointer pointer) {
node = node.at(pointer);
if (node.isMissingNode()) {
throw new MissingKeyException(pointer);
}
return node;
}
}