blob: fa5395707270c2d5c6a4a15097c99994c3b8d7d2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.extensions;
import java.util.Objects;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.statefun.sdk.TypeName;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
/**
* A {@link ComponentJsonObject} consists one or more application entities (i.e. function providers,
* ingresses, routers, or egresses) that should be bound to a remote {@link StatefulFunctionModule}.
*
* <p>Each component is represented in remote module YAML specification files as a single YAML
* document of the following format:
*
* <pre>
* kind: com.foo.bar.v5/some.component (typename)
* spec:
* ... (specification document of the component)
* </pre>
*
* <p>The {@code kind} is a {@link TypeName} that should be universally unique within the
* application. It identifies which {@link ComponentBinder} recognizes this component and knows how
* to parse it to resolve application entities to be bound to the module.
*
* @see ComponentBinder
*/
@PublicEvolving
public final class ComponentJsonObject {
public static final String BINDER_KIND_FIELD = "kind";
public static final String SPEC_FIELD = "spec";
private final ObjectNode rawObjectNode;
private final TypeName binderTypename;
private final JsonNode specJsonNode;
public ComponentJsonObject(JsonNode jsonNode) {
Objects.requireNonNull(jsonNode);
checkIsObject(jsonNode);
this.rawObjectNode = (ObjectNode) jsonNode;
this.binderTypename = parseBinderTypename(rawObjectNode);
this.specJsonNode = extractSpecJsonNode(rawObjectNode);
}
/**
* Returns the complete component JSON object.
*
* @return the complete component JSON object.
*/
public ObjectNode get() {
return rawObjectNode;
}
/**
* Returns the {@link TypeName} of the binder for this component.
*
* @return the {@link TypeName} of the binder for this component.
*/
public TypeName binderTypename() {
return binderTypename;
}
/**
* Returns the specification JSON node for this component.
*
* @return the specification JSON node for this component.
*/
public JsonNode specJsonNode() {
return specJsonNode;
}
@Override
public String toString() {
return rawObjectNode.toString();
}
private static void checkIsObject(JsonNode jsonNode) {
if (!jsonNode.isObject()) {
throwExceptionWithFormatHint();
}
}
private static TypeName parseBinderTypename(ObjectNode componentObject) {
final JsonNode binderKindObject = componentObject.get(BINDER_KIND_FIELD);
if (binderKindObject == null) {
throwExceptionWithFormatHint();
}
try {
return TypeName.parseFrom(binderKindObject.asText());
} catch (Exception e) {
throw new ComponentJsonFormatException("Invalid binder kind format.", e);
}
}
private static JsonNode extractSpecJsonNode(ObjectNode componentObject) {
final JsonNode specJsonNode = componentObject.get(SPEC_FIELD);
if (specJsonNode == null) {
throwExceptionWithFormatHint();
}
return specJsonNode;
}
private static void throwExceptionWithFormatHint() {
throw new ComponentJsonFormatException(
"Invalid ComponentJsonObject; components should be a JSON object with the required fields [kind] and [spec].");
}
}