| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.reporting; |
| |
| import org.apache.nifi.annotation.lifecycle.OnStopped; |
| import org.apache.nifi.components.ConfigVerificationResult; |
| import org.apache.nifi.components.ConfigVerificationResult.Outcome; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.context.PropertyContext; |
| import org.apache.nifi.controller.ConfigurationContext; |
| import org.apache.nifi.flowfile.attributes.CoreAttributes; |
| import org.apache.nifi.logging.ComponentLog; |
| import org.apache.nifi.processor.exception.ProcessException; |
| import org.apache.nifi.remote.Transaction; |
| import org.apache.nifi.remote.TransferDirection; |
| import org.apache.nifi.remote.client.SiteToSiteClient; |
| import org.apache.nifi.reporting.s2s.SiteToSiteUtils; |
| import org.apache.nifi.schema.access.SchemaNotFoundException; |
| import org.apache.nifi.serialization.MalformedRecordException; |
| import org.apache.nifi.serialization.RecordReader; |
| import org.apache.nifi.serialization.RecordSetWriter; |
| import org.apache.nifi.serialization.RecordSetWriterFactory; |
| import org.apache.nifi.serialization.SimpleRecordSchema; |
| import org.apache.nifi.serialization.WriteResult; |
| import org.apache.nifi.serialization.record.DataType; |
| import org.apache.nifi.serialization.record.MapRecord; |
| import org.apache.nifi.serialization.record.Record; |
| import org.apache.nifi.serialization.record.RecordField; |
| import org.apache.nifi.serialization.record.RecordFieldType; |
| import org.apache.nifi.serialization.record.RecordSchema; |
| import org.apache.nifi.serialization.record.SerializedForm; |
| import org.apache.nifi.serialization.record.type.ArrayDataType; |
| import org.apache.nifi.serialization.record.type.MapDataType; |
| import org.apache.nifi.serialization.record.type.RecordDataType; |
| import org.apache.nifi.serialization.record.util.DataTypeUtils; |
| import org.codehaus.jackson.JsonFactory; |
| import org.codehaus.jackson.JsonNode; |
| import org.codehaus.jackson.JsonParseException; |
| import org.codehaus.jackson.JsonParser; |
| import org.codehaus.jackson.JsonToken; |
| import org.codehaus.jackson.map.ObjectMapper; |
| import org.codehaus.jackson.node.ArrayNode; |
| |
| import javax.json.JsonArray; |
| import javax.json.JsonObjectBuilder; |
| import javax.json.JsonValue; |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.nio.charset.StandardCharsets; |
| import java.text.DateFormat; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.function.Supplier; |
| |
| /** |
| * Base class for ReportingTasks that send data over site-to-site. |
| */ |
| public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingTask implements VerifiableReportingTask { |
| private static final String ESTABLISH_COMMUNICATION = "Establish Site-to-Site Connection"; |
| |
| protected static final String LAST_EVENT_ID_KEY = "last_event_id"; |
| protected static final String DESTINATION_URL_PATH = "/nifi"; |
| protected static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; |
| |
| static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() |
| .name("record-writer") |
| .displayName("Record Writer") |
| .description("Specifies the Controller Service to use for writing out the records.") |
| .identifiesControllerService(RecordSetWriterFactory.class) |
| .required(false) |
| .build(); |
| |
| static final PropertyDescriptor ALLOW_NULL_VALUES = new PropertyDescriptor.Builder() |
| .name("include-null-values") |
| .displayName("Include Null Values") |
| .description("Indicate if null values should be included in records. Default will be false") |
| .required(true) |
| .allowableValues("true", "false") |
| .defaultValue("false") |
| .build(); |
| |
| protected volatile SiteToSiteClient siteToSiteClient; |
| protected volatile RecordSchema recordSchema; |
| |
| @Override |
| protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { |
| final List<PropertyDescriptor> properties = new ArrayList<>(); |
| properties.add(SiteToSiteUtils.DESTINATION_URL); |
| properties.add(SiteToSiteUtils.PORT_NAME); |
| properties.add(SiteToSiteUtils.SSL_CONTEXT); |
| properties.add(SiteToSiteUtils.INSTANCE_URL); |
| properties.add(SiteToSiteUtils.COMPRESS); |
| properties.add(SiteToSiteUtils.TIMEOUT); |
| properties.add(SiteToSiteUtils.BATCH_SIZE); |
| properties.add(SiteToSiteUtils.TRANSPORT_PROTOCOL); |
| properties.add(SiteToSiteUtils.HTTP_PROXY_HOSTNAME); |
| properties.add(SiteToSiteUtils.HTTP_PROXY_PORT); |
| properties.add(SiteToSiteUtils.HTTP_PROXY_USERNAME); |
| properties.add(SiteToSiteUtils.HTTP_PROXY_PASSWORD); |
| properties.add(RECORD_WRITER); |
| properties.add(ALLOW_NULL_VALUES); |
| return properties; |
| } |
| |
| public void setup(final PropertyContext reportContext) throws IOException { |
| if (siteToSiteClient == null) { |
| siteToSiteClient = SiteToSiteUtils.getClient(reportContext, getLogger(), null); |
| } |
| } |
| |
| @OnStopped |
| public void shutdown() throws IOException { |
| final SiteToSiteClient client = getClient(); |
| if (client != null) { |
| client.close(); |
| siteToSiteClient = null; |
| } |
| } |
| |
| // this getter is intended explicitly for testing purposes |
| protected SiteToSiteClient getClient() { |
| return this.siteToSiteClient; |
| } |
| |
| protected void sendData(final ReportingContext context, final Transaction transaction, Map<String, String> attributes, final JsonArray jsonArray) throws IOException { |
| if(context.getProperty(RECORD_WRITER).isSet()) { |
| transaction.send(getData(context, new ByteArrayInputStream(jsonArray.toString().getBytes(StandardCharsets.UTF_8)), attributes), attributes); |
| } else { |
| transaction.send(jsonArray.toString().getBytes(StandardCharsets.UTF_8), attributes); |
| } |
| } |
| |
| protected byte[] getData(final ReportingContext context, InputStream in, Map<String, String> attributes) { |
| try (final JsonRecordReader reader = new JsonRecordReader(in, recordSchema)) { |
| |
| final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); |
| final RecordSchema writeSchema = writerFactory.getSchema(null, recordSchema); |
| final ByteArrayOutputStream out = new ByteArrayOutputStream(); |
| |
| try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, attributes)) { |
| writer.beginRecordSet(); |
| |
| Record record; |
| while ((record = reader.nextRecord()) != null) { |
| writer.write(record); |
| } |
| |
| final WriteResult writeResult = writer.finishRecordSet(); |
| |
| attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); |
| attributes.putAll(writeResult.getAttributes()); |
| } |
| |
| return out.toByteArray(); |
| } catch (IOException | SchemaNotFoundException | MalformedRecordException e) { |
| throw new ProcessException("Failed to write metrics using record writer: " + e.getMessage(), e); |
| } |
| } |
| |
| protected void addField(final JsonObjectBuilder builder, final String key, final Boolean value, final boolean allowNullValues) { |
| if (value != null) { |
| builder.add(key, value); |
| }else if(allowNullValues){ |
| builder.add(key,JsonValue.NULL); |
| } |
| } |
| |
| protected void addField(final JsonObjectBuilder builder, final String key, final Long value, boolean allowNullValues) { |
| if (value != null) { |
| builder.add(key, value); |
| }else if(allowNullValues){ |
| builder.add(key,JsonValue.NULL); |
| } |
| } |
| |
| protected void addField(final JsonObjectBuilder builder, final String key, final Integer value, boolean allowNullValues) { |
| if (value != null) { |
| builder.add(key, value); |
| }else if(allowNullValues){ |
| builder.add(key,JsonValue.NULL); |
| } |
| } |
| |
| protected void addField(final JsonObjectBuilder builder, final String key, final String value, boolean allowNullValues) { |
| if (value != null) { |
| builder.add(key, value); |
| }else if(allowNullValues){ |
| builder.add(key,JsonValue.NULL); |
| } |
| } |
| |
| private class JsonRecordReader implements RecordReader { |
| |
| private RecordSchema recordSchema; |
| private final JsonParser jsonParser; |
| private final boolean array; |
| private final JsonNode firstJsonNode; |
| private boolean firstObjectConsumed = false; |
| |
| private final Supplier<DateFormat> dateFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()); |
| private final Supplier<DateFormat> timeFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.TIME.getDefaultFormat()); |
| private final Supplier<DateFormat> timestampFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()); |
| |
| public JsonRecordReader(final InputStream in, RecordSchema recordSchema) throws IOException, MalformedRecordException { |
| this.recordSchema = recordSchema; |
| try { |
| jsonParser = new JsonFactory().createJsonParser(in); |
| jsonParser.setCodec(new ObjectMapper()); |
| JsonToken token = jsonParser.nextToken(); |
| if (token == JsonToken.START_ARRAY) { |
| array = true; |
| token = jsonParser.nextToken(); |
| } else { |
| array = false; |
| } |
| if (token == JsonToken.START_OBJECT) { |
| firstJsonNode = jsonParser.readValueAsTree(); |
| } else { |
| firstJsonNode = null; |
| } |
| } catch (final JsonParseException e) { |
| throw new MalformedRecordException("Could not parse data as JSON", e); |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| jsonParser.close(); |
| } |
| |
| @Override |
| public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException { |
| if (firstObjectConsumed && !array) { |
| return null; |
| } |
| |
| JsonNode nextNode = getNextJsonNode(); |
| if(nextNode == null) { |
| return null; |
| } |
| |
| try { |
| return convertJsonNodeToRecord(nextNode, getSchema(), null, coerceTypes, dropUnknownFields); |
| } catch (final MalformedRecordException mre) { |
| throw mre; |
| } catch (final IOException ioe) { |
| throw ioe; |
| } catch (final Exception e) { |
| throw new MalformedRecordException("Failed to convert data into a Record object with the given schema", e); |
| } |
| } |
| |
| @Override |
| public RecordSchema getSchema() throws MalformedRecordException { |
| return recordSchema; |
| } |
| |
| private JsonNode getNextJsonNode() throws IOException, MalformedRecordException { |
| if (!firstObjectConsumed) { |
| firstObjectConsumed = true; |
| return firstJsonNode; |
| } |
| while (true) { |
| final JsonToken token = jsonParser.nextToken(); |
| if (token == null) { |
| return null; |
| } |
| switch (token) { |
| case END_OBJECT: |
| continue; |
| case START_OBJECT: |
| return jsonParser.readValueAsTree(); |
| case END_ARRAY: |
| case START_ARRAY: |
| return null; |
| default: |
| throw new MalformedRecordException("Expected to get a JSON Object but got a token of type " + token.name()); |
| } |
| } |
| } |
| |
| private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final String fieldNamePrefix, |
| final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException { |
| |
| final Map<String, Object> values = new HashMap<>(schema.getFieldCount() * 2); |
| |
| if (dropUnknown) { |
| for (final RecordField recordField : schema.getFields()) { |
| final JsonNode childNode = getChildNode(jsonNode, recordField); |
| if (childNode == null) { |
| continue; |
| } |
| |
| final String fieldName = recordField.getFieldName(); |
| final Object value; |
| |
| if (coerceTypes) { |
| final DataType desiredType = recordField.getDataType(); |
| final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName; |
| value = convertField(childNode, fullFieldName, desiredType, dropUnknown); |
| } else { |
| value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType()); |
| } |
| |
| values.put(fieldName, value); |
| } |
| } else { |
| final Iterator<String> fieldNames = jsonNode.getFieldNames(); |
| while (fieldNames.hasNext()) { |
| final String fieldName = fieldNames.next(); |
| final JsonNode childNode = jsonNode.get(fieldName); |
| final RecordField recordField = schema.getField(fieldName).orElse(null); |
| final Object value; |
| |
| if (coerceTypes && recordField != null) { |
| final DataType desiredType = recordField.getDataType(); |
| final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName; |
| value = convertField(childNode, fullFieldName, desiredType, dropUnknown); |
| } else { |
| value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType()); |
| } |
| |
| values.put(fieldName, value); |
| } |
| } |
| |
| final Supplier<String> supplier = () -> jsonNode.toString(); |
| return new MapRecord(schema, values, SerializedForm.of(supplier, "application/json"), false, dropUnknown); |
| } |
| |
| private JsonNode getChildNode(final JsonNode jsonNode, final RecordField field) { |
| if (jsonNode.has(field.getFieldName())) { |
| return jsonNode.get(field.getFieldName()); |
| } |
| for (final String alias : field.getAliases()) { |
| if (jsonNode.has(alias)) { |
| return jsonNode.get(alias); |
| } |
| } |
| return null; |
| } |
| |
| protected Object convertField(final JsonNode fieldNode, final String fieldName, final DataType desiredType, final boolean dropUnknown) throws IOException, MalformedRecordException { |
| if (fieldNode == null || fieldNode.isNull()) { |
| return null; |
| } |
| |
| switch (desiredType.getFieldType()) { |
| case BOOLEAN: |
| case BYTE: |
| case CHAR: |
| case DOUBLE: |
| case FLOAT: |
| case INT: |
| case BIGINT: |
| case DECIMAL: |
| case LONG: |
| case SHORT: |
| case STRING: |
| case DATE: |
| case TIME: |
| case TIMESTAMP: { |
| final Object rawValue = getRawNodeValue(fieldNode, null); |
| final Object converted = DataTypeUtils.convertType(rawValue, desiredType, dateFormat, timeFormat, timestampFormat, fieldName); |
| return converted; |
| } |
| case MAP: { |
| final DataType valueType = ((MapDataType) desiredType).getValueType(); |
| |
| final Map<String, Object> map = new HashMap<>(); |
| final Iterator<String> fieldNameItr = fieldNode.getFieldNames(); |
| while (fieldNameItr.hasNext()) { |
| final String childName = fieldNameItr.next(); |
| final JsonNode childNode = fieldNode.get(childName); |
| final Object childValue = convertField(childNode, fieldName, valueType, dropUnknown); |
| map.put(childName, childValue); |
| } |
| |
| return map; |
| } |
| case ARRAY: { |
| final ArrayNode arrayNode = (ArrayNode) fieldNode; |
| final int numElements = arrayNode.size(); |
| final Object[] arrayElements = new Object[numElements]; |
| int count = 0; |
| for (final JsonNode node : arrayNode) { |
| final DataType elementType = ((ArrayDataType) desiredType).getElementType(); |
| final Object converted = convertField(node, fieldName, elementType, dropUnknown); |
| arrayElements[count++] = converted; |
| } |
| |
| return arrayElements; |
| } |
| case RECORD: { |
| if (fieldNode.isObject()) { |
| RecordSchema childSchema; |
| if (desiredType instanceof RecordDataType) { |
| childSchema = ((RecordDataType) desiredType).getChildSchema(); |
| } else { |
| return null; |
| } |
| |
| if (childSchema == null) { |
| final List<RecordField> fields = new ArrayList<>(); |
| final Iterator<String> fieldNameItr = fieldNode.getFieldNames(); |
| while (fieldNameItr.hasNext()) { |
| fields.add(new RecordField(fieldNameItr.next(), RecordFieldType.STRING.getDataType())); |
| } |
| |
| childSchema = new SimpleRecordSchema(fields); |
| } |
| |
| return convertJsonNodeToRecord(fieldNode, childSchema, fieldName + ".", true, dropUnknown); |
| } else { |
| return null; |
| } |
| } |
| case CHOICE: { |
| return DataTypeUtils.convertType(getRawNodeValue(fieldNode, null), desiredType, fieldName); |
| } |
| } |
| |
| return null; |
| } |
| |
| protected Object getRawNodeValue(final JsonNode fieldNode, final DataType dataType) throws IOException { |
| if (fieldNode == null || fieldNode.isNull()) { |
| return null; |
| } |
| |
| if (fieldNode.isNumber()) { |
| return fieldNode.getNumberValue(); |
| } |
| |
| if (fieldNode.isBinary()) { |
| return fieldNode.getBinaryValue(); |
| } |
| |
| if (fieldNode.isBoolean()) { |
| return fieldNode.getBooleanValue(); |
| } |
| |
| if (fieldNode.isTextual()) { |
| return fieldNode.getTextValue(); |
| } |
| |
| if (fieldNode.isArray()) { |
| final ArrayNode arrayNode = (ArrayNode) fieldNode; |
| final int numElements = arrayNode.size(); |
| final Object[] arrayElements = new Object[numElements]; |
| int count = 0; |
| |
| final DataType elementDataType; |
| if (dataType != null && dataType.getFieldType() == RecordFieldType.ARRAY) { |
| final ArrayDataType arrayDataType = (ArrayDataType) dataType; |
| elementDataType = arrayDataType.getElementType(); |
| } else { |
| elementDataType = null; |
| } |
| |
| for (final JsonNode node : arrayNode) { |
| final Object value = getRawNodeValue(node, elementDataType); |
| arrayElements[count++] = value; |
| } |
| |
| return arrayElements; |
| } |
| |
| if (fieldNode.isObject()) { |
| RecordSchema childSchema; |
| if (dataType != null && RecordFieldType.RECORD == dataType.getFieldType()) { |
| final RecordDataType recordDataType = (RecordDataType) dataType; |
| childSchema = recordDataType.getChildSchema(); |
| } else { |
| childSchema = null; |
| } |
| |
| if (childSchema == null) { |
| childSchema = new SimpleRecordSchema(Collections.emptyList()); |
| } |
| |
| final Iterator<String> fieldNames = fieldNode.getFieldNames(); |
| final Map<String, Object> childValues = new HashMap<>(); |
| while (fieldNames.hasNext()) { |
| final String childFieldName = fieldNames.next(); |
| final Object childValue = getRawNodeValue(fieldNode.get(childFieldName), dataType); |
| childValues.put(childFieldName, childValue); |
| } |
| |
| final MapRecord record = new MapRecord(childSchema, childValues); |
| return record; |
| } |
| |
| return null; |
| } |
| |
| } |
| |
| @Override |
| public List<ConfigVerificationResult> verify(final ConfigurationContext context, final ComponentLog verificationLogger) { |
| final List<ConfigVerificationResult> verificationResults = new ArrayList<>(); |
| |
| try (final SiteToSiteClient client = SiteToSiteUtils.getClient(context, verificationLogger, null)) { |
| final Transaction transaction = client.createTransaction(TransferDirection.SEND); |
| |
| // If transaction is null, indicates that all nodes are penalized |
| if (transaction == null) { |
| verificationResults.add(new ConfigVerificationResult.Builder() |
| .verificationStepName(ESTABLISH_COMMUNICATION) |
| .outcome(Outcome.SKIPPED) |
| .explanation("All nodes in destination NiFi are currently 'penalized', meaning that there have been recent failures communicating with the destination NiFi, or that" + |
| " the NiFi instance is applying backpressure") |
| .build()); |
| } else { |
| transaction.cancel("Just verifying configuration"); |
| |
| verificationResults.add(new ConfigVerificationResult.Builder() |
| .verificationStepName(ESTABLISH_COMMUNICATION) |
| .outcome(Outcome.SUCCESSFUL) |
| .explanation("Established connection to destination NiFi instance and Received indication that it is ready to ready to receive data") |
| .build()); |
| } |
| } catch (final Exception e) { |
| verificationLogger.error("Failed to establish site-to-site connection", e); |
| |
| verificationResults.add(new ConfigVerificationResult.Builder() |
| .verificationStepName(ESTABLISH_COMMUNICATION) |
| .outcome(Outcome.FAILED) |
| .explanation("Failed to establish Site-to-Site Connection: " + e) |
| .build()); |
| } |
| |
| return verificationResults; |
| } |
| } |