blob: d5b97d9788ec88bce0076d20fcc4a591e476845d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.protobuf;
import com.squareup.wire.schema.CoreLoaderKt;
import com.squareup.wire.schema.Location;
import com.squareup.wire.schema.Schema;
import com.squareup.wire.schema.SchemaLoader;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SchemaRegistryService;
import org.apache.nifi.services.protobuf.schema.ProtoSchemaStrategy;
import org.apache.nifi.services.protobuf.validation.ProtoValidationResource;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.FileSystems;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@Tags({"protobuf", "record", "reader", "parser"})
@CapabilityDescription("Parses a Protocol Buffers message from binary format.")
public class ProtobufReader extends SchemaRegistryService implements RecordReaderFactory {
private static final String ANY_PROTO = "google/protobuf/any.proto";
private static final String DURATION_PROTO = "google/protobuf/duration.proto";
private static final String EMPTY_PROTO = "google/protobuf/empty.proto";
private static final String STRUCT_PROTO = "google/protobuf/struct.proto";
private static final String TIMESTAMP_PROTO = "google/protobuf/timestamp.proto";
private static final String WRAPPERS_PROTO = "google/protobuf/wrappers.proto";
private static final AllowableValue GENERATE_FROM_PROTO_FILE = new AllowableValue("generate-from-proto-file",
"Generate from Proto file", "The record schema is generated from the provided proto file");
private volatile String messageType;
private volatile Schema protoSchema;
// Holder of cached proto information so validation does not reload the same proto file over and over
private final AtomicReference<ProtoValidationResource> validationResourceHolder = new AtomicReference<>();
public static final PropertyDescriptor PROTOBUF_DIRECTORY = new PropertyDescriptor.Builder()
.name("Proto Directory")
.displayName("Proto Directory")
.description("Directory containing Protocol Buffers message definition (.proto) file(s).")
.required(true)
.addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor MESSAGE_TYPE = new PropertyDescriptor.Builder()
.name("Message Type")
.displayName("Message Type")
.description("Fully qualified name of the Protocol Buffers message type including its package (eg. mypackage.MyMessage). " +
"The .proto files configured in '" + PROTOBUF_DIRECTORY.getDisplayName() + "' must contain the definition of this message type.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(PROTOBUF_DIRECTORY);
properties.add(MESSAGE_TYPE);
return properties;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> problems = new ArrayList<>();
final String protoDirectory = validationContext.getProperty(PROTOBUF_DIRECTORY).evaluateAttributeExpressions().getValue();
final String messageType = validationContext.getProperty(MESSAGE_TYPE).evaluateAttributeExpressions().getValue();
if (protoDirectory != null && messageType != null) {
final Schema protoSchema = getSchemaForValidation(protoDirectory);
if (protoSchema.getType(messageType) == null) {
problems.add(new ValidationResult.Builder()
.subject(MESSAGE_TYPE.getDisplayName())
.valid(false)
.explanation(String.format("'%s' message type cannot be found in the provided proto files.", messageType))
.build());
}
}
return problems;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
final String protoDirectory = context.getProperty(PROTOBUF_DIRECTORY).evaluateAttributeExpressions().getValue();
messageType = context.getProperty(MESSAGE_TYPE).evaluateAttributeExpressions().getValue();
protoSchema = loadProtoSchema(protoDirectory);
}
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) {
if (allowableValue.equalsIgnoreCase(GENERATE_FROM_PROTO_FILE.getValue())) {
return new ProtoSchemaStrategy(messageType, protoSchema);
}
return SchemaAccessUtils.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
}
@Override
protected List<AllowableValue> getSchemaAccessStrategyValues() {
final List<AllowableValue> allowableValues = new ArrayList<>(super.getSchemaAccessStrategyValues());
allowableValues.add(GENERATE_FROM_PROTO_FILE);
return allowableValues;
}
@Override
protected AllowableValue getDefaultSchemaAccessStrategy() {
return GENERATE_FROM_PROTO_FILE;
}
@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException {
return new ProtobufRecordReader(protoSchema, messageType, in, getSchema(variables, in, null));
}
private Schema loadProtoSchema(final String protoDirectory) {
final SchemaLoader schemaLoader = new SchemaLoader(FileSystems.getDefault());
schemaLoader.initRoots(Arrays.asList(Location.get(protoDirectory),
Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, ANY_PROTO),
Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, DURATION_PROTO),
Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, EMPTY_PROTO),
Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, STRUCT_PROTO),
Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, TIMESTAMP_PROTO),
Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, WRAPPERS_PROTO)), Collections.emptyList());
return schemaLoader.loadSchema();
}
private Schema getSchemaForValidation(final String protoDirectory) {
ProtoValidationResource validationResource = validationResourceHolder.get();
if (validationResource == null || !protoDirectory.equals(validationResource.getProtoDirectory())) {
validationResource = new ProtoValidationResource(protoDirectory, loadProtoSchema(protoDirectory));
validationResourceHolder.set(validationResource);
}
return validationResource.getProtoSchema();
}
}