blob: 288986719b62d1f067d985e3f9d8c5ee82517ec0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ.testsuite.dag;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
/**
* Utility class to SerDe workflow dag.
*/
public class DagUtils {
static final ObjectMapper MAPPER = new ObjectMapper();
/**
* Converts a YAML path to {@link WorkflowDag}.
*/
public static WorkflowDag convertYamlPathToDag(FileSystem fs, String path) throws IOException {
InputStream is = fs.open(new Path(path));
return convertYamlToDag(toString(is));
}
/**
* Converts a YAML representation to {@link WorkflowDag}.
*/
public static WorkflowDag convertYamlToDag(String yaml) throws IOException {
Map<String, DagNode> allNodes = new HashMap<>();
final ObjectMapper yamlReader = new ObjectMapper(new YAMLFactory());
final JsonNode jsonNode = yamlReader.readTree(yaml);
Iterator<Entry<String, JsonNode>> itr = jsonNode.fields();
while (itr.hasNext()) {
Entry<String, JsonNode> dagNode = itr.next();
allNodes.put(dagNode.getKey(), convertJsonToDagNode(allNodes, dagNode.getValue()));
}
return new WorkflowDag(findRootNodes(allNodes));
}
/**
* Converts {@link WorkflowDag} to a YAML representation.
*/
public static String convertDagToYaml(WorkflowDag dag) throws IOException {
final ObjectMapper yamlWriter = new ObjectMapper(new YAMLFactory().disable(Feature.WRITE_DOC_START_MARKER)
.enable(Feature.MINIMIZE_QUOTES).enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES));
JsonNode yamlNode = MAPPER.createObjectNode();
convertDagToYaml(yamlNode, dag.getNodeList());
return yamlWriter.writerWithDefaultPrettyPrinter().writeValueAsString(yamlNode);
}
private static void convertDagToYaml(JsonNode yamlNode, List<DagNode> dagNodes) throws IOException {
for (DagNode dagNode : dagNodes) {
String name = dagNode.getConfig().getOtherConfigs().getOrDefault(DeltaConfig.Config.NODE_NAME, dagNode.getName()).toString();
((ObjectNode) yamlNode).put(name, convertDagNodeToJsonNode(dagNode));
if (dagNode.getChildNodes().size() > 0) {
convertDagToYaml(yamlNode, dagNode.getChildNodes());
}
}
}
private static DagNode convertJsonToDagNode(Map<String, DagNode> allNodes, JsonNode node) throws IOException {
String type = node.get(DeltaConfig.Config.TYPE).asText();
final DagNode retNode = convertJsonToDagNode(node, type);
Arrays.asList(node.get(DeltaConfig.Config.DEPENDENCIES).textValue().split(",")).stream().forEach(dep -> {
DagNode parentNode = allNodes.get(dep);
if (parentNode != null) {
parentNode.addChildNode(retNode);
}
});
return retNode;
}
private static List<DagNode> findRootNodes(Map<String, DagNode> allNodes) {
final List<DagNode> rootNodes = new ArrayList<>();
allNodes.entrySet().stream().forEach(entry -> {
if (entry.getValue().getParentNodes().size() < 1) {
rootNodes.add(entry.getValue());
}
});
return rootNodes;
}
private static DagNode convertJsonToDagNode(JsonNode node, String type) {
try {
DeltaConfig.Config config = DeltaConfig.Config.newBuilder().withConfigsMap(convertJsonNodeToMap(node)).build();
return (DagNode) ReflectionUtils.loadClass(generateFQN(type), config);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
private static String generateFQN(String name) throws ClassNotFoundException {
return Class.forName(StringUtils.joinUsingDelim(".",
DagNode.class.getName().substring(0, DagNode.class.getName().lastIndexOf(".")), name)).getName();
}
private static JsonNode convertDagNodeToJsonNode(DagNode node) throws IOException {
return createJsonNode(node, node.getClass().getSimpleName());
}
private static Map<String, Object> convertJsonNodeToMap(JsonNode node) {
Map<String, Object> configsMap = new HashMap<>();
Iterator<Entry<String, JsonNode>> itr = node.get(DeltaConfig.Config.CONFIG_NAME).fields();
while (itr.hasNext()) {
Entry<String, JsonNode> entry = itr.next();
switch (entry.getKey()) {
case DeltaConfig.Config.HIVE_QUERIES:
configsMap.put(DeltaConfig.Config.HIVE_QUERIES, getHiveQueries(entry));
break;
case DeltaConfig.Config.HIVE_PROPERTIES:
configsMap.put(DeltaConfig.Config.HIVE_PROPERTIES, getProperties(entry));
break;
default:
configsMap.put(entry.getKey(), getValue(entry.getValue()));
break;
}
}
return configsMap;
}
private static List<Pair<String, Integer>> getHiveQueries(Entry<String, JsonNode> entry) {
List<Pair<String, Integer>> queries = new ArrayList<>();
Iterator<Entry<String, JsonNode>> queriesItr = entry.getValue().fields();
while (queriesItr.hasNext()) {
queries.add(Pair.of(queriesItr.next().getValue().textValue(), queriesItr.next().getValue().asInt()));
}
return queries;
}
private static List<String> getProperties(Entry<String, JsonNode> entry) {
List<String> properties = new ArrayList<>();
Iterator<Entry<String, JsonNode>> queriesItr = entry.getValue().fields();
while (queriesItr.hasNext()) {
properties.add(queriesItr.next().getValue().textValue());
}
return properties;
}
private static Object getValue(JsonNode node) {
if (node.isInt()) {
return node.asInt();
} else if (node.isLong()) {
return node.asLong();
} else if (node.isShort()) {
return node.asInt();
} else if (node.isBoolean()) {
return node.asBoolean();
} else if (node.isDouble()) {
return node.asDouble();
} else if (node.isFloat()) {
return node.asDouble();
}
return node.textValue();
}
private static JsonNode createJsonNode(DagNode node, String type) throws IOException {
JsonNode configNode = MAPPER.readTree(node.getConfig().toString());
JsonNode jsonNode = MAPPER.createObjectNode();
((ObjectNode) jsonNode).put(DeltaConfig.Config.CONFIG_NAME, configNode);
((ObjectNode) jsonNode).put(DeltaConfig.Config.TYPE, type);
((ObjectNode) jsonNode).put(DeltaConfig.Config.DEPENDENCIES, getDependencyNames(node));
return jsonNode;
}
private static String getDependencyNames(DagNode node) {
return node.getParentNodes().stream()
.map(e -> ((DagNode) e).getConfig().getOtherConfigs().getOrDefault(DeltaConfig.Config.NODE_NAME, node.getName()).toString())
.collect(Collectors.joining(",")).toString();
}
public static String toString(InputStream inputStream) throws IOException {
ByteArrayOutputStream result = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int length;
while ((length = inputStream.read(buffer)) != -1) {
result.write(buffer, 0, length);
}
return result.toString("utf-8");
}
}