blob: fdd4d5745cbc249a65d4f32c3230286e922d60c6 [file] [log] [blame]
/**
* @file CompressContent.cpp
* CompressContent class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "CompressContent.h"
#include <stdio.h>
#include <algorithm>
#include <memory>
#include <string>
#include <map>
#include <set>
#include "utils/TimeUtil.h"
#include "utils/StringUtils.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
core::Property CompressContent::CompressLevel("Compression Level", "The compression level to use; this is valid only when using GZIP compression.", "1");
core::Property CompressContent::CompressMode("Mode", "Indicates whether the processor should compress content or decompress content.", MODE_COMPRESS);
core::Property CompressContent::CompressFormat("Compression Format", "The compression format to use.", COMPRESSION_FORMAT_ATTRIBUTE);
core::Property CompressContent::UpdateFileName("Update Filename", "Determines if filename extension need to be updated", "false");
core::Relationship CompressContent::Success("success", "FlowFiles will be transferred to the success relationship after successfully being compressed or decompressed");
core::Relationship CompressContent::Failure("failure", "FlowFiles will be transferred to the failure relationship if they fail to compress/decompress");
void CompressContent::initialize() {
// Set the supported properties
std::set<core::Property> properties;
properties.insert(CompressLevel);
properties.insert(CompressMode);
properties.insert(CompressFormat);
properties.insert(UpdateFileName);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
relationships.insert(Failure);
relationships.insert(Success);
setSupportedRelationships(relationships);
}
void CompressContent::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
std::string value;
if (context->getProperty(CompressLevel.getName(), value) && !value.empty()) {
core::Property::StringToInt(value, compressLevel_);
}
value = "";
if (context->getProperty(CompressMode.getName(), value) && !value.empty()) {
this->compressMode_ = value;
}
value = "";
if (context->getProperty(CompressFormat.getName(), value) && !value.empty()) {
this->compressFormat_ = value;
}
value = "";
if (context->getProperty(UpdateFileName.getName(), value) && !value.empty()) {
org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, updateFileName_);
}
logger_->log_info("Compress Content: Mode [%s] Format [%s] Level [%d] UpdateFileName [%d]", compressMode_, compressFormat_, compressLevel_, updateFileName_);
// update the mimeTypeMap
compressionFormatMimeTypeMap_["application/gzip"] = COMPRESSION_FORMAT_GZIP;
compressionFormatMimeTypeMap_["application/bzip2"] = COMPRESSION_FORMAT_BZIP2;
compressionFormatMimeTypeMap_["application/x-bzip2"] = COMPRESSION_FORMAT_BZIP2;
compressionFormatMimeTypeMap_["application/x-lzma"] = COMPRESSION_FORMAT_LZMA;
compressionFormatMimeTypeMap_["application/x-xz"] = COMPRESSION_FORMAT_XZ_LZMA2;
fileExtension_[COMPRESSION_FORMAT_GZIP] = ".gz";
fileExtension_[COMPRESSION_FORMAT_LZMA] = ".lzma";
fileExtension_[COMPRESSION_FORMAT_BZIP2] = ".bz2";
fileExtension_[COMPRESSION_FORMAT_XZ_LZMA2] = ".xz";
}
void CompressContent::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
std::shared_ptr<core::FlowFile> flowFile = session->get();
if (!flowFile) {
return;
}
std::string compressFormat = compressFormat_;
if (compressFormat_ == COMPRESSION_FORMAT_ATTRIBUTE) {
std::string attr;
flowFile->getAttribute(FlowAttributeKey(MIME_TYPE), attr);
if (attr.empty()) {
logger_->log_error("No %s attribute existed for the flow, route to failure", FlowAttributeKey(MIME_TYPE));
session->transfer(flowFile, Failure);
return;
}
auto search = compressionFormatMimeTypeMap_.find(attr);
if (search != compressionFormatMimeTypeMap_.end()) {
compressFormat = search->second;
} else {
logger_->log_info("Mime type of %s is not indicated a support format, route to success", attr);
session->transfer(flowFile, Success);
return;
}
}
std::transform(compressFormat.begin(), compressFormat.end(), compressFormat.begin(), ::tolower);
std::string mimeType;
if (compressFormat == COMPRESSION_FORMAT_GZIP) {
mimeType = "application/gzip";
} else if (compressFormat == COMPRESSION_FORMAT_BZIP2) {
mimeType = "application/bzip2";
} else if (compressFormat == COMPRESSION_FORMAT_LZMA) {
mimeType = "application/x-lzma";
} else if (compressFormat == COMPRESSION_FORMAT_XZ_LZMA2) {
mimeType = "application/x-xz";
} else {
logger_->log_error("Compress format is invalid %s", compressFormat);
session->transfer(flowFile, Failure);
return;
}
std::string fileExtension;
auto search = fileExtension_.find(compressFormat);
if (search != fileExtension_.end()) {
fileExtension = search->second;
}
std::shared_ptr<core::FlowFile> processFlowFile = session->create(flowFile);
CompressContent::WriteCallback callback(compressMode_, compressLevel_, compressFormat, flowFile, session);
session->write(processFlowFile, &callback);
if (callback.status_ < 0) {
logger_->log_error("Compress Content processing fail for the flow with UUID %s", flowFile->getUUIDStr());
session->transfer(flowFile, Failure);
session->remove(processFlowFile);
} else {
std::string fileName;
processFlowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
if (compressMode_ == MODE_COMPRESS) {
session->putAttribute(processFlowFile, FlowAttributeKey(MIME_TYPE), mimeType);
if (updateFileName_) {
fileName = fileName + fileExtension;
session->putAttribute(processFlowFile, FlowAttributeKey(FILENAME), fileName);
}
} else {
session->removeAttribute(processFlowFile, FlowAttributeKey(MIME_TYPE));
if (updateFileName_) {
if (fileName.size() >= fileExtension.size() && fileName.compare(fileName.size() - fileExtension.size(), fileExtension.size(), fileExtension) == 0) {
fileName = fileName.substr(0, fileName.size() - fileExtension.size());
session->putAttribute(processFlowFile, FlowAttributeKey(FILENAME), fileName);
}
}
}
logger_->log_debug("Compress Content processing success for the flow with UUID %s name %s", processFlowFile->getUUIDStr(), fileName);
session->transfer(processFlowFile, Success);
session->remove(flowFile);
}
}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */