blob: 4f526c3c1d099b218bd09d7ede602ba56f7738e2 [file] [log] [blame]
/**
* @file ProcessSession.cpp
* ProcessSession class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <sys/time.h>
#include <time.h>
#include <chrono>
#include <thread>
#include <iostream>
#include "ProcessSession.h"
FlowFileRecord* ProcessSession::create()
{
std::map<std::string, std::string> empty;
FlowFileRecord *record = new FlowFileRecord(empty);
if (record)
{
_addedFlowFiles[record->getUUIDStr()] = record;
_logger->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
}
return record;
}
FlowFileRecord* ProcessSession::create(FlowFileRecord *parent)
{
FlowFileRecord *record = this->create();
if (record)
{
// Copy attributes
std::map<std::string, std::string> parentAttributes = parent->getAttributes();
std::map<std::string, std::string>::iterator it;
for (it = parentAttributes.begin(); it!= parentAttributes.end(); it++)
{
if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) ||
it->first == FlowAttributeKey(DISCARD_REASON) ||
it->first == FlowAttributeKey(UUID))
// Do not copy special attributes from parent
continue;
record->setAttribute(it->first, it->second);
}
record->_lineageStartDate = parent->_lineageStartDate;
record->_lineageIdentifiers = parent->_lineageIdentifiers;
record->_lineageIdentifiers.insert(parent->_uuidStr);
}
return record;
}
FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent)
{
FlowFileRecord *record = this->create(parent);
if (record)
{
// Copy Resource Claim
record->_claim = parent->_claim;
if (record->_claim)
{
record->_offset = parent->_offset;
record->_size = parent->_size;
record->_claim->increaseFlowFileRecordOwnedCount();
}
}
return record;
}
FlowFileRecord* ProcessSession::cloneDuringTransfer(FlowFileRecord *parent)
{
std::map<std::string, std::string> empty;
FlowFileRecord *record = new FlowFileRecord(empty);
if (record)
{
this->_clonedFlowFiles[record->getUUIDStr()] = record;
_logger->log_debug("Clone FlowFile with UUID %s during transfer", record->getUUIDStr().c_str());
// Copy attributes
std::map<std::string, std::string> parentAttributes = parent->getAttributes();
std::map<std::string, std::string>::iterator it;
for (it = parentAttributes.begin(); it!= parentAttributes.end(); it++)
{
if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) ||
it->first == FlowAttributeKey(DISCARD_REASON) ||
it->first == FlowAttributeKey(UUID))
// Do not copy special attributes from parent
continue;
record->setAttribute(it->first, it->second);
}
record->_lineageStartDate = parent->_lineageStartDate;
record->_lineageIdentifiers = parent->_lineageIdentifiers;
record->_lineageIdentifiers.insert(parent->_uuidStr);
// Copy Resource Claim
record->_claim = parent->_claim;
if (record->_claim)
{
record->_offset = parent->_offset;
record->_size = parent->_size;
record->_claim->increaseFlowFileRecordOwnedCount();
}
}
return record;
}
FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent, long offset, long size)
{
FlowFileRecord *record = this->create(parent);
if (record)
{
if (parent->_claim)
{
if ((offset + size) > (long) parent->_size)
{
// Set offset and size
_logger->log_error("clone offset %d and size %d exceed parent size %d",
offset, size, parent->_size);
// Remove the Add FlowFile for the session
std::map<std::string, FlowFileRecord *>::iterator it =
this->_addedFlowFiles.find(record->getUUIDStr());
if (it != this->_addedFlowFiles.end())
this->_addedFlowFiles.erase(record->getUUIDStr());
delete record;
return NULL;
}
record->_offset = parent->_offset + parent->_offset;
record->_size = size;
// Copy Resource Claim
record->_claim = parent->_claim;
record->_claim->increaseFlowFileRecordOwnedCount();
}
}
return record;
}
void ProcessSession::remove(FlowFileRecord *flow)
{
flow->_markedDelete = true;
_deletedFlowFiles[flow->getUUIDStr()] = flow;
}
void ProcessSession::putAttribute(FlowFileRecord *flow, std::string key, std::string value)
{
flow->setAttribute(key, value);
}
void ProcessSession::removeAttribute(FlowFileRecord *flow, std::string key)
{
flow->removeAttribute(key);
}
void ProcessSession::penalize(FlowFileRecord *flow)
{
flow->_penaltyExpirationMs = getTimeMillis() + this->_processContext->getProcessor()->getPenalizationPeriodMsec();
}
void ProcessSession::transfer(FlowFileRecord *flow, Relationship relationship)
{
_transferRelationship[flow->getUUIDStr()] = relationship;
}
void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback *callback)
{
ResourceClaim *claim = NULL;
claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY);
try
{
std::ofstream fs;
fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
if (fs.is_open())
{
// Call the callback to write the content
callback->process(&fs);
if (fs.good() && fs.tellp() >= 0)
{
flow->_size = fs.tellp();
flow->_offset = 0;
if (flow->_claim)
{
// Remove the old claim
flow->_claim->decreaseFlowFileRecordOwnedCount();
flow->_claim = NULL;
}
flow->_claim = claim;
claim->increaseFlowFileRecordOwnedCount();
/*
_logger->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
fs.close();
}
else
{
fs.close();
throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
}
}
else
{
throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
}
}
catch (std::exception &exception)
{
if (flow && flow->_claim == claim)
{
flow->_claim->decreaseFlowFileRecordOwnedCount();
flow->_claim = NULL;
}
if (claim)
delete claim;
_logger->log_debug("Caught Exception %s", exception.what());
throw;
}
catch (...)
{
if (flow && flow->_claim == claim)
{
flow->_claim->decreaseFlowFileRecordOwnedCount();
flow->_claim = NULL;
}
if (claim)
delete claim;
_logger->log_debug("Caught Exception during process session write");
throw;
}
}
void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback *callback)
{
ResourceClaim *claim = NULL;
if (flow->_claim == NULL)
{
// No existed claim for append, we need to create new claim
return write(flow, callback);
}
claim = flow->_claim;
try
{
std::ofstream fs;
fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app);
if (fs.is_open())
{
// Call the callback to write the content
std::streampos oldPos = fs.tellp();
callback->process(&fs);
if (fs.good() && fs.tellp() >= 0)
{
uint64_t appendSize = fs.tellp() - oldPos;
flow->_size += appendSize;
/*
_logger->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
fs.close();
}
else
{
fs.close();
throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
}
}
else
{
throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
}
}
catch (std::exception &exception)
{
_logger->log_debug("Caught Exception %s", exception.what());
throw;
}
catch (...)
{
_logger->log_debug("Caught Exception during process session append");
throw;
}
}
void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback *callback)
{
try
{
ResourceClaim *claim = NULL;
if (flow->_claim == NULL)
{
// No existed claim for read, we throw exception
throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read");
}
claim = flow->_claim;
std::ifstream fs;
fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary);
if (fs.is_open())
{
fs.seekg(flow->_offset, fs.beg);
if (fs.good())
{
callback->process(&fs);
/*
_logger->log_debug("Read offset %d size %d content %s for FlowFile UUID %s",
flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
fs.close();
}
else
{
fs.close();
throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error");
}
}
else
{
throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
}
}
catch (std::exception &exception)
{
_logger->log_debug("Caught Exception %s", exception.what());
throw;
}
catch (...)
{
_logger->log_debug("Caught Exception during process session read");
throw;
}
}
void ProcessSession::import(std::string source, FlowFileRecord *flow, bool keepSource, uint64_t offset)
{
ResourceClaim *claim = NULL;
claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY);
char *buf = NULL;
int size = 4096;
buf = new char [size];
try
{
std::ofstream fs;
fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
std::ifstream input;
input.open(source.c_str(), std::fstream::in | std::fstream::binary);
if (fs.is_open() && input.is_open())
{
// Open the source file and stream to the flow file
input.seekg(offset, fs.beg);
while (input.good())
{
input.read(buf, size);
if (input)
fs.write(buf, size);
else
fs.write(buf, input.gcount());
}
if (fs.good() && fs.tellp() >= 0)
{
flow->_size = fs.tellp();
flow->_offset = 0;
if (flow->_claim)
{
// Remove the old claim
flow->_claim->decreaseFlowFileRecordOwnedCount();
flow->_claim = NULL;
}
flow->_claim = claim;
claim->increaseFlowFileRecordOwnedCount();
/*
_logger->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s",
flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
fs.close();
input.close();
if (!keepSource)
std::remove(source.c_str());
}
else
{
fs.close();
input.close();
throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
}
}
else
{
throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
}
delete[] buf;
}
catch (std::exception &exception)
{
if (flow && flow->_claim == claim)
{
flow->_claim->decreaseFlowFileRecordOwnedCount();
flow->_claim = NULL;
}
if (claim)
delete claim;
_logger->log_debug("Caught Exception %s", exception.what());
delete[] buf;
throw;
}
catch (...)
{
if (flow && flow->_claim == claim)
{
flow->_claim->decreaseFlowFileRecordOwnedCount();
flow->_claim = NULL;
}
if (claim)
delete claim;
_logger->log_debug("Caught Exception during process session write");
delete[] buf;
throw;
}
}
void ProcessSession::commit()
{
try
{
// First we clone the flow record based on the transfered relationship for updated flow record
std::map<std::string, FlowFileRecord *>::iterator it;
for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++)
{
FlowFileRecord *record = it->second;
if (record->_markedDelete)
continue;
std::map<std::string, Relationship>::iterator itRelationship =
this->_transferRelationship.find(record->getUUIDStr());
if (itRelationship != _transferRelationship.end())
{
Relationship relationship = itRelationship->second;
// Find the relationship, we need to find the connections for that relationship
std::set<Connection *> connections =
_processContext->getProcessor()->getOutGoingConnections(relationship.getName());
if (connections.empty())
{
// No connection
if (!_processContext->getProcessor()->isAutoTerminated(relationship))
{
// Not autoterminate, we should have the connect
std::string message = "Connect empty for non auto terminated relationship" + relationship.getName();
throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
}
else
{
// Autoterminated
remove(record);
}
}
else
{
// We connections, clone the flow and assign the connection accordingly
for (std::set<Connection *>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection)
{
Connection *connection(*itConnection);
if (itConnection == connections.begin())
{
// First connection which the flow need be routed to
record->_connection = connection;
}
else
{
// Clone the flow file and route to the connection
FlowFileRecord *cloneRecord;
cloneRecord = this->cloneDuringTransfer(record);
if (cloneRecord)
cloneRecord->_connection = connection;
else
throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
}
}
}
}
else
{
// Can not find relationship for the flow
throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow");
}
}
// Do the samething for added flow file
for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++)
{
FlowFileRecord *record = it->second;
if (record->_markedDelete)
continue;
std::map<std::string, Relationship>::iterator itRelationship =
this->_transferRelationship.find(record->getUUIDStr());
if (itRelationship != _transferRelationship.end())
{
Relationship relationship = itRelationship->second;
// Find the relationship, we need to find the connections for that relationship
std::set<Connection *> connections =
_processContext->getProcessor()->getOutGoingConnections(relationship.getName());
if (connections.empty())
{
// No connection
if (!_processContext->getProcessor()->isAutoTerminated(relationship))
{
// Not autoterminate, we should have the connect
std::string message = "Connect empty for non auto terminated relationship " + relationship.getName();
throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
}
else
{
// Autoterminated
remove(record);
}
}
else
{
// We connections, clone the flow and assign the connection accordingly
for (std::set<Connection *>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection)
{
Connection *connection(*itConnection);
if (itConnection == connections.begin())
{
// First connection which the flow need be routed to
record->_connection = connection;
}
else
{
// Clone the flow file and route to the connection
FlowFileRecord *cloneRecord;
cloneRecord = this->cloneDuringTransfer(record);
if (cloneRecord)
cloneRecord->_connection = connection;
else
throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
}
}
}
}
else
{
// Can not find relationship for the flow
throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow");
}
}
// Complete process the added and update flow files for the session, send the flow file to its queue
for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++)
{
FlowFileRecord *record = it->second;
if (record->_markedDelete)
{
continue;
}
if (record->_connection)
record->_connection->put(record);
else
delete record;
}
for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++)
{
FlowFileRecord *record = it->second;
if (record->_markedDelete)
{
continue;
}
if (record->_connection)
record->_connection->put(record);
else
delete record;
}
// Process the clone flow files
for (it = _clonedFlowFiles.begin(); it!= _clonedFlowFiles.end(); it++)
{
FlowFileRecord *record = it->second;
if (record->_markedDelete)
{
continue;
}
if (record->_connection)
record->_connection->put(record);
else
delete record;
}
// Delete the deleted flow files
for (it = _deletedFlowFiles.begin(); it!= _deletedFlowFiles.end(); it++)
{
FlowFileRecord *record = it->second;
delete record;
}
// Delete the snapshot
for (it = _originalFlowFiles.begin(); it!= _originalFlowFiles.end(); it++)
{
FlowFileRecord *record = it->second;
delete record;
}
// All done
_updatedFlowFiles.clear();
_addedFlowFiles.clear();
_clonedFlowFiles.clear();
_deletedFlowFiles.clear();
_originalFlowFiles.clear();
_logger->log_trace("ProcessSession committed for %s", _processContext->getProcessor()->getName().c_str());
}
catch (std::exception &exception)
{
_logger->log_debug("Caught Exception %s", exception.what());
throw;
}
catch (...)
{
_logger->log_debug("Caught Exception during process session commit");
throw;
}
}
void ProcessSession::rollback()
{
try
{
std::map<std::string, FlowFileRecord *>::iterator it;
// Requeue the snapshot of the flowfile back
for (it = _originalFlowFiles.begin(); it!= _originalFlowFiles.end(); it++)
{
FlowFileRecord *record = it->second;
if (record->_orginalConnection)
{
record->_snapshot = false;
record->_orginalConnection->put(record);
}
else
delete record;
}
_originalFlowFiles.clear();
// Process the clone flow files
for (it = _clonedFlowFiles.begin(); it!= _clonedFlowFiles.end(); it++)
{
FlowFileRecord *record = it->second;
delete record;
}
_clonedFlowFiles.clear();
for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++)
{
FlowFileRecord *record = it->second;
delete record;
}
_addedFlowFiles.clear();
for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++)
{
FlowFileRecord *record = it->second;
delete record;
}
_updatedFlowFiles.clear();
_deletedFlowFiles.clear();
_logger->log_trace("ProcessSession rollback for %s", _processContext->getProcessor()->getName().c_str());
}
catch (std::exception &exception)
{
_logger->log_debug("Caught Exception %s", exception.what());
throw;
}
catch (...)
{
_logger->log_debug("Caught Exception during process session roll back");
throw;
}
}
FlowFileRecord *ProcessSession::get()
{
Connection *first = _processContext->getProcessor()->getNextIncomingConnection();
if (first == NULL)
return NULL;
Connection *current = first;
do
{
std::set<FlowFileRecord *> expired;
FlowFileRecord *ret = current->poll(expired);
if (expired.size() > 0)
{
// Remove expired flow record
for (std::set<FlowFileRecord *>::iterator it = expired.begin(); it != expired.end(); ++it)
{
delete (*it);
}
}
if (ret)
{
// add the flow record to the current process session update map
ret->_markedDelete = false;
_updatedFlowFiles[ret->getUUIDStr()] = ret;
std::map<std::string, std::string> empty;
FlowFileRecord *snapshot = new FlowFileRecord(empty);
_logger->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr().c_str());
snapshot->duplicate(ret);
// save a snapshot
_originalFlowFiles[snapshot->getUUIDStr()] = snapshot;
return ret;
}
current = _processContext->getProcessor()->getNextIncomingConnection();
}
while (current != NULL && current != first);
return NULL;
}