blob: 34daa65b658406b14ef31445b53fc15038897685 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBMINIFI_INCLUDE_CORE_FLOWFILE_H_
#define LIBMINIFI_INCLUDE_CORE_FLOWFILE_H_
#include <map>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include "utils/TimeUtil.h"
#include "ResourceClaim.h"
#include "Connectable.h"
#include "WeakReference.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace core {
class FlowFile : public core::Connectable, public ReferenceContainer {
public:
FlowFile();
~FlowFile() override;
FlowFile& operator=(const FlowFile& other);
/**
* Returns a pointer to this flow file record's
* claim
*/
std::shared_ptr<ResourceClaim> getResourceClaim();
/**
* Sets _claim to the inbound claim argument
*/
void setResourceClaim(const std::shared_ptr<ResourceClaim>& claim);
/**
* clear the resource claim
*/
void clearResourceClaim();
/**
* Returns a pointer to this flow file record's
* claim at the given stash key
*/
std::shared_ptr<ResourceClaim> getStashClaim(const std::string& key);
/**
* Sets the given stash key to the inbound claim argument
*/
void setStashClaim(const std::string& key, const std::shared_ptr<ResourceClaim>& claim);
/**
* Clear the resource claim at the given stash key
*/
void clearStashClaim(const std::string& key);
/**
* Return true if the given stash claim exists
*/
bool hasStashClaim(const std::string& key);
/**
* Get lineage identifiers
*/
std::set<std::string>& getlineageIdentifiers();
/**
* Returns whether or not this flow file record
* is marked as deleted.
* @return marked deleted
*/
bool isDeleted() const;
/**
* Sets whether to mark this flow file record
* as deleted
* @param deleted deleted flag
*/
void setDeleted(bool deleted);
/**
* Get entry date for this record
* @return entry date uint64_t
*/
uint64_t getEntryDate() const;
/**
* Gets the event time.
* @return event time.
*/
uint64_t getEventTime() const;
/**
* Get lineage start date
* @return lineage start date uint64_t
*/
uint64_t getlineageStartDate() const;
/**
* Sets the lineage start date
* @param date new lineage start date
*/
void setLineageStartDate(const uint64_t date);
void setLineageIdentifiers(std::set<std::string> lineage_Identifiers) {
lineage_Identifiers_ = lineage_Identifiers;
}
/**
* Obtains an attribute if it exists. If it does the value is
* copied into value
* @param key key to look for
* @param value value to set
* @return result of finding key
*/
bool getAttribute(std::string key, std::string& value) const;
/**
* Updates the value in the attribute map that corresponds
* to key
* @param key attribute name
* @param value value to set to attribute name
* @return result of finding key
*/
bool updateAttribute(std::string key, std::string value);
/**
* Removes the attribute
* @param key attribute name to remove
* @return result of finding key
*/
bool removeAttribute(std::string key);
/**
* setAttribute, if attribute already there, update it, else, add it
*/
void setAttribute(const std::string& key, const std::string& value) {
attributes_[key] = value;
}
/**
* Returns the map of attributes
* @return attributes.
*/
std::map<std::string, std::string> getAttributes() const {
return attributes_;
}
/**
* Returns the map of attributes
* @return attributes.
*/
std::map<std::string, std::string> *getAttributesPtr() {
return &attributes_;
}
/**
* adds an attribute if it does not exist
*
*/
bool addAttribute(const std::string& key, const std::string& value);
/**
* Set the size of this record.
* @param size size of record to set.Ï
*/
void setSize(const uint64_t size) {
size_ = size;
}
/**
* Returns the size of corresponding flow file
* @return size as a uint64_t
*/
uint64_t getSize() const;
/**
* Sets the offset
* @param offset offset to apply to this record.
*/
void setOffset(const uint64_t offset) {
offset_ = offset;
}
/**
* Sets the penalty expiration
* @param penaltyExp new penalty expiration
*/
void setPenaltyExpiration(const uint64_t penaltyExp) {
penaltyExpiration_ms_ = penaltyExp;
}
uint64_t getPenaltyExpiration() const {
return penaltyExpiration_ms_;
}
/**
* Gets the offset within the flow file
* @return size as a uint64_t
*/
uint64_t getOffset() const;
bool getUUID(utils::Identifier& other) {
other = uuid_;
return true;
}
// Check whether it is still being penalized
bool isPenalized() const {
return penaltyExpiration_ms_ > 0 && penaltyExpiration_ms_ > utils::timeutils::getTimeMillis();
}
uint64_t getId() const {
return id_;
}
/**
* Yield
*/
void yield() override {
}
/**
* Determines if we are connected and operating
*/
bool isRunning() override {
return true;
}
/**
* Determines if work is available by this connectable
* @return boolean if work is available.
*/
bool isWorkAvailable() override {
return true;
}
/**
* Sets the original connection with a shared pointer.
* @param connection shared connection.
*/
void setConnection(std::shared_ptr<core::Connectable>& connection);
/**
* Sets the original connection with a shared pointer.
* @param connection shared connection.
*/
void setConnection(std::shared_ptr<core::Connectable>&& connection);
/**
* Returns the connection referenced by this record.
* @return shared connection pointer.
*/
std::shared_ptr<core::Connectable> getConnection() const;
/**
* Sets the original connection with a shared pointer.
* @param connection shared connection.
*/
void setOriginalConnection(std::shared_ptr<core::Connectable>& connection);
/**
* Returns the original connection referenced by this record.
* @return shared original connection pointer.
*/
std::shared_ptr<core::Connectable> getOriginalConnection() const;
void setStoredToRepository(bool storedInRepository) {
stored = storedInRepository;
}
bool isStored() const {
return stored;
}
protected:
bool stored;
// Mark for deletion
bool marked_delete_;
// Date at which the flow file entered the flow
uint64_t entry_date_;
// event time
uint64_t event_time_;
// Date at which the origin of this flow file entered the flow
uint64_t lineage_start_date_;
// Date at which the flow file was queued
uint64_t last_queue_date_;
// Size in bytes of the data corresponding to this flow file
uint64_t size_;
// A global unique identifier
// A local unique identifier
uint64_t id_;
// Offset to the content
uint64_t offset_;
// Penalty expiration
uint64_t penaltyExpiration_ms_;
// Attributes key/values pairs for the flow record
std::map<std::string, std::string> attributes_;
// Pointer to the associated content resource claim
std::shared_ptr<ResourceClaim> claim_;
// Pointers to stashed content resource claims
std::map<std::string, std::shared_ptr<ResourceClaim>> stashedContent_;
// UUID string
// std::string uuid_str_;
// UUID string for all parents
std::set<std::string> lineage_Identifiers_;
// Connection queue that this flow file will be transfer or current in
std::shared_ptr<core::Connectable> connection_;
// Orginal connection queue that this flow file was dequeued from
std::shared_ptr<core::Connectable> original_connection_;
private:
static std::shared_ptr<logging::Logger> logger_;
static std::shared_ptr<utils::IdGenerator> id_generator_;
static std::shared_ptr<utils::NonRepeatingStringGenerator> numeric_id_generator_;
};
} // namespace core
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org
#endif // LIBMINIFI_INCLUDE_CORE_FLOWFILE_H_