blob: 446fc6d9814523a6a71efd3896cc2ed30e064491 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.crypto.propertiesfactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.crypto.ColumnEncryptionProperties;
import org.apache.parquet.crypto.DecryptionKeyRetrieverMock;
import org.apache.parquet.crypto.DecryptionPropertiesFactory;
import org.apache.parquet.crypto.EncryptionPropertiesFactory;
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.crypto.ParquetCipher;
import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class SchemaCryptoPropertiesFactory implements EncryptionPropertiesFactory, DecryptionPropertiesFactory {
public static final String PATH_NAME_PREFIX = "column_encryption_1178_";
private static Logger log = LoggerFactory.getLogger(SchemaCryptoPropertiesFactory.class);
public static final String CONF_ENCRYPTION_ALGORITHM = "parquet.encryption.algorithm";
public static final String CONF_ENCRYPTION_FOOTER = "parquet.encrypt.footer";
private static final byte[] FOOTER_KEY = {0x01, 0x02, 0x03, 0x4, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a,
0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10};
private static final byte[] FOOTER_KEY_METADATA = "footkey".getBytes(StandardCharsets.UTF_8);
private static final byte[] COL_KEY = {0x02, 0x03, 0x4, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b,
0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11};
private static final byte[] COL_KEY_METADATA = "col".getBytes(StandardCharsets.UTF_8);
@Override
public FileEncryptionProperties getFileEncryptionProperties(Configuration conf, Path tempFilePath,
WriteContext fileWriteContext) throws ParquetCryptoRuntimeException {
MessageType schema = fileWriteContext.getSchema();
List<String[]> paths = schema.getPaths();
if (paths == null || paths.isEmpty()) {
throw new ParquetCryptoRuntimeException("Null or empty fields is found");
}
Map<ColumnPath, ColumnEncryptionProperties> columnPropertyMap = new HashMap<>();
for (String[] path : paths) {
getColumnEncryptionProperties(path, columnPropertyMap, conf);
}
if (columnPropertyMap.size() == 0) {
log.debug("No column is encrypted. Returning null so that Parquet can skip. Empty properties will cause Parquet exception");
return null;
}
/**
* Why we still need footerKeyMetadata even withEncryptedFooter as false? According to the
* 'Plaintext Footer' section of
* https://github.com/apache/parquet-format/blob/encryption/Encryption.md, the plaintext footer
* is signed in order to prevent tampering with the FileMetaData contents. So footerKeyMetadata
* is always needed. This signature will be verified if parquet-mr code is with parquet-1178.
* Otherwise, it will be ignored.
*/
boolean shouldEncryptFooter = getEncryptFooter(conf);
FileEncryptionProperties.Builder encryptionPropertiesBuilder =
FileEncryptionProperties.builder(FOOTER_KEY)
.withFooterKeyMetadata(FOOTER_KEY_METADATA)
.withAlgorithm(getParquetCipherOrDefault(conf))
.withEncryptedColumns(columnPropertyMap);
if (!shouldEncryptFooter) {
encryptionPropertiesBuilder = encryptionPropertiesBuilder.withPlaintextFooter();
}
FileEncryptionProperties encryptionProperties = encryptionPropertiesBuilder.build();
log.info(
"FileEncryptionProperties is built with, algorithm:{}, footerEncrypted:{}",
encryptionProperties.getAlgorithm(),
encryptionProperties.encryptedFooter());
return encryptionProperties;
}
private ParquetCipher getParquetCipherOrDefault(Configuration conf) {
String algorithm = conf.get(CONF_ENCRYPTION_ALGORITHM, "AES_GCM_CTR_V1");
log.debug("Encryption algorithm is {}", algorithm);
return ParquetCipher.valueOf(algorithm.toUpperCase());
}
private boolean getEncryptFooter(Configuration conf) {
boolean encryptFooter = conf.getBoolean(CONF_ENCRYPTION_FOOTER, false);
log.debug("Encrypt Footer: {}", encryptFooter);
return encryptFooter;
}
private void getColumnEncryptionProperties(String[] path, Map<ColumnPath, ColumnEncryptionProperties> columnPropertyMap,
Configuration conf) throws ParquetCryptoRuntimeException {
String pathName = String.join(".", path);
String columnKeyName = conf.get(PATH_NAME_PREFIX + pathName, null);
if (columnKeyName != null) {
ColumnPath columnPath = ColumnPath.get(path);
ColumnEncryptionProperties colEncProp = ColumnEncryptionProperties.builder(columnPath)
.withKey(COL_KEY)
.withKeyMetaData(COL_KEY_METADATA)
.build();
columnPropertyMap.put(columnPath, colEncProp);
}
}
@Override
public FileDecryptionProperties getFileDecryptionProperties(Configuration hadoopConfig, Path filePath)
throws ParquetCryptoRuntimeException {
DecryptionKeyRetrieverMock keyRetriever = new DecryptionKeyRetrieverMock();
keyRetriever.putKey("footkey", FOOTER_KEY);
keyRetriever.putKey("col", COL_KEY);
return FileDecryptionProperties.builder().withPlaintextFilesAllowed().withKeyRetriever(keyRetriever).build();
}
}