blob: 8d6fe9395e34c60048bbaf1686b8dc7f1eaa8153 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.json;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.record.NullSuppression;
import org.apache.nifi.schema.access.SchemaAccessWriter;
import org.apache.nifi.serialization.AbstractRecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RawRecordWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SerializedForm;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigInteger;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSetWriter, RawRecordWriter {
private final ComponentLog logger;
private final SchemaAccessWriter schemaAccess;
private final RecordSchema recordSchema;
private final JsonGenerator generator;
private final NullSuppression nullSuppression;
private final OutputGrouping outputGrouping;
private final Supplier<DateFormat> LAZY_DATE_FORMAT;
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
private String mimeType = "application/json";
private static final ObjectMapper objectMapper = new ObjectMapper();
public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
this(logger, recordSchema, schemaAccess, out, prettyPrint, nullSuppression, outputGrouping, dateFormat, timeFormat, timestampFormat, "application/json");
}
public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat,
final String mimeType) throws IOException {
super(out);
this.logger = logger;
this.recordSchema = recordSchema;
this.schemaAccess = schemaAccess;
this.nullSuppression = nullSuppression;
this.outputGrouping = outputGrouping;
this.mimeType = mimeType;
// Use DateFormat with default TimeZone to avoid unexpected conversion of year-month-day
final DateFormat df = dateFormat == null ? null : new SimpleDateFormat(dateFormat);
final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
LAZY_DATE_FORMAT = () -> df;
LAZY_TIME_FORMAT = () -> tf;
LAZY_TIMESTAMP_FORMAT = () -> tsf;
final JsonFactory factory = new JsonFactory();
factory.setCodec(objectMapper);
this.generator = factory.createJsonGenerator(out);
if (prettyPrint) {
generator.useDefaultPrettyPrinter();
} else if (OutputGrouping.OUTPUT_ONELINE.equals(outputGrouping)) {
// Use a minimal pretty printer with a newline object separator, will output one JSON object per line
generator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
}
}
@Override
protected void onBeginRecordSet() throws IOException {
final OutputStream out = getOutputStream();
schemaAccess.writeHeader(recordSchema, out);
if (outputGrouping == OutputGrouping.OUTPUT_ARRAY) {
generator.writeStartArray();
}
}
@Override
protected Map<String, String> onFinishRecordSet() throws IOException {
if (outputGrouping == OutputGrouping.OUTPUT_ARRAY) {
generator.writeEndArray();
}
return schemaAccess.getAttributes(recordSchema);
}
@Override
public void close() throws IOException {
if (generator != null) {
generator.close();
}
super.close();
}
@Override
public void flush() throws IOException {
if (generator != null) {
generator.flush();
}
}
@Override
public Map<String, String> writeRecord(final Record record) throws IOException {
// If we are not writing an active record set, then we need to ensure that we write the
// schema information.
if (!isActiveRecordSet()) {
generator.flush();
schemaAccess.writeHeader(recordSchema, getOutputStream());
}
writeRecord(record, recordSchema, generator, JsonGenerator::writeStartObject, JsonGenerator::writeEndObject, true);
return schemaAccess.getAttributes(recordSchema);
}
@Override
public WriteResult writeRawRecord(final Record record) throws IOException {
// If we are not writing an active record set, then we need to ensure that we write the
// schema information.
if (!isActiveRecordSet()) {
generator.flush();
schemaAccess.writeHeader(recordSchema, getOutputStream());
}
writeRecord(record, recordSchema, generator, JsonGenerator::writeStartObject, JsonGenerator::writeEndObject, false);
final Map<String, String> attributes = schemaAccess.getAttributes(recordSchema);
return WriteResult.of(incrementRecordCount(), attributes);
}
private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator,
final GeneratorTask startTask, final GeneratorTask endTask, final boolean schemaAware) throws IOException {
final Optional<SerializedForm> serializedForm = record.getSerializedForm();
if (serializedForm.isPresent()) {
final SerializedForm form = serializedForm.get();
if (form.getMimeType().equals(getMimeType()) && record.getSchema().equals(writeSchema)) {
final Object serialized = form.getSerialized();
if (serialized instanceof String) {
generator.writeRawValue((String) serialized);
return;
}
}
}
try {
startTask.apply(generator);
if (schemaAware) {
for (final RecordField field : writeSchema.getFields()) {
final String fieldName = field.getFieldName();
final Object value = record.getValue(field);
if (value == null) {
if (nullSuppression == NullSuppression.NEVER_SUPPRESS || (nullSuppression == NullSuppression.SUPPRESS_MISSING) && isFieldPresent(field, record)) {
generator.writeNullField(fieldName);
}
continue;
}
generator.writeFieldName(fieldName);
final DataType dataType = writeSchema.getDataType(fieldName).get();
writeValue(generator, value, fieldName, dataType);
}
} else {
for (final String fieldName : record.getRawFieldNames()) {
final Object value = record.getValue(fieldName);
if (value == null) {
if (nullSuppression == NullSuppression.NEVER_SUPPRESS || (nullSuppression == NullSuppression.SUPPRESS_MISSING) && record.getRawFieldNames().contains(fieldName)) {
generator.writeNullField(fieldName);
}
continue;
}
generator.writeFieldName(fieldName);
writeRawValue(generator, value, fieldName);
}
}
endTask.apply(generator);
} catch (final Exception e) {
logger.error("Failed to write {} with schema {} as a JSON Object due to {}", new Object[] {record, record.getSchema(), e.toString(), e});
throw e;
}
}
private boolean isFieldPresent(final RecordField field, final Record record) {
final Set<String> rawFieldNames = record.getRawFieldNames();
if (rawFieldNames.contains(field.getFieldName())) {
return true;
}
for (final String alias : field.getAliases()) {
if (rawFieldNames.contains(alias)) {
return true;
}
}
return false;
}
@SuppressWarnings("unchecked")
private void writeRawValue(final JsonGenerator generator, final Object value, final String fieldName) throws IOException {
if (value == null) {
generator.writeNull();
return;
}
if (value instanceof Record) {
final Record record = (Record) value;
writeRecord(record, record.getSchema(), generator, JsonGenerator::writeStartObject, JsonGenerator::writeEndObject, false);
return;
}
if (value instanceof Map) {
final Map<String, ?> map = (Map<String, ?>) value;
generator.writeStartObject();
for (final Map.Entry<String, ?> entry : map.entrySet()) {
final String mapKey = entry.getKey();
final Object mapValue = entry.getValue();
generator.writeFieldName(mapKey);
writeRawValue(generator, mapValue, fieldName + "." + mapKey);
}
generator.writeEndObject();
return;
}
if (value instanceof Object[]) {
final Object[] values = (Object[]) value;
generator.writeStartArray();
for (final Object element : values) {
writeRawValue(generator, element, fieldName);
}
generator.writeEndArray();
return;
}
if (value instanceof java.sql.Time) {
final Object formatted = format((java.sql.Time) value, LAZY_TIME_FORMAT);
generator.writeObject(formatted);
return;
}
if (value instanceof java.sql.Date) {
final Object formatted = format((java.sql.Date) value, LAZY_DATE_FORMAT);
generator.writeObject(formatted);
return;
}
if (value instanceof java.util.Date) {
final Object formatted = format((java.util.Date) value, LAZY_TIMESTAMP_FORMAT);
generator.writeObject(formatted);
return;
}
generator.writeObject(value);
}
private Object format(final java.util.Date value, final Supplier<DateFormat> formatSupplier) {
if (value == null) {
return null;
}
if (formatSupplier == null) {
return value.getTime();
}
final DateFormat format = formatSupplier.get();
if (format == null) {
return value.getTime();
}
return format.format(value);
}
@SuppressWarnings("unchecked")
private void writeValue(final JsonGenerator generator, final Object value, final String fieldName, final DataType dataType) throws IOException {
if (value == null) {
generator.writeNull();
return;
}
final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
if (chosenDataType == null) {
logger.debug("Could not find a suitable field type in the CHOICE for field {} and value {}; will use null value", new Object[] {fieldName, value});
generator.writeNull();
return;
}
final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
if (coercedValue == null) {
generator.writeNull();
return;
}
switch (chosenDataType.getFieldType()) {
case DATE: {
final String stringValue = DataTypeUtils.toString(coercedValue, LAZY_DATE_FORMAT);
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
} else {
generator.writeString(stringValue);
}
break;
}
case TIME: {
final String stringValue = DataTypeUtils.toString(coercedValue, LAZY_TIME_FORMAT);
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
} else {
generator.writeString(stringValue);
}
break;
}
case TIMESTAMP: {
final String stringValue = DataTypeUtils.toString(coercedValue, LAZY_TIMESTAMP_FORMAT);
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
} else {
generator.writeString(stringValue);
}
break;
}
case DOUBLE:
generator.writeNumber(DataTypeUtils.toDouble(coercedValue, fieldName));
break;
case FLOAT:
generator.writeNumber(DataTypeUtils.toFloat(coercedValue, fieldName));
break;
case LONG:
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
break;
case INT:
case BYTE:
case SHORT:
generator.writeNumber(DataTypeUtils.toInteger(coercedValue, fieldName));
break;
case CHAR:
case STRING:
generator.writeString(coercedValue.toString());
break;
case DECIMAL:
generator.writeNumber(DataTypeUtils.toBigDecimal(coercedValue, fieldName));
break;
case BIGINT:
if (coercedValue instanceof Long) {
generator.writeNumber(((Long) coercedValue).longValue());
} else {
generator.writeNumber((BigInteger) coercedValue);
}
break;
case BOOLEAN:
final String stringValue = coercedValue.toString();
if ("true".equalsIgnoreCase(stringValue)) {
generator.writeBoolean(true);
} else if ("false".equalsIgnoreCase(stringValue)) {
generator.writeBoolean(false);
} else {
generator.writeString(stringValue);
}
break;
case RECORD: {
final Record record = (Record) coercedValue;
final RecordDataType recordDataType = (RecordDataType) chosenDataType;
final RecordSchema childSchema = recordDataType.getChildSchema();
writeRecord(record, childSchema, generator, JsonGenerator::writeStartObject, JsonGenerator::writeEndObject, true);
break;
}
case MAP: {
final MapDataType mapDataType = (MapDataType) chosenDataType;
final DataType valueDataType = mapDataType.getValueType();
final Map<String, ?> map = (Map<String, ?>) coercedValue;
generator.writeStartObject();
for (final Map.Entry<String, ?> entry : map.entrySet()) {
final String mapKey = entry.getKey();
final Object mapValue = entry.getValue();
generator.writeFieldName(mapKey);
writeValue(generator, mapValue, fieldName + "." + mapKey, valueDataType);
}
generator.writeEndObject();
break;
}
case ARRAY:
default:
if (coercedValue instanceof Object[]) {
final Object[] values = (Object[]) coercedValue;
final ArrayDataType arrayDataType = (ArrayDataType) chosenDataType;
final DataType elementType = arrayDataType.getElementType();
writeArray(values, fieldName, generator, elementType);
} else {
generator.writeString(coercedValue.toString());
}
break;
}
}
private void writeArray(final Object[] values, final String fieldName, final JsonGenerator generator, final DataType elementType) throws IOException {
generator.writeStartArray();
for (final Object element : values) {
writeValue(generator, element, fieldName, elementType);
}
generator.writeEndArray();
}
@Override
public String getMimeType() {
return this.mimeType;
}
private interface GeneratorTask {
void apply(JsonGenerator generator) throws IOException;
}
}