blob: b118ddbcace4b476091b54f4e4daa604d7324289 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.streams.plugins.pig;
import org.apache.streams.util.schema.FieldType;
import org.apache.streams.util.schema.FieldUtil;
import org.apache.streams.util.schema.FileUtil;
import org.apache.streams.util.schema.Schema;
import org.apache.streams.util.schema.SchemaStore;
import org.apache.streams.util.schema.SchemaStoreImpl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang3.StringUtils;
import org.jsonschema2pojo.util.URLUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.apache.streams.util.schema.FileUtil.dropExtension;
import static org.apache.streams.util.schema.FileUtil.dropSourcePathPrefix;
import static org.apache.streams.util.schema.FileUtil.swapExtension;
import static org.apache.streams.util.schema.FileUtil.writeFile;
/**
* Embed within your own java code
*
* <p></p>
* StreamsPigGenerationConfig config = new StreamsPigGenerationConfig();
* config.setSourceDirectory("src/main/jsonschema");
* config.setTargetDirectory("target/generated-resources");
* StreamsPigResourceGenerator generator = new StreamsPigResourceGenerator(config);
* generator.run();
*
*/
public class StreamsPigResourceGenerator implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamsPigResourceGenerator.class);
private static final String LS = System.getProperty("line.separator");
private StreamsPigGenerationConfig config;
private SchemaStore schemaStore = new SchemaStoreImpl();
private int currentDepth = 0;
/**
* Run from CLI without Maven
*
* <p></p>
* java -jar streams-plugin-pig-jar-with-dependencies.jar StreamsPigResourceGenerator src/main/jsonschema target/generated-resources
*
* @param args [sourceDirectory, targetDirectory]
* */
public static void main(String[] args) {
StreamsPigGenerationConfig config = new StreamsPigGenerationConfig();
String sourceDirectory = "src/main/jsonschema";
String targetDirectory = "target/generated-resources/pig-cli";
if ( args.length > 0 ) {
sourceDirectory = args[0];
}
if ( args.length > 1 ) {
targetDirectory = args[1];
}
config.setSourceDirectory(sourceDirectory);
config.setTargetDirectory(targetDirectory);
StreamsPigResourceGenerator streamsPigResourceGenerator = new StreamsPigResourceGenerator(config);
streamsPigResourceGenerator.run();
}
public StreamsPigResourceGenerator(StreamsPigGenerationConfig config) {
this.config = config;
}
@Override
public void run() {
Objects.requireNonNull(config);
generate(config);
}
/**
* run generate using supplied StreamsPigGenerationConfig.
* @param config StreamsPigGenerationConfig
*/
public void generate(StreamsPigGenerationConfig config) {
List<File> sourceFiles = new LinkedList<>();
for (Iterator<URL> sources = config.getSource(); sources.hasNext(); ) {
URL source = sources.next();
sourceFiles.add(URLUtil.getFileFromURL(source));
}
LOGGER.info("Seeded with {} source paths:", sourceFiles.size());
FileUtil.resolveRecursive(config, sourceFiles);
LOGGER.info("Resolved {} schema files:", sourceFiles.size());
for (File item : sourceFiles) {
schemaStore.create(item.toURI());
}
LOGGER.info("Identified {} objects:", schemaStore.getSize());
for (Iterator<Schema> schemaIterator = schemaStore.getSchemaIterator(); schemaIterator.hasNext(); ) {
Schema schema = schemaIterator.next();
currentDepth = 0;
if (schema.getUri().getScheme().equals("file")) {
String inputFile = schema.getUri().getPath();
String resourcePath = dropSourcePathPrefix(inputFile, config.getSourceDirectory());
for (String sourcePath : config.getSourcePaths()) {
resourcePath = dropSourcePathPrefix(resourcePath, sourcePath);
}
String outputFile = config.getTargetDirectory() + "/" + swapExtension(resourcePath, "json", "pig");
LOGGER.info("Processing {}:", resourcePath);
String resourceId = schemaSymbol(schema);
String resourceContent = generateResource(schema, resourceId);
writeFile(outputFile, resourceContent);
LOGGER.info("Wrote {}:", outputFile);
}
}
}
/**
* generateResource String from schema and resourceId.
* @param schema Schema
* @param resourceId String
* @return mapping
*/
public String generateResource(Schema schema, String resourceId) {
StringBuilder resourceBuilder = new StringBuilder();
resourceBuilder.append(pigEscape(resourceId));
resourceBuilder.append(" = ");
resourceBuilder.append("LOAD '' USING JsonLoader('");
resourceBuilder = appendRootObject(resourceBuilder, schema, resourceId, ':');
resourceBuilder.append("');");
return resourceBuilder.toString();
}
protected StringBuilder appendRootObject(StringBuilder builder, Schema schema, String resourceId, Character separator) {
ObjectNode propertiesNode = schemaStore.resolveProperties(schema, null, resourceId);
if (propertiesNode != null && propertiesNode.isObject() && propertiesNode.size() > 0) {
builder = appendPropertiesNode(builder, schema, propertiesNode, separator);
}
return builder;
}
private StringBuilder appendPropertiesNode(StringBuilder builder, Schema schema, ObjectNode propertiesNode, Character seperator) {
Objects.requireNonNull(builder);
Objects.requireNonNull(propertiesNode);
Iterator<Map.Entry<String, JsonNode>> fields = propertiesNode.fields();
List<String> fieldStrings = new ArrayList<>();
for ( ; fields.hasNext(); ) {
Map.Entry<String, JsonNode> field = fields.next();
String fieldId = field.getKey();
if ( !config.getExclusions().contains(fieldId) && field.getValue().isObject()) {
ObjectNode fieldNode = (ObjectNode) field.getValue();
FieldType fieldType = FieldUtil.determineFieldType(fieldNode);
if (fieldType != null ) {
switch (fieldType) {
case ARRAY:
ObjectNode resolvedItems = schemaStore.resolveItems(schema, fieldNode, fieldId);
if ( resolvedItems != null && currentDepth <= config.getMaxDepth()) {
StringBuilder arrayItemsBuilder = appendArrayItems(new StringBuilder(), schema, fieldId, resolvedItems, seperator);
if (StringUtils.isNotBlank(arrayItemsBuilder.toString())) {
fieldStrings.add(arrayItemsBuilder.toString());
}
}
break;
case OBJECT:
ObjectNode childProperties = schemaStore.resolveProperties(schema, fieldNode, fieldId);
if ( currentDepth < config.getMaxDepth()) {
StringBuilder structFieldBuilder = appendStructField(new StringBuilder(), schema, fieldId, childProperties, seperator);
if (StringUtils.isNotBlank(structFieldBuilder.toString())) {
fieldStrings.add(structFieldBuilder.toString());
}
}
break;
default:
StringBuilder valueFieldBuilder = appendValueField(new StringBuilder(), schema, fieldId, fieldType, seperator);
if (StringUtils.isNotBlank(valueFieldBuilder.toString())) {
fieldStrings.add(valueFieldBuilder.toString());
}
}
}
}
}
builder.append(String.join(", ", fieldStrings));
Objects.requireNonNull(builder);
return builder;
}
private StringBuilder appendValueField(StringBuilder builder, Schema schema, String fieldId, FieldType fieldType, Character seperator) {
// safe to append nothing
Objects.requireNonNull(builder);
builder.append(pigEscape(fieldId));
builder.append(seperator);
builder.append(pigType(fieldType));
return builder;
}
protected StringBuilder appendArrayItems(StringBuilder builder, Schema schema, String fieldId, ObjectNode itemsNode, Character seperator) {
// not safe to append nothing
Objects.requireNonNull(builder);
if ( itemsNode == null ) {
return builder;
}
FieldType itemType = FieldUtil.determineFieldType(itemsNode);
try {
switch ( itemType ) {
case OBJECT:
builder = appendArrayObject(builder, schema, fieldId, itemsNode, seperator);
break;
case ARRAY:
ObjectNode subArrayItems = (ObjectNode) itemsNode.get("items");
builder = appendArrayItems(builder, schema, fieldId, subArrayItems, seperator);
break;
default:
builder = appendArrayField(builder, schema, fieldId, itemType, seperator);
}
} catch (Exception ex) {
LOGGER.warn("No item type resolvable for {}", fieldId);
}
Objects.requireNonNull(builder);
return builder;
}
private StringBuilder appendArrayField(StringBuilder builder, Schema schema, String fieldId, FieldType fieldType, Character seperator) {
// safe to append nothing
Objects.requireNonNull(builder);
Objects.requireNonNull(fieldId);
builder.append("{t: (");
builder.append(pigEscape(fieldId));
builder.append(seperator);
builder.append(pigType(fieldType));
builder.append(")}");
Objects.requireNonNull(builder);
return builder;
}
private StringBuilder appendArrayObject(StringBuilder builder, Schema schema, String fieldId, ObjectNode fieldNode, Character seperator) {
// safe to append nothing
Objects.requireNonNull(builder);
Objects.requireNonNull(fieldId);
Objects.requireNonNull(fieldNode);
ObjectNode propertiesNode = schemaStore.resolveProperties(schema, fieldNode, fieldId);
if ( propertiesNode.size() > 0 ) {
builder.append("{t: (");
builder = appendStructField(builder, schema, "", propertiesNode, ':');
builder.append(")}");
}
Objects.requireNonNull(builder);
return builder;
}
private StringBuilder appendStructField(StringBuilder builder, Schema schema, String fieldId, ObjectNode propertiesNode, Character seperator) {
// safe to append nothing
Objects.requireNonNull(builder);
Objects.requireNonNull(propertiesNode);
if ( propertiesNode != null && propertiesNode.isObject() && propertiesNode.size() > 0 ) {
currentDepth += 1;
if (StringUtils.isNotBlank(fieldId)) {
builder.append(pigEscape(fieldId));
builder.append(seperator);
builder.append("(");
builder = appendPropertiesNode(builder, schema, propertiesNode, ':');
builder.append(")");
}
currentDepth -= 1;
}
Objects.requireNonNull(builder);
return builder;
}
private static String pigEscape( String fieldId ) {
return fieldId;
}
private static String pigType( FieldType fieldType ) {
switch ( fieldType ) {
case STRING:
return "chararray";
case INTEGER:
return "int";
case NUMBER:
return "double";
case OBJECT:
return "tuple";
default:
return fieldType.name().toLowerCase();
}
}
private String schemaSymbol( Schema schema ) {
if (schema == null) {
return null;
}
// this needs to return whatever
if (schema.getUri().getScheme().equals("file")) {
String inputFile = schema.getUri().getPath();
String resourcePath = dropSourcePathPrefix(inputFile, config.getSourceDirectory());
for (String sourcePath : config.getSourcePaths()) {
resourcePath = dropSourcePathPrefix(resourcePath, sourcePath);
}
return dropExtension(resourcePath).replace("/", "_").replace("-", "");
} else {
return "IDK";
}
}
}