blob: 7ca2fc420ac8a74d14cf8bb850ee0149e28d471c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schema.inference;
import avro.shaded.com.google.common.annotations.VisibleForTesting;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.serialization.RecordSchemaCacheService;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.RecordDataType;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
import static org.apache.nifi.processor.util.StandardValidators.POSITIVE_INTEGER_VALIDATOR;
@CapabilityDescription("Provides a Schema Cache that evicts elements based on a Least-Recently-Used algorithm. This cache is not persisted, so any restart of NiFi will result in " +
"the cache being cleared. Additionally, the cache will be cleared any time that the Controller Service is stopped and restarted.")
@Tags({"record", "schema", "cache"})
public class VolatileSchemaCache extends AbstractControllerService implements RecordSchemaCacheService {
static final PropertyDescriptor MAX_SIZE = new Builder()
.name("max-cache-size")
.displayName("Maximum Cache Size")
.description("The maximum number of Schemas to cache.")
.required(true)
.addValidator(POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(VARIABLE_REGISTRY)
.defaultValue("100")
.build();
private static final Base64.Encoder ENCODER = Base64.getEncoder().withoutPadding();
private volatile Cache<String, RecordSchema> cache;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.singletonList(MAX_SIZE);
}
@OnEnabled
public void setup(final ConfigurationContext context) {
final int maxSize = context.getProperty(MAX_SIZE).evaluateAttributeExpressions().asInteger();
cache = Caffeine.newBuilder()
.maximumSize(maxSize)
.build();
}
@Override
public String cacheSchema(final RecordSchema schema) {
final String identifier = createIdentifier(schema);
final RecordSchema existingSchema = cache.get(identifier, id -> schema);
if (existingSchema == null) {
// We successfully inserted into the cache.
getLogger().debug("Successfully cached schema with ID {} (no existing schema with this ID)", new Object[] {identifier});
return identifier;
}
// There was already a Schema in the cache with that identifier.
if (existingSchema.equals(schema)) {
// Schemas match. Already cached successfully.
getLogger().debug("Successfully cached schema with ID {} (existing schema with this ID was equal)", new Object[] {identifier});
return identifier;
}
// Schemas hashed to same value but do not equal one another. Append a randomly generated UUID
// and add that to the cache.
final String updatedIdentifier = identifier + "-" + UUID.randomUUID().toString();
cache.put(updatedIdentifier, schema);
getLogger().debug("Schema with ID {} conflicted with new Schema. Resolved by using generated identifier {}", new Object[] {identifier, updatedIdentifier});
return updatedIdentifier;
}
@Override
public Optional<RecordSchema> getSchema(final String schemaIdentifier) {
final RecordSchema cachedSchema = cache.getIfPresent(schemaIdentifier);
return Optional.ofNullable(cachedSchema);
}
@VisibleForTesting
protected String createIdentifier(final RecordSchema schema) {
final MessageDigest digest;
try {
digest = MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException e) {
throw new AssertionError(e);
}
final Optional<String> suppliedText = schema.getSchemaText();
if (suppliedText.isPresent()) {
digest.update(suppliedText.get().getBytes(StandardCharsets.UTF_8));
} else {
computeHash(schema, digest);
}
final byte[] digestBytes = digest.digest();
return ENCODER.encodeToString(digestBytes);
}
private void computeHash(final RecordSchema schema, final MessageDigest digest) {
for (final RecordField field : schema.getFields()) {
digest.update(field.getFieldName().getBytes(StandardCharsets.UTF_8));
final DataType dataType = field.getDataType();
final RecordFieldType fieldType = dataType.getFieldType();
digest.update(fieldType.name().getBytes(StandardCharsets.UTF_8));
final String format = dataType.getFormat();
if (format != null) {
digest.update(format.getBytes(StandardCharsets.UTF_8));
}
if (fieldType == RecordFieldType.RECORD) {
final RecordSchema childSchema = ((RecordDataType) dataType).getChildSchema();
if (childSchema != null) {
computeHash(childSchema, digest);
}
}
}
}
}