blob: 115ffddab3d66acc90057001c9cbddace1467e38 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.pgp;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.pgp.service.api.PGPPublicKeyService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.pgp.exception.PGPProcessException;
import org.apache.nifi.processors.pgp.io.KeyIdentifierConverter;
import org.apache.nifi.stream.io.StreamUtils;
import org.bouncycastle.openpgp.PGPCompressedData;
import org.bouncycastle.openpgp.PGPException;
import org.bouncycastle.openpgp.PGPLiteralData;
import org.bouncycastle.openpgp.PGPObjectFactory;
import org.bouncycastle.openpgp.PGPOnePassSignature;
import org.bouncycastle.openpgp.PGPOnePassSignatureList;
import org.bouncycastle.openpgp.PGPPublicKey;
import org.bouncycastle.openpgp.PGPSignature;
import org.bouncycastle.openpgp.PGPSignatureList;
import org.bouncycastle.openpgp.PGPUtil;
import org.bouncycastle.openpgp.jcajce.JcaPGPObjectFactory;
import org.bouncycastle.openpgp.operator.jcajce.JcaPGPContentVerifierBuilderProvider;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* Verify Content using Open Pretty Good Privacy Public Keys
*/
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"PGP", "GPG", "OpenPGP", "Encryption", "Signing", "RFC 4880"})
@CapabilityDescription("Verify signatures using OpenPGP Public Keys")
@SeeAlso({ DecryptContentPGP.class, EncryptContentPGP.class, SignContentPGP.class })
@WritesAttributes({
@WritesAttribute(attribute = PGPAttributeKey.LITERAL_DATA_FILENAME, description = "Filename from Literal Data"),
@WritesAttribute(attribute = PGPAttributeKey.LITERAL_DATA_MODIFIED, description = "Modified Date Time from Literal Data in milliseconds"),
@WritesAttribute(attribute = PGPAttributeKey.SIGNATURE_CREATED, description = "Signature Creation Time in milliseconds"),
@WritesAttribute(attribute = PGPAttributeKey.SIGNATURE_ALGORITHM, description = "Signature Algorithm including key and hash algorithm names"),
@WritesAttribute(attribute = PGPAttributeKey.SIGNATURE_HASH_ALGORITHM_ID, description = "Signature Hash Algorithm Identifier"),
@WritesAttribute(attribute = PGPAttributeKey.SIGNATURE_KEY_ALGORITHM_ID, description = "Signature Key Algorithm Identifier"),
@WritesAttribute(attribute = PGPAttributeKey.SIGNATURE_KEY_ID, description = "Signature Public Key Identifier"),
@WritesAttribute(attribute = PGPAttributeKey.SIGNATURE_TYPE_ID, description = "Signature Type Identifier"),
@WritesAttribute(attribute = PGPAttributeKey.SIGNATURE_VERSION, description = "Signature Version Number"),
})
public class VerifyContentPGP extends AbstractProcessor {
public static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("Signature Verification Succeeded")
.build();
public static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("Signature Verification Failed")
.build();
public static final PropertyDescriptor PUBLIC_KEY_SERVICE = new PropertyDescriptor.Builder()
.name("public-key-service")
.displayName("Public Key Service")
.description("PGP Public Key Service for verifying signatures with Public Key Encryption")
.identifiesControllerService(PGPPublicKeyService.class)
.required(true)
.build();
private static final Set<Relationship> RELATIONSHIPS = new HashSet<>(Arrays.asList(SUCCESS, FAILURE));
private static final List<PropertyDescriptor> DESCRIPTORS = Collections.singletonList(
PUBLIC_KEY_SERVICE
);
private static final int BUFFER_SIZE = 8192;
private static final String KEY_ID_UNKNOWN = "UNKNOWN";
/**
* Get Relationships
*
* @return Processor Relationships
*/
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
/**
* Get Supported Property Descriptors
*
* @return Processor Supported Property Descriptors
*/
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
/**
* On Trigger verifies signatures found in Flow File contents using configured properties
*
* @param context Process Context
* @param session Process Session
*/
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final PGPPublicKeyService publicKeyService = context.getProperty(PUBLIC_KEY_SERVICE).asControllerService(PGPPublicKeyService.class);
final VerifyStreamCallback callback = new VerifyStreamCallback(publicKeyService);
try {
flowFile = session.write(flowFile, callback);
flowFile = session.putAllAttributes(flowFile, callback.attributes);
final String keyId = flowFile.getAttribute(PGPAttributeKey.SIGNATURE_KEY_ID);
getLogger().info("Signature Key ID [{}] Verification Completed {}", keyId, flowFile);
session.transfer(flowFile, SUCCESS);
} catch (final RuntimeException e) {
flowFile = session.putAllAttributes(flowFile, callback.attributes);
getLogger().error("Processing Failed {}", flowFile, e);
session.transfer(flowFile, FAILURE);
}
}
private class VerifyStreamCallback implements StreamCallback {
private final PGPPublicKeyService publicKeyService;
private final Map<String, String> attributes = new HashMap<>();
private boolean verified;
private VerifyStreamCallback(final PGPPublicKeyService publicKeyService) {
this.publicKeyService = publicKeyService;
}
/**
* Process Input Stream containing binary or ASCII Armored OpenPGP messages and write literal data after verification
*
* @param inputStream Input Stream to be read
* @param outputStream Output Stream for literal data contents
* @throws IOException Thrown when unable to read or write streams
*/
@Override
public void process(final InputStream inputStream, final OutputStream outputStream) throws IOException {
final InputStream decoderInputStream = PGPUtil.getDecoderStream(inputStream);
final PGPObjectFactory pgpObjectFactory = new JcaPGPObjectFactory(decoderInputStream);
final Iterator<?> objects = pgpObjectFactory.iterator();
if (objects.hasNext()) {
processObjectFactory(objects, outputStream);
}
if (verified) {
getLogger().debug("One-Pass Signature Algorithm [{}] Verified", attributes.get(PGPAttributeKey.SIGNATURE_ALGORITHM));
} else {
final String keyId = attributes.getOrDefault(PGPAttributeKey.SIGNATURE_KEY_ID, KEY_ID_UNKNOWN);
throw new PGPProcessException(String.format("Signature Key ID [%s] Verification Failed", keyId));
}
}
private void processObjectFactory(final Iterator<?> objects, final OutputStream outputStream) throws IOException {
PGPOnePassSignature onePassSignature = null;
while (objects.hasNext()) {
final Object object = objects.next();
getLogger().debug("PGP Object Read [{}]", object.getClass().getSimpleName());
if (object instanceof PGPCompressedData) {
final PGPCompressedData compressedData = (PGPCompressedData) object;
try {
final PGPObjectFactory compressedObjectFactory = new JcaPGPObjectFactory(compressedData.getDataStream());
processObjectFactory(compressedObjectFactory.iterator(), outputStream);
} catch (final PGPException e) {
throw new PGPProcessException("Read Compressed Data Failed", e);
}
} else if (object instanceof PGPOnePassSignatureList) {
final PGPOnePassSignatureList onePassSignatureList = (PGPOnePassSignatureList) object;
onePassSignature = processOnePassSignatures(onePassSignatureList);
} else if (object instanceof PGPLiteralData) {
final PGPLiteralData literalData = (PGPLiteralData) object;
processLiteralData(literalData, outputStream, onePassSignature);
} else if (object instanceof PGPSignatureList) {
final PGPSignatureList signatureList = (PGPSignatureList) object;
processSignatures(signatureList, onePassSignature);
}
}
}
private PGPOnePassSignature processOnePassSignatures(final PGPOnePassSignatureList onePassSignatureList) {
getLogger().debug("One-Pass Signatures Found [{}]", onePassSignatureList.size());
PGPOnePassSignature initializedOnePassSignature = null;
final Iterator<PGPOnePassSignature> onePassSignatures = onePassSignatureList.iterator();
if (onePassSignatures.hasNext()) {
final PGPOnePassSignature onePassSignature = onePassSignatures.next();
setOnePassSignatureAttributes(onePassSignature);
final String keyId = KeyIdentifierConverter.format(onePassSignature.getKeyID());
final Optional<PGPPublicKey> optionalPublicKey = publicKeyService.findPublicKey(keyId);
if (optionalPublicKey.isPresent()) {
getLogger().debug("One-Pass Signature Key ID [{}] found", keyId);
final PGPPublicKey publicKey = optionalPublicKey.get();
try {
onePassSignature.init(new JcaPGPContentVerifierBuilderProvider(), publicKey);
initializedOnePassSignature = onePassSignature;
} catch (final PGPException e) {
throw new PGPProcessException(String.format("One-Pass Signature Key ID [%s] Initialization Failed", keyId), e);
}
} else {
getLogger().warn("One-Pass Signature Key ID [{}] not found in Public Key Service", keyId);
}
}
return initializedOnePassSignature;
}
private void processLiteralData(final PGPLiteralData literalData,
final OutputStream outputStream,
final PGPOnePassSignature onePassSignature) throws IOException {
setLiteralDataAttributes(literalData);
final InputStream literalInputStream = literalData.getInputStream();
if (onePassSignature == null) {
StreamUtils.copy(literalInputStream, outputStream);
} else {
processSignedStream(literalInputStream, outputStream, onePassSignature);
}
}
private void processSignatures(final PGPSignatureList signatureList, final PGPOnePassSignature onePassSignature) {
getLogger().debug("Signatures Found [{}]", signatureList.size());
final Iterator<PGPSignature> signatures = signatureList.iterator();
if (signatures.hasNext()) {
final PGPSignature signature = signatures.next();
setSignatureAttributes(signature);
if (onePassSignature == null) {
getLogger().debug("One-Pass Signature not found: Verification Failed");
} else {
try {
verified = onePassSignature.verify(signature);
} catch (final PGPException e) {
final String keyId = KeyIdentifierConverter.format(onePassSignature.getKeyID());
throw new PGPProcessException(String.format("One-Pass Signature Key ID [%s] Verification Failed", keyId), e);
}
}
}
}
private void processSignedStream(final InputStream inputStream, final OutputStream outputStream, final PGPOnePassSignature onePassSignature) throws IOException {
final String keyId = KeyIdentifierConverter.format(onePassSignature.getKeyID());
getLogger().debug("Processing Data for One-Pass Signature with Key ID [{}]", keyId);
final byte[] buffer = new byte[BUFFER_SIZE];
int read;
while ((read = inputStream.read(buffer)) >= 0) {
onePassSignature.update(buffer, 0, read);
outputStream.write(buffer, 0, read);
}
}
private void setOnePassSignatureAttributes(final PGPOnePassSignature onePassSignature) {
setSignatureAlgorithm(onePassSignature.getKeyAlgorithm(), onePassSignature.getHashAlgorithm());
attributes.put(PGPAttributeKey.SIGNATURE_KEY_ID, KeyIdentifierConverter.format(onePassSignature.getKeyID()));
attributes.put(PGPAttributeKey.SIGNATURE_TYPE_ID, Integer.toString(onePassSignature.getSignatureType()));
}
private void setSignatureAttributes(final PGPSignature signature) {
setSignatureAlgorithm(signature.getKeyAlgorithm(), signature.getHashAlgorithm());
attributes.put(PGPAttributeKey.SIGNATURE_CREATED, Long.toString(signature.getCreationTime().getTime()));
attributes.put(PGPAttributeKey.SIGNATURE_KEY_ID, KeyIdentifierConverter.format(signature.getKeyID()));
attributes.put(PGPAttributeKey.SIGNATURE_TYPE_ID, Integer.toString(signature.getSignatureType()));
attributes.put(PGPAttributeKey.SIGNATURE_VERSION, Integer.toString(signature.getVersion()));
}
private void setLiteralDataAttributes(final PGPLiteralData literalData) {
attributes.put(PGPAttributeKey.LITERAL_DATA_FILENAME, literalData.getFileName());
attributes.put(PGPAttributeKey.LITERAL_DATA_MODIFIED, Long.toString(literalData.getModificationTime().getTime()));
}
private void setSignatureAlgorithm(final int keyAlgorithm, final int hashAlgorithm) {
attributes.put(PGPAttributeKey.SIGNATURE_HASH_ALGORITHM_ID, Integer.toString(hashAlgorithm));
attributes.put(PGPAttributeKey.SIGNATURE_KEY_ALGORITHM_ID, Integer.toString(keyAlgorithm));
try {
final String algorithm = PGPUtil.getSignatureName(keyAlgorithm, hashAlgorithm);
attributes.put(PGPAttributeKey.SIGNATURE_ALGORITHM, algorithm);
} catch (final PGPException e) {
getLogger().debug("Signature Algorithm Key Identifier [{}] Hash Identifier [{}] not found", keyAlgorithm, hashAlgorithm);
}
}
}
}